Master作为Spark standalone模式的核心,如果Master出现异常,那么集群就不能正常工作。所以Spark会从Standby中选择一个节点作为Master.
Spark支持以下几种策略,这种策略可以通过配置文件spark-env.sh配置spark.deploy.recoveryMode
# ZOOKEEPER: 集群元数据持久化到zookeeper,当master出现异常的时候,zookeeper会通过选举机制选举出新的Master,新的Master接管集群时需要从zookeeper获取持久化信息,并根据这些信息恢复集群状态
# FILESYSTEM: 集群的元数据持久化到文件系统,当Master出现异常的时候,只要在该机器上重启Master,启动后的Master获取持久化信息并根据持久化信息恢复集群状态
# CUSTOM: 自定义恢复模式,实现StandaloneRecoveryModeFactory抽象类进行实现,并把该类配置到配置文件,当Master出现异常,会根据用户自定义的方式进行恢复集群状况
# NONE: 不持久化集群元数据,当Master出现异常时,新启动的Master不进行恢复集群状态
我们知道Master继承了ThreadSafeRpcEndpoint,从而可以进行Rpc通信,而且它还继承了特质LeaderElectable,从而可以进行选举Leader和撤销Leader身份。
trait LeaderElectable {
def electedLeader(): Unit
def revokedLeadership(): Unit
}
state = RecoveryState.STANDBY: 初始状态设置为standby
PersistenceEngine persistenceEngine:持久化引擎,用于持久化集群元数据
LeaderElectionAgent leaderElectionAgent:用于Leader选举的代理
在Master启动的时候,会根据当前配置的spark.deploy.recoveryMode策略,如果没有默认是None得到对应的持久化引擎和用于选举Leader的持久化代理
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match { // 如果恢复模式是ZOOKEEPER,那么通过zookeeper来持久化恢复状态 case "ZOOKEEPER" => logInfo("Persisting recovery state to ZooKeeper") val zkFactory = new ZooKeeperRecoveryModeFactory(conf, serializer) (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this)) // 如果恢复模式是文件系统,那么通过文件系统来持久化恢复状态 case "FILESYSTEM" => val fsFactory = new FileSystemRecoveryModeFactory(conf, serializer) (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this)) // 如果恢复模式是定制的,那么指定你定制的全路径类名,然后产生相关操作来持久化恢复状态 case "CUSTOM" => val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory")) val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer]) .newInstance(conf, serializer) .asInstanceOf[StandaloneRecoveryModeFactory] (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this)) case _ => (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this)) } persistenceEngine = persistenceEngine_ leaderElectionAgent = leaderElectionAgent_
三 Master异常进行恢复
case ElectedLeader => // 根据配置的spark.deploy.recoveryMode,决定使用哪一种recovery 模式,然后决定采用什么持久化engine // 然后根据持久化engine,读取持久化数据,得到一个已经存储的(applicationInfo,driverInfo,workerInfo) // 的一个三元组 val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv) // 根据读取的持久化数据是否都为空,判断RecoveryState状态是否是alive还是recovering state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) { RecoveryState.ALIVE } else { RecoveryState.RECOVERING } logInfo("I have been elected leader! New state: " + state) // 如果处于recovering状态 if (state == RecoveryState.RECOVERING) { // 开始恢复数据 beginRecovery(storedApps, storedDrivers, storedWorkers) // 后台线程调度一个线程去定期检查master完成了恢复工作 recoveryCompletionTask = forwardMessageThread.schedule(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(CompleteRecovery) } }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS) } // 如果是CompleteRecovery,则调用completeRecovery case CompleteRecovery => completeRecovery()
private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo], storedWorkers: Seq[WorkerInfo]) { // 遍历每一个存储的application,注册该application,并且发送MasterChanged请求 for (app <- storedApps) { logInfo("Trying to recover app: " + app.id) try { registerApplication(app) // 将该application状态置为UNKNOWN状态 app.state = ApplicationState.UNKNOWN // 然后这个app向master发送MasterChanged请求 app.driver.send(MasterChanged(self, masterWebUiUrl)) } catch { case e: Exception => logInfo("App " + app.id + " had exception on reconnect") } } // 遍历每一个存储的driver, 更新master所维护的driver集合 for (driver <- storedDrivers) { drivers += driver } // 遍历每一个存储的wroker,然后向master注册worker for (worker <- storedWorkers) { logInfo("Trying to recover worker: " + worker.id) try { // 注册worker,就是更新master的woker集合,和worker相关的映射列表 registerWorker(worker) // 将该worker状态置为UNKNOWN状态 worker.state = WorkerState.UNKNOWN // 然后改worker向master发送MasterChanged请求 worker.endpoint.send(MasterChanged(self, masterWebUiUrl)) } catch { case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect") } } }
case MasterChanged(masterRef, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL) // 获取新的master的url和master,连接状态置为true,取消之前的尝试重新注册 changeMaster(masterRef, masterWebUiUrl) // 创建当前节点executors的简单描述对象ExecutorDescription val execs = executors.values. map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state)) // 向新的master发送WorkerSchedulerStateResponse消息,然后会做一些操作 masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq))
case MasterChanged(masterRef, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL) master = Some(masterRef) alreadyDisconnected = false masterRef.send(MasterChangeAcknowledged(appId.get))
case WorkerSchedulerStateResponse(workerId, executors, driverIds) => // 根据workerId获取worker idToWorker.get(workerId) match { case Some(worker) => logInfo("Worker has been re-registered: " + workerId) // worker状态置为alive worker.state = WorkerState.ALIVE // 从指定的executor中过滤出哪些是有效的executor val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined) // 遍历有效的executors for (exec <- validExecutors) { // 获取executor所对应的app val app = idToApp.get(exec.appId).get // 为app设置executor,比如哪一个worker,多少核数等资源 val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId)) // 将该executor添加到该woker上 worker.addExecutor(execInfo) execInfo.copyState(exec) } // 将所有的driver设置为RUNNING然后加入到worker中 for (driverId <- driverIds) { drivers.find(_.id == driverId).foreach { driver => driver.worker = Some(worker) driver.state = DriverState.RUNNING worker.drivers(driverId) = driver } } case None => logWarning("Scheduler state from unknown worker: " + workerId) } // 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作 if (canCompleteRecovery) { completeRecovery() }
case MasterChangeAcknowledged(workerId, executors, driverIds) => // 根据workerId获取worker idToWorker.get(workerId) match { case Some(worker) => logInfo("Worker has been re-registered: " + workerId) // worker状态置为alive worker.state = WorkerState.ALIVE // 从指定的executor中过滤出哪些是有效的executor val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined) // 遍历有效的executors for (exec <- validExecutors) { // 获取executor所对应的app val app = idToApp.get(exec.appId).get // 为app设置executor,比如哪一个worker,多少核数等资源 val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId)) // 将该executor添加到该woker上 worker.addExecutor(execInfo) execInfo.copyState(exec) } // 将所有的driver设置为RUNNING然后加入到worker中 for (driverId <- driverIds) { drivers.find(_.id == driverId).foreach { driver => driver.worker = Some(worker) driver.state = DriverState.RUNNING worker.drivers(driverId) = driver } } case None => logWarning("Scheduler state from unknown worker: " + workerId) } // 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作 if (canCompleteRecovery) { completeRecovery() }
private def completeRecovery() { // 如果状态不是recovering则返回 if (state != RecoveryState.RECOVERING) { return } // 然后状态置为completing_recovery(正处于恢复中) state = RecoveryState.COMPLETING_RECOVERY // 杀掉那些不响应但是状态不是UNKNOWN的worker和application workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker) apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication) // 重新调度未被任何worker声称的driver,即还没有worker来运行 drivers.filter(_.worker.isEmpty).foreach { d => logWarning(s"Driver ${d.id} was not found after master recovery") // 如果是driver是监管者,则重新发起driver,否则删除driver if (d.desc.supervise) { logWarning(s"Re-launching ${d.id}") relaunchDriver(d) } else { removeDriver(d.id, DriverState.ERROR, None) logWarning(s"Did not re-launch ${d.id} because it was not supervised") } } // 然后状态置为alive state = RecoveryState.ALIVE // 重新分配资源,调度driver和application schedule() logInfo("Recovery complete - resuming operations!") }
我们这里主要以zookeeper为例子:
当节点启动的时候,会调用onstart方法,然后个根据恢复模式zookeeper,会初始化ZooKeeperPersistenceEngine和ZooKeeperLeaderElectionAgent。ZooKeeperPersistenceEngine主要负责持久化app,driver,woker等信息,ZooKeeperLeaderElectionAgent主要负责监听master状态变化和选举Leader。
ZooKeeperLeaderElectionAgent在初始化的时候,就会启动,它在启动的时候会创建LeaderLatch,Leader启动就会启动一个后台线程去检测Leader状态,然后ZooKeeperLeaderElectionAgent根据当前节点状态判断是不是Leader,向Master发送ElectedLeader和RevokedLeadership消息