频道栏目
首页 > 网络 > 云计算 > 正文

Spark算子执行流程详解之七

2017-03-03 09:31:22      个评论      
收藏   我要投稿

31.union

将2个rdd合并在一起。
def union(other: RDD[T]): RDD[T] = withScope {
 if (partitioner.isDefined && other.partitioner == partitioner) {//两者的分区函数相同
   new PartitionerAwareUnionRDD(sc, Array(this, other))
 } else {//两者的分区函数不同
   new UnionRDD(sc, Array(this, other))
 }
}
先来看当两者的分区函数相同时其是如何处理的: 
private[spark]
  class PartitionerAwareUnionRDD[T: ClassTag](
 sc: SparkContext,
 var rdds: Seq[RDD[T]]
 ) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) {
 require(rdds.length > 0)
 require(rdds.forall(_.partitioner.isDefined))
 require(rdds.flatMap(_.partitioner).toSet.size == 1,
 "Parent RDDs have different partitioners: " + rdds.flatMap(_.partitioner))
  
 override val partitioner = rdds.head.partitioner
  
  
//生成PartitionerAwareUnionRDDPartition,保存了组成某个分区索引为index的分区来源于rdds的哪几个分区
 override def getPartitions: Array[Partition] = {
 val numPartitions = partitioner.get.numPartitions
 (0 until numPartitions).map(index => {
 new PartitionerAwareUnionRDDPartition(rdds, index)
 }).toArray
 }
  
 // Get the location where most of the partitions of parent RDDs are located
 override def getPreferredLocations(s: Partition): Seq[String] = {
 logDebug("Finding preferred location for " + this + ", partition " + s.index)
 val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents
 val locations = rdds.zip(parentPartitions).flatMap {
 case (rdd, part) => {
 val parentLocations = currPrefLocs(rdd, part)
 logDebug("Location of " + rdd + " partition " + part.index + " = " + parentLocations)
 parentLocations
 }
 }
 val location = if (locations.isEmpty) {
 None
 } else {
 // Find the location that maximum number of parent partitions prefer
 Some(locations.groupBy(x => x).maxBy(_._2.length)._1)
 }
 logDebug("Selected location for " + this + ", partition " + s.index + " = " + location)
 location.toSeq
 }
  
 override def compute(s: Partition, context: TaskContext): Iterator[T] = {
//parents即指向了该分区来源于的rdds组合的哪几个分区
 val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents
//然后就是遍历原始rdds组合的某几个分区组成单个分区
 rdds.zip(parentPartitions).iterator.flatMap {
 case (rdd, p) => rdd.iterator(p, context)
 }
 }
  
 override def clearDependencies() {
 super.clearDependencies()
 rdds = null
 }
  
 // Get the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
 private def currPrefLocs(rdd: RDD[_], part: Partition): Seq[String] = {
 rdd.context.getPreferredLocs(rdd, part.index).map(tl => tl.host)
 }
}
其分区信息为PartitionerAwareUnionRDDPartition: 
class PartitionerAwareUnionRDDPartition(
 @transient val rdds: Seq[RDD[_]],
 val idx: Int
 ) extends Partition {
// parents保存了对于分区索引idx来源于rdds的Partition信息,其实就是一一对应,比方说第1个分区来源于rdds组合中的每个rdd的第一个分区
 var parents = rdds.map(_.partitions(idx)).toArray
  
 override val index = idx
 override def hashCode(): Int = idx
  
 @throws(classOf[IOException])
 private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
 // Update the reference to parent partition at the time of task serialization
 parents = rdds.map(_.partitions(index)).toArray
 oos.defaultWriteObject()
 }
}
其实当分区函数相同时,其结果的RDD的对应分区来源于原始两个RDD的对应分区,即:

\

再来看当两者的分区函数不相同时其是如何处理的: 
class UnionRDD[T: ClassTag](
 sc: SparkContext,
 var rdds: Seq[RDD[T]])
 extends RDD[T](sc, Nil) { // Nil since we implement getDependencies
  
 override def getPartitions: Array[Partition] = {
//计算rdds组合总共有几个分区
 val array = new Array[Partition](rdds.map(_.partitions.length).sum)
 var pos = 0
//总共有几个分区就生成几个分区,其每个分区各自对应rdds组合中的分区
 for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) {
// pos:分区索引,rdd:该分区的父rdd,rddIndex:父rdd在rdds中的索引,split.index:该分区的Partition信息
 array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)
 pos += 1
 }
 array
 }
  
 override def getDependencies: Seq[Dependency[_]] = {
 val deps = new ArrayBuffer[Dependency[_]]
 var pos = 0
 for (rdd <- rdds) {
 deps += new RangeDependency(rdd, 0, pos, rdd.partitions.length)
 pos += rdd.partitions.length
 }
 deps
 }
  
 override def compute(s: Partition, context: TaskContext): Iterator[T] = {
 val part = s.asInstanceOf[UnionPartition[T]]
//rdds组合中的某个rdd所对应的分区数据
 parent[T](part.parentRddIndex).iterator(part.parentPartition, context)
 }
  
 override def getPreferredLocations(s: Partition): Seq[String] =
 s.asInstanceOf[UnionPartition[T]].preferredLocations()
  
 override def clearDependencies() {
 super.clearDependencies()
 rdds = null
 }
}
且看UnionPartition中parentPartition代表的意思: 
/**
* Partition for UnionRDD.
*
* @param idx index of the partition
* @param rdd the parent RDD this partition refers to
* @param parentRddIndex index of the parent RDD this partition refers to
* @param parentRddPartitionIndex index of the partition within the parent RDD
* this partition refers to
*/
  private[spark] class UnionPartition[T: ClassTag](
 idx: Int,
 @transient rdd: RDD[T],
 val parentRddIndex: Int,
 @transient parentRddPartitionIndex: Int)
 extends Partition {
 // parentPartition来源于该分区对应父rdd的分区索引为parentRddPartitionIndex的Partition
 var parentPartition: Partition = rdd.partitions(parentRddPartitionIndex)
  
 def preferredLocations(): Seq[String] = rdd.preferredLocations(parentPartition)
  
 override val index: Int = idx
  
 @throws(classOf[IOException])
 private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
 // Update the reference to parent split at the time of task serialization
 parentPartition = rdd.partitions(parentRddPartitionIndex)
 oos.defaultWriteObject()
 }
}
因此当两者的分区函数不相同时,其执行流程如下:

