Spark 2.0 Structred Streaming之Streaming Query分析

12/28 编程语言 阅读 526 views 次 人气 0
摘要:

Structred Streaming之Streaming Query分析

  1. 在用户的应用程序中,用户会调用DataStreamWriter.start()方法发起一个Streaming query。
  2. 在DataStreamWriter中,会调用df.sparkSession.sessionState.streamingQueryManager.startQuery方法开始查询。
  3. StreamingQueryManager调用createQuery私有方法创建Query。

    private def createQuery(
          userSpecifiedName: Option[String],
          userSpecifiedCheckpointLocation: Option[String],
          df: DataFrame,
          sink: Sink,
          outputMode: OutputMode,
          useTempCheckpointLocation: Boolean,
          recoverFromCheckpointLocation: Boolean,
          trigger: Trigger,
          triggerClock: Clock): StreamingQueryWrapper

    StreamingQueryWrapper是StreamExecution的一个Wrapper:

    class StreamExecution( override val sparkSession: SparkSession, override val name: String, checkpointRoot: String, analyzedPlan: LogicalPlan, val sink: Sink, val trigger: Trigger, val triggerClock: Clock, val outputMode: OutputMode)
      extends StreamingQuery with ProgressReporter with Logging
  4. StreamingQueryManager调用query.streamingQuery.start()方法,在StreamExecution中启动microBatchThread线程,在该线程中执行runBatches()核心方法。
    • 在triggerExecutor.execute()方法中触发事件处理:
      • 从故障中恢复:populateStartOffsets(),从Sink中恢复,避免重复处理
      • 正常情况:constructNextBatch() ,轮训所有数据源,看是否有新数据需要处理

评论

该文章不支持评论!

分享到: