# Spark算子执行流程详解之一

2017-03-03

# 1.take

 def take(num: Int): Array[T] = withScope { if (num == 0) { new Array[T](0) } else { val buf = newArrayBuffer[T] val totalParts = this.partitions.length var partsScanned = 0 while (buf.size < num && partsScanned < totalParts) { // The number of partitions to try in this iteration. It is ok for this number to be // greater than totalParts because we actually cap it at totalParts in runJob. var numPartsToTry =1 if (partsScanned > 0) { // If we didn't find any rows after the previous iteration, quadruple and retry. // Otherwise, interpolate the number of partitions we need to try, but overestimate // it by 50%. We also cap the estimation in the end. if (buf.size ==0) {//截止目前为止buf为空的话，则扩大4倍范围 numPartsToTry = partsScanned * 4 } else {//截止目前为止还有部分值没取到的话，则扩大至Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)，但是不超过当前已扫描过分区的4倍 // the left side of max is >=1 whenever partsScanned >= 2 numPartsToTry = Math.max((1.5* num * partsScanned / buf.size).toInt - partsScanned, 1) numPartsToTry = Math.min(numPartsToTry, partsScanned * 4) } } val left = num - buf.size val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) val res = sc.runJob(this, (it:Iterator[T]) => it.take(left).toArray, p, allowLocal =true) res.foreach(buf ++= _.take(num - buf.size)) partsScanned += numPartsToTry } buf.toArray } }

 /** * Run a job on a given set of partitions of an RDD, but take a function of type * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`. */ def runJob[T,U: ClassTag]( rdd: RDD[T], func: Iterator[T] =>U, partitions: Seq[Int], allowLocal: Boolean ): Array[U] = { val cleanedFunc = clean(func) runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions, allowLocal) }

Take可以避免全量计算，执行时间比较短。但可能会多次触发action。

# 2.first

 /** * Return the first element in this RDD. */ def first(): T = withScope { take(1) match { case Array(t) => t case _ => throw newUnsupportedOperationException("empty collection") } }

# 3.sortByKey

 def sortByKey(ascending: Boolean =true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] = self.withScope { val part = newRangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K,V, V](self, part) .setKeyOrdering(if (ascending)ordering elseordering.reverse) }

sortByKey其实就是根据父RDD生成ShuffledRDD的过程，其分区函数为范围分区RangePartitioner，执行过程如下：

 class RangePartitioner[K: Ordering : ClassTag, V]( @transient partitions: Int, @transient rdd: RDD[_ <: Product2[K,V]], private var ascending: Boolean =true) extends Partitioner { // We allow partitions = 0, which happens when sorting an empty RDD under the default settings. require(partitions >= 0, s"Number of partitions cannot be negative but found\$partitions.") private var ordering= implicitly[Ordering[K]] // An array of upper bounds for the first (partitions - 1) partitions前（partitions - 1）的分区边界 private var rangeBounds: Array[K] = { if (partitions <= 1) { Array.empty } else { // This is the sample size we need to have roughly balanced output partitions, capped at 1M. val sampleSize = math.min(20.0* partitions, 1e6) // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0* sampleSize / rdd.partitions.size).toInt // numItems相当于记录rdd元素的总数 // sketched的类型是Array[(Int, Int, Array[K])]，记录的是分区的编号、该分区中总元素的个数以及从父RDD中每个分区采样的数据 val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) if (numItems == 0L) { Array.empty } else { // If a partition contains much more than the average number of items, we re-sample from it // to ensure that enough items are collected from that partition. val fraction = math.min(sampleSize / math.max(numItems,1L), 1.0) val candidates = ArrayBuffer.empty[(K, Float)] val imbalancedPartitions = mutable.Set.empty[Int] sketched.foreach { case (idx,n, sample) => if (fraction * n > sampleSizePerPartition) { imbalancedPartitions += idx } else { // The weight is 1 over the sampling probability. val weight = (n.toDouble / sample.size).toFloat for (key<- sample) { candidates += ((key, weight)) } } } if (imbalancedPartitions.nonEmpty) { // Re-sample imbalanced partitions with the desired sampling probability. val imbalanced =new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains) val seed = byteswap32(-rdd.id- 1) val reSampled = imbalanced.sample(withReplacement =false, fraction, seed).collect() val weight = (1.0/ fraction).toFloat candidates ++= reSampled.map(x => (x, weight)) } RangePartitioner.determineBounds(candidates, partitions) } } } def numPartitions: Int = rangeBounds.length +1 private var binarySearch: ((Array[K],K) => Int) = CollectionsUtils.makeBinarySearch[K] def getPartition(key: Any): Int = { val k = key.asInstanceOf[K] var partition = 0 if (rangeBounds.length <=128) { // If we have less than 128 partitions naive search while (partition rangeBounds.length) { partition = rangeBounds.length } } if (ascending) { partition } else { rangeBounds.length - partition } } private[spark] objectRangePartitioner { /** * Sketches the input RDD via reservoir sampling on each partition. * * @param rdd the input RDD to sketch * @param sampleSizePerPartition max sample size per partition * @return (total number of items, an array of (partitionId, number of items, sample)) */ def sketch[K: ClassTag]( rdd: RDD[K], sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = { val shift = rdd.id // val classTagK = classTag[K] // to avoid serializing the entire partitioner object val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => val seed = byteswap32(idx ^ (shift <<16)) //Reservoir:水塘抽样 val (sample, n) = SamplingUtils.reservoirSampleAndCount( iter, sampleSizePerPartition, seed) Iterator((idx, n, sample)) }.collect() val numItems = sketched.map(_._2.toLong).sum (numItems, sketched) } /** * Determines the bounds for range partitioning from candidates with weights indicating how many * items each represents. Usually this is 1 over the probability used to sample this candidate. * * @param candidates unordered candidates with weights * @param partitions number of partitions * @return selected bounds */ def determineBounds[K: Ordering : ClassTag]( candidates: ArrayBuffer[(K, Float)], partitions: Int): Array[K] = { val ordering = implicitly[Ordering[K]] val ordered = candidates.sortBy(_._1) val numCandidates = ordered.size val sumWeights = ordered.map(_._2.toDouble).sum val step = sumWeights / partitions var cumWeight = 0.0 var target = step val bounds = ArrayBuffer.empty[K] var i = 0 var j = 0 var previousBound = Option.empty[K] while ((i < numCandidates) && (j < partitions -1)) { val (key, weight) = ordered(i) cumWeight += weight if (cumWeight > target) { // Skip duplicate values. if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { bounds += key target += step j += 1 previousBound = Some(key) } } i += 1 } bounds.toArray } }

 //stream代表数据流 //reservoir代表返回长度为k的池塘 //从stream中取前k个放入reservoir； for ( int i = 1; i < k; i++) reservoir[i] = stream[i]; for (i = k; stream != null; i++) { p = random(0, i); if (p < k) reservoir[p] = stream[i]; return reservoir;

 private var rangeBounds: Array[K] = { if (partitions <= 1) { Array.empty } else { // This is the sample size we need to have roughly balanced output partitions, capped at 1M. val sampleSize = math.min(20.0* partitions, 1e6) // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0* sampleSize / rdd.partitions.size).toInt // numItems相当于记录rdd元素的总数 // sketched的类型是Array[(Int, Int, Array[K])]，记录的是分区的编号、该分区中总元素的个数以及从父RDD中每个分区采样的数据 // sampleSizePerPartition代表的是每个分区抽样的值，然后针对待排序的key值进行抽样，即sketch函数 val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) if (numItems == 0L) { Array.empty } else { // If a partition contains much more than the average number of items, we re-sample from it // to ensure that enough items are collected from that partition. //如果存在数据倾斜的情况，则某些分区包含数据量多的情况下，抽样的值偏少，需要增加抽样的数目 val fraction = math.min(sampleSize / math.max(numItems,1L), 1.0) val candidates = ArrayBuffer.empty[(K, Float)] val imbalancedPartitions = mutable.Set.empty[Int] sketched.foreach { case (idx,n, sample) => if (fraction * n > sampleSizePerPartition) { imbalancedPartitions += idx } else { // The weight is 1 over the sampling probability. val weight = (n.toDouble / sample.size).toFloat for (key<- sample) { candidates += ((key, weight)) } } } //针对不平衡的分区继续抽样 if (imbalancedPartitions.nonEmpty) { // Re-sample imbalanced partitions with the desired sampling probability. val imbalanced =new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains) val seed = byteswap32(-rdd.id- 1) val reSampled = imbalanced.sample(withReplacement =false, fraction, seed).collect() val weight = (1.0/ fraction).toFloat candidates ++= reSampled.map(x => (x, weight)) } //根据各个分区抽样的值来划分边界，其中weight值反应某个key的权重，权重越大，说明该key值越多 RangePartitioner.determineBounds(candidates, partitions) } } }

 def sketch[K: ClassTag]( rdd: RDD[K], sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = { val shift = rdd.id // val classTagK = classTag[K] // to avoid serializing the entire partitioner object val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => val seed = byteswap32(idx ^ (shift <<16)) //Reservoir:水塘抽样 val (sample, n) = SamplingUtils.reservoirSampleAndCount( iter, sampleSizePerPartition, seed) Iterator((idx, n, sample)) }.collect() val numItems = sketched.map(_._2.toLong).sum (numItems, sketched) }   def reservoirSampleAndCount[T: ClassTag]( input: Iterator[T], k: Int, seed: Long = Random.nextLong()) : (Array[T], Int) = { val reservoir = newArray[T](k) // Put the first k elements in the reservoir. //先取前K个值 vari = 0 while (i < k && input.hasNext) { val item = input.next() reservoir(i) = item i += 1 } // If we have consumed all the elements, return them. Otherwise do the replacement. if (i < k) {//如果没有取到，说明该分区少于K个值，则直接返回 // If input size < k, trim the array to return only an array of input size. val trimReservoir =new Array[T](i) System.arraycopy(reservoir, 0, trimReservoir,0, i) (trimReservoir, i) } else {//否则按照水塘抽样遍历剩余的值 // If input size > k, continue the sampling process. val rand = new XORShiftRandom(seed) while (input.hasNext) { val item = input.next() val replacementIndex = rand.nextInt(i) if (replacementIndex < k) { reservoir(replacementIndex) = item } i += 1 } (reservoir, i) } }

 def determineBounds[K: Ordering : ClassTag]( candidates: ArrayBuffer[(K, Float)], partitions: Int): Array[K] = { val ordering = implicitly[Ordering[K]] val ordered = candidates.sortBy(_._1) val numCandidates = ordered.size val sumWeights = ordered.map(_._2.toDouble).sum val step = sumWeights / partitions var cumWeight = 0.0 var target = step val bounds = ArrayBuffer.empty[K] var i = 0 var j = 0 var previousBound = Option.empty[K] while ((i < numCandidates) && (j < partitions -1)) { val (key, weight) = ordered(i) cumWeight += weight if (cumWeight > target) {//根据weight值来均衡划分分界 // Skip duplicate values. if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { bounds += key target += step j += 1 previousBound = Some(key) } } i += 1 } bounds.toArray }