Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21469#discussion_r194585044
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
    @@ -112,14 +122,19 @@ trait StateStoreWriter extends StatefulOperator { 
self: SparkPlan =>
         val storeMetrics = store.metrics
         longMetric("numTotalStateRows") += storeMetrics.numKeys
         longMetric("stateMemory") += storeMetrics.memoryUsedBytes
    -    storeMetrics.customMetrics.foreach { case (metric, value) =>
    -      longMetric(metric.name) += value
    +    storeMetrics.customMetrics.foreach {
    +      case (metric: StateStoreCustomAverageMetric, value) =>
    +        longMetric(metric.name).set(value * 1.0d)
    --- End diff --
    
    If my understanding is right, the metric object (return of `longMetric()`) 
is different between each task, so the object will be different for each batch 
and each task. (TaskMetric is serialized and deserialized so it can't be shared 
between tasks.)
    
    And actually the metric values are not aggregated into an SQLMetric object. 
The values are just aggregated and represented in SQLAppStatusListener.
    
    
https://github.com/apache/spark/blob/f5af86ea753c446df59a0a8c16c685224690d633/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala#L162-L174
    
    
https://github.com/apache/spark/blob/f5af86ea753c446df59a0a8c16c685224690d633/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala#L147-L160
    
    <img width="489" alt="screen shot 2018-06-12 at 9 17 14 am" 
src="https://user-images.githubusercontent.com/1317309/41263432-024efe4a-6e22-11e8-92f9-24d1f73776a9.png";>



---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to