1.1 从数据源加载数据,数据源可以是本地数据文件和HDFS文件,也可以你是内存里的数据结构或者HBase等,创建初始的RDD
1.2 对RDD进行一系列的transformation操作,每一个transformation可能产生一个或者多个RDD
1.3 对最后的final RDD进行action操作,触发job操作,将最后每一个分区计算后得到结果
1.4 对每一个分区的结果返回到Driver端,进行最后的计算。比如count实际上包含了action和sum两个步骤的计算。RDD可以被cache到内存,也可以checkpoint到磁盘。
我们以count这个action操作为例子,它首先会调用SparkContext的runJob方法
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
在一个RDD的所有分区上运行job,并且返回结果
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { // 判断SparkContext是否停止或者关闭 if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite // 清除闭包 val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } // 委托给DAGScheduler的runJob方法,提交Job dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) // RDD执行checkpoint操作 rdd.doCheckpoint() }
def submitJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = { // 判断任务处理的分区是否存在 val maxPartitions = rdd.partitions.length partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions) } // 获取jobId,如果作业只包含0个任务,则立即返回JobWaiter val jobId = nextJobId.getAndIncrement() if (partitions.size == 0) { return new JobWaiter[U](this, jobId, 0, resultHandler) } assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] // 创建JobWaiter对象 val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) // 创建JobSubmitted对象,放入队列eventProcessLoop eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) waiter }