SparkBroadcast之TorrentBroadcast,Spark Broadcast概述中介绍了四种实现Broadcast的思路,这篇关注Spark中具体的实现TorrentBroadcast,其是Spark 2.0及以后唯一的实现,其他实现都被删除了。
TorrentBroadcast的实现和Spark Broadcast概述中介绍的SplitStream Broadcast (SSB) 基本一致,如下
如上图,当Spark程序使用到Broadcast变量时,driver端维护Broadcast变量及BlockLocations信息,具体如下
Broadcast | Broadcast变量按大小切分为Block(piece)块(默认为4M),如上图中的piece0、piece1 |
BlockLocations | 维护block块的存储位置信息,如上图中piece0:[driver、executor 1]表示piece0这个block在driver和executor1上有存储 |
executor2(如上图所示)首次执行使用了Broadcast变量的task时,需先将Broadcast从其他节点下载到本地并缓存,以后执行的task使用本地缓存的Broadcast变量,从远程下载Broadcast时以block为单位,以piece0的下载为例,流程如下
piece1的下载同理,下载完的piece0、piece1拼接为Broadcast变量,提供给task使用。
executor2本地会缓存两份Broadcast变量,一份为piece0、piece1拼接好的完整的Broadcast,提供给后续task,避免重复的拼接,另一份以block为单位,为其他节点提供远程下载服务。
Broadcast变量的存储及远程下载是基于storage模块的BlockManager实现的,BlockManager的实现细节后续介绍。
blockLocations维护在BlockManagerMasterEndpoint中,如下
上图中BlockId对应子类BroadcastBlockId,以及BlockManagerId存储的信息如下
BroadcastBlockId | broadcastId + blockId(pieceId) |
BlockManagerId | executorId + host + port |
其中broadcastId为自增Id,blockId(pieceId)为字符串”piece”加block的下标,下标为0的block对应的blockId(pieceId)为”piece0”。
Broadcast变量按照4M切分,如下
程序中创建broadcast的例子如下
val sc = new SparkContext("local","wordcount") val broadcast = sc.broadcast(List("a b"))
上述程序对应的Driver端代码调用如下
程序中使用broadcast只需调用其value方法
broadcast.value
对应的代码调用如下
介绍TorrentBroadcast的实现思路,底层基于BlockManager。