zecookiez commented on code in PR #49816:
URL: https://github.com/apache/spark/pull/49816#discussion_r1953550630
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -391,13 +402,48 @@ trait StateStoreWriter
}
}
+ protected def setStoreCustomMetrics(storeMetrics: StateStoreMetrics): Unit =
{
+ storeMetrics.customMetrics.foreach {
+ // Set the max for instance metrics
+ case (metric: StateStoreInstanceMetric, value) =>
+ // Check for cases where value < 0 and .value converts metric to 0
+ // Metrics like last uploaded snapshot version can have an init value
of -1,
+ // which need special handling to avoid setting the metric to 0 using
`.value`.
+ longMetric(metric.name).set(
+ if (longMetric(metric.name).isZero) {
+ value
+ } else {
+ // Use max to grab the most updated value across all state store
instances,
Review Comment:
I may be mistaken but because this is updated at the end of each batch, in
most cases we should see it go straight to the first branch without taking the
max.
But there is a hypothetical case where two snapshots with different versions
are uploaded from two different executors under the same state store, the only
way we know which one to keep is by taking the higher version number.
Though for all custom metrics, there is not enough information in the
general case to figure out which value to keep. For the previous scenario we
can take the max between the two, but I don't know if there's a nice way to
tackle the general case for all custom metrics.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]