频道栏目
首页 > 资讯 > 云计算 > 正文

Spark-1.6.0中的SortBasedShuffle源码解读

16-05-26        来源:[db:作者]  
收藏   我要投稿

从Spark-1.2.0开始,Spark的Shuffle由Hash Based Shuffle升级成了Sort Based Shuffle。即Spark.shuffle.manager从Hash换成了Sort。不同形式是Shuffle逻辑主要是ShuffleManager的实现类不同。

在org.apache.spark.SparkEnv类中:

// Let the user specify short names for shuffle managers

val shortShuffleMgrNames = Map(

"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",

"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",

"tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")

val shuffleMgrName = conf.get( "spark.shuffle.manager" , "sort")

val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)

val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode" , false)

val memoryManager: MemoryManager =

if (useLegacyMemoryManager) {

new StaticMemoryManager(conf , numUsableCores)

} else {

UnifiedMemoryManager(conf , numUsableCores)

}

可以看出,在Spark-1.6.0中可以支持三种模式的Shuffle,分别是HashShuffle,SortShuffle以及UnsafeShuffle。默认的是Sort Based Shuffle。

如果需要更改Shuffle类型,在需要设置的参数是:

spark.shuffle.manager,可选的参数有hash,sort(default),tungsten-sort,如果自定义了ShuffleManager类型,比如com.xx.yy.AbcShuffleManager,也可以将该参数设置为com.xx.yy.AbcShuffleManager。SparkEnv会根据配置的参数去查找该类。

同时,从Spark-1.6.0版本开始,引入了一个动态内存分配的功能,该功能默认是开启的,用户可以自己选择是否使用1.6.0之前版本Spark中的内存分配策略,通过配置参数spark.memory.useLegacyMode来决定,该参数默认为false。

一、Sort Based Shuffle原理

简单来说,Shuffle过程类似于MR程序中的Map-Reduce过程。可以分为Write和Read两个阶段。

在org.apache.spark.shuffle.sort.SortShuffleManager类的描述中写道:

在sort-based shuffle中,输入的records会根据它们key对应的partition ids进行排序,属于同一partition的记录不排序。然后将这些记录输出到一个map output文件中。Reducers从该输出文件的一个连续文件片段中读取属于它的分区的记录。当map输出文件太大内存无法装下时,这些排好序的文件块会spill到磁盘上,磁盘上的文件会最终会合并成一个按分区排好序的最终输出文件。

Sort-based shuffle的map输出文件有两种输出方式:

当同时满足如下三个条件时,以序列化的方式进行排序

1、shuffle过程不需要进行aggregation或者输出不需要排序

2、shuffle的序列化支持序列化值重新排序(比如KryoSerializer和Spark SQL常用的序列化器)

3、shuffle产生的分区数小于16777216个

在上面三个条件之外的情况,都以非序列化的方式进行排序

序列化的排序方式:

在shuffle过程中,当records进入到shuffle writer同时就会被序列化,在整个排序过程中以序列化的形式缓存,这种方式的好处是:

1、它的sort操作是在序列化的二进制数据上完成,而不是Java对象,这样减少了reduce时的内存消耗和GC压力。但是它会要求序列化器能够允许序列化的数据在不进行反序列化的操作情况下移动数据位置。

2、它使用了一个ShuffleExternalSorter来对partition id和record pointer进行排序,在ShuffleExternalSorter中对排好序的每一个记录仅仅只用到8个字节,所以可以在内存中缓存更多的记录。

3、spill过程中会对同一分区的序列化好了的记录在不进行反序列化的情况下进行合并。

4、如果spill过程中的压缩器支持压缩数据的合并操作时,spill操作的最后将压缩好的spill文件进行合并生成最终spill文件时就仅仅只需要将每个输出文件进行简单的合并即可。可以避免在merge过程中进行解压缩和copy操作。

Sort-based Shuffle是在Shuffle过程中有排序的操作,但是这个排序是部分排序。即只根据partition id对每个partition进行排序,但是同一个partition中的记录并不会被排序。但是如果是sortByKey操作需要对每条记录进行排序的话的话,各个partition中Record间的排序则在Reducer中完成。也就是说,假如有100条记录需要进行处理,并且处理后这100条记录会输出到10个partition中,假设编号为1~10,那么只会对1~10这10个输出分区进行排序,同属于分区1的记录并不会排序。对应下图中的FileSegment之间会进行排序,但是FileSegment中的记录不排序。

 

