Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/spark/pull/21469#discussion_r194585044
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
---
@@ -112,14 +122,19 @@ trait StateStoreWriter extends StatefulOperator {
self: SparkPlan =>
val storeMetrics = store.metrics
longMetric("numTotalStateRows") += storeMetrics.numKeys
longMetric("stateMemory") += storeMetrics.memoryUsedBytes
- storeMetrics.customMetrics.foreach { case (metric, value) =>
- longMetric(metric.name) += value
+ storeMetrics.customMetrics.foreach {
+ case (metric: StateStoreCustomAverageMetric, value) =>
+ longMetric(metric.name).set(value * 1.0d)
--- End diff --
If my understanding is right, the metric object (return of `longMetric()`)
is different between each task, so the object will be different for each batch
and each task. (TaskMetric is serialized and deserialized so it can't be shared
between tasks.)
And actually the metric values are not aggregated into an SQLMetric object.
The values are just aggregated and represented in SQLAppStatusListener.
https://github.com/apache/spark/blob/f5af86ea753c446df59a0a8c16c685224690d633/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala#L162-L174
https://github.com/apache/spark/blob/f5af86ea753c446df59a0a8c16c685224690d633/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala#L147-L160
<img width="489" alt="screen shot 2018-06-12 at 9 17 14 am"
src="https://user-images.githubusercontent.com/1317309/41263432-024efe4a-6e22-11e8-92f9-24d1f73776a9.png">
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]