频道栏目
首页 > 资讯 > 云计算 > 正文

Spark源码知识讲解之Master主备切换机制

17-11-09        来源:[db:作者]  
收藏   我要投稿

Master作为Spark standalone模式的核心,如果Master出现异常,那么集群就不能正常工作。所以Spark会从Standby中选择一个节点作为Master.

Spark支持以下几种策略,这种策略可以通过配置文件spark-env.sh配置spark.deploy.recoveryMode

# ZOOKEEPER: 集群元数据持久化到zookeeper,当master出现异常的时候,zookeeper会通过选举机制选举出新的Master,新的Master接管集群时需要从zookeeper获取持久化信息,并根据这些信息恢复集群状态

# FILESYSTEM: 集群的元数据持久化到文件系统,当Master出现异常的时候,只要在该机器上重启Master,启动后的Master获取持久化信息并根据持久化信息恢复集群状态

# CUSTOM: 自定义恢复模式,实现StandaloneRecoveryModeFactory抽象类进行实现,并把该类配置到配置文件,当Master出现异常,会根据用户自定义的方式进行恢复集群状况

# NONE: 不持久化集群元数据,当Master出现异常时,新启动的Master不进行恢复集群状态

我们知道Master继承了ThreadSafeRpcEndpoint,从而可以进行Rpc通信,而且它还继承了特质LeaderElectable,从而可以进行选举Leader和撤销Leader身份。

trait LeaderElectable {
def electedLeader(): Unit
def revokedLeadership(): Unit
}

一 HA相关的属性

state = RecoveryState.STANDBY: 初始状态设置为standby

PersistenceEngine persistenceEngine:持久化引擎,用于持久化集群元数据

LeaderElectionAgent leaderElectionAgent:用于Leader选举的代理

二 根据恢复模式策略,初始化对应的持久化对象和选举代理对象

在Master启动的时候,会根据当前配置的spark.deploy.recoveryMode策略,如果没有默认是None得到对应的持久化引擎和用于选举Leader的持久化代理

val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
 // 如果恢复模式是ZOOKEEPER,那么通过zookeeper来持久化恢复状态
 case "ZOOKEEPER" =>
 logInfo("Persisting recovery state to ZooKeeper")
 val zkFactory =
 new ZooKeeperRecoveryModeFactory(conf, serializer)
 (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
 // 如果恢复模式是文件系统,那么通过文件系统来持久化恢复状态
 case "FILESYSTEM" =>
 val fsFactory =
 new FileSystemRecoveryModeFactory(conf, serializer)
 (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
 // 如果恢复模式是定制的,那么指定你定制的全路径类名,然后产生相关操作来持久化恢复状态
 case "CUSTOM" =>
 val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
 val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
 .newInstance(conf, serializer)
 .asInstanceOf[StandaloneRecoveryModeFactory]
 (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
 case _ =>
 (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_

三 Master异常进行恢复

3.1 开始进行选举

case ElectedLeader =>
 // 根据配置的spark.deploy.recoveryMode,决定使用哪一种recovery 模式,然后决定采用什么持久化engine
 // 然后根据持久化engine,读取持久化数据,得到一个已经存储的(applicationInfo,driverInfo,workerInfo)
 // 的一个三元组
 val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
 // 根据读取的持久化数据是否都为空,判断RecoveryState状态是否是alive还是recovering
 state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
 RecoveryState.ALIVE
 } else {
 RecoveryState.RECOVERING
 }
 logInfo("I have been elected leader! New state: " + state)
 // 如果处于recovering状态
 if (state == RecoveryState.RECOVERING) {
 // 开始恢复数据
 beginRecovery(storedApps, storedDrivers, storedWorkers)
 // 后台线程调度一个线程去定期检查master完成了恢复工作
 recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
 override def run(): Unit = Utils.tryLogNonFatalError {
 self.send(CompleteRecovery)
 }
 }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
 }
// 如果是CompleteRecovery,则调用completeRecovery
case CompleteRecovery => completeRecovery()

3.2 开始进行恢复

private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
 storedWorkers: Seq[WorkerInfo]) {
 // 遍历每一个存储的application,注册该application,并且发送MasterChanged请求
 for (app <- storedApps) {
 logInfo("Trying to recover app: " + app.id)
 try {
 registerApplication(app)
 // 将该application状态置为UNKNOWN状态
 app.state = ApplicationState.UNKNOWN
 // 然后这个app向master发送MasterChanged请求
 app.driver.send(MasterChanged(self, masterWebUiUrl))
 } catch {
 case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
 }
 }
 // 遍历每一个存储的driver, 更新master所维护的driver集合
 for (driver <- storedDrivers) {
 drivers += driver
 }
 // 遍历每一个存储的wroker,然后向master注册worker
 for (worker <- storedWorkers) {
 logInfo("Trying to recover worker: " + worker.id)
 try {
 // 注册worker,就是更新master的woker集合,和worker相关的映射列表
 registerWorker(worker)
 // 将该worker状态置为UNKNOWN状态
 worker.state = WorkerState.UNKNOWN
 // 然后改worker向master发送MasterChanged请求
 worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
 } catch {
 case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
 }
 }
}

3.3 worker接受到MasterChange消息,向master发送消息:

WorkerSchedulerStateResponse

case MasterChanged(masterRef, masterWebUiUrl) =>
 logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
 // 获取新的master的url和master,连接状态置为true,取消之前的尝试重新注册
 changeMaster(masterRef, masterWebUiUrl)
 // 创建当前节点executors的简单描述对象ExecutorDescription
 val execs = executors.values.
 map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
 // 向新的master发送WorkerSchedulerStateResponse消息,然后会做一些操作
 masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq))