Sort Based Shuffle

 

上图简单描述了Sort Based Shuffle的过程。每个Shuffle Map Task不会为每个Reducer单独产生一个文件,而是一个Map Task只生产一个最终文件,这个文件中根据不同partition id进行排序,然后有一个Index引导文件使得每个Reducer能很快的定位到其需要处理的FileSegment。

二、Sort Based Shuffle Write

先找到Shuffle过程的入口。

1、ShuffleMapTask类

在Scheduler模块中,一个DAG中除了最后一个Stage是FinalStage外,中间依赖的Stage都是ShuffleMapStage,在这个Stage中对应的Task类型都是ShuffleMapTask。ShuffleMapTask在Executor上允许时,最终调用的方法是ShuffleMapTask#runTask。

其中主要的代码如下

val manager = SparkEnv.get.shuffleManager

writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)

writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])

writer.stop(success = true).get

从SparkEnv中获得manager,即上文中提到的org.apache.spark.shuffle.SortShuffleManager。然后由该manager为当前ShuffleMapTask所对应的分区生成一个writer对象,这个writer是SortShuffleWriter类型。最后调用writer.write方法,将该分区循环写出。

2、SortShuffleWriter

SortShuffleWriter#write方法的源码如下:

override def write(records: Iterator[Product2[K, V]]): Unit = {

sorter = if (dep.mapSideCombine) {

require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")

new ExternalSorter[K, V, C](

context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)

} else {

// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't

// care whether the keys get sorted in each partition; that will be done on the reduce side

// if the operation being run is sortByKey.

new ExternalSorter[K, V, V](

context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)

}

sorter.insertAll(records)

// Don't bother including the time to open the merged output file in the shuffle write time,

// because it just opens a single file, so is typically too fast to measure accurately

// (see SPARK-3570).

val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)

val tmp = Utils.tempFileWith(output)

val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)

val partitionLengths = sorter.writePartitionedFile(blockId, tmp)

shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)

mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)

}

这里面前半段主要是生成一个sorter对象,根据是否有mapSideCombine来生成不同的ExternalSorter对象。不做mapSideCombine的话,在构造ExternalSorter时不会传入聚合函数,也不对这个partition中的记录进行排序。如果该map task是由sortByKey操作触发的,那么根据key的排序会在reduce端进行。
[page]

得到sorter对象后,调用insertAll方法对records做进一步处理。

Map端Combine

在PairRDDFunctions#combineByKey中我们可以看到:

def combineByKey [C](

createCombiner: V => C,

mergeValue: ( C, V ) => C,

mergeCombiners: ( C, C ) => C,

partitioner: Partitioner ,

mapSideCombine: Boolean = true,

serializer: Serializer = null): RDD[(K , C)] = self.withScope {

combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners ,

partitioner , mapSideCombine, serializer)(null)

}

该方法有一个Boolean的传入值mapSideCombine,并且默认为true。也就是说,在默认情况下调用该方法时就会执行mapSideCombine操作了。

mapSideCombine会使一个maptask输出的值在进行reduce操作之前先进行一定的合并。相当于先对一个分区的数据根据传入参数进行一次reduce操作,这样数据量会缩减,提高后续shuffle操作从性能。

3、ExternalSorter

(1)map和buffer对象

在ExternalSorter中有两个比较重要的属性,map和buffer,这两个属性在后面有很重要的作用。定义如下:

private var map = new PartitionedAppendOnlyMap[K, C]

private var buffer = new PartitionedPairBuffer[K, C]

虽然分别实现的类是PartitionedAppendOnlyMap和PartitionedPairBuffre,但是在这两个类的源码中还是能够看出,这两个类的特性在类名中得到了很好的体现。

对map来说,只会存储key不同的值,如果遇到相同的key,会把key对应的value进行更新。其底层保存数据的结构还是一个Array类型的data变量,偶数位存的是key的值,基数位存的是value。当需要更新时,可以看下面这段逻辑,根据k找到其在Array中的位置,然后更新key和value的值。从0位开始,偶数位保存的是key值,紧随其后的那位上保存的是value值。

data(2 * pos) = k

data(2 * pos + 1) = value.asInstanceOf[AnyRef]

