viirya commented on a change in pull request #31219:
URL: https://github.com/apache/spark/pull/31219#discussion_r559327573
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
##########
@@ -1003,12 +761,232 @@ abstract class StateStoreSuiteBase[ProviderClass <:
StateStoreProvider]
assert(rowsToSet(finalStore.iterator()) === Set(key -> 2))
}
+ test("maintenance") {
+ val conf = new SparkConf()
+ .setMaster("local")
+ .setAppName("test")
+ // Make sure that when SparkContext stops, the StateStore maintenance
thread 'quickly'
+ // fails to talk to the StateStoreCoordinator and unloads all the
StateStores
+ .set(RPC_NUM_RETRIES, 1)
+ val opId = 0
+ val dir1 = newDir()
+ val storeProviderId1 = StateStoreProviderId(StateStoreId(dir1, opId, 0),
UUID.randomUUID)
+ val dir2 = newDir()
+ val storeProviderId2 = StateStoreProviderId(StateStoreId(dir2, opId, 1),
UUID.randomUUID)
+ val sqlConf =
getDefaultSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get,
+ SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY.defaultValue.get)
+ sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 2)
+ // Make maintenance thread do snapshots and cleanups very fast
+ sqlConf.setConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL, 10L)
+ val storeConf = StateStoreConf(sqlConf)
+ val hadoopConf = new Configuration()
+ val provider = newStoreProvider(storeProviderId1.storeId)
+
+ var latestStoreVersion = 0
+
+ def generateStoreVersions(): Unit = {
+ for (i <- 1 to 20) {
+ val store = StateStore.get(storeProviderId1, keySchema, valueSchema,
None,
+ latestStoreVersion, storeConf, hadoopConf)
+ put(store, "a", i)
+ store.commit()
+ latestStoreVersion += 1
+ }
+ }
+
+ val timeoutDuration = 1.minute
+
+ quietly {
+ withSpark(new SparkContext(conf)) { sc =>
+ withCoordinatorRef(sc) { coordinatorRef =>
+ require(!StateStore.isMaintenanceRunning, "StateStore is
unexpectedly running")
+
+ // Generate sufficient versions of store for snapshots
+ generateStoreVersions()
+
+ eventually(timeout(timeoutDuration)) {
+ // Store should have been reported to the coordinator
+ assert(coordinatorRef.getLocation(storeProviderId1).nonEmpty,
+ "active instance was not reported")
+
+ // Background maintenance should clean up and generate snapshots
+ assert(StateStore.isMaintenanceRunning, "Maintenance task is not
running")
+
+ // Some snapshots should have been generated
+ val snapshotVersions = (1 to latestStoreVersion).filter { version
=>
+ fileExists(provider, version, isSnapshot = true)
+ }
+ assert(snapshotVersions.nonEmpty, "no snapshot file found")
+ }
+
+ // Generate more versions such that there is another snapshot and
+ // the earliest delta file will be cleaned up
+ generateStoreVersions()
+
+ // Earliest delta file should get cleaned up
+ eventually(timeout(timeoutDuration)) {
+ assert(!fileExists(provider, 1, isSnapshot = false), "earliest
file not deleted")
+ }
+
+ // If driver decides to deactivate all stores related to a query run,
+ // then this instance should be unloaded
+ coordinatorRef.deactivateInstances(storeProviderId1.queryRunId)
+ eventually(timeout(timeoutDuration)) {
+ assert(!StateStore.isLoaded(storeProviderId1))
+ }
+
+ // Reload the store and verify
+ StateStore.get(storeProviderId1, keySchema, valueSchema,
indexOrdinal = None,
+ latestStoreVersion, storeConf, hadoopConf)
+ assert(StateStore.isLoaded(storeProviderId1))
+
+ // If some other executor loads the store, then this instance should
be unloaded
+ coordinatorRef
+ .reportActiveInstance(storeProviderId1, "other-host",
"other-exec", Seq.empty)
+ eventually(timeout(timeoutDuration)) {
+ assert(!StateStore.isLoaded(storeProviderId1))
+ }
+
+ // Reload the store and verify
+ StateStore.get(storeProviderId1, keySchema, valueSchema,
indexOrdinal = None,
+ latestStoreVersion, storeConf, hadoopConf)
+ assert(StateStore.isLoaded(storeProviderId1))
+
+ // If some other executor loads the store, and when this executor
loads other store,
+ // then this executor should unload inactive instances immediately.
+ coordinatorRef
+ .reportActiveInstance(storeProviderId1, "other-host",
"other-exec", Seq.empty)
+ StateStore.get(storeProviderId2, keySchema, valueSchema,
indexOrdinal = None,
+ 0, storeConf, hadoopConf)
+ assert(!StateStore.isLoaded(storeProviderId1))
+ assert(StateStore.isLoaded(storeProviderId2))
+ }
+ }
+
+ // Verify if instance is unloaded if SparkContext is stopped
+ eventually(timeout(timeoutDuration)) {
+ require(SparkEnv.get === null)
+ assert(!StateStore.isLoaded(storeProviderId1))
+ assert(!StateStore.isLoaded(storeProviderId2))
+ assert(!StateStore.isMaintenanceRunning)
+ }
+ }
+ }
+
+ test("StateStore.get") {
+ quietly {
+ val dir = newDir()
+ val storeId = StateStoreProviderId(StateStoreId(dir, 0, 0),
UUID.randomUUID)
+ val storeConf = getDefaultStoreConf
+ val hadoopConf = new Configuration()
+
+ // Verify that trying to get incorrect versions throw errors
+ intercept[IllegalArgumentException] {
+ StateStore.get(
+ storeId, keySchema, valueSchema, None, -1, storeConf, hadoopConf)
+ }
+ assert(!StateStore.isLoaded(storeId)) // version -1 should not attempt
to load the store
+
+ intercept[IllegalStateException] {
+ StateStore.get(
+ storeId, keySchema, valueSchema, None, 1, storeConf, hadoopConf)
+ }
+
+ // Increase version of the store and try to get again
+ val store0 = StateStore.get(
+ storeId, keySchema, valueSchema, None, 0, storeConf, hadoopConf)
+ assert(store0.version === 0)
+ put(store0, "a", 1)
+ store0.commit()
+
+ val store1 = StateStore.get(
+ storeId, keySchema, valueSchema, None, 1, storeConf, hadoopConf)
+ assert(StateStore.isLoaded(storeId))
+ assert(store1.version === 1)
+ assert(rowsToSet(store1.iterator()) === Set("a" -> 1))
+
+ // Verify that you can also load older version
+ val store0reloaded = StateStore.get(
+ storeId, keySchema, valueSchema, None, 0, storeConf, hadoopConf)
+ assert(store0reloaded.version === 0)
+ assert(rowsToSet(store0reloaded.iterator()) === Set.empty)
+
+ // Verify that you can remove the store and still reload and use it
+ StateStore.unload(storeId)
+ assert(!StateStore.isLoaded(storeId))
+
+ val store1reloaded = StateStore.get(
+ storeId, keySchema, valueSchema, None, 1, storeConf, hadoopConf)
+ assert(StateStore.isLoaded(storeId))
+ assert(store1reloaded.version === 1)
+ put(store1reloaded, "a", 2)
+ assert(store1reloaded.commit() === 2)
+ assert(rowsToSet(store1reloaded.iterator()) === Set("a" -> 2))
+ }
+ }
+
+ test("snapshotting") {
+ val provider = newStoreProvider(minDeltasForSnapshot = 5,
numOfVersToRetainInMemory = 2)
Review comment:
Yes, keeping the same.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]