HeartSaVioR commented on code in PR #49816:
URL: https://github.com/apache/spark/pull/49816#discussion_r1961616963


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -281,10 +285,20 @@ object StateStoreMetrics {
       customMetric -> sumOfMetrics
     }.toMap
 
+    val distinctInstanceMetrics = 
allMetrics.flatMap(_.instanceMetrics.keys).distinct
+    val instanceMetrics = allMetrics.flatMap(_.instanceMetrics)
+    val combinedInstanceMetrics = distinctInstanceMetrics.map { instanceMetric 
=>
+      val sameMetrics = instanceMetrics.filter(_._1 == instanceMetric)
+      // Use the instance metric's custom ordering to select the combined 
metric value
+      val selectedMetrics = sameMetrics.map(_._2).min(instanceMetric.ordering)
+      instanceMetric -> selectedMetrics

Review Comment:
   Just to make very clear, will the name be different for every state store 
instance despite the same metric here? I don't see us doing top N, hence I 
assume you're just doing some accumulation if there are "somehow" multiple 
values for the same metric, which does not happen in normal.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -250,11 +250,15 @@ class WrappedReadStateStore(store: StateStore) extends 
ReadStateStore {
  * @param customMetrics   Custom implementation-specific metrics
  *                        The metrics reported through this must have the same 
`name` as those
  *                        reported by `StateStoreProvider.customMetrics`.
+ * @param instanceMetrics Custom implementation-specific metrics that are 
specific to state stores
+ *                        The metrics reported through this must have the same 
`name` as those
+ *                        reported by `StateStoreProvider.customMetrics`.

Review Comment:
   nit: `supportedInstanceMetrics`? I guess you've just copied from the above. 
That said, the above should also be supportedCustomMetrics, though it's beyond 
the scope of PR (so up to you).



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -321,6 +335,73 @@ case class StateStoreCustomTimingMetric(name: String, 
desc: String) extends Stat
     SQLMetrics.createTimingMetric(sparkContext, desc)
 }
 
+trait StateStoreInstanceMetric {
+  def metricPrefix: String
+  def descPrefix: String
+  def descNotes: String
+  def partitionId: Option[Int]
+  def storeName: String
+  def stateStoreProvider: String

Review Comment:
   nit: maybe it's not needed as we haven't had any desire to add this for 
StateStoreCustomMetric?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -321,6 +335,73 @@ case class StateStoreCustomTimingMetric(name: String, 
desc: String) extends Stat
     SQLMetrics.createTimingMetric(sparkContext, desc)
 }
 
+trait StateStoreInstanceMetric {
+  def metricPrefix: String
+  def descPrefix: String
+  def descNotes: String
+  def partitionId: Option[Int]
+  def storeName: String
+  def stateStoreProvider: String
+  def initValue: Long
+
+  def createSQLMetric(sparkContext: SparkContext): SQLMetric
+
+  /** This determines the selection of instance metrics during progress 
reporting */
+  def ordering: Ordering[Long]
+
+  /** Should this instance metric be reported if it is unchanged from its 
initial value */
+  def ignoreIfUnchanged: Boolean
+
+  def name: String = {
+    assert(partitionId.isDefined, "Partition ID must be defined for instance 
metric name")
+    
s"$stateStoreProvider.$metricPrefix.partition_${partitionId.get}_$storeName"
+  }
+
+  def desc: String = {
+    assert(partitionId.isDefined, "Partition ID must be defined for instance 
metric description")
+    s"$stateStoreProvider: $descPrefix " +
+    s"(partitionId = ${partitionId.get}, storeName = $storeName, notes = 
$descNotes)"
+  }
+
+  def withNewDescNotes(descNotes: String): StateStoreInstanceMetric

Review Comment:
   Do we have notes for description as well? I haven't seen this yet from 
existing metrics.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala:
##########
@@ -441,13 +441,20 @@ class SymmetricHashJoinStateManager(
     val keyToNumValuesMetrics = keyToNumValues.metrics
     val keyWithIndexToValueMetrics = keyWithIndexToValue.metrics
     def newDesc(desc: String): String = 
s"${joinSide.toString.toUpperCase(Locale.ROOT)}: $desc"
+    def newDescNotes: String = joinSide.toString.toUpperCase(Locale.ROOT)
 
     StateStoreMetrics(
       keyWithIndexToValueMetrics.numKeys,       // represent each buffered row 
only once
       keyToNumValuesMetrics.memoryUsedBytes + 
keyWithIndexToValueMetrics.memoryUsedBytes,
       keyWithIndexToValueMetrics.customMetrics.map {
         case (metric, value) => (metric.withNewDesc(desc = 
newDesc(metric.desc)), value)
-      }
+      },
+      // We want to collect instance metrics from both state stores
+      (keyWithIndexToValueMetrics.instanceMetrics ++ 
keyToNumValuesMetrics.instanceMetrics)
+        .collect {
+          case (metric, value) =>
+            (metric.withNewDescNotes(descNotes = newDescNotes), value)

Review Comment:
   stateStoreName contains the side information, so if the metric name contains 
state store name, we don't need the note.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala:
##########
@@ -270,4 +286,237 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
     assert(changelogVersionsPresent(dirForPartition0) == List(3L, 4L))
     assert(snapshotVersionsPresent(dirForPartition0).contains(5L))
   }
+
+  private def snapshotLagMetricName(
+      partitionId: Long,
+      storeName: String = StateStoreId.DEFAULT_STORE_NAME): String = {
+    s"$SNAPSHOT_LAG_METRIC_PREFIX${partitionId}_$storeName"
+  }
+
+  testWithChangelogCheckpointingEnabled(
+    "SPARK-51097: Verify snapshot lag metrics are updated correctly with 
RocksDBStateStoreProvider"
+  ) {
+    withSQLConf(
+      SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName,
+      SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
+      SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
+      SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+      SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "5"

Review Comment:
   nit: let's set this smaller - for unit tests, the default value of shuffle 
partition is 5



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -391,13 +434,60 @@ trait StateStoreWriter
     }
   }
 
+  protected def setStoreCustomMetrics(customMetrics: 
Map[StateStoreCustomMetric, Long]): Unit = {
+    customMetrics.foreach {
+      case (metric, value) =>
+        longMetric(metric.name) += value
+    }
+  }
+
+  protected def setStoreInstanceMetrics(
+      instanceMetrics: Map[StateStoreInstanceMetric, Long]): Unit = {
+    instanceMetrics.foreach {
+      case (metric, value) =>
+        // Set the max for instance metrics
+        // Check for cases where value < 0 and .value converts metric to 0
+        // Some metrics like last uploaded snapshot version can have an 
initial value of -1,
+        // which need special handling to avoid setting the metric to 0 using 
`.value`.
+        longMetric(metric.name).set(
+          if (longMetric(metric.name).isZero) {
+            value
+          } else {
+            // Use max to grab the most updated value across all state store 
instances,

Review Comment:
   I have been confused from this; we set instance metric to have storeName and 
partition ID in the metric. Once we do that, is there the case where we have 
multiple values for the metric? If this is ever happening, could you please 
help describing the case, and whether max would normally fit on aggregation?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -321,6 +335,73 @@ case class StateStoreCustomTimingMetric(name: String, 
desc: String) extends Stat
     SQLMetrics.createTimingMetric(sparkContext, desc)
 }
 
+trait StateStoreInstanceMetric {
+  def metricPrefix: String
+  def descPrefix: String
+  def descNotes: String
+  def partitionId: Option[Int]
+  def storeName: String
+  def stateStoreProvider: String
+  def initValue: Long
+
+  def createSQLMetric(sparkContext: SparkContext): SQLMetric
+
+  /** This determines the selection of instance metrics during progress 
reporting */

Review Comment:
   Probably good to mention about the direction, e.g. if you want to do TOP N 
then ordering should be desc/asc (not yet to read the part so whatever fits).



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -281,10 +285,20 @@ object StateStoreMetrics {
       customMetric -> sumOfMetrics
     }.toMap
 
+    val distinctInstanceMetrics = 
allMetrics.flatMap(_.instanceMetrics.keys).distinct

Review Comment:
   nit: probably `distinctInstanceMetricKeys`? Just to be clearer to 
distinguish with the below.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -320,11 +327,48 @@ trait StateStoreWriter
    * the driver after this SparkPlan has been executed and metrics have been 
updated.
    */
   def getProgress(): StateOperatorProgress = {
+    // Still publish instance metrics that are marked with ignoreIfUnchanged,
+    // as those unchanged metric values may contain important information
+    val instanceMetricsToReport = stateStoreInstanceMetrics
+      .filter {
+        case (name, _) =>
+          val metric = longMetric(name)
+          val metricConfig = instanceMetricConfiguration(name)
+          // Keep instance metrics that are updated or aren't marked to be 
ignored,
+          // as their initial value could still be important.
+          !metric.isZero || !metricConfig.ignoreIfUnchanged
+      }
+      .groupBy {
+        // Group all instance metrics underneath their common metric prefix
+        // to ignore partition and store names.
+        case (name, _) =>
+          (
+            instanceMetricConfiguration(name).stateStoreProvider,
+            instanceMetricConfiguration(name).metricPrefix
+          )
+      }
+      .flatMap {
+        case (_, metrics) =>
+          // Select at most N metrics based on the metric's defined ordering
+          // to report to the driver. For example, ascending order would be 
taking the N smallest.

Review Comment:
   nit: Let's give this information to the interface. Let's assume developers 
of new metric would only be able to see the interface to begin with.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -391,13 +434,60 @@ trait StateStoreWriter
     }
   }
 
+  protected def setStoreCustomMetrics(customMetrics: 
Map[StateStoreCustomMetric, Long]): Unit = {
+    customMetrics.foreach {
+      case (metric, value) =>
+        longMetric(metric.name) += value
+    }
+  }
+
+  protected def setStoreInstanceMetrics(
+      instanceMetrics: Map[StateStoreInstanceMetric, Long]): Unit = {
+    instanceMetrics.foreach {
+      case (metric, value) =>
+        // Set the max for instance metrics
+        // Check for cases where value < 0 and .value converts metric to 0
+        // Some metrics like last uploaded snapshot version can have an 
initial value of -1,
+        // which need special handling to avoid setting the metric to 0 using 
`.value`.
+        longMetric(metric.name).set(
+          if (longMetric(metric.name).isZero) {
+            value
+          } else {
+            // Use max to grab the most updated value across all state store 
instances,
+            // which for snapshot versions is the largest version number.
+            Math.max(value, longMetric(metric.name).value)
+          }
+        )
+    }
+  }
+
   private def stateStoreCustomMetrics: Map[String, SQLMetric] = {
     val provider = StateStoreProvider.create(conf.stateStoreProviderClass)
     provider.supportedCustomMetrics.map {
       metric => (metric.name, metric.createSQLMetric(sparkContext))
     }.toMap
   }
 
+  private def stateStoreInstanceMetrics: Map[String, SQLMetric] = {
+    stateStoreInstanceMetricObjects.map {
+      case (name, metric) => (name, metric.createSQLMetric(sparkContext))
+    }
+  }
+
+  private def stateStoreInstanceMetricObjects: Map[String, 
StateStoreInstanceMetric] = {
+    val provider = StateStoreProvider.create(conf.stateStoreProviderClass)
+    val maxPartitions = conf.defaultNumShufflePartitions

Review Comment:
   There could be an extreme case where `conf.defaultNumShufflePartitions` and 
`getStateInfo.numPartitions` are different. The latter is the source of truth 
for stateful operators.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -391,13 +434,60 @@ trait StateStoreWriter
     }
   }
 
+  protected def setStoreCustomMetrics(customMetrics: 
Map[StateStoreCustomMetric, Long]): Unit = {
+    customMetrics.foreach {
+      case (metric, value) =>
+        longMetric(metric.name) += value
+    }
+  }
+
+  protected def setStoreInstanceMetrics(
+      instanceMetrics: Map[StateStoreInstanceMetric, Long]): Unit = {
+    instanceMetrics.foreach {
+      case (metric, value) =>
+        // Set the max for instance metrics
+        // Check for cases where value < 0 and .value converts metric to 0
+        // Some metrics like last uploaded snapshot version can have an 
initial value of -1,
+        // which need special handling to avoid setting the metric to 0 using 
`.value`.
+        longMetric(metric.name).set(
+          if (longMetric(metric.name).isZero) {
+            value
+          } else {
+            // Use max to grab the most updated value across all state store 
instances,
+            // which for snapshot versions is the largest version number.
+            Math.max(value, longMetric(metric.name).value)
+          }
+        )
+    }
+  }
+
   private def stateStoreCustomMetrics: Map[String, SQLMetric] = {
     val provider = StateStoreProvider.create(conf.stateStoreProviderClass)
     provider.supportedCustomMetrics.map {
       metric => (metric.name, metric.createSQLMetric(sparkContext))
     }.toMap
   }
 
+  private def stateStoreInstanceMetrics: Map[String, SQLMetric] = {
+    stateStoreInstanceMetricObjects.map {
+      case (name, metric) => (name, metric.createSQLMetric(sparkContext))
+    }
+  }
+
+  private def stateStoreInstanceMetricObjects: Map[String, 
StateStoreInstanceMetric] = {

Review Comment:
   I see this method to be called in multiple places; would the map need to be 
used separately?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to