对buffer来说,遇到的每一个值都会写入其中。其底层也是维持了一个Array类型的数据结构,但是其插入逻辑如下,在当前Array最后保存新增的key和value:

data(2 * curSize) = (partition, key.asInstanceOf[AnyRef])

data(2 * curSize + 1) = value.asInstanceOf[AnyRef]

(2)insertAll(records: Iterator[Product2[K, V]])方法

在该方法中,对有mapSideCombine和没有mapSideCombine采取了不同的处理方法。

- 有mapSideCombine

// Combine values in-memory first using our AppendOnlyMap

val mergeValue = aggregator.get.mergeValue

val createCombiner = aggregator.get.createCombiner

var kv: Product2[K, V] = null

val update = (hadValue: Boolean, oldValue: C) => {

if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)

}

while (records.hasNext) {

addElementsRead()

kv = records.next()

map.changeValue((getPartition(kv._1), kv._1), update)

maybeSpillCollection(usingMap = true)

}

循环处理records中的每一条记录,处理一条记录就在addElementsRead()中将_elementsRead加1,记录处理的记录数,然后更新map中的值。这里传入的update是一个方法,进行map端的combine操作,如果遇到记录过的相同key,就将value使用传入的aggregator进行聚合,如果遇到一个新key,就将该key对应的value计入一个新的combiner中。

PartitionedAppendOnlyMap的类继承关系,及如下图changeValue的调用如下图:

 

这里写图片描述

 

该方法最终进入AppendOnlyMap#changeValue方法中,按照在3.(1)提到的data对象进行更新。每次更新完一条记录后,会对该记录进行判断,满足抽样条件的话就会进行一次抽样。这里的抽样过程主要是为了后续判断该data使用的内存大小所用,在后面会有详细介绍。

- 没有mapSideCombine

这里的主要逻辑是:

while (records.hasNext) {

addElementsRead()

val kv = records.next()

buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])

maybeSpillCollection(usingMap = false)

}

仍然是循环取出records中的每一条记录。取出一条便将_elementsRead加1,然后将数据存入上面的buffer变量中。buffer.insert方法处理过程如下:

/** Add an element into the buffer */

def insert(partition: Int, key: K, value: V): Unit = {

if (curSize == capacity) {

growArray()

}

data(2 * curSize) = (partition, key.asInstanceOf[AnyRef])

data(2 * curSize + 1) = value.asInstanceOf[AnyRef]

curSize += 1

afterUpdate()

}

curSize记录当前data对象中存储记录的个数,插入一条记录就会加1。capacity是当前data中能够存储的记录总个数。data的初始长度是2 * 64,即能存储64个record的key-value对。data中存储record的上限是2^30 -1个,当不超过该上限时,growArray方法会以当前capacity两倍(但是最多达到上限)容量创建一个新的data数组将原来data中的数据copy到新数组中,同时会对新的data进行采样。有关采样的过程及用途,在后面内存分析时会讲到。

然后直接在data最后新增两位保存新的key和value值,更新curSize。每插入一条记录最后调用afterUpdate方法,对当前data中的记录进行一次判断是否需要进行一次采样。

(3)maybeSpillCollection判断是否需要spill

在(2)中insertAll方法对map和buffer进行更新后,接下来就会调用maybeSpillCollection方法决定map和buffer是否需要进行spill。

/**

* Spill the current in-memory collection to disk if needed.

* * @param usingMap whether we're using a map or buffer as our current in-memory collection

*/

private def maybeSpillCollection(usingMap: Boolean): Unit = {

var estimatedSize = 0L

if (usingMap) {

estimatedSize = map.estimateSize()

if (maybeSpill(map, estimatedSize)) {

map = new PartitionedAppendOnlyMap[K, C]

}

} else {

estimatedSize = buffer.estimateSize()

if (maybeSpill(buffer, estimatedSize)) {

buffer = new PartitionedPairBuffer[K, C]

}

}

if (estimatedSize > _peakMemoryUsedBytes) {

_peakMemoryUsedBytes = estimatedSize

}

}

传入参数usingMap是用来标识使用的是map还是buffer对象,对前面有印象的话应该知道使用map对象还是buffer对象是由有没有mapSideCombine来决定的,由于map和buffer对象底层还是一个Array类型的data对象,只是对数据更新和插入的处理方式有些不同。所以接下来都以buffer来作进一步的分析。

