当重新选择Leader的时候,会进行集群的恢复,在恢复的过程中,就会向Worker和AppClient发送MasterChanged消息。
private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo], storedWorkers: Seq[WorkerInfo]) { // 遍历每一个存储的application,注册该application,并且发送MasterChanged请求 for (app <- storedApps) { logInfo("Trying to recover app: " + app.id) try { registerApplication(app) // 将该application状态置为UNKNOWN状态 app.state = ApplicationState.UNKNOWN // 然后这个app向master发送MasterChanged请求 app.driver.send(MasterChanged(self, masterWebUiUrl)) } catch { case e: Exception => logInfo("App " + app.id + " had exception on reconnect") } } // 遍历每一个存储的driver, 更新master所维护的driver集合 for (driver <- storedDrivers) { drivers += driver } // 遍历每一个存储的wroker,然后向master注册worker for (worker <- storedWorkers) { logInfo("Trying to recover worker: " + worker.id) try { // 注册worker,就是更新master的woker集合,和worker相关的映射列表 registerWorker(worker) // 将该worker状态置为UNKNOWN状态 worker.state = WorkerState.UNKNOWN // 然后改worker向master发送MasterChanged请求 worker.endpoint.send(MasterChanged(self, masterWebUiUrl)) } catch { case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect") } } }
# 获取新的master的url和master,连接状态置为true,取消之前的尝试重新注册
# 向新的master发送WorkerSchedulerStateResponse消息,然后会做一些操作
case MasterChanged(masterRef, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
// 获取新的master的url和master,连接状态置为true,取消之前的尝试重新注册
changeMaster(masterRef, masterWebUiUrl)
// 创建当前节点executors的简单描述对象ExecutorDescription
val execs = executors.values.
map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
// 向新的master发送WorkerSchedulerStateResponse消息,然后会做一些操作
masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq))
# 更新master
# 向新的master发送MasterChangeAcknowledged消息
case MasterChanged(masterRef, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL) // 更新master master = Some(masterRef) alreadyDisconnected = false // 向新的master发送MasterChangeAcknowledged消息 masterRef.send(MasterChangeAcknowledged(appId.get))
由于这是新的master,所以worker需要重新注册,然后新的master再次把之前相关的应用程序在worker上进行恢复
case WorkerSchedulerStateResponse(workerId, executors, driverIds) => // 根据workerId获取worker idToWorker.get(workerId) match { case Some(worker) => logInfo("Worker has been re-registered: " + workerId) // worker状态置为alive worker.state = WorkerState.ALIVE // 从指定的executor中过滤出哪些是有效的executor val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined) // 遍历有效的executors for (exec <- validExecutors) { // 获取executor所对应的app val app = idToApp.get(exec.appId).get // 为app设置executor,比如哪一个worker,多少核数等资源 val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId)) // 将该executor添加到该woker上 worker.addExecutor(execInfo) execInfo.copyState(exec) } // 将所有的driver设置为RUNNING然后加入到worker中 for (driverId <- driverIds) { drivers.find(_.id == driverId).foreach { driver => driver.worker = Some(worker) driver.state = DriverState.RUNNING worker.drivers(driverId) = driver } } case None => logWarning("Scheduler state from unknown worker: " + workerId) } // 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作 if (canCompleteRecovery) { completeRecovery() }
# 更新application状态为WAITTING
# 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作
case MasterChangeAcknowledged(appId) => idToApp.get(appId) match { case Some(app) => logInfo("Application has been re-registered: " + appId) app.state = ApplicationState.WAITING case None => logWarning("Master change ack from unknown app: " + appId) } // 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作 if (canCompleteRecovery) { completeRecovery() }