# 判断map端是否需要聚合,比如和都要写入的话,那么先生成然后再进行后续的写入工作判断map端是否允许进行combine操作,如果允许则进行combine操作,否则直接返回records
# 遍历记录,并且对数据进行partitioner操作,进行分区,获得一个分区号bucketIds,根据bucketId取得ShuffleWriterGroup里的对应的writer将数据写入文件
# 通过ShuffleWriterGroup将数据
override defwrite(records:Iterator[Product2[K,V]]): Unit = {
// 判断map端是否需要聚合,比如和都要写入的话,那么先生成然后再进行后续的写入工作
val iter= if (dep.aggregator.isDefined) {
// 判断map端是否允许进行combine操作,如果允许则进行combine操作,否则直接返回records
if (dep.mapSideCombine) {
dep.aggregator.get.combineValuesByKey(records,context)
} else {
records
}
} else {
require(!dep.mapSideCombine,"Map-side combine withoutAggregator specified!")
records
}
// 遍历记录,并且对数据进行partitioner操作,进行分区,获得一个分区号bucketIds
// 根据bucketId取得ShuffleWriterGroup里的对应的writer将数据写入文件
for (elem <- iter) {
val bucketId = dep.partitioner.getPartition(elem._1)
// 通过ShuffleWriterGroup将数据
shuffle
}
}
FileShuffleBlockResolver主要用于管理block的writer,每一个Reducer任务对应着一个文件。
forMapTask:对于指定的map task获取一个ShuffleWriterGroup,里面一个reducer对应着一个writer
def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer, writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = { new ShuffleWriterGroup { shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets)) private val shuffleState = shuffleStates(shuffleId) private var fileGroup: ShuffleFileGroup = null val openStartTime = System.nanoTime val serializerInstance = serializer.newInstance() // 如果启动了consolidation机制,spark.shuffle.consolidateFiles置为true val writers: Array[DiskBlockObjectWriter] = if (consolidateShuffleFiles) { // 获取那些还没有使用的文件组 fileGroup = getUnusedFileGroup() // 返回reducer个数的DiskBlockObjectWriter对象,比如reducer个数为10则返回10个,每一个reducer对应着 // 每一个ShuffleMapTask里的一个bucketId.即对于每一个bucket,都会获取一个针对ShuffleFileGroup的 // writer,而不是一个独立的ShuffleBlockFile,这样就实现多个MapTask输出数据的合并 Array.tabulate[DiskBlockObjectWriter](numBuckets) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializerInstance, bufferSize, writeMetrics) } } else {//没有开启consolidation机制 // 返回reducer个数的DiskBlockObjectWriter对象 Array.tabulate[DiskBlockObjectWriter](numBuckets) { bucketId => // 创建blockId val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) // 根据blockId获取Block File val blockFile = blockManager.diskBlockManager.getFile(blockId) // 如果该文件已经存在,则删除,因为可能以前失败的task,已经创建过了 if (blockFile.exists) { if (blockFile.delete()) { logInfo(s"Removed existing shuffle file $blockFile") } else { logWarning(s"Failed to remove existing shuffle file $blockFile") } } // 针对每一个blockFile都会生成一个writer blockManager.getDiskWriter(blockId, blockFile, serializerInstance, bufferSize, writeMetrics) } } writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime) // 释放writers override def releaseWriters(success: Boolean) { // 如果开启了consolidation机制,则如果成功的话,则记录FileSegment的offset和length if (consolidateShuffleFiles) { if (success) { val offsets = writers.map(_.fileSegment().offset) val lengths = writers.map(_.fileSegment().length) fileGroup.recordMapOutput(mapId, offsets, lengths) } // 回收文件组 recycleFileGroup(fileGroup) } else { // 如果没有开启consolidation机制,则直接将完成的map 任务的id放入completedMapTasks shuffleState.completedMapTasks.add(mapId) } } // 获取未使用的文件组 private def getUnusedFileGroup(): ShuffleFileGroup = { val fileGroup = shuffleState.unusedFileGroups.poll() if (fileGroup != null) fileGroup else newFileGroup() } // 产生一个新的文件组 private def newFileGroup(): ShuffleFileGroup = { val fileId = shuffleState.nextFileId.getAndIncrement() val files = Array.tabulate[File](numBuckets) { bucketId => val filename = physicalFileName(shuffleId, bucketId, fileId) blockManager.diskBlockManager.getFile(filename) } val fileGroup = new ShuffleFileGroup(shuffleId, fileId, files) shuffleState.allFileGroups.add(fileGroup) fileGroup } // 回收文件组 private def recycleFileGroup(group: ShuffleFileGroup) { shuffleState.unusedFileGroups.add(group) } } }
当ResultTask或者ShuffleMapTask在执行到ShuffledRDD的时候,肯定会调用compute的时候进行计算,就会通过ShuffleReader读取数据
# 创建ShuffleBlockFetcherIterator,去拉取数据
# 对读取到到的数据进行流处理
# 对读取的数据进行聚合处理
# 对基于排序的shuffle机制,处理分区数据的二次排序
在基于排序的shuffle实现过程中,默认仅仅是基于Partitionid进行排序在分区的内部数据是没有排序的,因此添加了keyOrdering变量,提供是否需要针对分区内部的数据进行排序
为了减少内存的压力,避免GC开销,引入了外部排序器对数据进行排序;当内存不足以容纳排序的数据量时,会根据配置的spark.shuffle.spill属性来决定是否需要spill到磁盘,默认情况下是打开的,如果不打开,在数据量比较大的时候会引发内存溢出问题
override def read(): Iterator[Product2[K, C]] = { // 创建ShuffleBlockFetcherIterator val blockFetcherItr = new ShuffleBlockFetcherIterator( context, blockManager.shuffleClient, blockManager, mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition), // 最多允许理请求总字节数默认是48M SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024) // 对读取到到的数据进行流处理 val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) => blockManager.wrapForCompression(blockId, inputStream) } val ser = Serializer.getSerializer(dep.serializer) val serializerInstance = ser.newInstance() val recordIter = wrappedStreams.flatMap { wrappedStream => serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator } val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency() val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]]( recordIter.map(record => { readMetrics.incRecordsRead(1) record }), context.taskMetrics().updateShuffleReadMetrics()) // An interruptible iterator must be used here in order to support task cancellation val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter) // 对读取的数据进行聚合处理 val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { // 如果要求combine,则进行combine,如果map端已经做了聚合处理,那么这个地方对读取到的聚合结果进行处理 if (dep.mapSideCombine) { // 针对各个map端各分区对key进行合并的结果再次聚合,map的合并可以大大减少网络传输的数据量 val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]] dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context) } else { // 针对未合并的key-value的值进行合并 val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]] dep.aggregator.get.combineValuesByKey(keyValuesIterator, context) } } else { require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]] } // 在基于排序的shuffle实现过程中,默认仅仅是基于Partitionid进行排序 // 在分区的内部数据是没有排序的,因此添加了keyOrdering变量,提供是否需要 // 针对分区内部的数据进行排序 dep.keyOrdering match { /* * 为了减少内存的压力,避免GC开销,引入了外部排序器对数据进行排序;当内存不足以容纳排序 * 的数据量时,会根据配置的spark.shuffle.spill属性来决定是否需要spill到磁盘,默认情况下 * 是打开的,如果不打开,在数据量比较大的时候会引发内存溢出问题 */ case Some(keyOrd: Ordering[K]) => val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser)) sorter.insertAll(aggregatedIter) context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) context.internalMetricsToAccumulators( InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.peakMemoryUsedBytes) sorter.iterator // 不需要排序的时候直接返回 case None => aggregatedIter } }
ShuffleBlockFetcherIterator:从多个block上拉取数据
# 划分本地和远端block,确定数据读取策略,返回需要在远端拉取block的请求集合
# 添加远端请求到队列
# 向block发送远端请求,直到达到阀值
# 开始从本地block拉取数据
private[this] def initialize(): Unit = { // 添加一个任务完成的回到函数用于清理工作 context.addTaskCompletionListener(_ => cleanup()) // 划分本地和远端block,确定数据读取策略,返回需要在远端拉取block的请求集合 val remoteRequests = splitLocalRemoteBlocks() // 添加远端请求到队列 fetchRequests ++= Utils.randomize(remoteRequests) // Send out initial requests for blocks, up to our maxBytesInFlight // 向block发送远端请求,直到达到阀值 while (fetchRequests.nonEmpty && (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { sendRequest(fetchRequests.dequeue()) } val numFetches = remoteRequests.size - fetchRequests.size logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime)) // 开始从本地block拉取数据 fetchLocalBlocks() logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime)) }
划分本地和远端block,确定数据读取策略,返回需要在远端拉取block的请求集合
private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = { // 远端请求从最多5个node去获取数据,每一个节点拉取的数据取决于spark.reducer.maxMbInFlight即maxBytesInFlight参数 // 加入整个集群只允许每次在5台拉取5G的数据,那么每一节点只允许拉取1G数据,这样就可以允许他们并行从5个节点获取, // 而不是主动从一个节点获取 val targetRequestSize = math.max(maxBytesInFlight / 5, 1L) logDebug("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize) // 创建FetchRequest队列,用于存放拉取的数据的请求,每一个请求可能包含多个block, // 具体多少取决于总的请求block大小是否超过目标阀值 val remoteRequests = new ArrayBuffer[FetchRequest] // Tracks total number of blocks (including zero sized blocks) var totalBlocks = 0 for ((address, blockInfos) <- blocksByAddress) { // 获取block的大小,并更新总的block数量信息 totalBlocks += blockInfos.size // 要获取的数据在本地 if (address.executorId == blockManager.blockManagerId.executorId) { // 更新要从本地block拉取的集合 localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1) // 更新要拉取的block数量 numBlocksToFetch += localBlocks.size } else {//数据不在本地时 val iterator = blockInfos.iterator var curRequestSize = 0L // 当前请求的大小 // 存放当前的远端请求 var curBlocks = new ArrayBuffer[(BlockId, Long)] // 遍历每一个block while (iterator.hasNext) { val (blockId, size) = iterator.next() // 过滤掉空的block if (size > 0) { curBlocks += ((blockId, size)) // 更新要拉取的远端的blockId的集合列表 remoteBlocks += blockId // 更新要拉取的block数量 numBlocksToFetch += 1 curRequestSize += size } else if (size < 0) { throw new BlockException(blockId, "Negative block size " + size) } // 如果当前请求的大小已经超过了阀值 if (curRequestSize >= targetRequestSize) { // 创建一个新的FetchRequest,放到请求队列 remoteRequests += new FetchRequest(address, curBlocks) // 清空当前block列表 curBlocks = new ArrayBuffer[(BlockId, Long)] logDebug(s"Creating fetch request of $curRequestSize at $address") // 重置当前请求数量为0 curRequestSize = 0 } } // 最后添加请求到请求队列 if (curBlocks.nonEmpty) { remoteRequests += new FetchRequest(address, curBlocks) } } } logInfo(s"Getting $numBlocksToFetch non-empty blocks out of $totalBlocks blocks") remoteRequests }
private[this] def sendRequest(req: FetchRequest) { logDebug("Sending request for %d blocks (%s) from %s".format( req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort)) // 更新正在处理的请求的数量 bytesInFlight += req.size // 将(blockId, size)转换成map val sizeMap = req.blocks.map { case (blockId, size) => (blockId.toString, size) }.toMap // 获取每一个请求的block列表的blockId val blockIds = req.blocks.map(_._1.toString) // 请求的远端的地址 val address = req.address // 调用ShuffleClient从远程获取数据 shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray, new BlockFetchingListener { override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = { if (!isZombie) { // Increment the ref count because we need to pass this to a different thread. // This needs to be released after use. buf.retain() results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf)) shuffleMetrics.incRemoteBytesRead(buf.size) shuffleMetrics.incRemoteBlocksFetched(1) } logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) } override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = { logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e) results.put(new FailureFetchResult(BlockId(blockId), address, e)) } } ) }
private[this] def fetchLocalBlocks() { val iter = localBlocks.iterator // 开始遍历本地的block while (iter.hasNext) { val blockId = iter.next() try { // 获取本地block数据 val buf = blockManager.getBlockData(blockId) shuffleMetrics.incLocalBlocksFetched(1) shuffleMetrics.incLocalBytesRead(buf.size) buf.retain() // 将结果放入results results.put(new SuccessFetchResult(blockId, blockManager.blockManagerId, 0, buf)) } catch { case e: Exception => // If we see an exception, stop immediately. logError(s"Error occurred while fetching local blocks", e) results.put(new FailureFetchResult(blockId, blockManager.blockManagerId, e)) return } } }