HeartSaVioR commented on code in PR #49816:
URL: https://github.com/apache/spark/pull/49816#discussion_r1962839216


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -391,13 +434,60 @@ trait StateStoreWriter
     }
   }
 
+  protected def setStoreCustomMetrics(customMetrics: 
Map[StateStoreCustomMetric, Long]): Unit = {
+    customMetrics.foreach {
+      case (metric, value) =>
+        longMetric(metric.name) += value
+    }
+  }
+
+  protected def setStoreInstanceMetrics(
+      instanceMetrics: Map[StateStoreInstanceMetric, Long]): Unit = {
+    instanceMetrics.foreach {
+      case (metric, value) =>
+        // Set the max for instance metrics
+        // Check for cases where value < 0 and .value converts metric to 0
+        // Some metrics like last uploaded snapshot version can have an 
initial value of -1,
+        // which need special handling to avoid setting the metric to 0 using 
`.value`.
+        longMetric(metric.name).set(
+          if (longMetric(metric.name).isZero) {
+            value
+          } else {
+            // Use max to grab the most updated value across all state store 
instances,

Review Comment:
   metrics from failed/cancelled tasks would be ignored by default. That's why 
I suspect the case, but OK to be prepared.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to