zecookiez commented on code in PR #49816:
URL: https://github.com/apache/spark/pull/49816#discussion_r1962433179
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -391,13 +434,60 @@ trait StateStoreWriter
}
}
+ protected def setStoreCustomMetrics(customMetrics:
Map[StateStoreCustomMetric, Long]): Unit = {
+ customMetrics.foreach {
+ case (metric, value) =>
+ longMetric(metric.name) += value
+ }
+ }
+
+ protected def setStoreInstanceMetrics(
+ instanceMetrics: Map[StateStoreInstanceMetric, Long]): Unit = {
+ instanceMetrics.foreach {
+ case (metric, value) =>
+ // Set the max for instance metrics
+ // Check for cases where value < 0 and .value converts metric to 0
+ // Some metrics like last uploaded snapshot version can have an
initial value of -1,
+ // which need special handling to avoid setting the metric to 0 using
`.value`.
+ longMetric(metric.name).set(
+ if (longMetric(metric.name).isZero) {
+ value
+ } else {
+ // Use max to grab the most updated value across all state store
instances,
+ // which for snapshot versions is the largest version number.
+ Math.max(value, longMetric(metric.name).value)
+ }
+ )
+ }
+ }
+
private def stateStoreCustomMetrics: Map[String, SQLMetric] = {
val provider = StateStoreProvider.create(conf.stateStoreProviderClass)
provider.supportedCustomMetrics.map {
metric => (metric.name, metric.createSQLMetric(sparkContext))
}.toMap
}
+ private def stateStoreInstanceMetrics: Map[String, SQLMetric] = {
+ stateStoreInstanceMetricObjects.map {
+ case (name, metric) => (name, metric.createSQLMetric(sparkContext))
+ }
+ }
+
+ private def stateStoreInstanceMetricObjects: Map[String,
StateStoreInstanceMetric] = {
Review Comment:
It does not, good catch. There should be just one call to this method now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]