micheal-o commented on code in PR #52773:
URL: https://github.com/apache/spark/pull/52773#discussion_r2519694842
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala:
##########
@@ -120,6 +120,39 @@ class StateStoreCoordinatorSuite extends SparkFunSuite
with SharedSparkContext {
}
}
+ test("SPARK-54063: reportActiveInstance return correct
shouldForceSnapshotUpload") {
+ withCoordinatorAndSQLConf(
+ sc,
+ SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG.key ->
"true",
+ SQLConf.STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG.key -> "false",
+ SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
+ SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+ SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+ SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG.key ->
"true",
Review Comment:
nit: this key is set twice
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -317,6 +357,10 @@ private class StateStoreCoordinator(
stateStoreLatestUploadedSnapshot --= storeIdsToRemove
// Remove the corresponding run id entries for report time and starting
time
lastFullSnapshotLagReportTimeMs -= runId
+ // Remove the corresponding run id entries for snapshot upload lagging
stores
+ if (forceSnapshotUploadOnLag) {
Review Comment:
We can do without checking if enabled right? We can just remove the runId.
When not enabled, the map should be empty right
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala:
##########
@@ -235,6 +268,179 @@ class StateStoreCoordinatorSuite extends SparkFunSuite
with SharedSparkContext {
query
}
+ faultyStateStoreProviders.foreach { case (providerName, providerClassName,
badPartitions) =>
+ test(
+ s"SPARK-54063: Force trigger snapshot upload for $providerName when
lagging"
+ ) {
+ withCoordinatorAndSQLConf(
+ sc,
+ SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+ SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
+ SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+ SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName,
+ RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
".changelogCheckpointing.enabled" -> "true",
+ SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG.key ->
"true",
+
SQLConf.STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG.key ->
"2",
+ SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL.key ->
"0",
+ SQLConf.STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG.key -> "true"
+ ) {
+ case (coordRef, spark) =>
+ import spark.implicits._
+ implicit val sparkSession: SparkSession = spark
+ val inputData = MemoryStream[Int]
+ val query = setUpStatefulQuery(inputData, "query")
+ val inputData2 = MemoryStream[Int]
+ val query2 = setUpStatefulQuery(inputData2, "query2")
+ // Add, commit, and wait multiple times to force snapshot versions
and time difference
+ // we will detect state store with partition 0 and 1 to be lagged on
version 5
+ // snapshotVersionOnLagDetected =
+ // STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG *
+ // STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT + 1
+ val snapshotVersionOnLagDetected = 2 * 1 + 1
+ (0 until snapshotVersionOnLagDetected).foreach { _ =>
+ inputData.addData(1, 2, 3)
+ query.processAllAvailable()
+ inputData2.addData(1, 2, 3)
+ query2.processAllAvailable()
+ Thread.sleep(500)
+ }
+ // Verify only the partitions in badPartitions doesn't have a
snapshot
+ verifySnapshotUploadEvents(coordRef, query, badPartitions)
+ verifySnapshotUploadEvents(coordRef, query2, badPartitions)
+
+ def verifyShouldForceSnapshotOnBadPartitions(
+ checkpointDir: String,
+ runId: UUID,
+ shouldForce: Boolean,
+ expectedSnapshotVersion: Option[Int]): Unit = {
+ badPartitions.foreach { partitionId =>
+ val storeId = StateStoreId(
+ checkpointDir,
+ 0,
+ partitionId,
+ StateStoreId.DEFAULT_STORE_NAME
+ )
+ val providerId = StateStoreProviderId(storeId, runId)
+ if (expectedSnapshotVersion.isDefined) {
+ val latestSnapshotVersion =
coordRef.getLatestSnapshotVersionForTesting(providerId)
+ assert(latestSnapshotVersion.get ==
expectedSnapshotVersion.get)
+ }
+ val shouldForceSnapshot =
+ coordRef.checkIfShouldForceSnapshotUploadForTesting(providerId)
+ assert(shouldForceSnapshot == shouldForce)
+ }
+ }
+ val streamingQuery =
query.asInstanceOf[StreamingQueryWrapper].streamingQuery
+ val stateCheckpointDir =
streamingQuery.lastExecution.checkpointLocation
+ // Verify that shouldForceSnapshotUpload is true for bad partitions
+ // after the coordinator detects the lag
+ verifyShouldForceSnapshotOnBadPartitions(
+ stateCheckpointDir,
+ query.runId,
+ shouldForce = true,
+ expectedSnapshotVersion = None)
+ // The coordinator should detected that lagging stores now. So next
+ // commit should automatically trigger snapshot
+ inputData.addData(1, 2, 3)
+ query.processAllAvailable()
+ Thread.sleep(500)
+
+ // Verify that snapshot was created and shouldForceSnapshotUpload is
now false
+ verifyShouldForceSnapshotOnBadPartitions(
+ stateCheckpointDir,
+ query.runId,
+ shouldForce = false,
+ expectedSnapshotVersion = Some(snapshotVersionOnLagDetected + 1)
+ )
+
+ val streamingQuery2 =
query2.asInstanceOf[StreamingQueryWrapper].streamingQuery
+ val stateCheckpointDir2 =
streamingQuery2.lastExecution.checkpointLocation
+
+ // verify that lagging stores in query2 are not impacted by query1
catching up
+ verifyShouldForceSnapshotOnBadPartitions(
+ stateCheckpointDir2, query2.runId,
+ shouldForce = true,
+ expectedSnapshotVersion = None)
+
+ // Verify that the lagging stores are no longer marked as
+ // lagging because they are removed when stop() is called
+ query2.stop()
+ verifyShouldForceSnapshotOnBadPartitions(
+ stateCheckpointDir2, query2.runId,
+ shouldForce = false,
+ expectedSnapshotVersion = None)
+ query.stop()
+ }
+ }
+ }
+
+ test(
+ s"SPARK-54063: force snapshot propagate when upgrade read store to write
store"
+ ) {
+ withCoordinatorAndSQLConf(
+ sc,
+ SQLConf.SHUFFLE_PARTITIONS.key -> "5",
Review Comment:
nit: if this confs are the same with the ones above, you can just make them
a common val/func
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]