zecookiez commented on code in PR #49816:
URL: https://github.com/apache/spark/pull/49816#discussion_r1962540158


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -391,13 +434,60 @@ trait StateStoreWriter
     }
   }
 
+  protected def setStoreCustomMetrics(customMetrics: 
Map[StateStoreCustomMetric, Long]): Unit = {
+    customMetrics.foreach {
+      case (metric, value) =>
+        longMetric(metric.name) += value
+    }
+  }
+
+  protected def setStoreInstanceMetrics(
+      instanceMetrics: Map[StateStoreInstanceMetric, Long]): Unit = {
+    instanceMetrics.foreach {
+      case (metric, value) =>
+        // Set the max for instance metrics
+        // Check for cases where value < 0 and .value converts metric to 0
+        // Some metrics like last uploaded snapshot version can have an 
initial value of -1,
+        // which need special handling to avoid setting the metric to 0 using 
`.value`.
+        longMetric(metric.name).set(
+          if (longMetric(metric.name).isZero) {
+            value
+          } else {
+            // Use max to grab the most updated value across all state store 
instances,
+            // which for snapshot versions is the largest version number.
+            Math.max(value, longMetric(metric.name).value)
+          }
+        )
+    }
+  }
+
   private def stateStoreCustomMetrics: Map[String, SQLMetric] = {
     val provider = StateStoreProvider.create(conf.stateStoreProviderClass)
     provider.supportedCustomMetrics.map {
       metric => (metric.name, metric.createSQLMetric(sparkContext))
     }.toMap
   }
 
+  private def stateStoreInstanceMetrics: Map[String, SQLMetric] = {
+    stateStoreInstanceMetricObjects.map {
+      case (name, metric) => (name, metric.createSQLMetric(sparkContext))
+    }
+  }
+
+  private def stateStoreInstanceMetricObjects: Map[String, 
StateStoreInstanceMetric] = {
+    val provider = StateStoreProvider.create(conf.stateStoreProviderClass)
+    val maxPartitions = conf.defaultNumShufflePartitions

Review Comment:
   It looks like `stateInfo` may not be defined at the time this method is 
called, I've changed this to use `conf.defaultNumShufflePartitions` as a 
fallback if the former is not defined.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to