micheal-o commented on code in PR #52773:
URL: https://github.com/apache/spark/pull/52773#discussion_r2482477697
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2625,6 +2625,18 @@ object SQLConf {
.checkValue(k => k >= 0, "Must be greater than or equal to 0")
.createWithDefault(5)
+ val STATE_STORE_AUTO_SNAPSHOT_FOR_LAGGING_STORES_ENABLED =
Review Comment:
nit: STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2625,6 +2625,18 @@ object SQLConf {
.checkValue(k => k >= 0, "Must be greater than or equal to 0")
.createWithDefault(5)
+ val STATE_STORE_AUTO_SNAPSHOT_FOR_LAGGING_STORES_ENABLED =
+
buildConf("spark.sql.streaming.stateStore.autoSnapshotForLaggingStoresEnabled")
+ .internal()
+ .doc(
+ "When enabled, the state store coordinator will store a list of state
stores that are " +
+ "lagging behind in snapshot uploads. The provider will use this
information to decide " +
+ "whether to force snapshot creation on commit when creating a new
store."
Review Comment:
You can simplify this description. This will be in user doc. Also mention
that it requires `STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG` to be
enabled.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -1193,11 +1206,12 @@ object StateStore extends Logging {
// to track which executor has which provider
if (!storeConf.unloadOnCommit) {
val otherProviderIds = loadedProviders.keys.filter(_ !=
storeProviderId).toSeq
- val providerIdsToUnload = reportActiveStoreInstance(storeProviderId,
otherProviderIds)
+ val storeStatus = reportActiveStoreInstance(storeProviderId,
otherProviderIds)
+
provider.setShouldForceSnapshotOnCommit(storeStatus.shouldForceSnapshotUpload)
Review Comment:
as mentioned above, lets not set it as provider field. The provider can be
used by multiple threads/task instance. And this setting/unsetting on the
provider can lead to unwanted behavior.
Return the flag and pass it into the specific state store instance created.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -299,7 +303,8 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
val newMap = getLoadedMapForStore(version)
logInfo(log"Retrieved version ${MDC(LogKeys.STATE_STORE_VERSION, version)}
" +
log"of ${MDC(LogKeys.STATE_STORE_PROVIDER,
HDFSBackedStateStoreProvider.this)} for update")
- new HDFSBackedStateStore(version, newMap)
+ logInfo(s"shouldForceSnapshotOnCommit=${getShouldForceSnapshotOnCommit}")
Review Comment:
nit: just add this to the log above, to avoid separate log message
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -574,6 +574,19 @@ case class RangeKeyScanStateEncoderSpec(
*/
trait StateStoreProvider {
+ // Track whether this state store instance is lagging behind in snapshot
uploads.
Review Comment:
Lets not set this as a member of the provider. The provider is long lived
across many batches and store instances. To avoid race or bug, where many
stores are now mistakenly uploading snapshots. This flag should be pass-through
to the specific State store instance created
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -653,13 +653,15 @@ private[sql] class RocksDBStateStoreProvider
* @param uniqueId Optional unique identifier for checkpoint
* @param readOnly Whether to open the store in read-only mode
* @param existingStore Optional existing store to reuse instead of creating
a new one
+ * @param forceSnapshotOnCommit Whether to force a snapshot upload on commit
Review Comment:
Lets also add a metrics for both hdfs and rocksdb, to show that we forced
snapshot. This will help us know the number of stores that snapshot was forced
for during the batch. So that it shows up in the streaming query progress
metrics.
[See](https://github.com/apache/spark/blob/6d5f111cc86a0194130d77510a4d739e32c875df/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala#L437)
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala:
##########
@@ -117,6 +117,10 @@ class StateStoreConf(
val reportSnapshotUploadLag: Boolean =
sqlConf.stateStoreCoordinatorReportSnapshotUploadLag
+ /** Whether to force snapshot when a state store is lagging behind in
snapshot uploads. */
+ val stateStoreAutoSnapshotForLaggingStoresEnabled: Boolean =
Review Comment:
nit: no need for `stateStore` prefix in the val name. The name of the class
is already `StateStoreConf`. Also fix val name based on config name suggestion
above
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -643,7 +643,8 @@ class RocksDB(
def load(
version: Long,
stateStoreCkptId: Option[String] = None,
- readOnly: Boolean = false): RocksDB = {
+ readOnly: Boolean = false,
+ forceSnapshotOnCommit: Boolean = false): RocksDB = {
Review Comment:
lets not pass this flag here. Lets pass it on `rocksdb.commit(...)` instead.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -116,6 +116,17 @@ private case class GetLaggingStoresForTesting(
private object StopCoordinator
extends StateStoreCoordinatorMessage
+/**
+ * Status information about state stores on this executor.
Review Comment:
should this be after the `ReportActiveInstance` message above.
Also fix description: "Response from coordinator for ReportActiveInstance."
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -339,6 +362,12 @@ private class StateStoreCoordinator(
val laggingStores =
findLaggingStores(queryRunId, latestVersion, currentTimestamp,
isTerminatingTrigger)
if (laggingStores.nonEmpty) {
+ if (shouldAutoSnapshotForLaggingStores) {
Review Comment:
This is a bug. We would only clear `snapshotUploadLaggingStores` if
`laggingStores.nonEmpty`. Hence lets say 5 stores were lagging, and then we
force snapshot and they are no longer lagging i.e laggingStores now 0, we will
not update the `snapshotUploadLaggingStores`. And will continue forcing
snapshots indefinitely.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -116,6 +116,17 @@ private case class GetLaggingStoresForTesting(
private object StopCoordinator
extends StateStoreCoordinatorMessage
+/**
+ * Status information about state stores on this executor.
+ * @param shouldForceSnapshotUpload Whether the current provider should force
a snapshot
+ * upload on next commit
+ * @param providerIdsToUnload The list of provider IDs that should be unloaded
from this executor.
+ */
+case class ReportActiveInstanceResponse(
Review Comment:
nit: indentation looks wrong
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -339,6 +362,12 @@ private class StateStoreCoordinator(
val laggingStores =
findLaggingStores(queryRunId, latestVersion, currentTimestamp,
isTerminatingTrigger)
if (laggingStores.nonEmpty) {
+ if (shouldAutoSnapshotForLaggingStores) {
+ // findLaggingStores will always get a complete list of lagging
stores from instances.
+ // We need to clear the set here to ensure any deactivated stores
are removed.
+ snapshotUploadLaggingStores.clear()
Review Comment:
`findLaggingStores` only returns lagging stores for the queryRunId. But we
are clearing for all queries here. As mentioned above, we need to track per
queryRunId and then only clear and update for that queryRunId.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -329,6 +349,9 @@ private class StateStoreCoordinator(
if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version
>= version)) {
stateStoreLatestUploadedSnapshot.put(providerId,
SnapshotUploadEvent(version, timestamp))
}
+ if (shouldAutoSnapshotForLaggingStores) {
+ snapshotUploadLaggingStores.remove(providerId)
Review Comment:
nit: indentation
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -255,6 +266,7 @@ private class StateStoreCoordinator(
val sqlConf: SQLConf)
extends ThreadSafeRpcEndpoint with Logging {
private val instances = new mutable.HashMap[StateStoreProviderId,
ExecutorCacheTaskLocation]
+ private val snapshotUploadLaggingStores = new
mutable.HashSet[StateStoreProviderId]
Review Comment:
You also need to clear this when the runId is deactivated i.e. query run is
terminated. see `DeactivateInstances`. Otherwise this map will continue growing
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala:
##########
@@ -208,7 +208,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with
SharedSparkContext {
if (badPartitions.contains(partitionId)) {
assert(latestSnapshotVersion.getOrElse(0) == 0)
} else {
- assert(latestSnapshotVersion.get >= 0)
+ assert(latestSnapshotVersion.get > 0)
Review Comment:
can you confirm this won't cause flakiness and we are definitely uploading
snapshot for the good partitions
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2625,6 +2625,18 @@ object SQLConf {
.checkValue(k => k >= 0, "Must be greater than or equal to 0")
.createWithDefault(5)
+ val STATE_STORE_AUTO_SNAPSHOT_FOR_LAGGING_STORES_ENABLED =
+
buildConf("spark.sql.streaming.stateStore.autoSnapshotForLaggingStoresEnabled")
+ .internal()
+ .doc(
+ "When enabled, the state store coordinator will store a list of state
stores that are " +
+ "lagging behind in snapshot uploads. The provider will use this
information to decide " +
+ "whether to force snapshot creation on commit when creating a new
store."
+ )
+ .version("4.1.0")
+ .booleanConf
+ .createWithDefault(false)
Review Comment:
When this is enabled, it requires
`STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG` to be enabled, so we need
to verify that it is enabled. It is enabled by default but it is possible for
user to turn it off.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -339,6 +362,12 @@ private class StateStoreCoordinator(
val laggingStores =
findLaggingStores(queryRunId, latestVersion, currentTimestamp,
isTerminatingTrigger)
if (laggingStores.nonEmpty) {
+ if (shouldAutoSnapshotForLaggingStores) {
+ // findLaggingStores will always get a complete list of lagging
stores from instances.
+ // We need to clear the set here to ensure any deactivated stores
are removed.
+ snapshotUploadLaggingStores.clear()
Review Comment:
Lets have a test case for multiple queries in session too. That would have
caught this.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -339,6 +362,12 @@ private class StateStoreCoordinator(
val laggingStores =
findLaggingStores(queryRunId, latestVersion, currentTimestamp,
isTerminatingTrigger)
if (laggingStores.nonEmpty) {
+ if (shouldAutoSnapshotForLaggingStores) {
Review Comment:
You need to update your test too, because it should be verifying this and
should have caught this.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala:
##########
@@ -235,6 +235,70 @@ class StateStoreCoordinatorSuite extends SparkFunSuite
with SharedSparkContext {
query
}
+ faultyStateStoreProviders.foreach { case (providerName, providerClassName,
badPartitions) =>
Review Comment:
We need more test cases. This isn't catching all the edge cases.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -1495,20 +1509,25 @@ object StateStore extends Logging {
private def reportActiveStoreInstance(
storeProviderId: StateStoreProviderId,
- otherProviderIds: Seq[StateStoreProviderId]): Seq[StateStoreProviderId]
= {
+ otherProviderIds: Seq[StateStoreProviderId]):
ReportActiveInstanceResponse = {
if (SparkEnv.get != null) {
val host = SparkEnv.get.blockManager.blockManagerId.host
val executorId = SparkEnv.get.blockManager.blockManagerId.executorId
- val providerIdsToUnload = coordinatorRef
+ val storeStatus = coordinatorRef
.map(_.reportActiveInstance(storeProviderId, host, executorId,
otherProviderIds))
- .getOrElse(Seq.empty[StateStoreProviderId])
+ .getOrElse(ReportActiveInstanceResponse(
+ shouldForceSnapshotUpload = false,
+ providerIdsToUnload = Seq.empty[StateStoreProviderId]))
logInfo(log"Reported that the loaded instance " +
log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, storeProviderId)} is
active")
logDebug(log"The loaded instances are going to unload: " +
- log"${MDC(LogKeys.STATE_STORE_PROVIDER_IDS, providerIdsToUnload)}")
- providerIdsToUnload
+ log"${MDC(LogKeys.STATE_STORE_PROVIDER_IDS,
storeStatus.providerIdsToUnload)}")
+
logDebug(s"shouldForceSnapshotOnCommit=${storeStatus.shouldForceSnapshotUpload}")
Review Comment:
no need for separate log line. Put it in the info log above that says the
provider is active
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -116,6 +116,17 @@ private case class GetLaggingStoresForTesting(
private object StopCoordinator
extends StateStoreCoordinatorMessage
+/**
+ * Status information about state stores on this executor.
+ * @param shouldForceSnapshotUpload Whether the current provider should force
a snapshot
+ * upload on next commit
+ * @param providerIdsToUnload The list of provider IDs that should be unloaded
from this executor.
Review Comment:
nit "... from the executor"
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -255,6 +266,7 @@ private class StateStoreCoordinator(
val sqlConf: SQLConf)
extends ThreadSafeRpcEndpoint with Logging {
private val instances = new mutable.HashMap[StateStoreProviderId,
ExecutorCacheTaskLocation]
+ private val snapshotUploadLaggingStores = new
mutable.HashSet[StateStoreProviderId]
Review Comment:
we need to track this per queryRunId. The coordinator is per session and
there can be multiple streaming queries in the session
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -2595,6 +2595,21 @@ class RocksDBStateStoreSuite extends
StateStoreSuiteBase[RocksDBStateStoreProvid
}
}
+ test("SPARK-54063: RocksDBStateStoreProvider should force snapshot on commit
when lagging") {
+ tryWithProviderResource(newStoreProvider()) { provider =>
+ provider.setShouldForceSnapshotOnCommit(true)
+ val store = provider.getStore(0)
+ put(store, "a", 0, 1)
+ store.commit()
+ // Indirectly verify by checking metrics set during uploading snapshot
+ val metricPair = store.metrics.customMetrics.find(
+ _._1.name == RocksDBStateStoreProvider.CUSTOM_METRIC_BYTES_COPIED.name)
Review Comment:
use the new force snapshot metric here instead
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -799,6 +799,20 @@ class StateStoreSuite extends
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
}
}
+ test("SPARK-54063: force snapshot when shouldForceSnapshotOnCommit is true")
{
Review Comment:
why duplicate this test case? Why not just put it in the
`StateStoreSuiteBase`
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -475,9 +480,14 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
private def commitUpdates(
newVersion: Long,
map: HDFSBackedStateStoreMap,
- output: DataOutputStream): Unit = {
+ output: DataOutputStream,
+ saveAsSnapshot: Boolean = false): Unit = {
synchronized {
- finalizeDeltaFile(output)
+ if (saveAsSnapshot) {
+ writeSnapshotFile(newVersion, map, "commit")
+ } else {
+ finalizeDeltaFile(output)
Review Comment:
yes, we should write delta file too. To remain consistent, since we expect
that there should always be a delta file even when there is a snapshot file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]