zecookiez commented on code in PR #49816:
URL: https://github.com/apache/spark/pull/49816#discussion_r1953550630


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -391,13 +402,48 @@ trait StateStoreWriter
     }
   }
 
+  protected def setStoreCustomMetrics(storeMetrics: StateStoreMetrics): Unit = 
{
+    storeMetrics.customMetrics.foreach {
+      // Set the max for instance metrics
+      case (metric: StateStoreInstanceMetric, value) =>
+        // Check for cases where value < 0 and .value converts metric to 0
+        // Metrics like last uploaded snapshot version can have an init value 
of -1,
+        // which need special handling to avoid setting the metric to 0 using 
`.value`.
+        longMetric(metric.name).set(
+          if (longMetric(metric.name).isZero) {
+            value
+          } else {
+            // Use max to grab the most updated value across all state store 
instances,

Review Comment:
   I may be mistaken but because this is updated at the end of each batch, in 
most cases we should see it go straight to the first branch without taking the 
max. 
   
   But there is a hypothetical case where two snapshots with different versions 
are uploaded from two different executors under the same state store, the only 
way we know which one to keep is by taking the higher version number. 
   
   Though for all custom metrics, there is not enough information in the 
general case to figure out which value to keep. For the previous scenario we 
can take the max between the two, but I don't know if there's a nice way to 
tackle the general case for all custom metrics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to