85

Spark Streaming 是如何提交任务的

 5 years ago
source link: http://www.10tiao.com/html/240/201806/2649260341/1.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

赵法宪



大数据爱好者,对部分大数据源码有过研究。熟悉Hdfs, HBase, Spark等相关开发。


Spark Streaming是如何提交任务的?

Spark Streaming 是现在实时消息处理的解决方案之一,本文是简单介绍一下 Spark Streaming 的任务是如何提交的。 默认读者知道什么是 RDD, 以及 SparkContext 是如何提交 RDD 任务的。 Spark 版本 2.2.x


Spark Streaming Example

首先,我们先看一个 Spark Streaming 程序的例子 (取自 Spark Streaming Example, 删除了部分无关代码和注释)

  1. objectNetworkWordCount {

  2.  def main(args: Array[String]) {

  3.    // 通过指定的host和ip,以socket的形式读取每一行数据,以1秒为批次,计算每批数据的word count并打印

  4.    val host = args(0)

  5.    val port = args(1).toInt

  6.    val sparkConf = newSparkConf().setAppName("NetworkWordCount")

  7.    val ssc = newStreamingContext(sparkConf, Seconds(1))

  8.    val lines = ssc.socketTextStream(host, port, StorageLevel.MEMORY_AND_DISK_SER)

  9.    val words = lines.flatMap(_.split(" "))

  10.    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

  11.    wordCounts.print()

  12.    ssc.start()

  13.    ssc.awaitTermination()

  14.  }

  15. }


Streaming Context

通过例子,我们可以看到,整个 Spark Streaming 的入口是 StreamingContext 这个类。 下图为 StreamingContext 这个类中比较重要的两个对象。

DStreamGraph 会持有该 StreamingContext 的所有输入流,以及输出流。 JobScheduler 则分别通过 ReceiverTracker 来管理所有的 Receiver 以及处理 Receiver 接收的数据; 通过 JobGenerator 定时生成每个批次的 RDD 任务,提交给 Executor 执行。

这里我们看一下 DStreamGraph 这个类,通过它引入了几个新的类 DStream, InputDStream, Receiver。

DStream

DStream 是 Spark 对 RDD 在时间维度上的一个封装,代表了一个 RDD 的流。 (个人理解是为了在 Spark 工作栈方面的统一,但也正因为这个问题,Spark Streaming 暂时不支持实时消息处理)

如果 Spark Core 的处理方式是 RDD 组成的 DAG, 那么 Spark Streaming 就是 DStream 组成的 DAG(Streaming 中没有明确的 DAG 概念)。

DStream 的 dependencies 对象的作用和 RDD 的 deps 类似,都是为了把这个图描述出来。 DStream 的 generatedRDDs 对象是用于保存时间戳和 RDD 的映射关系,也就是上面提到的 DStream 是一个 RDD 的流。 这个时间戳就和我们初始化 StreamingContext 时的 batchDuration 参数有关。而对应的 RDD 可以理解为这段时间内的数据。 其实我们对 Streaming 的操作最后都会映射到对这个对应的 RDD 进行操作。

插句题外话,我们去翻看 WindowedDStream 的 compute()方法的话,会发现它是对一段时间内的 RDD 做了一个 union 操作 , 把它们当做同一个 RDD 来看待。

Output Operator

对于 RDD 具备 transform 和 action 两类操作 , DStream 呢? DStream 有一种叫 output operator 的操作,核心是调用了 foreachRDD()这个方法。(print, saveAsxxx 也都是间接使用了这个方法) 而在 foreachRDD()起到关键性作用的就是,它会调用 register()方法 (当然先转化为 ForEachDStream),这个方法会把我们最终的 DStream 加入到 StreamingContext 对象 的 graph(DStreamGraph)的 outputStreams 中。这样,我们一个 Stream 的图 (类似于 RDD 的 DAG)就组装完成了。

拼图的工作暂时告一段落。

DStream的子类们

