micheal-o commented on code in PR #52773:
URL: https://github.com/apache/spark/pull/52773#discussion_r2501401133
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1440,13 +1440,17 @@ class RocksDB(
* - Create a RocksDB checkpoint in a new local dir
* - Sync the checkpoint dir files to DFS
*/
- def commit(): (Long, StateStoreCheckpointInfo) = {
+ def commit(forceSnapshotOnCommit: Boolean = false): (Long,
StateStoreCheckpointInfo) = {
Review Comment:
nit: this function name is `commit`. So it is fine to just call this param
`forceSnapshot` here
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -530,9 +540,13 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
private def commitUpdates(
newVersion: Long,
map: HDFSBackedStateStoreMap,
- output: DataOutputStream): Unit = {
+ output: DataOutputStream,
+ shouldForceSnapshotOnCommit: Boolean = false): Unit = {
Review Comment:
nit: this function name is `commit...`. So it is fine to just call this
param `shouldForceSnapshot` here
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala:
##########
@@ -235,6 +235,113 @@ class StateStoreCoordinatorSuite extends SparkFunSuite
with SharedSparkContext {
query
}
+ faultyStateStoreProviders.foreach { case (providerName, providerClassName,
badPartitions) =>
+ test(
+ s"SPARK-54063: Force trigger snapshot upload for $providerName when
lagging"
+ ) {
+ withCoordinatorAndSQLConf(
+ sc,
+ SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+ SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
+ SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
+ SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName,
+ RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
".changelogCheckpointing.enabled" -> "true",
+ SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG.key ->
"true",
+
SQLConf.STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG.key ->
"2",
+ SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL.key ->
"0",
+ SQLConf.STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG.key -> "true"
+ ) {
+ case (coordRef, spark) =>
+ import spark.implicits._
+ implicit val sparkSession: SparkSession = spark
+ val inputData = MemoryStream[Int]
+ val query = setUpStatefulQuery(inputData, "query")
+ val inputData2 = MemoryStream[Int]
+ val query2 = setUpStatefulQuery(inputData2, "query2")
+ // Add, commit, and wait multiple times to force snapshot versions
and time difference
+ // we will detect state store with partition 0 and 1 to be lagged on
version 5
+ // snapshotVersionOnLagDetected =
+ // STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG *
+ // STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT + 1
+ val snapshotVersionOnLagDetected = 2 * 1 + 1
+ (0 until snapshotVersionOnLagDetected).foreach { _ =>
+ inputData.addData(1, 2, 3)
+ query.processAllAvailable()
+ inputData2.addData(1, 2, 3)
+ query2.processAllAvailable()
+ Thread.sleep(500)
+ }
+ // Verify only the partitions in badPartitions doesn't have a
snapshot
+ verifySnapshotUploadEvents(coordRef, query, badPartitions)
+ verifySnapshotUploadEvents(coordRef, query2, badPartitions)
+
+ // The coordinator should detected that lagging stores now. So next
+ // commit should automatically trigger snapshot
Review Comment:
should we verify that should force is now true here for the bad partitions?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2695,6 +2695,18 @@ object SQLConf {
.checkValue(k => k >= 0, "Must be greater than or equal to 0")
.createWithDefault(if (Utils.isTesting) 1 else 0)
+ val STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG =
+ buildConf("spark.sql.streaming.stateStore.forceSnapshotUploadOnLag")
+ .internal()
+ .doc(
+ "When enabled, state stores with lagging snapshot uploads will
automatically trigger " +
+ "a snapshot on the next commit. Requires
spark.sql.streaming.stateStore.coordinator" +
+ "ReportSnapshotUploadLag to be true."
+ )
+ .version("4.1.0")
Review Comment:
nit: `4.2.0`
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -1207,9 +1208,9 @@ object StateStore extends Logging {
if (version < 0) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
}
- val storeProvider = getStateStoreProvider(storeProviderId, keySchema,
valueSchema,
- keyStateEncoderSpec, useColumnFamilies, storeConf, hadoopConf,
useMultipleValuesPerKey,
- stateSchemaBroadcast)
+ val (storeProvider, _) = getStateStoreProvider(storeProviderId,
+ keySchema, valueSchema, keyStateEncoderSpec, useColumnFamilies,
storeConf, hadoopConf,
+ useMultipleValuesPerKey, stateSchemaBroadcast)
storeProvider.upgradeReadStoreToWriteStore(readStore, version,
stateStoreCkptId)
Review Comment:
As mentioned above, we should pass in forceSnapshot for this too. This is
the case where we are reusing store.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -756,7 +760,7 @@ private[sql] class RocksDBStateStoreProvider
case None =>
// Create new store instance. The stamp should be defined
// in this case
- new RocksDBStateStore(version, stamp.get, readOnly)
+ new RocksDBStateStore(version, stamp.get, readOnly,
forceSnapshotOnCommit)
Review Comment:
Lets also set `forceSnapshotOnCommit` for the case above, where we are
reusing store. We reuse store for aggregation, if we don't set this, then we
would never force snapshot for aggregation queries
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -275,6 +289,9 @@ private class StateStoreCoordinator(
private def shouldCoordinatorReportSnapshotLag: Boolean =
sqlConf.stateStoreCoordinatorReportSnapshotUploadLag
+ private def stateStoreForceSnapshotUploadOnLag: Boolean =
Review Comment:
nit: remove `stateStore` prefix for this func. Not needed here
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -1586,20 +1595,26 @@ object StateStore extends Logging {
private def reportActiveStoreInstance(
storeProviderId: StateStoreProviderId,
- otherProviderIds: Seq[StateStoreProviderId]): Seq[StateStoreProviderId]
= {
+ otherProviderIds: Seq[StateStoreProviderId]):
ReportActiveInstanceResponse = {
if (SparkEnv.get != null) {
val host = SparkEnv.get.blockManager.blockManagerId.host
val executorId = SparkEnv.get.blockManager.blockManagerId.executorId
- val providerIdsToUnload = coordinatorRef
+ val storeStatus = coordinatorRef
.map(_.reportActiveInstance(storeProviderId, host, executorId,
otherProviderIds))
- .getOrElse(Seq.empty[StateStoreProviderId])
+ .getOrElse(ReportActiveInstanceResponse(
+ shouldForceSnapshotUpload = false,
+ providerIdsToUnload = Seq.empty[StateStoreProviderId]))
logInfo(log"Reported that the loaded instance " +
log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, storeProviderId)} is
active")
logDebug(log"The loaded instances are going to unload: " +
- log"${MDC(LogKeys.STATE_STORE_PROVIDER_IDS, providerIdsToUnload)}")
- providerIdsToUnload
+ log"${MDC(LogKeys.STATE_STORE_PROVIDER_IDS,
storeStatus.providerIdsToUnload)}" +
+ log", shouldForceSnapshotUpload=" +
Review Comment:
nit: shouldn't this be with the logInfo above
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -329,6 +356,9 @@ private class StateStoreCoordinator(
if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version
>= version)) {
stateStoreLatestUploadedSnapshot.put(providerId,
SnapshotUploadEvent(version, timestamp))
}
+ if (stateStoreForceSnapshotUploadOnLag) {
Review Comment:
should we move this into the block above? We should only remove it, if it is
a newer snapshot version
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]