BlockManagerMasterEndpoint主要用于向BlockManagerSlaveEndpoint发送消息,主要分析他们都接受哪些消息,接受到消息之后怎么处理?
首先它维护了3个重要映射:
维护一个
维护一个
维护一个<>
//接收消息并返回结果
override def receiveAndReply(context:RpcCallContext): PartialFunction[Any, Unit] = {
// 注册BlockManager
case RegisterBlockManager(blockManagerId,maxMemSize, slaveEndpoint) =>
register(blockManagerId,maxMemSize, slaveEndpoint)
context.reply(true)
// 更新block信息
case _updateBlockInfo@ UpdateBlockInfo(
blockManagerId, blockId, storageLevel, deserializedSize,size, externalBlockStoreSize) =>
context.reply(updateBlockInfo(
blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize))
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
// 根据blockId获取对应的所有BlockManagerId列表
case GetLocations(blockId) =>
context.reply(getLocations(blockId))
// 根据指定的blockId列表,返回多个blockId对应的BlockManagerId集合
case GetLocationsMultipleBlockIds(blockIds) =>
context.reply(getLocationsMultipleBlockIds(blockIds))
// 获取指定的blockManagerId是Executor的BlockManager,且不包括指定blockManagerId
case GetPeers(blockManagerId) =>
context.reply(getPeers(blockManagerId))
// 根据executorId获取RPC远程主机和端口号
case GetRpcHostPortForExecutor(executorId) =>
context.reply(getRpcHostPortForExecutor(executorId))
// 获取内存状态
case GetMemoryStatus=>
context.reply(memoryStatus)
// 获取存储状态
case GetStorageStatus=>
context.reply(storageStatus)
// 返回所有block manager的block状态
case GetBlockStatus(blockId,askSlaves) =>
context.reply(blockStatus(blockId,askSlaves))
// 获取与过滤条件相匹配的blockId
case GetMatchingBlockIds(filter,askSlaves) =>
context.reply(getMatchingBlockIds(filter,askSlaves))
// 删除指定rdd对应的所有blocks
case RemoveRdd(rddId) =>
context.reply(removeRdd(rddId))
// 删除该shuffle对应的所有block
case RemoveShuffle(shuffleId) =>
context.reply(removeShuffle(shuffleId))
// 删除广播数据对应的block
case RemoveBroadcast(broadcastId,removeFromDriver) =>
context.reply(removeBroadcast(broadcastId,removeFromDriver))
// 从worker节点(slave节点)删除对应block
case RemoveBlock(blockId) =>
removeBlockFromWorkers(blockId)
context.reply(true)
// 试图从BlockManagerMaster移除掉这个Executor
case RemoveExecutor(execId) =>
removeExecutor(execId)
context.reply(true)
// 停止StopBlockManagerMaster消息
case StopBlockManagerMaster=>
context.reply(true)
stop()
// 发送BlockManager心跳检测消息
case BlockManagerHeartbeat(blockManagerId) =>
context.reply(heartbeatReceived(blockManagerId))
// 判断executorId对应的BlockManager是否有缓存的block
case HasCachedBlocks(executorId) =>
blockManagerIdByExecutor.get(executorId)match {
case Some(bm) =>
if (blockManagerInfo.contains(bm)) {
val bmInfo= blockManagerInfo(bm)
context.reply(bmInfo.cachedBlocks.nonEmpty)
} else {
context.reply(false)
}
case None => context.reply(false)
}
}
首先删除和该rdd相关的元数据信息;然后再向BlockManager从节点发送RemoveRdd进行具体的删除
private def removeRdd(rddId: Int): Future[Seq[Int]] = { // 将所有可以转化为rdd的blockId转化为rddId,然后过滤出和当前指定rddId相等的blocks val blocks = blockLocations.keys.flatMap(_.asRDDId).filter(_.rddId == rddId) // 遍历和该rdd的blocks,从该block对应的BlockManager中删除该block // 并且blockLocations也要移除这个block blocks.foreach { blockId => val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId) bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId))) blockLocations.remove(blockId) } // 然后通过BlockManagerSlaveEndpoint向slave发送RemoveRdd消息 val removeMsg = RemoveRdd(rddId) Future.sequence( blockManagerInfo.values.map { bm => bm.slaveEndpoint.ask[Int](removeMsg) }.toSeq ) }
只是向slave发送RemoveShuffle消息,让slave去删除shuffle相关的block
private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = { // 只是向slave发送RemoveShuffle消息,让slave去删除shuffle相关的block val removeMsg = RemoveShuffle(shuffleId) Future.sequence( blockManagerInfo.values.map { bm => bm.slaveEndpoint.ask[Boolean](removeMsg) }.toSeq ) }
private def removeBlockManager(blockManagerId: BlockManagerId) { // 根据blockManaerId获取BlockInfo val info = blockManagerInfo(blockManagerId) // 从中移除diaper该block manager对应的executorId blockManagerIdByExecutor -= blockManagerId.executorId // 从中移除掉这个BlockManager blockManagerInfo .remove(blockManagerId) // 遍历该BlockManager所对应的所有block val iterator = info.blocks.keySet.iterator while (iterator.hasNext) { // 获取每一个blockId val blockId = iterator.next // 从<>>映射中得到该block所对应的所有BlockManager val locations = blockLocations.get(blockId) // 所有BlockManager中移除当前要移除的blockManagerId locations -= blockManagerId // 移除完了之后,Set大小,如果没有数据了,则表示没有对应的 // BlockManger与之对应,我们应该从<> if (locations.size == 0) { blockLocations.remove(blockId) } } listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId)) logInfo(s"Removing block manager $blockManagerId") }>移除这个blockId
private def removeBlockFromWorkers(blockId: BlockId) { // 获取该block所在的那些BlockManagerId的列表 val locations = blockLocations.get(blockId) if (locations != null) { // 遍历blockManagerId列表,然后获取每一个blockManagerId对应的BlockManager // 如果这个BlockManager已经定义,则向slave节点发送RemoveBlock消息 locations.foreach { blockManagerId: BlockManagerId => val blockManager = blockManagerInfo.get(blockManagerId) if (blockManager.isDefined) { blockManager.get.slaveEndpoint.ask[Boolean](RemoveBlock(blockId)) } } } }
private def blockStatus(blockId: BlockId, askSlaves: Boolean): Map[BlockManagerId, Future[Option[BlockStatus]]] = { // 创建GetBlockStatus对象 val getBlockStatus = GetBlockStatus(blockId) // 遍历注册过的BlockManagerInfo,如果需要向slave查询,则向BlockManagerSlaveEndpoint发送BlockStatus消息 // 否则将返回结果封装Future中,最后将结果转化成Map[BlockManagerId, Future[Option[BlockStatus]]] blockManagerInfo.values.map { info => val blockStatusFuture = if (askSlaves) { info.slaveEndpoint.ask[Option[BlockStatus]](getBlockStatus) } else { Future { info.getStatus(blockId) } } (info.blockManagerId, blockStatusFuture) }.toMap }
private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) { val time = System.currentTimeMillis() // 如果还没有被注册 if (!blockManagerInfo.contains(id)) { // 获取该executor对应的BlockManagerId blockManagerIdByExecutor.get(id.executorId) match { // 但是该block对应的executor已经有对应的BlockManager,则表示是旧的BlockManager,则把该Executor删除掉 case Some(oldId) => logError("Got two different block manager registrations on same executor - " + s" will replace old one $oldId with new one $id") // 从内存中移除该Executor以及Executor对应的BlockManager removeExecutor(id.executorId) case None => } logInfo("Registering block manager %s with %s RAM, %s".format( id.hostPort, Utils.bytesToString(maxMemSize), id)) // <ExecuotorId,BlockManagerId> 映射加入这个BlockManagerId blockManagerIdByExecutor(id.executorId) = id // 创建BlockManagerInfo,加入到blockManagerInfo (id) = new BlockManagerInfo( id, System.currentTimeMillis(), maxMemSize, slaveEndpoint) } listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize)) }
private def updateBlockInfo( blockManagerId: BlockManagerId, blockId: BlockId, storageLevel: StorageLevel, memSize: Long, diskSize: Long, externalBlockStoreSize: Long): Boolean = { // 如果该blockManagerId还没有注册,则返回 if (!blockManagerInfo.contains(blockManagerId)) { // 如果blockManagerId是driver上的BlockManager而且又不在本地,意思就是这个BlockManager是其他节点的 if (blockManagerId.isDriver && !isLocal) { // We intentionally do not register the master (except in local mode), // so we should not indicate failure. return true } else { return false } } // 如果没有block,也不用更新block,所以返回 if (blockId == null) { blockManagerInfo(blockManagerId).updateLastSeenMs() return true } // 调用BlockManagerInfo的updateBlockInfo方法,更新block blockManagerInfo(blockManagerId).updateBlockInfo( blockId, storageLevel, memSize, diskSize, externalBlockStoreSize) var locations: mutable.HashSet[BlockManagerId] = null // 如果blockLocations包含blockId,则获取block对应的所有BlockManager集合,否则创建空的集合 // 然后更新blockLocations集合 if (blockLocations.containsKey(blockId)) { locations = blockLocations.get(blockId) } else { locations = new mutable.HashSet[BlockManagerId] blockLocations.put(blockId, locations) } // 存储级别有效,则向block对应的BlockManger集合里添加该blockManagerId // 如果无效,则移除之 if (storageLevel.isValid) { locations.add(blockManagerId) } else { locations.remove(blockManagerId) } // 如果block对应的BlockManger集合为空,则没有BlockManager与之对应,则从blockLocations删除这个blockId if (locations.size == 0) { blockLocations.remove(blockId) } true }
1.9 getPeers 获取指定的blockManagerId是Executor的BlockManager,且不包括指定blockManagerId
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { // 获取所有BlockManagerId集合 val blockManagerIds = blockManagerInfo.keySet // 如果包含指定的blockManagerId if (blockManagerIds.contains(blockManagerId)) { // 得到Executor的BlockManager,再得到和当前blockManagerId不相等的BlockMangerId集合 blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq } else { Seq.empty } }
接收BlockManagerMasterEndpoint发送过来的指令,然后执行该指令
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { // 接收master发送过来的RemoveBlock消息 case RemoveBlock(blockId) => doAsync[Boolean]("removing block " + blockId, context) { // 调用BlockManager删除block blockManager.removeBlock(blockId) true } // 接收master发送过来的RemoveRdd消息 case RemoveRdd(rddId) => doAsync[Int]("removing RDD " + rddId, context) { // 调用BlockManager删除rdd对应的block blockManager.removeRdd(rddId) } // 接收master发送过来的RemoveShuffle消息 case RemoveShuffle(shuffleId) => doAsync[Boolean]("removing shuffle " + shuffleId, context) { // 首先需要调用MapOutputTracker取消shuffleId的注册的 if (mapOutputTracker != null) { mapOutputTracker.unregisterShuffle(shuffleId) } // 删除shuffle的元数据 SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId) } // 接收master发送过来的RemoveBroadcast消息 case RemoveBroadcast(broadcastId, _) => doAsync[Int]("removing broadcast " + broadcastId, context) { // 调用BlockManagerd的removeBroadcast blockManager.removeBroadcast(broadcastId, tellMaster = true) } // 接收消息GetBlockStatus,调用blockManager的getStatus case GetBlockStatus(blockId, _) => context.reply(blockManager.getStatus(blockId)) // 接收GetMatchingBlockIds消息调用blockManager的getMatchingBlockIds方法 case GetMatchingBlockIds(filter, _) => context.reply(blockManager.getMatchingBlockIds(filter)) }