micheal-o commented on code in PR #52773:
URL: https://github.com/apache/spark/pull/52773#discussion_r2519694842


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala:
##########
@@ -120,6 +120,39 @@ class StateStoreCoordinatorSuite extends SparkFunSuite 
with SharedSparkContext {
     }
   }
 
+  test("SPARK-54063: reportActiveInstance return correct 
shouldForceSnapshotUpload") {
+      withCoordinatorAndSQLConf(
+        sc,
+        SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG.key -> 
"true",
+        SQLConf.STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG.key -> "false",
+        SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
+        SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+        SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+        SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG.key -> 
"true",

Review Comment:
   nit: this key is set twice



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -317,6 +357,10 @@ private class StateStoreCoordinator(
       stateStoreLatestUploadedSnapshot --= storeIdsToRemove
       // Remove the corresponding run id entries for report time and starting 
time
       lastFullSnapshotLagReportTimeMs -= runId
+      // Remove the corresponding run id entries for snapshot upload lagging 
stores
+      if (forceSnapshotUploadOnLag) {

Review Comment:
   We can do without checking if enabled right? We can just remove the runId. 
When not enabled, the map should be empty right



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala:
##########
@@ -235,6 +268,179 @@ class StateStoreCoordinatorSuite extends SparkFunSuite 
with SharedSparkContext {
     query
   }
 
+  faultyStateStoreProviders.foreach { case (providerName, providerClassName, 
badPartitions) =>
+    test(
+      s"SPARK-54063: Force trigger snapshot upload for $providerName when 
lagging"
+    ) {
+      withCoordinatorAndSQLConf(
+        sc,
+        SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+        SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
+        SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+        SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName,
+        RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled" -> "true",
+        SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG.key -> 
"true",
+        
SQLConf.STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG.key -> 
"2",
+        SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL.key -> 
"0",
+        SQLConf.STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG.key -> "true"
+      ) {
+        case (coordRef, spark) =>
+          import spark.implicits._
+          implicit val sparkSession: SparkSession = spark
+          val inputData = MemoryStream[Int]
+          val query = setUpStatefulQuery(inputData, "query")
+          val inputData2 = MemoryStream[Int]
+          val query2 = setUpStatefulQuery(inputData2, "query2")
+          // Add, commit, and wait multiple times to force snapshot versions 
and time difference
+          // we will detect state store with partition 0 and 1 to be lagged on 
version 5
+          // snapshotVersionOnLagDetected =
+          //   STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG *
+          //   STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT + 1
+          val snapshotVersionOnLagDetected = 2 * 1 + 1
+          (0 until snapshotVersionOnLagDetected).foreach { _ =>
+            inputData.addData(1, 2, 3)
+            query.processAllAvailable()
+            inputData2.addData(1, 2, 3)
+            query2.processAllAvailable()
+            Thread.sleep(500)
+          }
+          // Verify only the partitions in badPartitions doesn't have a 
snapshot
+          verifySnapshotUploadEvents(coordRef, query, badPartitions)
+          verifySnapshotUploadEvents(coordRef, query2, badPartitions)
+
+          def verifyShouldForceSnapshotOnBadPartitions(
+              checkpointDir: String,
+              runId: UUID,
+              shouldForce: Boolean,
+              expectedSnapshotVersion: Option[Int]): Unit = {
+            badPartitions.foreach { partitionId =>
+              val storeId = StateStoreId(
+                checkpointDir,
+                0,
+                partitionId,
+                StateStoreId.DEFAULT_STORE_NAME
+              )
+              val providerId = StateStoreProviderId(storeId, runId)
+              if (expectedSnapshotVersion.isDefined) {
+                val latestSnapshotVersion = 
coordRef.getLatestSnapshotVersionForTesting(providerId)
+                assert(latestSnapshotVersion.get == 
expectedSnapshotVersion.get)
+              }
+              val shouldForceSnapshot =
+                coordRef.checkIfShouldForceSnapshotUploadForTesting(providerId)
+              assert(shouldForceSnapshot == shouldForce)
+            }
+          }
+          val streamingQuery = 
query.asInstanceOf[StreamingQueryWrapper].streamingQuery
+          val stateCheckpointDir = 
streamingQuery.lastExecution.checkpointLocation
+          // Verify that shouldForceSnapshotUpload is true for bad partitions
+          // after the coordinator detects the lag
+          verifyShouldForceSnapshotOnBadPartitions(
+            stateCheckpointDir,
+            query.runId,
+            shouldForce = true,
+            expectedSnapshotVersion = None)
+          // The coordinator should detected that lagging stores now. So next
+          // commit should automatically trigger snapshot
+          inputData.addData(1, 2, 3)
+          query.processAllAvailable()
+          Thread.sleep(500)
+
+          // Verify that snapshot was created and shouldForceSnapshotUpload is 
now false
+          verifyShouldForceSnapshotOnBadPartitions(
+            stateCheckpointDir,
+            query.runId,
+            shouldForce = false,
+            expectedSnapshotVersion = Some(snapshotVersionOnLagDetected + 1)
+          )
+
+          val streamingQuery2 = 
query2.asInstanceOf[StreamingQueryWrapper].streamingQuery
+          val stateCheckpointDir2 = 
streamingQuery2.lastExecution.checkpointLocation
+
+          // verify that lagging stores in query2 are not impacted by query1 
catching up
+          verifyShouldForceSnapshotOnBadPartitions(
+            stateCheckpointDir2, query2.runId,
+            shouldForce = true,
+            expectedSnapshotVersion = None)
+
+          // Verify that the lagging stores are no longer marked as
+          // lagging because they are removed when stop() is called
+          query2.stop()
+          verifyShouldForceSnapshotOnBadPartitions(
+            stateCheckpointDir2, query2.runId,
+            shouldForce = false,
+            expectedSnapshotVersion = None)
+          query.stop()
+        }
+      }
+  }
+
+  test(
+      s"SPARK-54063: force snapshot propagate when upgrade read store to write 
store"
+    ) {
+      withCoordinatorAndSQLConf(
+        sc,
+        SQLConf.SHUFFLE_PARTITIONS.key -> "5",

Review Comment:
   nit: if this confs are the same with the ones above, you can just make them 
a common val/func



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to