\

32. 两个加加++

其作用就是union
/**
* Return the union of this RDD and another one. Any identical elements will appear multiple
* times (use `.distinct()` to eliminate them).
*/
  def ++(other: RDD[T]): RDD[T] = withScope {
 this.union(other)
}

33.intersection

求2个RDD的交集,其中相同的值只输出一次。
/**
* Return the intersection of this RDD and another one. The output will not contain any duplicate
* elements, even if the input RDDs did.
*
* Note that this method performs a shuffle internally.
*/
  def intersection(other: RDD[T]): RDD[T] = withScope {
 this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
 .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
 .keys
}
其大致流程是先求交的两个rdd映射为KV对的pairRDD,其中V为null,然后生成CoGroupedRDD,接着对CoGroupedRDD的values进行转化为V为两个迭代器,紧接着进行筛选,保留左右两边rdd都存在的记录,最后返回其KEY值,即原始的左右两RDD的内容。
cogroup函数的内部实现如下:
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
  def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
//defaultPartitioner求默认的分区函数
 cogroup(other, defaultPartitioner(self, other))
}
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
  def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
 : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
 if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
 throw new SparkException("Default partitioner cannot partition array keys.")
 }
 val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
//将cg的values映射为两个数组的迭代器
 cg.mapValues { case Array(vs, w1s) =>
 (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
 }
}
因此CoGroupedRDD的实现如下:
/**
* :: DeveloperApi ::
* A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
* tuple with the list of values for that key.
*
* Note: This is an internal API. We recommend users use RDD.cogroup(...) instead of
* instantiating this directly.
  
* @param rdds parent RDDs.
* @param part partitioner used to partition the shuffle output
*/
  @DeveloperApi
  class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
 extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {
  
 // For example, `(k, a) cogroup (k, b)` produces k -> Array(ArrayBuffer as, ArrayBuffer bs).
 // Each ArrayBuffer is represented as a CoGroup, and the resulting Array as a CoGroupCombiner.
 // CoGroupValue is the intermediate state of each value before being merged in compute.
 private type CoGroup = CompactBuffer[Any]
 private type CoGroupValue = (Any, Int) // Int is dependency number
 private type CoGroupCombiner = Array[CoGroup]
  
 private var serializer: Option[Serializer] = None
  
 /** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */
 def setSerializer(serializer: Serializer): CoGroupedRDD[K] = {
 this.serializer = Option(serializer)
 this
 }
 
 override def getDependencies: Seq[Dependency[_]] = {
 rdds.map { rdd: RDD[_ <: Product2[K, _]] =>

//如果rdd的分区函数和CoGroupedRDD的分区函数相同,则相互之间的依赖是窄依赖

if (rdd.partitioner == Some(part)) {
 logDebug("Adding one-to-one dependency with " + rdd)
 new OneToOneDependency(rdd)
 } else {
//否则是宽依赖
 logDebug("Adding shuffle dependency with " + rdd)
 new ShuffleDependency[K, Any, CoGroupCombiner](rdd, part, serializer)
 }
 }
 }
  
 /*
 * 获取其分区配置信息CoGroupPartition,其由CoGroupPartition(
 idx: Int, val narrowDeps: Array[Option[NarrowCoGroupSplitDep]])组成
其中idx代表对应分区索引,narrowDeps存储的是其依赖的数组
*/
 override def getPartitions: Array[Partition] = {
 val array = new Array[Partition](part.numPartitions)
 for (i <- 0 until array.length) {
 // Each CoGroupPartition will have a dependency per contributing RDD
 array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
 // Assume each RDD contributed a single dependency, and get it
 dependencies(j) match {
//宽依赖直接返回None
 case s: ShuffleDependency[_, _, _] =>
 None
 case _ =>
//其他则为窄依赖
 Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
 }
 }.toArray)
 }
 array
 }
  
 override val partitioner: Some[Partitioner] = Some(part)
  
  
