Github user arunmahadevan commented on a diff in the pull request:
https://github.com/apache/spark/pull/21469#discussion_r194592510
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
---
@@ -112,14 +122,19 @@ trait StateStoreWriter extends StatefulOperator {
self: SparkPlan =>
val storeMetrics = store.metrics
longMetric("numTotalStateRows") += storeMetrics.numKeys
longMetric("stateMemory") += storeMetrics.memoryUsedBytes
- storeMetrics.customMetrics.foreach { case (metric, value) =>
- longMetric(metric.name) += value
+ storeMetrics.customMetrics.foreach {
+ case (metric: StateStoreCustomAverageMetric, value) =>
+ longMetric(metric.name).set(value * 1.0d)
--- End diff --
Not sure if SQLAppstatusListener comes into play for reporting query
progress. (e.g. StreamingQueryWrapper.lastProgress)
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L193
Based on my understanding, the SQLMetric is an Accumulator so the merged
values of the accumulators across all the tasks is returned. The merge
operation in SQLMetric just adds the value so it makes sense only for count or
size values. We would be able to display the (min, med, max) values for now in
the UI and not in the "query status". I was thinking if we make it a count
metric, it may work (similar to number of state rows). I am fine with either
way.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]