频道栏目
首页 > 资讯 > 云计算 > 正文

Spark源码知识讲解之Job触发原理

17-11-09        来源:[db:作者]  
收藏   我要投稿

一 Job的执行流程

1.1 从数据源加载数据,数据源可以是本地数据文件和HDFS文件,也可以你是内存里的数据结构或者HBase等,创建初始的RDD

1.2 对RDD进行一系列的transformation操作,每一个transformation可能产生一个或者多个RDD

1.3 对最后的final RDD进行action操作,触发job操作,将最后每一个分区计算后得到结果

1.4 对每一个分区的结果返回到Driver端,进行最后的计算。比如count实际上包含了action和sum两个步骤的计算。RDD可以被cache到内存,也可以checkpoint到磁盘。

二 Job触发流程源码

2.1 调用action操作,运行job

我们以count这个action操作为例子,它首先会调用SparkContext的runJob方法

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

2.2runJob

在一个RDD的所有分区上运行job,并且返回结果

def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U,
 partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = {
 // 判断SparkContext是否停止或者关闭
 if (stopped.get()) {
 throw new IllegalStateException("SparkContext has been shutdown")
 }
 val callSite = getCallSite
 // 清除闭包
 val cleanedFunc = clean(func)
 logInfo("Starting job: " + callSite.shortForm)
 if (conf.getBoolean("spark.logLineage", false)) {
 logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
 }
 // 委托给DAGScheduler的runJob方法,提交Job
 dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
 progressBar.foreach(_.finishAll())
 // RDD执行checkpoint操作
 rdd.doCheckpoint()
}

2.3submitJob

def submitJob[T, U](rdd: RDD[T],
 func: (TaskContext, Iterator[T]) => U,
 partitions: Seq[Int],
 callSite: CallSite,
 resultHandler: (Int, U) => Unit,
 properties: Properties): JobWaiter[U] = {
 // 判断任务处理的分区是否存在
 val maxPartitions = rdd.partitions.length
 partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
 throw new IllegalArgumentException(
 "Attempting to access a non-existent partition: " + p + ". " +
 "Total number of partitions: " + maxPartitions)
 }
 // 获取jobId,如果作业只包含0个任务,则立即返回JobWaiter
 val jobId = nextJobId.getAndIncrement()
 if (partitions.size == 0) {
 return new JobWaiter[U](this, jobId, 0, resultHandler)
 }

 assert(partitions.size > 0)
 val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
 // 创建JobWaiter对象
 val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
 // 创建JobSubmitted对象,放入队列eventProcessLoop
 eventProcessLoop.post(JobSubmitted(
 jobId, rdd, func2, partitions.toArray, callSite, waiter,
 SerializationUtils.clone(properties)))
 waiter
}

相关TAG标签
上一篇:Spark源码知识讲解之DAGScheduler以及stage的划分
下一篇:数据模型、数据库范式讲解
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站