我们来看一下 DStream 的子类,一部分是通过内部操作生成的子类例如MappedDStream,FilteredDStream 等,另一部分是 InputDStream 也就是我们所处理的数据的数据源的抽象类。

  1. abstractclassInputDStream[T: ClassTag](_ssc: StreamingContext)

  2.  extendsDStream[T](_ssc) {

  3.  ssc.graph.addInputStream(this)

  4.  /** This is an unique identifier for the input stream. */

  5.  val id = ssc.getNewInputStreamId()

这个抽象类里面,我们关注两个点:

1.类对象初始化时,会自动把该对象加入到StreamingContext对象的graph(DStreamGraph)的inputStreams中。 2.每个对象都有从StreamingContext拿到的唯一id。

关于 InputDStream 的子类又分为两种:

  • 从Driver端接收数据的话,直接继承InputDStream即可。

  • 需要分布式接收数据的话,则需要继承InputDStream的子类ReceiverInputDStream。

Driver 端接收数据的方式我们这次不研究。主要看 ReceiverInputDStream。 有些同学可能已经猜到,Spark Streaming 想接收不同的数据源,只需要分别实现 Receiver 类、ReceiverInputDStream 类即可。

这里,对于不同数据源的不同实现我们不做解读。我们主要来看一下这些 Receiver 都是怎么运作的。

StreamingContext的具体执行逻辑

退回到最初的例子,我们的代码中有如下步骤。

1.初始化 StreamingContext;

2.从StreamingContenxt拿到ReceiverInputDStream的子类SocketInputDStream;

3.进过各种 DStream 的转化,最终调用 print()方法(内部调用 foreachRDD()),把最终的 F orEachDStream 加入到 graph 的 outputStreams 中;

4.调用 StreamingContext 的 start()方法,触发整个逻辑的执行。

那么在 StreamingContext 的 start()方法做了什么呢? 主要逻辑在一个叫 streaming-start 的线程中,核心逻辑只有一行代码

scheduler.start()

这个对象在我们第一个图里是有体现的。类型是 JobScheduler。


JobScheduler.start()

我们进入 JobScheduler 的 start()方法看一下。 有以下几个主要动作:

1.初始化内部消息队列eventLoop 2.初始化并启动receiverTracker 3.启动jobGenerator 4.初始化并启动executorAllocationManager

我们重点关注一下在第一个图中有体现的 receiverTracker 和 jobGenerator

ReceiverTracker.start()

ReceiverTracker 的 start()方法

1.注册一个RPC endpoint对象用于RPC通信 2.调用launchReceivers()方法

launchReceivers()方法做了什么呢?

  1. privatedef launchReceivers(): Unit = {

  2.    // 拿到graph中inputDStreams里面所有是ReceiverInputDStream子类的对象

  3.    val receivers = receiverInputStreams.map { nis =>

  4.      // 对这些对象调用getReceiver()方法,拿到对应的Receiver对象,并把上文的InputDStream的id塞给对应的receiver。

  5.      val rcvr = nis.getReceiver()

  6.      rcvr.setReceiverId(nis.id)

  7.      rcvr

  8.    }

  9.    // 调用runDummySparkJob()方法,该方法是为了保证所有的slave节点都已经注册,防止所有的receiver任务都发布在同一个节点。

  10.    runDummySparkJob()

  11.    logInfo("Starting " + receivers.length + " receivers")

  12.    // 通过RPC endpoint对象发送StartAllReceivers消息。

  13.    endpoint.send(StartAllReceivers(receivers))

  14.  }

StartAllReceivers 消息传到哪了?怎么处理? 翻到 ReceiverTrackerEndpoint的receive()方法

  1. overridedef receive: PartialFunction[Any, Unit] = {

  2.      // Local messages

  3.      caseStartAllReceivers(receivers) =>

  4.        // 根据receivers和executors的信息,分配receivers应该分发到哪些节点处理。

  5.        // 主要原则是如果receiver对象重写了preferredLocation方法,就按该方法进行分配。

  6.        // 否则就尽量分散分配。

  7.        val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)

  8.        for (receiver <- receivers) {

  9.          // 拿到上面处理好的executor对象

  10.          val executors = scheduledLocations(receiver.streamId)

  11.          // 更新内部的receiver和executor对应关系表(hashMap)

  12.          updateReceiverScheduledExecutors(receiver.streamId, executors)

  13.          // 更新内部的receiver preferedLocations对象(hashMap)

  14.          receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation

  15.          // 发布receiver任务

  16.          startReceiver(receiver, executors)

  17.        }

  18.        ...

  19.        }