这里的主要逻辑是,首先调用buffer的estimateSize方法,计算当前buffer对象的内存大小estimatedSize,然后根据该大小调用方法maybeSpill判断是否需要进行spill操作,spill后会把buffer清空,重新进行下一轮的操作。那么接下来就有两个重点:计算buffer的大小和判定是否需要spill。

- buffer.estimateSize方法计算内存

PartitionedPairBuffer和PartitionedAppendOnlyMap都继承了trait SizeTracker。所以,不管是map还是buffer调用的estimateSize都是相同的。

应该还记得前面提到过对buffer插入数据时,会有一个采样的操作。有关采样的相关过程也在SizeTracker中。

SizeTracker中的属性:

private val samples = new mutable.Queue[Sample]//一个队列,用于存储对数据的采样样本

private val SAMPLE_GROWTH_RATE = 1.1//采样间隔次数增长率

private var bytesPerUpdate: Double = _//根据samples中最后两个样本计算出的记录内存平均增长率

private var numUpdates: Long = _//buffer的更新次数

private var nextSampleNum: Long = _//下一次采样操作的次数

......

case class Sample(size: Long, numUpdates: Long)

在上面的代码片段中有一些属性和Sample类结构。这些在estimateSize方法中都有用到。在Spark中,对shuffle过程数据的内存大小,是根据采样样本的大小来估算的。

从前面我们知道,buffer中每插入一条记录都会判断是否需要采样,在SizeTracker#afterUpdate方法中采样的依据是当前buffer的更新次数numUpdates是否等于下一次进行采样操作的次数nextSampleNum,如果等于,则调用SizeTracker#takeSample方法进行一次采样,然后nextSampleNum变成ceil(numUpdates * SAMPLE_GROWTH_RATE)。然后根据samples中最后两个样本计算出每次buffer每次更新的内存平均增长率bytesPerUpdate = (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)。从这里可以看出,当buffer中记录比较少时,采样非常频繁,但是如果该buffer中容纳的记录越多,到后面进行一次采样的间隔次数就会越多。

估算的内存大小为:

def estimateSize(): Long = {

assert(samples.nonEmpty)

val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)

(samples.last.size + extrapolatedDelta).toLong

}

由samples中最后一个样本的大小,加上buffer记录距离上次采样的次数numUpdates - samples.last.numUpdates,乘以buffer每次新增一个record时的内存平均增长率bytesPerUpdate。这样可以在最短的时间内对存储了大量record的buffer内存占用大小进行计算,但是由于是基于采样的方法估算的内存大小,有时候会由于数据本身的问题导致计算不准确等问题。有可能偶尔出现OOM的情况。
[page]

- maybeSpill方法判断是否进行spill

在Spark-1.6之前,可以由参数spark.shuffle.spill设置为true或者false来选择打开或者关闭内存数据spill到磁盘的功能,但是在1.6版本中,该参数默认为true,并且即使设置为false,也不会起作用了,spark在需要时会把内存数据spill到磁盘。

根据上一步估算到的当前map或buffer的内存大小estimatedSize,如果达到spill的条件,将该map或者buffer中的数据spill到磁盘,然后重新初始化一个新的map或buffer,在下面maybeSpill方法中传入参数c就是当前的buffer对象,currentMemory是上一步估算的buffer占用的内存大小estimateSize。

protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {

var shouldSpill = false

//buffer中每插入32条记录,并且当前估算的buffer内存达到了spill的内存阈值

if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {

// Claim up to double our current memory from the shuffle memory pool

val amountToRequest = 2 * currentMemory - myMemoryThreshold

val granted =

taskMemoryManager.acquireExecutionMemory(amountToRequest, MemoryMode.ON_HEAP, null)

myMemoryThreshold += granted

// If we were granted too little memory to grow further (either tryToAcquire returned 0,

// or we already had more memory than myMemoryThreshold), spill the current collection

shouldSpill = currentMemory >= myMemoryThreshold

}

shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold

// Actually spill

if (shouldSpill) {

_spillCount += 1

logSpillage(currentMemory)

//ExternalSorter#spill方法

spill(collection)

_elementsRead = 0

_memoryBytesSpilled += currentMemory

releaseMemory()

}

shouldSpill

}

