liviazhu-db commented on code in PR #49816:
URL: https://github.com/apache/spark/pull/49816#discussion_r1945446788


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2251,6 +2251,22 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val STATE_STORE_PARTITION_METRICS_REPORT_LIMIT =
+    buildConf("spark.sql.streaming.stateStore.numPartitionMetricsToReport")
+      .internal()
+      .doc(
+        "Maximum number of partition-level metrics to include in state store 
progress " +
+          "reporting. The default limit is 20% of the number of cores (with a 
minimum of 1 " +
+          "partition) and with a cap of 10. This limits the metrics to the N 
partitions with " +
+          "the smallest values to prevent the progress report from becoming 
too large."
+      )
+      .version("4.0.0")
+      .intConf
+      .checkValue(k => k >= 0, "Must be greater than or equal to 0")
+      .createWithDefault(
+        Math.min(10, Math.min(1, 
SHUFFLE_PARTITIONS.defaultValue.getOrElse(200) / 5))

Review Comment:
   By your comment it should be `Math.min(10, Math.max(1, 
SHUFFLE_PARTITIONS...)` right? As it is written right now I think you're always 
going to get a limit of 1. 
   
   Also, I think this isn't actually going to capture the real number of 
shuffle partitions, since you're not actually getting the SQLConf here. You're 
going to get 200 every time. I'm not sure if you can get the real number of 
shuffle partitions here, so maybe just create it with a default of 10 unless 
@ericm-db knows how to actually get the SQLConf here.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -146,6 +146,7 @@ class RocksDB(
   @volatile private var changelogWriter: Option[StateStoreChangelogWriter] = 
None
   private val enableChangelogCheckpointing: Boolean = 
conf.enableChangelogCheckpointing
   @volatile protected var loadedVersion: Long = -1L   // -1 = nothing valid is 
loaded
+  @volatile protected var lastUploadedVersion = -1L   // -1 = nothing has been 
uploaded

Review Comment:
   I think that lastUploadedVersion is not thread-safe here. `@volatile` is 
only thread-safe when you have one writer thread, but both the maintenance and 
task thread can write this value. Can you make it an atomic instead?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala:
##########
@@ -270,4 +286,116 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
     assert(changelogVersionsPresent(dirForPartition0) == List(3L, 4L))
     assert(snapshotVersionsPresent(dirForPartition0).contains(5L))
   }
+
+  private def snapshotLagMetricName(partitionId: Long): String =
+    (SNAPSHOT_LAG_METRIC_PREFIX + partitionId)
+
+  testWithChangelogCheckpointingEnabled(
+    "SPARK-51097: Verify snapshot lag metric is updated correctly with 
RocksDBStateStoreProvider"
+  ) {
+    withSQLConf(
+      SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName,
+      SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
+      SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
+      SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+      SQLConf.STATE_STORE_PARTITION_METRICS_REPORT_LIMIT.key -> "5"
+    ) {
+      withTempDir { checkpointDir =>
+        val inputData = MemoryStream[String]
+        val result = inputData.toDS().dropDuplicates()
+
+        testStream(result, outputMode = OutputMode.Update)(
+          StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+          AddData(inputData, "a"),
+          ProcessAllAvailable(),
+          AddData(inputData, "b"),
+          ProcessAllAvailable(),
+          CheckNewAnswer("a", "b"),
+          Execute { q =>
+            // Make sure only smallest K active metrics are published
+            eventually(timeout(Span(10, Seconds))) {

Review Comment:
   nit: I think you can use `10.seconds`, just some syntactic sugar that looks 
pretty :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to