Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/16716#discussion_r98127618
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
---
@@ -180,15 +180,59 @@ trait ProgressReporter extends Logging {
currentStatus = currentStatus.copy(isTriggerActive = false)
}
+ /**
+ * Extract statistics about stateful operators from the executed query
plan.
+ * SPARK-19378: Still report stateOperator metrics even though no data
was processed while
+ * reporting progress.
+ */
+ private def extractStateOperatorMetrics(hasNewData: Boolean):
Seq[StateOperatorProgress] = {
+ if (lastExecution == null) return Nil
+ // lastExecution could belong to one of the previous triggers if
`!hasNewData`.
+ // Walking the plan again should be inexpensive.
+ val stateNodes = lastExecution.executedPlan.collect {
+ case p if p.isInstanceOf[StateStoreSaveExec] => p
+ }
+ stateNodes.map { node =>
+ val numRowsUpdated = if (hasNewData) {
+ node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L)
+ } else {
+ 0L
+ }
+ new StateOperatorProgress(
+ numRowsTotal =
node.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L),
+ numRowsUpdated = numRowsUpdated)
+ }
+ }
+
+ /**
+ * Extract statistics about event time from the executed query plan.
+ * SPARK-19378: Still report eventTime metrics even though no data was
processed while
+ * reporting progress.
+ */
+ private def extractEventTimeStats(watermarkTs: Map[String, String]):
Map[String, String] = {
--- End diff --
it does not make sense for this method to take this watermarkTs as a param.
its not extracting event time states from watermark ts, its just appending it.
Then why not just return empty map, and do the appending outside? Or do the
extraction of watermark inside the function as well.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]