HeartSaVioR commented on code in PR #45109:
URL: https://github.com/apache/spark/pull/45109#discussion_r1495255965


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -414,35 +497,91 @@ trait ProgressReporter extends Logging {
     }
   }
 
-  /** Extracts observed metrics from the most recent query execution. */
-  private def extractObservedMetrics(
-      hasNewData: Boolean,
-      lastExecution: QueryExecution): Map[String, Row] = {
-    if (!hasNewData || lastExecution == null) {
-      return Map.empty
+  /** Extract statistics about stateful operators from the executed query 
plan. */
+  private def extractStateOperatorMetrics(
+    lastExecution: IncrementalExecution): Seq[StateOperatorProgress] = {
+    assert(lastExecution != null, "lastExecution is not available")
+    lastExecution.executedPlan.collect {
+      case p if p.isInstanceOf[StateStoreWriter] =>
+        p.asInstanceOf[StateStoreWriter].getProgress()
     }
-    lastExecution.observedMetrics
   }
 
-  /** Records the duration of running `body` for the next query progress 
update. */
-  protected def reportTimeTaken[T](triggerDetailKey: String)(body: => T): T = {
-    val startTime = triggerClock.getTimeMillis()
-    val result = body
-    val endTime = triggerClock.getTimeMillis()
-    val timeTaken = math.max(endTime - startTime, 0)
+  /** Extracts statistics from the most recent query execution. */
+  private def extractExecutionStats(
+    hasNewData: Boolean,

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -414,35 +497,91 @@ trait ProgressReporter extends Logging {
     }
   }
 
-  /** Extracts observed metrics from the most recent query execution. */
-  private def extractObservedMetrics(
-      hasNewData: Boolean,
-      lastExecution: QueryExecution): Map[String, Row] = {
-    if (!hasNewData || lastExecution == null) {
-      return Map.empty
+  /** Extract statistics about stateful operators from the executed query 
plan. */
+  private def extractStateOperatorMetrics(
+    lastExecution: IncrementalExecution): Seq[StateOperatorProgress] = {
+    assert(lastExecution != null, "lastExecution is not available")
+    lastExecution.executedPlan.collect {
+      case p if p.isInstanceOf[StateStoreWriter] =>
+        p.asInstanceOf[StateStoreWriter].getProgress()
     }
-    lastExecution.observedMetrics
   }
 
-  /** Records the duration of running `body` for the next query progress 
update. */
-  protected def reportTimeTaken[T](triggerDetailKey: String)(body: => T): T = {
-    val startTime = triggerClock.getTimeMillis()
-    val result = body
-    val endTime = triggerClock.getTimeMillis()
-    val timeTaken = math.max(endTime - startTime, 0)
+  /** Extracts statistics from the most recent query execution. */
+  private def extractExecutionStats(
+    hasNewData: Boolean,
+    sourceToNumInputRows: Map[SparkDataStream, Long],
+    lastExecution: IncrementalExecution): ExecutionStats = {
+    val hasEventTime = progressReporter.logicalPlan().collect {
+      case e: EventTimeWatermark => e
+    }.nonEmpty
 
-    val previousTime = currentDurationsMs.getOrElse(triggerDetailKey, 0L)
-    currentDurationsMs.put(triggerDetailKey, previousTime + timeTaken)
-    logDebug(s"$triggerDetailKey took $timeTaken ms")
-    result
+    val watermarkTimestamp =
+      if (hasEventTime) {
+        Map("watermark" -> 
progressReporter.formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
+      } else Map.empty[String, String]
+
+    // SPARK-19378: Still report metrics even though no data was processed 
while reporting progress.
+    val stateOperators = extractStateOperatorMetrics(lastExecution)
+
+    val sinkOutput = sinkCommitProgress.map(_.numOutputRows)
+
+    if (!hasNewData) {
+      return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp, 
sinkOutput)
+    }
+
+    val eventTimeStats = lastExecution.executedPlan
+      .collect {
+        case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
+          val stats = e.eventTimeStats.value
+          Map(
+            "max" -> stats.max,
+            "min" -> stats.min,
+            "avg" -> stats.avg.toLong).transform((_, v) => 
progressReporter.formatTimestamp(v))
+      }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
+
+    ExecutionStats(sourceToNumInputRows, stateOperators, eventTimeStats.toMap, 
sinkOutput)
   }
 
-  protected def formatTimestamp(millis: Long): String = {
-    timestampFormat.format(Instant.ofEpochMilli(millis))
+  /**
+   * Reset values in the execution stats to removes the values which are 
specific to the batch.
+   * New execution stats will only retain the values as a snapshot of the 
query status.
+   * (E.g. for stateful operators, numRowsTotal is a snapshot of the status, 
whereas
+   * numRowsUpdated is bound to the batch.)
+   * TODO: We do not seem to clear up all values in StateOperatorProgress 
which are bound to the
+   * batch. Fix this.
+   */
+  private def resetExecStatsForNoExecution(originExecStats: ExecutionStats): 
ExecutionStats = {
+    val newStatefulOperators = originExecStats.stateOperators.map { so =>
+      so.copy(newNumRowsUpdated = 0, newNumRowsDroppedByWatermark = 0)
+    }
+    val newEventTimeStats = if 
(originExecStats.eventTimeStats.contains("watermark")) {
+      Map("watermark" -> 
progressReporter.formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
+    } else {
+      Map.empty[String, String]
+    }
+    val newOutputRows = originExecStats.outputRows.map(_ => 0L)
+    originExecStats.copy(
+      inputRows = Map.empty[SparkDataStream, Long],
+      stateOperators = newStatefulOperators,
+      eventTimeStats = newEventTimeStats,
+      outputRows = newOutputRows)
   }
 
-  /** Updates the message returned in `status`. */
-  protected def updateStatusMessage(message: String): Unit = {
-    currentStatus = currentStatus.copy(message = message)
+  /** Extracts observed metrics from the most recent query execution. */
+  private def extractObservedMetrics(
+    lastExecution: QueryExecution): Map[String, Row] = {

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -135,75 +219,121 @@ trait ProgressReporter extends Logging {
    * Record the offsets range this trigger will process. Call this before 
updating
    * `committedOffsets` in `StreamExecution` to make sure that the correct 
range is recorded.
    */
-  protected def recordTriggerOffsets(
-      from: StreamProgress,
-      to: StreamProgress,
-      latest: StreamProgress): Unit = {
+  def recordTriggerOffsets(
+    from: StreamProgress,

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -135,75 +219,121 @@ trait ProgressReporter extends Logging {
    * Record the offsets range this trigger will process. Call this before 
updating
    * `committedOffsets` in `StreamExecution` to make sure that the correct 
range is recorded.
    */
-  protected def recordTriggerOffsets(
-      from: StreamProgress,
-      to: StreamProgress,
-      latest: StreamProgress): Unit = {
+  def recordTriggerOffsets(
+    from: StreamProgress,
+    to: StreamProgress,
+    latest: StreamProgress): Unit = {
     currentTriggerStartOffsets = from.transform((_, v) => v.json)
     currentTriggerEndOffsets = to.transform((_, v) => v.json)
     currentTriggerLatestOffsets = latest.transform((_, v) => v.json)
     latestStreamProgress = to
+    currentTriggerLatestOffsets = latest.transform((_, v) => v.json)
   }
 
-  private def addNewProgress(newProgress: StreamingQueryProgress): Unit = {
-    progressBuffer.synchronized {
-      progressBuffer += newProgress
-      while (progressBuffer.length >= 
sparkSession.sessionState.conf.streamingProgressRetention) {
-        progressBuffer.dequeue()
-      }
-    }
-  }
+  /** Finalizes the trigger which did not execute a batch. */
+  def finishNoExecutionTrigger(lastExecutedEpochId: Long): Unit = {
+    currentTriggerEndTimestamp = triggerClock.getTimeMillis()
+    val processingTimeMills = currentTriggerEndTimestamp - 
currentTriggerStartTimestamp
 
-  private def updateProgress(newProgress: StreamingQueryProgress): Unit = {
-    // Reset noDataEventTimestamp if we processed any data
-    lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
+    val execStatsOnNoExecution = 
execStatsOnLatestExecutedBatch.map(resetExecStatsForNoExecution)
 
-    addNewProgress(newProgress)
-    postEvent(new QueryProgressEvent(newProgress))
-    logInfo(s"Streaming query made progress: $newProgress")
-  }
+    val newProgress = constructNewProgress(processingTimeMills, 
lastExecutedEpochId,
+      execStatsOnNoExecution, Map.empty[String, Row])
 
-  private def updateIdleness(newProgress: StreamingQueryProgress): Unit = {
-    val now = triggerClock.getTimeMillis()
-    if (now - noDataProgressEventInterval >= lastNoExecutionProgressEventTime) 
{
-      addNewProgress(newProgress)
-      if (lastNoExecutionProgressEventTime > Long.MinValue) {
-        postEvent(new QueryIdleEvent(newProgress.id, newProgress.runId,
-          formatTimestamp(currentTriggerStartTimestamp)))
-        logInfo(s"Streaming query has been idle and waiting for new data more 
than " +
-          s"$noDataProgressEventInterval ms.")
-      }
+    progressReporter.updateIdleness(id, runId, currentTriggerStartTimestamp, 
newProgress)
 
-      lastNoExecutionProgressEventTime = now
-    }
+    warnIfFinishTriggerTakesTooLong(currentTriggerEndTimestamp, 
processingTimeMills)
+
+    currentStatus = currentStatus.copy(isTriggerActive = false)
+  }
+
+  /**
+   * Retrieve a measured duration
+   */
+  def getDuration(key: String): Option[Long] = {
+    currentDurationsMs.get(key)
   }
 
   /**
    * Finalizes the query progress and adds it to list of recent status updates.
    *
    * @param hasNewData Whether the sources of this stream had new data for 
this trigger.
-   * @param hasExecuted Whether any batch was executed during this trigger. 
Streaming queries that
-   *                    perform stateful aggregations with timeouts can still 
run batches even
-   *                    though the sources don't have any new data.
    */
-  protected def finishTrigger(hasNewData: Boolean, hasExecuted: Boolean): Unit 
= {
-    assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != 
null &&
-      currentTriggerLatestOffsets != null)
+  def finishTrigger(
+    hasNewData: Boolean,
+    sourceToNumInputRowsMap: Map[SparkDataStream, Long],
+    lastExecution: IncrementalExecution,
+    lastEpochId: Long): Unit = {
+    assert(
+      currentTriggerStartOffsets != null && currentTriggerEndOffsets != null &&
+        currentTriggerLatestOffsets != null
+    )
     currentTriggerEndTimestamp = triggerClock.getTimeMillis()
-
-    val executionStats = extractExecutionStats(hasNewData, hasExecuted)
     val processingTimeMills = currentTriggerEndTimestamp - 
currentTriggerStartTimestamp
+    assert(lastExecution != null, "executed batch should provide the 
information for execution.")
+    val execStats = extractExecutionStats(hasNewData, sourceToNumInputRowsMap, 
lastExecution)
+    logDebug(s"Execution stats: $execStats")
+
+    val observedMetrics = extractObservedMetrics(lastExecution)
+    val newProgress = constructNewProgress(processingTimeMills, lastEpochId, 
Some(execStats),
+      observedMetrics)
+
+    progressReporter.lastNoExecutionProgressEventTime = 
triggerClock.getTimeMillis()
+    progressReporter.updateProgress(newProgress)
+
+    // Update the value since this trigger executes a batch successfully.
+    this.execStatsOnLatestExecutedBatch = Some(execStats)
+
+    warnIfFinishTriggerTakesTooLong(currentTriggerEndTimestamp, 
processingTimeMills)
+
+    currentStatus = currentStatus.copy(isTriggerActive = false)
+  }
+
+  private def constructNewProgress(
+    processingTimeMills: Long,

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -414,35 +497,91 @@ trait ProgressReporter extends Logging {
     }
   }
 
-  /** Extracts observed metrics from the most recent query execution. */
-  private def extractObservedMetrics(
-      hasNewData: Boolean,
-      lastExecution: QueryExecution): Map[String, Row] = {
-    if (!hasNewData || lastExecution == null) {
-      return Map.empty
+  /** Extract statistics about stateful operators from the executed query 
plan. */
+  private def extractStateOperatorMetrics(
+    lastExecution: IncrementalExecution): Seq[StateOperatorProgress] = {

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -220,94 +350,47 @@ trait ProgressReporter extends Logging {
         metrics = sourceMetrics
       )
     }
+  }
 
-    val sinkOutput = if (hasExecuted) {
-      sinkCommitProgress.map(_.numOutputRows)
-    } else {
-      sinkCommitProgress.map(_ => 0L)
-    }
-
+  private def extractSinkProgress(execStats: Option[ExecutionStats]): 
SinkProgress = {
+    val sinkOutput = execStats.flatMap(_.outputRows)
     val sinkMetrics = sink match {
-      case withMetrics: ReportsSinkMetrics =>
-        withMetrics.metrics()
+      case withMetrics: ReportsSinkMetrics => withMetrics.metrics()
       case _ => Map[String, String]().asJava
     }
 
-    val sinkProgress = SinkProgress(
-      sink.toString, sinkOutput, sinkMetrics)
-
-    val observedMetrics = extractObservedMetrics(hasNewData, lastExecution)
-
-    val newProgress = new StreamingQueryProgress(
-      id = id,
-      runId = runId,
-      name = name,
-      timestamp = formatTimestamp(currentTriggerStartTimestamp),
-      batchId = currentBatchId,
-      batchDuration = processingTimeMills,
-      durationMs =
-        new java.util.HashMap(currentDurationsMs.toMap.transform((_, v) => 
long2Long(v)).asJava),
-      eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava),
-      stateOperators = executionStats.stateOperators.toArray,
-      sources = sourceProgress.toArray,
-      sink = sinkProgress,
-      observedMetrics = new java.util.HashMap(observedMetrics.asJava))
-
-    if (hasExecuted) {
-      updateProgress(newProgress)
-    } else {
-      updateIdleness(newProgress)
-    }
-
-    currentStatus = currentStatus.copy(isTriggerActive = false)
+    SinkProgress(sink.toString, sinkOutput, sinkMetrics)
   }
 
-  /** Extract statistics about stateful operators from the executed query 
plan. */
-  private def extractStateOperatorMetrics(hasExecuted: Boolean): 
Seq[StateOperatorProgress] = {
-    if (lastExecution == null) return Nil
-    // lastExecution could belong to one of the previous triggers if 
`!hasExecuted`.
-    // Walking the plan again should be inexpensive.
-    lastExecution.executedPlan.collect {
-      case p if p.isInstanceOf[StateStoreWriter] =>
-        val progress = p.asInstanceOf[StateStoreWriter].getProgress()
-        if (hasExecuted) {
-          progress
-        } else {
-          progress.copy(newNumRowsUpdated = 0, newNumRowsDroppedByWatermark = 
0)
-        }
-    }
+  /**
+   * Override of finishTrigger to extract the map from IncrementalExecution.
+   */
+  def finishTrigger(
+    hasNewData: Boolean,

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -220,94 +350,47 @@ trait ProgressReporter extends Logging {
         metrics = sourceMetrics
       )
     }
+  }
 
-    val sinkOutput = if (hasExecuted) {
-      sinkCommitProgress.map(_.numOutputRows)
-    } else {
-      sinkCommitProgress.map(_ => 0L)
-    }
-
+  private def extractSinkProgress(execStats: Option[ExecutionStats]): 
SinkProgress = {
+    val sinkOutput = execStats.flatMap(_.outputRows)
     val sinkMetrics = sink match {
-      case withMetrics: ReportsSinkMetrics =>
-        withMetrics.metrics()
+      case withMetrics: ReportsSinkMetrics => withMetrics.metrics()
       case _ => Map[String, String]().asJava
     }
 
-    val sinkProgress = SinkProgress(
-      sink.toString, sinkOutput, sinkMetrics)
-
-    val observedMetrics = extractObservedMetrics(hasNewData, lastExecution)
-
-    val newProgress = new StreamingQueryProgress(
-      id = id,
-      runId = runId,
-      name = name,
-      timestamp = formatTimestamp(currentTriggerStartTimestamp),
-      batchId = currentBatchId,
-      batchDuration = processingTimeMills,
-      durationMs =
-        new java.util.HashMap(currentDurationsMs.toMap.transform((_, v) => 
long2Long(v)).asJava),
-      eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava),
-      stateOperators = executionStats.stateOperators.toArray,
-      sources = sourceProgress.toArray,
-      sink = sinkProgress,
-      observedMetrics = new java.util.HashMap(observedMetrics.asJava))
-
-    if (hasExecuted) {
-      updateProgress(newProgress)
-    } else {
-      updateIdleness(newProgress)
-    }
-
-    currentStatus = currentStatus.copy(isTriggerActive = false)
+    SinkProgress(sink.toString, sinkOutput, sinkMetrics)
   }
 
-  /** Extract statistics about stateful operators from the executed query 
plan. */
-  private def extractStateOperatorMetrics(hasExecuted: Boolean): 
Seq[StateOperatorProgress] = {
-    if (lastExecution == null) return Nil
-    // lastExecution could belong to one of the previous triggers if 
`!hasExecuted`.
-    // Walking the plan again should be inexpensive.
-    lastExecution.executedPlan.collect {
-      case p if p.isInstanceOf[StateStoreWriter] =>
-        val progress = p.asInstanceOf[StateStoreWriter].getProgress()
-        if (hasExecuted) {
-          progress
-        } else {
-          progress.copy(newNumRowsUpdated = 0, newNumRowsDroppedByWatermark = 
0)
-        }
-    }
+  /**
+   * Override of finishTrigger to extract the map from IncrementalExecution.
+   */
+  def finishTrigger(
+    hasNewData: Boolean,
+    lastExecution: IncrementalExecution,
+    lastEpoch: Long): Unit = {
+    val map: Map[SparkDataStream, Long] =
+      if (hasNewData) extractSourceToNumInputRows(lastExecution) else Map.empty
+    finishTrigger(hasNewData, map, lastExecution, lastEpoch)
   }
 
-  /** Extracts statistics from the most recent query execution. */
-  private def extractExecutionStats(hasNewData: Boolean, hasExecuted: 
Boolean): ExecutionStats = {
-    val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e 
}.nonEmpty
-    val watermarkTimestamp =
-      if (hasEventTime) Map("watermark" -> 
formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
-      else Map.empty[String, String]
-
-    // SPARK-19378: Still report metrics even though no data was processed 
while reporting progress.
-    val stateOperators = extractStateOperatorMetrics(hasExecuted)
-
-    if (!hasNewData) {
-      return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
+  private def warnIfFinishTriggerTakesTooLong(
+    triggerEndTimestamp: Long,

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -135,75 +219,121 @@ trait ProgressReporter extends Logging {
    * Record the offsets range this trigger will process. Call this before 
updating
    * `committedOffsets` in `StreamExecution` to make sure that the correct 
range is recorded.
    */
-  protected def recordTriggerOffsets(
-      from: StreamProgress,
-      to: StreamProgress,
-      latest: StreamProgress): Unit = {
+  def recordTriggerOffsets(
+    from: StreamProgress,
+    to: StreamProgress,
+    latest: StreamProgress): Unit = {
     currentTriggerStartOffsets = from.transform((_, v) => v.json)
     currentTriggerEndOffsets = to.transform((_, v) => v.json)
     currentTriggerLatestOffsets = latest.transform((_, v) => v.json)
     latestStreamProgress = to
+    currentTriggerLatestOffsets = latest.transform((_, v) => v.json)
   }
 
-  private def addNewProgress(newProgress: StreamingQueryProgress): Unit = {
-    progressBuffer.synchronized {
-      progressBuffer += newProgress
-      while (progressBuffer.length >= 
sparkSession.sessionState.conf.streamingProgressRetention) {
-        progressBuffer.dequeue()
-      }
-    }
-  }
+  /** Finalizes the trigger which did not execute a batch. */
+  def finishNoExecutionTrigger(lastExecutedEpochId: Long): Unit = {
+    currentTriggerEndTimestamp = triggerClock.getTimeMillis()
+    val processingTimeMills = currentTriggerEndTimestamp - 
currentTriggerStartTimestamp
 
-  private def updateProgress(newProgress: StreamingQueryProgress): Unit = {
-    // Reset noDataEventTimestamp if we processed any data
-    lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
+    val execStatsOnNoExecution = 
execStatsOnLatestExecutedBatch.map(resetExecStatsForNoExecution)
 
-    addNewProgress(newProgress)
-    postEvent(new QueryProgressEvent(newProgress))
-    logInfo(s"Streaming query made progress: $newProgress")
-  }
+    val newProgress = constructNewProgress(processingTimeMills, 
lastExecutedEpochId,
+      execStatsOnNoExecution, Map.empty[String, Row])
 
-  private def updateIdleness(newProgress: StreamingQueryProgress): Unit = {
-    val now = triggerClock.getTimeMillis()
-    if (now - noDataProgressEventInterval >= lastNoExecutionProgressEventTime) 
{
-      addNewProgress(newProgress)
-      if (lastNoExecutionProgressEventTime > Long.MinValue) {
-        postEvent(new QueryIdleEvent(newProgress.id, newProgress.runId,
-          formatTimestamp(currentTriggerStartTimestamp)))
-        logInfo(s"Streaming query has been idle and waiting for new data more 
than " +
-          s"$noDataProgressEventInterval ms.")
-      }
+    progressReporter.updateIdleness(id, runId, currentTriggerStartTimestamp, 
newProgress)
 
-      lastNoExecutionProgressEventTime = now
-    }
+    warnIfFinishTriggerTakesTooLong(currentTriggerEndTimestamp, 
processingTimeMills)
+
+    currentStatus = currentStatus.copy(isTriggerActive = false)
+  }
+
+  /**
+   * Retrieve a measured duration
+   */
+  def getDuration(key: String): Option[Long] = {
+    currentDurationsMs.get(key)
   }
 
   /**
    * Finalizes the query progress and adds it to list of recent status updates.
    *
    * @param hasNewData Whether the sources of this stream had new data for 
this trigger.
-   * @param hasExecuted Whether any batch was executed during this trigger. 
Streaming queries that
-   *                    perform stateful aggregations with timeouts can still 
run batches even
-   *                    though the sources don't have any new data.
    */
-  protected def finishTrigger(hasNewData: Boolean, hasExecuted: Boolean): Unit 
= {
-    assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != 
null &&
-      currentTriggerLatestOffsets != null)
+  def finishTrigger(
+    hasNewData: Boolean,

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -40,89 +41,172 @@ import org.apache.spark.util.Clock
 
 /**
  * Responsible for continually reporting statistics about the amount of data 
processed as well
- * as latency for a streaming query.  This trait is designed to be mixed into 
the
- * [[StreamExecution]], who is responsible for calling `startTrigger` and 
`finishTrigger`
- * at the appropriate times. Additionally, the status can updated with 
`updateStatusMessage` to
- * allow reporting on the streams current state (i.e. "Fetching more data").
+ * as latency for a streaming query.  This class is designed to hold 
information about
+ * a streaming query and contains methods that can be used on a streaming 
query,
+ * such as get the most recent progress of the query.
  */
-trait ProgressReporter extends Logging {
+class ProgressReporter(
+    private val sparkSession: SparkSession,
+    private val triggerClock: Clock,
+    val logicalPlan: () => LogicalPlan)
+  extends Logging {
 
-  case class ExecutionStats(
-    inputRows: Map[SparkDataStream, Long],
-    stateOperators: Seq[StateOperatorProgress],
-    eventTimeStats: Map[String, String])
-
-  // Internal state of the stream, required for computing metrics.
-  protected def id: UUID
-  protected def runId: UUID
-  protected def name: String
-  protected def triggerClock: Clock
-  protected def logicalPlan: LogicalPlan
-  protected def lastExecution: QueryExecution
-  protected def newData: Map[SparkDataStream, LogicalPlan]
-  protected def sinkCommitProgress: Option[StreamWriterCommitProgress]
-  protected def sources: Seq[SparkDataStream]
-  protected def sink: Table
+  // The timestamp we report an event that has not executed anything
+  var lastNoExecutionProgressEventTime = Long.MinValue
+
+  /** Holds the most recent query progress updates.  Accesses must lock on the 
queue itself. */
+  private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
+
+  val noDataProgressEventInterval: Long =
+    sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
+
+  private val timestampFormat =
+    DateTimeFormatter
+      .ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+      .withZone(DateTimeUtils.getZoneId("UTC"))
+
+  /** Returns an array containing the most recent query progress updates. */
+  def recentProgress: Array[StreamingQueryProgress] = 
progressBuffer.synchronized {
+    progressBuffer.toArray
+  }
+
+  /** Returns the most recent query progress update or null if there were no 
progress updates. */
+  def lastProgress: StreamingQueryProgress = progressBuffer.synchronized {
+    progressBuffer.lastOption.orNull
+  }
+
+  def updateProgress(newProgress: StreamingQueryProgress): Unit = {
+    // Reset noDataEventTimestamp if we processed any data
+    lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
+
+    addNewProgress(newProgress)
+    postEvent(new QueryProgressEvent(newProgress))
+    logInfo(s"Streaming query made progress: $newProgress")
+  }
+
+  private def addNewProgress(newProgress: StreamingQueryProgress): Unit = {
+    progressBuffer.synchronized {
+      progressBuffer += newProgress
+      while (progressBuffer.length >= 
sparkSession.sessionState.conf.streamingProgressRetention) {
+        progressBuffer.dequeue()
+      }
+    }
+  }
+
+  def updateIdleness(
+    id: UUID,

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -135,75 +219,121 @@ trait ProgressReporter extends Logging {
    * Record the offsets range this trigger will process. Call this before 
updating
    * `committedOffsets` in `StreamExecution` to make sure that the correct 
range is recorded.
    */
-  protected def recordTriggerOffsets(
-      from: StreamProgress,
-      to: StreamProgress,
-      latest: StreamProgress): Unit = {
+  def recordTriggerOffsets(
+    from: StreamProgress,
+    to: StreamProgress,
+    latest: StreamProgress): Unit = {
     currentTriggerStartOffsets = from.transform((_, v) => v.json)
     currentTriggerEndOffsets = to.transform((_, v) => v.json)
     currentTriggerLatestOffsets = latest.transform((_, v) => v.json)
     latestStreamProgress = to
+    currentTriggerLatestOffsets = latest.transform((_, v) => v.json)
   }
 
-  private def addNewProgress(newProgress: StreamingQueryProgress): Unit = {
-    progressBuffer.synchronized {
-      progressBuffer += newProgress
-      while (progressBuffer.length >= 
sparkSession.sessionState.conf.streamingProgressRetention) {
-        progressBuffer.dequeue()
-      }
-    }
-  }
+  /** Finalizes the trigger which did not execute a batch. */
+  def finishNoExecutionTrigger(lastExecutedEpochId: Long): Unit = {
+    currentTriggerEndTimestamp = triggerClock.getTimeMillis()
+    val processingTimeMills = currentTriggerEndTimestamp - 
currentTriggerStartTimestamp
 
-  private def updateProgress(newProgress: StreamingQueryProgress): Unit = {
-    // Reset noDataEventTimestamp if we processed any data
-    lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
+    val execStatsOnNoExecution = 
execStatsOnLatestExecutedBatch.map(resetExecStatsForNoExecution)
 
-    addNewProgress(newProgress)
-    postEvent(new QueryProgressEvent(newProgress))
-    logInfo(s"Streaming query made progress: $newProgress")
-  }
+    val newProgress = constructNewProgress(processingTimeMills, 
lastExecutedEpochId,
+      execStatsOnNoExecution, Map.empty[String, Row])
 
-  private def updateIdleness(newProgress: StreamingQueryProgress): Unit = {
-    val now = triggerClock.getTimeMillis()
-    if (now - noDataProgressEventInterval >= lastNoExecutionProgressEventTime) 
{
-      addNewProgress(newProgress)
-      if (lastNoExecutionProgressEventTime > Long.MinValue) {
-        postEvent(new QueryIdleEvent(newProgress.id, newProgress.runId,
-          formatTimestamp(currentTriggerStartTimestamp)))
-        logInfo(s"Streaming query has been idle and waiting for new data more 
than " +
-          s"$noDataProgressEventInterval ms.")
-      }
+    progressReporter.updateIdleness(id, runId, currentTriggerStartTimestamp, 
newProgress)
 
-      lastNoExecutionProgressEventTime = now
-    }
+    warnIfFinishTriggerTakesTooLong(currentTriggerEndTimestamp, 
processingTimeMills)
+
+    currentStatus = currentStatus.copy(isTriggerActive = false)
+  }
+
+  /**
+   * Retrieve a measured duration
+   */
+  def getDuration(key: String): Option[Long] = {
+    currentDurationsMs.get(key)
   }
 
   /**
    * Finalizes the query progress and adds it to list of recent status updates.
    *
    * @param hasNewData Whether the sources of this stream had new data for 
this trigger.
-   * @param hasExecuted Whether any batch was executed during this trigger. 
Streaming queries that
-   *                    perform stateful aggregations with timeouts can still 
run batches even
-   *                    though the sources don't have any new data.
    */
-  protected def finishTrigger(hasNewData: Boolean, hasExecuted: Boolean): Unit 
= {
-    assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != 
null &&
-      currentTriggerLatestOffsets != null)
+  def finishTrigger(
+    hasNewData: Boolean,
+    sourceToNumInputRowsMap: Map[SparkDataStream, Long],
+    lastExecution: IncrementalExecution,
+    lastEpochId: Long): Unit = {
+    assert(
+      currentTriggerStartOffsets != null && currentTriggerEndOffsets != null &&
+        currentTriggerLatestOffsets != null
+    )
     currentTriggerEndTimestamp = triggerClock.getTimeMillis()
-
-    val executionStats = extractExecutionStats(hasNewData, hasExecuted)
     val processingTimeMills = currentTriggerEndTimestamp - 
currentTriggerStartTimestamp
+    assert(lastExecution != null, "executed batch should provide the 
information for execution.")
+    val execStats = extractExecutionStats(hasNewData, sourceToNumInputRowsMap, 
lastExecution)
+    logDebug(s"Execution stats: $execStats")
+
+    val observedMetrics = extractObservedMetrics(lastExecution)
+    val newProgress = constructNewProgress(processingTimeMills, lastEpochId, 
Some(execStats),
+      observedMetrics)
+
+    progressReporter.lastNoExecutionProgressEventTime = 
triggerClock.getTimeMillis()
+    progressReporter.updateProgress(newProgress)
+
+    // Update the value since this trigger executes a batch successfully.
+    this.execStatsOnLatestExecutedBatch = Some(execStats)
+
+    warnIfFinishTriggerTakesTooLong(currentTriggerEndTimestamp, 
processingTimeMills)
+
+    currentStatus = currentStatus.copy(isTriggerActive = false)
+  }
+
+  private def constructNewProgress(
+    processingTimeMills: Long,
+    batchId: Long,
+    execStats: Option[ExecutionStats],
+    observedMetrics: Map[String, Row]): StreamingQueryProgress = {
     val processingTimeSec = Math.max(1L, processingTimeMills).toDouble / 
MILLIS_PER_SECOND
 
     val inputTimeSec = if (lastTriggerStartTimestamp >= 0) {
       (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / 
MILLIS_PER_SECOND
     } else {
       Double.PositiveInfinity
     }
-    logDebug(s"Execution stats: $executionStats")
+    val sourceProgress = extractSourceProgress(execStats, inputTimeSec, 
processingTimeSec)
+    val sinkProgress = extractSinkProgress(execStats)
+
+    val eventTime = execStats.map { stats =>
+      stats.eventTimeStats.asJava
+    }.getOrElse(new java.util.HashMap)
 
-    val sourceProgress = sources.distinct.map { source =>
-      val numRecords = executionStats.inputRows.getOrElse(source, 0L)
+    val stateOperators = execStats.map { stats =>
+      stats.stateOperators.toArray
+    }.getOrElse(Array[StateOperatorProgress]())
+
+    new StreamingQueryProgress(
+      id = id,
+      runId = runId,
+      name = name,
+      timestamp = 
progressReporter.formatTimestamp(currentTriggerStartTimestamp),
+      batchId = batchId,
+      batchDuration = processingTimeMills,
+      durationMs =
+        new java.util.HashMap(currentDurationsMs.toMap.transform((_, v) => 
long2Long(v)).asJava),
+      eventTime = new java.util.HashMap(eventTime),
+      stateOperators = stateOperators,
+      sources = sourceProgress.toArray,
+      sink = sinkProgress,
+      observedMetrics = new java.util.HashMap(observedMetrics.asJava))
+  }
+
+  private def extractSourceProgress(
+    execStats: Option[ExecutionStats],

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -40,89 +41,172 @@ import org.apache.spark.util.Clock
 
 /**
  * Responsible for continually reporting statistics about the amount of data 
processed as well
- * as latency for a streaming query.  This trait is designed to be mixed into 
the
- * [[StreamExecution]], who is responsible for calling `startTrigger` and 
`finishTrigger`
- * at the appropriate times. Additionally, the status can updated with 
`updateStatusMessage` to
- * allow reporting on the streams current state (i.e. "Fetching more data").
+ * as latency for a streaming query.  This class is designed to hold 
information about
+ * a streaming query and contains methods that can be used on a streaming 
query,
+ * such as get the most recent progress of the query.
  */
-trait ProgressReporter extends Logging {
+class ProgressReporter(
+    private val sparkSession: SparkSession,
+    private val triggerClock: Clock,
+    val logicalPlan: () => LogicalPlan)
+  extends Logging {
 
-  case class ExecutionStats(
-    inputRows: Map[SparkDataStream, Long],
-    stateOperators: Seq[StateOperatorProgress],
-    eventTimeStats: Map[String, String])
-
-  // Internal state of the stream, required for computing metrics.
-  protected def id: UUID
-  protected def runId: UUID
-  protected def name: String
-  protected def triggerClock: Clock
-  protected def logicalPlan: LogicalPlan
-  protected def lastExecution: QueryExecution
-  protected def newData: Map[SparkDataStream, LogicalPlan]
-  protected def sinkCommitProgress: Option[StreamWriterCommitProgress]
-  protected def sources: Seq[SparkDataStream]
-  protected def sink: Table
+  // The timestamp we report an event that has not executed anything
+  var lastNoExecutionProgressEventTime = Long.MinValue
+
+  /** Holds the most recent query progress updates.  Accesses must lock on the 
queue itself. */
+  private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
+
+  val noDataProgressEventInterval: Long =
+    sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
+
+  private val timestampFormat =
+    DateTimeFormatter
+      .ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+      .withZone(DateTimeUtils.getZoneId("UTC"))
+
+  /** Returns an array containing the most recent query progress updates. */
+  def recentProgress: Array[StreamingQueryProgress] = 
progressBuffer.synchronized {
+    progressBuffer.toArray
+  }
+
+  /** Returns the most recent query progress update or null if there were no 
progress updates. */
+  def lastProgress: StreamingQueryProgress = progressBuffer.synchronized {
+    progressBuffer.lastOption.orNull
+  }
+
+  def updateProgress(newProgress: StreamingQueryProgress): Unit = {
+    // Reset noDataEventTimestamp if we processed any data
+    lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
+
+    addNewProgress(newProgress)
+    postEvent(new QueryProgressEvent(newProgress))
+    logInfo(s"Streaming query made progress: $newProgress")
+  }
+
+  private def addNewProgress(newProgress: StreamingQueryProgress): Unit = {
+    progressBuffer.synchronized {
+      progressBuffer += newProgress
+      while (progressBuffer.length >= 
sparkSession.sessionState.conf.streamingProgressRetention) {
+        progressBuffer.dequeue()
+      }
+    }
+  }
+
+  def updateIdleness(
+    id: UUID,
+    runId: UUID,
+    currentTriggerStartTimestamp: Long,
+    newProgress: StreamingQueryProgress): Unit = {
+    val now = triggerClock.getTimeMillis()
+    if (now - noDataProgressEventInterval >= lastNoExecutionProgressEventTime) 
{
+      addNewProgress(newProgress)
+      if (lastNoExecutionProgressEventTime > Long.MinValue) {
+        postEvent(new QueryIdleEvent(id, runId, 
formatTimestamp(currentTriggerStartTimestamp)))
+        logInfo(s"Streaming query has been idle and waiting for new data more 
than " +
+          s"${noDataProgressEventInterval} ms.")
+      }
+
+      lastNoExecutionProgressEventTime = now
+    }
+  }
+
+  private def postEvent(event: StreamingQueryListener.Event): Unit = {
+    sparkSession.streams.postListenerEvent(event)
+  }
+
+  def formatTimestamp(millis: Long): String = {
+    Instant.ofEpochMilli(millis)
+      .atZone(ZoneId.of("Z")).format(timestampFormat)
+  }
+}
+
+/**
+ * This class holds variables and methods that are used track metrics and 
progress
+ * during the execution lifecycle of a batch that is being processed by the 
streaming query
+ */
+abstract class ProgressContext(
+  id: UUID,

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -398,24 +447,28 @@ class MicroBatchExecution(
    *    Identify a brand new batch
    *  DONE
    */
-  private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): 
Unit = {
-    sinkCommitProgress = None
+  protected def populateStartOffsets(
+    execCtx: MicroBatchExecutionContext,

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -463,34 +516,38 @@ class MicroBatchExecution(
             }
           case None => logInfo("no commit log present")
         }
-        logInfo(s"Resuming at batch $currentBatchId with committed offsets " +
-          s"$committedOffsets and available offsets $availableOffsets")
+        // initialize committed offsets to start offsets of the most recent 
committed batch
+        committedOffsets = execCtx.startOffsets
+        logInfo(s"Resuming at batch ${execCtx.batchId} with committed offsets 
" +
+          s"${execCtx.startOffsets} and available offsets 
${execCtx.endOffsets}")
       case None => // We are starting this stream for the first time.
         logInfo(s"Starting new streaming query.")
-        currentBatchId = 0
+        execCtx.batchId = 0
         watermarkTracker = WatermarkTracker(sparkSessionToRunBatches.conf)
     }
   }
 
   /**
    * Returns true if there is any new data available to be processed.
    */
-  private def isNewDataAvailable: Boolean = {
-    availableOffsets.exists {
+  private def isNewDataAvailable(execCtx: MicroBatchExecutionContext): Boolean 
= {
+    execCtx.endOffsets.exists {
       case (source, available) =>
-        committedOffsets
+        execCtx.startOffsets
           .get(source)
           .map(committed => committed != available)
           .getOrElse(true)
     }
   }
 
   /**
-   * Get the startOffset from availableOffsets. This is to be used in
+   * Get the startOffset from endOffsets. This is to be used in
    * latestOffset(startOffset, readLimit)
    */
-  private def getStartOffset(dataStream: SparkDataStream): OffsetV2 = {
-    val startOffsetOpt = availableOffsets.get(dataStream)
+  private def getStartOffset(
+    execCtx: MicroBatchExecutionContext,

Review Comment:
   nit: 2 more spaces



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to