Categories
程式開發

Spark Streaming源碼分析:DStream的內置和運行


貝殼實時計算平台已承接公司各種實時數據任務100多個,日處理消費總量接近250億,採用的底層引擎包括Spark-Streaming和Flink。為了提升數據處理速度和穩定性,我們對各種引擎的處理特性進行了深入研究。在Spark-Streaming中,對流的抽像是使用DStream來定義的,所以想要理解Spark-Streaming的流處理模型,理解DStream的內部實現以及其如何構建和運行是很有必要的。

DStream的構建

我們在定義一個流的處理邏輯時,首先從一個數據的流入源開始,這個數據源使用InputDStream定義,它是DStream的一個子類,之後我們會在其上調用一些tranform類型算子,像map, reduce,filter等等,每調用一個算子,都會創建一個新的DStream,每一個新創建的DStream都保留著當前節點所依賴的上一個節點和當前節點的執行邏輯這兩個部分,這樣,多個DStream節點就構成了一個邏輯執行鏈。

比如如下代碼會生成圖1的執行鏈

stream.map(...).filter(...).foreachRDD(...)

Spark Streaming源碼分析:DStream的內置和運行 1

虛線箭頭描述的是依賴關係

最後當調用action類型的算子時,所有action類型的算子,底層都是通過ForEachDStream實現的。

我們來看ForEachDStream的源碼

private(streaming) class ForEachDStream(T: ClassTag) (
    parent: DStream(T),
    foreachFunc: (RDD(T), Time) => Unit,
    displayInnerRDDOps: Boolean
  ) extends DStreamUnit {

  override def dependencies: List(DStream(_)) = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option(RDD(Unit)) = None

  override def generateJob(time: Time): Option(Job) = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }
}

這裡我們關注generateJob方法,這裡調用了它依賴的父DStream的getOrCompute來生成一個需要它來處理的RDD,然後對該RDD做該節點本身需要做的一些操作,即foreachFunc閉包,其實所有DStream的getOrCompute方法底層都會調用compute方法,該方法會返回RDD,即所有的DStream的compute方法中,要么其本身能夠從外部拉取數據,即InputDStream作為DStream鏈的第一個節點,要么其本身調用依賴的上游DStream的compute方法,再對生成的RDD做其本節點所定義的一些操作作為其返回值。

如此,當DStream鏈的最後一個節點被調用了compute方法時,它能夠依次遞歸的調用逐節點的compute方法,最後調用第一個InputDStream節點的compute方法生成一個能夠拉取外部數據的RDD。

其調用的時序圖如下

Spark Streaming源碼分析:DStream的內置和運行 2

以上只是為了直觀的理解DStream鍊是如何工作的,具體體現在分佈式環境上時,是由RDD來定義操作,切分成task後由Executor來執行。

另外需要說的是如果我們在單個流上定義一系列除window外的操作,其和我們直接調用InputDStream的foreachRDD後,在rdd上定義操作是等效的。

DStream的運行

除了上面介紹的DStream之外,在Spark-Streaming內部還有一些保存作業處理邏輯的模塊和用於根據時間生成和管理每個批次數據的模塊。下面是在SparkStreaming中一些比較核心的類,他們是構成一個流式作業,和使其運行起來的框架。

  1. InputDStream 管理流的數據源的通用抽像類
  2. JobGenerator 作業生成器
  3. JobScheduler 作業調度器,用於提交作業到集群運行
  4. DStreamGraph 管理創建的所有InputDStream的初始化和啟動,但不負責InputDStream間依賴關係的管理,InputDStream間依賴關係由其子類實現管理

首先看一下這四個類交互的時序圖

Spark Streaming源碼分析:DStream的內置和運行 3

圖中只畫了一些比較重要和核心的類和邏輯。 JobGenerator每隔我們設定的時間間隔會生成一個JobGeneratorEvent事件用於觸發生成一個作業。其內部是通過RecurringTimer類和EventLoop實現的。

代碼如下:初始化timer和eventLoop。

  private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

  def start(): Unit = synchronized {
    if (eventLoop != null) return // generator has already been started

    // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
    // See SPARK-10125
    checkpointWriter

    eventLoop = new EventLoopJobGeneratorEvent {
      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = {
        jobScheduler.reportError("Error in job generator", e)
      }
    }
    eventLoop.start()

    if (ssc.isCheckpointPresent) {
      restart()
    } else {
      startFirstTime()
    }
  }

這裡processEvent方法用來做消息分發,根據消息的不同類型調用不同的函數進行處理。

  private def processEvent(event: JobGeneratorEvent) {
    logDebug("Got event " + event)
    event match {
      case GenerateJobs(time) => generateJobs(time)
      case ClearMetadata(time) => clearMetadata(time)
      case DoCheckpoint(time, clearCheckpointDataLater) =>
        doCheckpoint(time, clearCheckpointDataLater)
      case ClearCheckpointData(time) => clearCheckpointData(time)
    }
  }

startFirstTime中調用了timer和DStreamGraph的start方法,將二者內部的事件循環線程啟動起來。

到這裡我們知道當timer根據時間生成GenerateJobs事件時,會觸發generateJobs函數的調用。

我們來看generateJobs的代碼

  private def generateJobs(time: Time) {
    // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
    // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
        PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }

其內部又調用了DStreamGraph的generateJobs方法用來生成多個Job,之後通過JobScheduler對這些Job進行提交。

DStreamGraph底層生成作業的過程是DStreamGraph實現的,它會遍歷所有註冊過的ForEachDStream,並分別調用他們的generateJob方法,返回一個Job對象,這就跟我們上面講過的ForEachDStream部分關聯上了。

Job裡麵包含了一個需要在其上執行計算的RDD,包括所有計算邏輯的閉包,而這個閉包真正執行,是在JobScheduler將這個Job對象提交到一個線程池之後,其會在線程池內執行這個Job對象內的閉包邏輯,將其轉換成分佈式計算的task分發到不同的節點上去執行。

JobScheduler.submitJobSet的如下代碼

  def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
      logInfo("Added jobs for time " + jobSet.time)
    }
  }

jobExecutor就是那個線程池,其隊列長度可通過spark.streaming.concurrentJobs進行配置,默認等於1。該參數決定了能夠並發執行的Job數量。

JobHandler是Job的封裝,會在執行Job的邏輯前後分別發布JobStarted和JobCompleted事件。而Job對象真正執行的邏輯就是在ForEachDStream類中的創建Job時foreachFunc閉包。

Spark-Streaming在貝殼的使用

目前貝殼的實時計算平台上運行著143個實時任務,其中121個為Spark任務,62個為Spark-Streaming任務,59個為Structured Streaming任務。為了實現組內任務的快速開發,我們在Spark實時技術棧的基礎上抽像出了Chronus框架,將流計算抽像出Source,Sink和Pipeline,其中Source和Sink可通過配置直接定義,開發時能夠將精力集中在Pipeline中的業務邏輯上。

框架對監控,offset管理都做了封裝。同時,我們基於Chronus框架也開發除了很多任務模板,能夠讓用戶通過一些簡單的可視化配置直接進行流式任務的開發,其中最典型的要屬SQL模板:

Spark Streaming源碼分析:DStream的內置和運行 4

上述的那59個Structured Streaming就是基於該SQL模板開發的實時任務。

作者介紹

顧淵離,目前負責貝殼大數據部實時計算平台底層引擎和實時場景模板相關開發工作。

本文轉載自公眾號貝殼產品技術。

原文鏈接

https://mp.weixin.qq.com/s/rkzIo8sweI2UOOnmOj6SQQ