虽然前面内存估算是基于采样来进行的,但是如果对buffer中每新增一条记录就判断一次是否需要spill肯定是一个很耗费时间与资源的过程。从上面的代码可以看出,根据buffer中插入记录的数目elementRead,每32次并且如果estimateSize达到了spill的内存阈值myMemoryThreshold才会判断是否spill。myMemoryThreshold由参数spark.shuffle.spill.initialMemoryThreshold(默认值5 * 1024 * 1024,5MB)来确定。

如果同时满足上面两个条件,就会向taskMemoryManager申请,申请的是Execution部分的内存。这里的taskMemoryManager,以及内存申请的过程,如果内存不足,则分配到的内存granted为0。申请的内存总数是amountToRequest,计算公式是amountToRequest = 2 * currentMemory - myMemoryThreshold,因为此次currentMemory已经大于了myMemoryThreshold,spark应该是认为经过32次插入数据后,currentMemory会翻番,所以向Execution内存池申请能够存储2 * currentMemory内存的空间。申请到内存后,将granted累加到myMemoryThreshold上,如果分配到的内存太少,即使加上新分配的内存,myMemoryThreshold仍然不足currentMemory,就会触发spill操作。

触发spill操作的另一个条件是_elementsRead > numElementsForceSpillThreshold,,当前buffer中的记录数超过参数spark.shuffle.spill.numElementsForceSpillThreshold(默认值是Long.MaxValue)。

在触发spill操作后,spill次数_spillCount累加,并记录此次spill出去的数据大小。调用spill方法进行spill操作,然后通过releaseMemory方法把Execution内存池的ON_HEAP内存释放,充值myMemoryThreshold为参数设定值。继续进行下一轮。

4、Sort Based Shuffle Write内存分析

这个过程中最耗内存的对象是上面的map或者buffer,这部分内存再加上spill操作时的缓存内存基本上就构成了Shuffle Wtire过程中整个内存的使用情况。

buffer中的内存大小是上面提到的PartitionedAppendOnlyBuffer占用的实际内存大小,如果一个Executor有C个Core,则C个Core共享整个Executor的内存,并且同时处理Task,所以buffer部分所有内存大小为C * PartitionedAppendOnlyBuffer。

在spill过程中,调用的是ExternalSorter#spill方法,我们来看一下spill的过程,

首先由diskBlockManager创建一个shuffle临时文件,生成blockId和file,

val (blockId, file) = diskBlockManager.createTempShuffleBlock()

blockId和file如下图所示:

 

blockId和file

 

然后获取一个往磁盘写临时文件的DiskBlockObjectWriter类型的diskWriter对象,

writer = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)

writer并不是接收到一条记录就往磁盘写一条记录,在里面有一个fileBuffer来缓存,每装满一次才会真正往磁盘spill一次,这个fileBufferSize的大小可以由参数spark.shuffle.fill.buffer(默认值为32K)来确定。

同时,在spill的过程中,每接收一条记录写入fileBuffer中的同时,也会记录fileBuffer中的记录数objectsWritten。每一个batch的数据写入磁盘时需要进行序列化,为了避免序列化过程中出现内存不足的情况,对每一个batch中的记录数也作了一个限制spark.shuffle.spill.batchSize(默认为10000),这个参数不宜设置为过小,太小的话频繁的序列化反序列化也是很耗费时间的。可以随着上面的batchSize一起增大或减小。每写满10000条记录,上面代码中的writer对象会调用flush方法往磁盘写入一次,然后重新生成一个writer对象。

最后在spills变量中记录每次spill的相关记录

private val spills = new ArrayBuffer[SpilledFile]

在本次调试过程中,spills中的内容如下:

 

spills变量

 

对应spill到磁盘上的文件:

 

spill磁盘文件

 

三、Sort Based Shuffle Read

Sort Based Shuffle Read类似于Write过程,从获取Reader对象开始。

1、ShuffledRDD

read过程从ShuffledRDD#compute方法开始,首先获取Reader对象。

override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {

val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]

SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)

.read()

.asInstanceOf[Iterator[(K, C)]]

}

获取到Reader对象后调用read方法开始读取shuffle write过程中spill到磁盘的临时文件。

Reader对象是BlockStoreShuffleReader类型

2、BlockStoreShuffleReader

接下来进入BlockStoreShuffleReader#read方法中。

相关TAG标签
上一篇:技术培训|资源编排,人人都可以成为架构师
下一篇:小窥TeslaCrypt密钥设计
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站