zecookiez commented on code in PR #49816:
URL: https://github.com/apache/spark/pull/49816#discussion_r1962425010


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -391,13 +434,60 @@ trait StateStoreWriter
     }
   }
 
+  protected def setStoreCustomMetrics(customMetrics: 
Map[StateStoreCustomMetric, Long]): Unit = {
+    customMetrics.foreach {
+      case (metric, value) =>
+        longMetric(metric.name) += value
+    }
+  }
+
+  protected def setStoreInstanceMetrics(
+      instanceMetrics: Map[StateStoreInstanceMetric, Long]): Unit = {
+    instanceMetrics.foreach {
+      case (metric, value) =>
+        // Set the max for instance metrics
+        // Check for cases where value < 0 and .value converts metric to 0
+        // Some metrics like last uploaded snapshot version can have an 
initial value of -1,
+        // which need special handling to avoid setting the metric to 0 using 
`.value`.
+        longMetric(metric.name).set(
+          if (longMetric(metric.name).isZero) {
+            value
+          } else {
+            // Use max to grab the most updated value across all state store 
instances,

Review Comment:
   There's a rare case where two different executors use the same state store 
instance to upload a snapshot due to provider getting unloaded or speculative 
execution. 
   
   For the snapshot version metric, this means we should take whichever 
executor uploaded a higher / more recent snapshot version. But for future 
instance metrics that we want to add this could be something different, so I've 
added a combine method to the interface to generalize for other metrics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to