startReceiver 方法的内容我精简一下 , 并适当调换顺序

  1. privatedef startReceiver(

  2.        receiver: Receiver[_],

  3.        scheduledLocations: Seq[TaskLocation]): Unit = {

  4.      // 我们自己构造一个RDD,这个makeRDD方法可以把数据生成到指定的location。

  5.      val receiverRDD: RDD[Receiver[_]] ={

  6.          val preferredLocations = scheduledLocations.map(_.toString).distinct

  7.          ssc.sc.makeRDD(Seq(receiver -> preferredLocations))

  8.        }

  9.      val checkpointDirOption = Option(ssc.checkpointDir)

  10.      val serializableHadoopConf =

  11.        newSerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

  12.      // 构造一个方法,主要内容是如何让receiver在worker上执行。

  13.      val startReceiverFunc: Iterator[Receiver[_]] => Unit =

  14.        (iterator: Iterator[Receiver[_]]) => {

  15.            val receiver = iterator.next()

  16.            // 初始化ReceiverSupervisorImpl的对象,把我们的receiver作为构造参数传入。

  17.            val supervisor = newReceiverSupervisorImpl(

  18.              receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)

  19.            // 调用该对象的start()方法

  20.            supervisor.start()

  21.            supervisor.awaitTermination()

  22.        }

  23.      // 以提交rdd任务的形式,对上述rdd执行我们构造的startReceiverFunc方法。

  24.      val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](

  25.        receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())

  26.    }

ReceiverSupervisorImpl 具体内容,我们因为篇幅有限,暂不扩展。有兴趣的同学可以自己去研究。 主要是怎么定时把接收到的数据存为 Spark 的 Block,并告知 blockManager。当然还有 WAL 的逻辑。

所以为什么我们要在提交 Streaming 任务申请资源的时候,给 receiver 预留资源。 这里我们就找到了答案:receiver 其实是被当做 rdd 的 job 发到 executor 去执行的。

到这里,我们的 ReceiverTracker 也就告一段落了。

JobGenerator.start()

接下来是 jobGenerator JobGenerator 的 start()方法

1.初始化内部消息队列eventLoop 2.调用startFirstTime()方法

我们看一下 startFirstTime 方法的内容,非常简单

  1. privatedef startFirstTime() {

  2.    val startTime = newTime(timer.getStartTime())

  3.    // 调用ssc.graph的start方法,主要是调用graph中每个inputDStreams对象的start()方法

  4.    // 目前除了0.10的DirectKafkaInputDStream,其他子类都是空实现,所以我们不去研究。

  5.    graph.start(startTime - graph.batchDuration)

  6.    // 启动一个定时触发器,主要是每间隔batchDuration就往自身的消息队列里面发一条GenerateJobs消息。

  7.    timer.start(startTime.milliseconds)

  8.    logInfo("Started JobGenerator at " + startTime)

  9.  }

timer 定时发送的消息的逻辑如下:

  1. private val timer = newRecurringTimer(clock, ssc.graph.batchDuration.milliseconds,

  2.    longTime => eventLoop.post(GenerateJobs(newTime(longTime))), "JobGenerator")

我们关注一下定时消息 GenerateJobs(new Time(longTime))的后续处理。 我这里把非核心逻辑的代码剔除掉。

  1.  privatedef generateJobs(time: Time) {

  2.    Try {

  3.      // 通过我们前面提到的receiverTracker关联batchTime和block信息,这里不展开里面的内容。

  4.      jobScheduler.receiverTracker.allocateBlocksToBatch(time)

  5.      // 对graph的outputDStreams中每个对象调用generateJob方法,组成一个Job数组(Seq)

  6.      graph.generateJobs(time)

  7.    } match {

  8.      caseSuccess(jobs) =>

  9.        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)

  10.        // 这里提交任务

  11.        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

  12.    }

  13.  }

这里会把任务封装为 JobSet, 然后调用 jobScheduler.submitJobSet()方法。我们看一下这个方法的核心调用

  1. def submitJobSet(jobSet: JobSet) {

  2.    // jobExecutor是一个线程池,pool的大小通过spark.streaming.concurrentJobs配置,默认为1

  3.    jobSet.jobs.foreach(job => jobExecutor.execute(newJobHandler(job)))

  4.  }