//在每个分区上根据传入的CoGroupPartition进行计算
 override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
 val sparkConf = SparkEnv.get.conf
//此参数决定了其中间整理的过程是在内存中执行还是内存+磁盘中执行
 val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
 val split = s.asInstanceOf[CoGroupPartition]
 //代表有多少个rdd,每个rdd根据分区函数对应其依赖
 val numRdds = dependencies.length
  
 // A list of (rdd iterator, dependency number) pairs
// rddIterators是个KV的迭代器,其K为Product2的迭代器,其V是其索引
 val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
 for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
//如果是窄依赖,则直接拉取父RDD对应分区的数值
 case oneToOneDependency: OneToOneDependency[Product2[K, Any]] =>
 val dependencyPartition = split.narrowDeps(depNum).get.split
 // Read them from the parent
 val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
 rddIterators += ((it, depNum))
 //如果是宽依赖,则从shuffle的中间结果拉取对应分区的数值
 case shuffleDependency: ShuffleDependency[_, _, _] =>
 // Read map outputs of shuffle
 val it = SparkEnv.get.shuffleManager
 .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context)
 .read()
 rddIterators += ((it, depNum))
 }
 /*
 * rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
 * [{Iterator[Product2[K, Any]],0},{Iterator[Product2[K, Any]],1}]
 * */
  
 if (!externalSorting) {//在内存中整理中间结果
   val map = new AppendOnlyMap[K, CoGroupCombiner]//CoGroupCombiner为Buffer数组,相同的K只会保留1个
   val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, oldVal) => {
 if (hadVal) oldVal else Array.fill(numRdds)(new CoGroup)
 }
 val getCombiner: K => CoGroupCombiner = key => {
 map.changeValue(key, update)
 }
//遍历迭代器数组,将相同的KEY的V存放在CoGroupCombiner里面
 rddIterators.foreach { case (it, depNum) =>
 while (it.hasNext) {
 val kv = it.next()
 getCombiner(kv._1)(depNum) += kv._2
 }
 }
 new InterruptibleIterator(context,
 map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
 } else {//在内存+磁盘中整理中间结果
   val map = createExternalMap(numRdds)
//插入到ExternalAppendOnlyMap里面
 for ((it, depNum) <- rddIterators) {
 map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
 }
 context.taskMetrics.incMemoryBytesSpilled(map.memoryBytesSpilled)
 context.taskMetrics.incDiskBytesSpilled(map.diskBytesSpilled)
 new InterruptibleIterator(context,
 map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
 }
 }
   ……
}
因此假设两个RDD执行cogroup,其中一个rdd的分区函数为hash分区,分区个数为3,另外一个rdd没有分区函数,则其执行流程如下:

\

34.glom

glom函数将每个分区形成一个数组,得到一个新的GlommedRDD。
/**
* Return an RDD created by coalescing all elements within each partition into an array.
*/
  def glom(): RDD[Array[T]] = withScope {
//通过Iterator(iter.toArray)将其转化为数组
 new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
}
其执行过程如下:

\

35.cartesian

这个操作返回两个RDD的笛卡尔集,这个操作不会执行shuffle
/**
* Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
* elements (a, b) where a is in `this` and b is in `other`.
*/
  def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
 new CartesianRDD(sc, this, other)
}
其主要由CartesianRDD实现,继续往下看: 
private[spark]
  class CartesianRDD[T: ClassTag, U: ClassTag](
 sc: SparkContext,
 var rdd1 : RDD[T],
 var rdd2 : RDD[U])
 extends RDD[Pair[T, U]](sc, Nil)
 with Serializable {
  
 val numPartitionsInRdd2 = rdd2.partitions.length
  
  
//分区个数为两个rdd的分区数目之积
 override def getPartitions: Array[Partition] = {
 // create the cross product split
 val array = new Array[Partition](rdd1.partitions.length * rdd2.partitions.length)
 for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
 val idx = s1.index * numPartitionsInRdd2 + s2.index
//分区索引idx的数据来源于rdd1的index分区和rdd2的index分区
 array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
 }
 array
 }
  
 override def getPreferredLocations(split: Partition): Seq[String] = {
 val currSplit = split.asInstanceOf[CartesianPartition]
 (rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)).distinct
 }
//通过遍历rdd1的s1分区和rdd2的s2分区组装成当前CartesianPartition的分区数据
 override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
 val currSplit = split.asInstanceOf[CartesianPartition]
 for (x <- rdd1.iterator(currSplit.s1, context);
 y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
 }
  
  
//返回的都是窄依赖
 override def getDependencies: Seq[Dependency[_]] = List(
 new NarrowDependency(rdd1) {
 def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2)
 },
 new NarrowDependency(rdd2) {
 def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)
 }
 )
  
 override def clearDependencies() {
 super.clearDependencies()
 rdd1 = null
 rdd2 = null
 }
}
其具体的执行过程如下:

\


上一篇:快学Scala笔记
下一篇:Spark算子执行流程详解之三
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站