主要负责整个应用程序在运行期间block元数据的管理和维护,以及向从节点发送指令执行命令。
RpcEndpointRef: driverEndpointBlockManagerMasterEndpoint通信终端
Boolean isDriver: 是否在Driver
defremoveExecutor(execId:String) {
// 向BlockManagerMasterEndpoint发送RemoveExecutor消息
tell(RemoveExecutor(execId))
logInfo("Removed "+ execId + " successfully in removeExecutor")
}
def registerBlockManager( blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = { logInfo("Trying to register BlockManager") // 向BlockManagerMasterEndpoint发送RegisterBlockManager消息 tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint)) logInfo("Registered BlockManager") }
def updateBlockInfo( blockManagerId: BlockManagerId, blockId: BlockId, storageLevel: StorageLevel, memSize: Long, diskSize: Long, externalBlockStoreSize: Long): Boolean = { // 向BlockManagerMasterEndpoint发送UpdateBlockInfo消息,并且返回结果 val res = driverEndpoint.askWithRetry[Boolean]( UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, externalBlockStoreSize)) logDebug(s"Updated info of block $blockId") res }
def getLocations(blockId: BlockId): Seq[BlockManagerId] = { // 向BlockManagerMasterEndpoint发送GetLocations消息 driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetLocations(blockId)) }
def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = { // 向BlockManagerMasterEndpoint发送GetLocationsMultipleBlockIds消息 driverEndpoint.askWithRetry[IndexedSeq[Seq[BlockManagerId]]]( GetLocationsMultipleBlockIds(blockIds)) }
def contains(blockId: BlockId): Boolean = { !getLocations(blockId).isEmpty }
def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetPeers(blockManagerId)) }
def removeBlock(blockId: BlockId) { driverEndpoint.askWithRetry[Boolean](RemoveBlock(blockId)) }
def removeRdd(rddId: Int, blocking: Boolean) { val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId)) future.onFailure { case e: Exception => logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e) }(ThreadUtils.sameThread) if (blocking) { timeout.awaitResult(future) } }
def removeShuffle(shuffleId: Int, blocking: Boolean) { val future = driverEndpoint.askWithRetry[Future[Seq[Boolean]]](RemoveShuffle(shuffleId)) future.onFailure { case e: Exception => logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e) }(ThreadUtils.sameThread) if (blocking) { timeout.awaitResult(future) } }
2.11 返回每一个block manager的内存状态 def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = { driverEndpoint.askWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus) }
def getStorageStatus: Array[StorageStatus] = { driverEndpoint.askWithRetry[Array[StorageStatus]](GetStorageStatus) }
def getBlockStatus( blockId: BlockId, askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = { val msg = GetBlockStatus(blockId, askSlaves) val response = driverEndpoint. askWithRetry[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg) val (blockManagerIds, futures) = response.unzip implicit val sameThread = ThreadUtils.sameThread val cbf = implicitly[ CanBuildFrom[Iterable[Future[Option[BlockStatus]]], Option[BlockStatus], Iterable[Option[BlockStatus]]]] val blockStatus = timeout.awaitResult( Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread)) if (blockStatus == null) { throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId) } ../../article/20220623/file.html(blockStatus).flatMap { case (blockManagerId, status) => status.map { s => (blockManagerId, s) } }.toMap }
def hasCachedBlocks(executorId: String): Boolean = { driverEndpoint.askWithRetry[Boolean](HasCachedBlocks(executorId)) }