我们看一下 JobHandler 的 run 方法同样把非核心逻辑剔除

  1. def run() {

  2.    // 避免JobScheduler调用stop(false)方法时把eventLoop置位null导致空指针异常。

  3.    var _eventLoop = eventLoop

  4.    if (_eventLoop != null) {

  5.      // 处理各种通知

  6.      _eventLoop.post(JobStarted(job, clock.getTimeMillis()))

  7.      // Disable checks for existing output directories in jobs launched by the streaming

  8.      // scheduler, since we may need to write output to an existing directory during checkpoint

  9.      // recovery; see SPARK-4835 for more details.

  10.      SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {

  11.        // 主要调用在此

  12.        job.run()

  13.      }

  14.      _eventLoop = eventLoop

  15.      if (_eventLoop != null) {

  16.        // 处理各种通知

  17.        _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))

  18.      }

  19.    }

  20. }

这里主要是调用了 job 的 run()方法,而 run()方法是调用我们初始化 Job 是传入的第二个构造参数 func,所以我们返回到这个 job 生成的地方。也就是 graph.generateJobs(time)这里。删去非核心逻辑

  1. def generateJobs(time: Time): Seq[Job] = {

  2.    val jobs = this.synchronized {

  3.      outputStreams.flatMap { outputStream =>

  4.        val jobOption = outputStream.generateJob(time)

  5.        jobOption

  6.      }

  7.    }

  8.    jobs

  9.  }

我们的 job 是对 graph 的 outputDStreams 中每个对象调用 generateJob()方法得来的。 这里要明确一点,目前版本为止,我们的这个 outputDStreams 只能是 ForEachDStream 这个类的实例, 具体原因参见上面 output operator 部分的 register()方法。 所以我们要看 ForEachDStream 的 generateJob()方法的实现。

  1. private[streaming]

  2. classForEachDStream[T: ClassTag] (

  3.    parent: DStream[T],

  4.    foreachFunc: (RDD[T], Time) => Unit,

  5.    displayInnerRDDOps: Boolean

  6.  ) extendsDStream[Unit](parent.ssc) {

  7.  overridedef dependencies: List[DStream[_]] = List(parent)

  8.  overridedef slideDuration: Duration = parent.slideDuration

  9.  overridedef compute(validTime: Time): Option[RDD[Unit]] = None

  10.  overridedef generateJob(time: Time): Option[Job] = {

  11.    parent.getOrCompute(time) match {

  12.      caseSome(rdd) =>

  13.        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {

  14.          foreachFunc(rdd, time)

  15.        }

  16.        Some(newJob(time, jobFunc))

  17.      caseNone => None

  18.    }

  19.  }

  20. }

这里似乎卡主了,因为目前是看不到任何通过 sparkContext 提交 RDD 任务的逻辑。 不过,再次看一下我们的例子。我们在例子最后,调用了 DStream 的 print()方法。这个 output operator,我们看一下这个方法的逻辑。

  1. defprint(num: Int): Unit = ssc.withScope {

  2.    def foreachFunc: (RDD[T], Time) => Unit = {

  3.      (rdd: RDD[T], time: Time) => {

  4.        val firstNum = rdd.take(num + 1)

  5.        // scalastyle:off println

  6.        println("-------------------------------------------")

  7.        println(s"Time: $time")

  8.        println("-------------------------------------------")

  9.        firstNum.take(num).foreach(println)

  10.        if (firstNum.length > num) println("...")

  11.        println()

  12.        // scalastyle:on println

  13.      }

  14.    }

  15.    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)

  16.  }

**val firstNum = rdd.take(num + 1) 这里调用了 rdd 的 take()方法,而 take()方法是一个 rdd 的 action 操作 (action 操作会调用 sparkContext 的 runJob()方法提交 rdd 任务到集群)。 这端代码经过两个函数的包装:

1.包装为 foreachFunc,传给 ForEachDStream 的构造参数。 2.在 ForEachDStream 的 generateJob()方法中,包装为 jobFunc 传给 Job 类的第二个构造方法。 所以我们的 DStream 中的 output operator 是会转化为 RDD 任务提交到集群处理。

这里,我们的主要流程就走完了。

总结:

主要是两点内容:

1.Receiver 会作为 RDD 任务提交到集群执行。 2.DStream 最终的执行形式也是转化为 RDD 任务进行提交。





About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK