zecookiez commented on code in PR #49816:
URL: https://github.com/apache/spark/pull/49816#discussion_r1962425010
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -391,13 +434,60 @@ trait StateStoreWriter
}
}
+ protected def setStoreCustomMetrics(customMetrics:
Map[StateStoreCustomMetric, Long]): Unit = {
+ customMetrics.foreach {
+ case (metric, value) =>
+ longMetric(metric.name) += value
+ }
+ }
+
+ protected def setStoreInstanceMetrics(
+ instanceMetrics: Map[StateStoreInstanceMetric, Long]): Unit = {
+ instanceMetrics.foreach {
+ case (metric, value) =>
+ // Set the max for instance metrics
+ // Check for cases where value < 0 and .value converts metric to 0
+ // Some metrics like last uploaded snapshot version can have an
initial value of -1,
+ // which need special handling to avoid setting the metric to 0 using
`.value`.
+ longMetric(metric.name).set(
+ if (longMetric(metric.name).isZero) {
+ value
+ } else {
+ // Use max to grab the most updated value across all state store
instances,
Review Comment:
There's a rare case where two different executors use the same state store
instance to upload a snapshot due to provider getting unloaded or speculative
execution.
For the snapshot version metric, this means we should take whichever
executor uploaded a higher / more recent snapshot version. But for future
instance metrics that we want to add this could be something different, so I've
added a combine method to the interface to generalize for other metrics.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]