micheal-o commented on code in PR #49816:
URL: https://github.com/apache/spark/pull/49816#discussion_r1950201910


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2251,6 +2251,19 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT =
+    
buildConf("spark.sql.streaming.stateStore.numStateStoreInstanceMetricsToReport")
+      .internal()
+      .doc(
+        "Number of state store instance metrics to include in state store 
progress reporting. " +
+        "The default limit is 20% of partitions clamped between 1 and 10 
(please refer to " +
+        "numStateStoreInstanceMetricsToReport for details)."

Review Comment:
   This doc comment is user facing, you shouldn't refer user to a func in the 
code.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2251,6 +2251,19 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT =
+    
buildConf("spark.sql.streaming.stateStore.numStateStoreInstanceMetricsToReport")
+      .internal()
+      .doc(
+        "Number of state store instance metrics to include in state store 
progress reporting. " +

Review Comment:
   nit: streaming progress report, not state store progress report



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -504,10 +536,19 @@ trait StateStoreProvider {
    * (specifically, same names) through `StateStore.metrics`.
    */
   def supportedCustomMetrics: Seq[StateStoreCustomMetric] = Nil
+
+  /**
+   * Optional custom state store instance metrics that the implementation may 
want to report.
+   * @note The StateStore objects created by this provider must report the 
same custom metrics
+   * (specifically, same names) through `StateStore.metrics`.
+   */
+  def supportedInstanceMetrics: Seq[StateStoreInstanceMetric] = Seq.empty
 }
 
 object StateStoreProvider {
 
+  val INSTANCE_METRIC_SUFFIX = ".partition_"

Review Comment:
   nit: this is only used in one place right now. No need to have separate 
const for it. Just use it there.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala:
##########
@@ -447,6 +447,9 @@ class SymmetricHashJoinStateManager(
       keyToNumValuesMetrics.memoryUsedBytes + 
keyWithIndexToValueMetrics.memoryUsedBytes,
       keyWithIndexToValueMetrics.customMetrics.map {
         case (metric, value) => (metric.withNewDesc(desc = 
newDesc(metric.desc)), value)
+      } ++ keyToNumValuesMetrics.customMetrics.collect {
+        case (metric: StateStoreInstanceMetric, value) =>

Review Comment:
   nit: add comment e.g. `For instance metric, we want to collect metrics for 
both state stores. Unlike other custom metrics where it is only collected for 
keyWithIndexToValue`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -837,8 +842,16 @@ object RocksDBStateStoreProvider {
     CUSTOM_METRIC_COMPACT_WRITTEN_BYTES, CUSTOM_METRIC_FLUSH_WRITTEN_BYTES,
     CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE, 
CUSTOM_METRIC_NUM_EXTERNAL_COL_FAMILIES,
     CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES)
+
+  val CUSTOM_INSTANCE_METRIC_SNAPSHOT_LAST_UPLOADED = StateStoreInstanceMetric(
+    "rocksdbSnapshotLastUploaded",
+    "RocksDB: the last uploaded version of the snapshot for a specific state 
store instance"
+  )
+
+  val ALL_INSTANCE_METRICS = Seq(CUSTOM_INSTANCE_METRIC_SNAPSHOT_LAST_UPLOADED)
 }
 
+

Review Comment:
   nit: why?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -125,6 +126,10 @@ trait StatefulOperator extends SparkPlan {
   def validateAndMaybeEvolveStateSchema(
       hadoopConf: Configuration, batchId: Long, stateSchemaVersion: Int):
     List[StateSchemaValidationResult]
+
+  // Used to determine state store names used when allocation metric names,
+  // since join operations use multiple state stores with non-default names.
+  def isJoinOperator: Boolean = false

Review Comment:
   ditto



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2251,6 +2251,19 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT =
+    
buildConf("spark.sql.streaming.stateStore.numStateStoreInstanceMetricsToReport")
+      .internal()
+      .doc(
+        "Number of state store instance metrics to include in state store 
progress reporting. " +
+        "The default limit is 20% of partitions clamped between 1 and 10 
(please refer to " +

Review Comment:
   see my other comment below about removing this 20%



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -5733,6 +5746,12 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
 
   def numStateStoreMaintenanceThreads: Int = 
getConf(NUM_STATE_STORE_MAINTENANCE_THREADS)
 
+  def numStateStoreInstanceMetricsToReport: Int = {
+    getConf(STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT).getOrElse(
+      Math.min(10, Math.max(1, defaultNumShufflePartitions / 5))

Review Comment:
   Why are we doing this 20% check thing? I don't see a need for it. Why not 
just 
   `min(10, defaultNumShufflePartitions)`?
   
   Actually, why not just make 10 the default conf value?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -321,6 +321,38 @@ case class StateStoreCustomTimingMetric(name: String, 
desc: String) extends Stat
     SQLMetrics.createTimingMetric(sparkContext, desc)
 }
 
+case class StateStoreInstanceMetric(
+    metricPrefix: String,
+    descPrefix: String,
+    partitionId: Option[Int] = None,
+    storeName: String = StateStoreId.DEFAULT_STORE_NAME)
+    extends StateStoreCustomMetric {
+  override def name: String = {
+    partitionId
+      .map { id =>
+        
s"$metricPrefix${StateStoreProvider.INSTANCE_METRIC_SUFFIX}${id}_${storeName}"
+      }
+      .getOrElse(metricPrefix)
+  }
+
+  override def desc: String = {
+    partitionId
+      .map { id =>
+        s"$descPrefix (partitionId = $id, storeName = ${storeName})"
+      }
+      .getOrElse(descPrefix)
+  }
+
+  override def withNewDesc(desc: String): StateStoreInstanceMetric = 
copy(descPrefix = desc)
+
+  override def createSQLMetric(sparkContext: SparkContext): SQLMetric =
+    SQLMetrics.createSizeMetric(sparkContext, desc)
+
+  def withNewPartition(partitionId: Int, storeName: String): 
StateStoreInstanceMetric = {

Review Comment:
   nit: rename to `withNewId` instead? Since we are not setting only partitionId



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -812,4 +825,8 @@ case class StreamingSymmetricHashJoinExec(
 
     Some((leftStateWatermark ++ rightStateWatermark ++ 
Some(inputWatermarkMs)).min)
   }
+
+  // Used to determine state store names used when allocation metric names,
+  // since join operations use multiple state stores with non-default names.
+  override def isJoinOperator: Boolean = true

Review Comment:
   why?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -398,6 +427,25 @@ trait StateStoreWriter
     }.toMap
   }
 
+  private def stateStoreInstanceMetrics: Map[String, SQLMetric] = {
+    val provider = StateStoreProvider.create(conf.stateStoreProviderClass)
+    val maxPartitions = conf.defaultNumShufflePartitions
+
+    (0 until maxPartitions).flatMap { partitionId =>
+      provider.supportedInstanceMetrics.flatMap { metric =>
+        val storeNames = if (isJoinOperator) {
+          SymmetricHashJoinStateManager.allStateStoreNames(LeftSide, RightSide)

Review Comment:
   There is a better way to do this. Lets not expose Symmetric hash join 
implementation details here. This is a higher level.
   
   One option is to add a func to `StateStoreWriter` for implementers to 
provide their state store name. e.g. `stateStoreNames` func returns 
`DEFAULT_STORE_NAME` by default, but join can override this to provide its own 
names. That is just one option.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -373,8 +389,21 @@ trait StateStoreWriter
     val storeMetrics = store.metrics
     longMetric("numTotalStateRows") += storeMetrics.numKeys
     longMetric("stateMemory") += storeMetrics.memoryUsedBytes
-    storeMetrics.customMetrics.foreach { case (metric, value) =>
-      longMetric(metric.name) += value
+    storeMetrics.customMetrics.foreach {
+      // Set the max for instance metrics
+      case (metric: StateStoreInstanceMetric, value) =>

Review Comment:
   this code is a duplicate of the same code in 
`StreamingSymmetricHashJoinExec`. Should we just have a common func for this 
logic, instead of duplicating?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -527,8 +527,21 @@ case class StreamingSymmetricHashJoinExec(
           (leftSideJoiner.numUpdatedStateRows + 
rightSideJoiner.numUpdatedStateRows)
         numTotalStateRows += combinedMetrics.numKeys
         stateMemory += combinedMetrics.memoryUsedBytes
-        combinedMetrics.customMetrics.foreach { case (metric, value) =>
-          longMetric(metric.name) += value
+        combinedMetrics.customMetrics.foreach {
+          // Set the max for instance metrics
+          case (metric: StateStoreInstanceMetric, value) =>
+            // Check for cases where value < 0 and .value converts metric to 0.
+            // Metrics like last uploaded snapshot version can have an init 
value of -1,
+            // which need special handling to avoid setting the metric to 0 
using `.value`.
+            longMetric(metric.name).set(
+              if (longMetric(metric.name).isZero) {
+                value
+              } else {
+                Math.max(value, longMetric(metric.name).value)

Review Comment:
   nit: lets add comment here for why we are using max 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -321,6 +321,38 @@ case class StateStoreCustomTimingMetric(name: String, 
desc: String) extends Stat
     SQLMetrics.createTimingMetric(sparkContext, desc)
 }
 
+case class StateStoreInstanceMetric(
+    metricPrefix: String,
+    descPrefix: String,
+    partitionId: Option[Int] = None,

Review Comment:
   why is `partitionId` optional?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -812,4 +825,8 @@ case class StreamingSymmetricHashJoinExec(
 
     Some((leftStateWatermark ++ rightStateWatermark ++ 
Some(inputWatermarkMs)).min)
   }
+
+  // Used to determine state store names used when allocation metric names,
+  // since join operations use multiple state stores with non-default names.
+  override def isJoinOperator: Boolean = true

Review Comment:
   I explained why we shouldn't do this in my comment below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to