3.4 Application接受到MasterChange消息,向Master发送消息:MasterChangeAcknowledged

case MasterChanged(masterRef, masterWebUiUrl) =>
 logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
 master = Some(masterRef)
 alreadyDisconnected = false
 masterRef.send(MasterChangeAcknowledged(appId.get))

3.5 Master接收到WorkerSchedulerStateResponse和MasterChangeAcknowledged消息后,调用completeRecovery操作,kill掉那些不响应但是状态不是UNKNOWN的worker和application

case WorkerSchedulerStateResponse(workerId, executors, driverIds) =>
 // 根据workerId获取worker
 idToWorker.get(workerId) match {
 case Some(worker) =>
 logInfo("Worker has been re-registered: " + workerId)
 // worker状态置为alive
 worker.state = WorkerState.ALIVE
 // 从指定的executor中过滤出哪些是有效的executor
 val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
 // 遍历有效的executors
 for (exec <- validExecutors) {
 // 获取executor所对应的app
 val app = idToApp.get(exec.appId).get
 // 为app设置executor,比如哪一个worker,多少核数等资源
 val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
 // 将该executor添加到该wokerworker.addExecutor(execInfo)
 execInfo.copyState(exec)
 }
 // 将所有的driver设置为RUNNING然后加入到worker中
 for (driverId <- driverIds) {
 drivers.find(_.id == driverId).foreach { driver =>
 driver.worker = Some(worker)
 driver.state = DriverState.RUNNING
 worker.drivers(driverId) = driver
 }
 }
 case None =>
 logWarning("Scheduler state from unknown worker: " + workerId)
 }
 // 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作
 if (canCompleteRecovery) { completeRecovery() }
case MasterChangeAcknowledged(workerId, executors, driverIds) =>
 // 根据workerId获取worker
 idToWorker.get(workerId) match {
 case Some(worker) =>
 logInfo("Worker has been re-registered: " + workerId)
 // worker状态置为alive
 worker.state = WorkerState.ALIVE
 // 从指定的executor中过滤出哪些是有效的executor
 val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
 // 遍历有效的executors
 for (exec <- validExecutors) {
 // 获取executor所对应的app
 val app = idToApp.get(exec.appId).get
 // 为app设置executor,比如哪一个worker,多少核数等资源
 val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
 // 将该executor添加到该wokerworker.addExecutor(execInfo)
 execInfo.copyState(exec)
 }
 // 将所有的driver设置为RUNNING然后加入到worker中
 for (driverId <- driverIds) {
 drivers.find(_.id == driverId).foreach { driver =>
 driver.worker = Some(worker)
 driver.state = DriverState.RUNNING
 worker.drivers(driverId) = driver
 }
 }
 case None =>
 logWarning("Scheduler state from unknown worker: " + workerId)
 }
 // 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作
 if (canCompleteRecovery) { completeRecovery() }


3.6 完成恢复

private def completeRecovery() {
 // 如果状态不是recovering则返回
 if (state != RecoveryState.RECOVERING) { return }
 // 然后状态置为completing_recovery(正处于恢复中)
 state = RecoveryState.COMPLETING_RECOVERY

 // 杀掉那些不响应但是状态不是UNKNOWN的worker和application
 workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
 apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)

 // 重新调度未被任何worker声称的driver,即还没有worker来运行
 drivers.filter(_.worker.isEmpty).foreach { d =>
 logWarning(s"Driver ${d.id} was not found after master recovery")
 // 如果是driver是监管者,则重新发起driver,否则删除driver
 if (d.desc.supervise) {
 logWarning(s"Re-launching ${d.id}")
 relaunchDriver(d)
 } else {
 removeDriver(d.id, DriverState.ERROR, None)
 logWarning(s"Did not re-launch ${d.id} because it was not supervised")
 }
 }
 // 然后状态置为alive
 state = RecoveryState.ALIVE
 // 重新分配资源,调度driver和application
 schedule()
 logInfo("Recovery complete - resuming operations!")
}

四 触发选举流程

我们这里主要以zookeeper为例子:

当节点启动的时候,会调用onstart方法,然后个根据恢复模式zookeeper,会初始化ZooKeeperPersistenceEngine和ZooKeeperLeaderElectionAgent。ZooKeeperPersistenceEngine主要负责持久化app,driver,woker等信息,ZooKeeperLeaderElectionAgent主要负责监听master状态变化和选举Leader。

ZooKeeperLeaderElectionAgent在初始化的时候,就会启动,它在启动的时候会创建LeaderLatch,Leader启动就会启动一个后台线程去检测Leader状态,然后ZooKeeperLeaderElectionAgent根据当前节点状态判断是不是Leader,向Master发送ElectedLeader和RevokedLeadership消息

相关TAG标签
上一篇:mysql与canal,实现数据同步的方法
下一篇:Mysql数据库无法启动问题的解决方案
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站