HeartSaVioR commented on code in PR #47490:
URL: https://github.com/apache/spark/pull/47490#discussion_r1749399236
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -1426,6 +1430,280 @@ class TransformWithStateSuite extends
StateStoreMetricsTest
}
}
}
+
+ private def getFiles(path: Path): Array[FileStatus] = {
+ val hadoopConf = spark.sessionState.newHadoopConf()
+ val fileManager = CheckpointFileManager.create(path, hadoopConf)
+ fileManager.list(path)
+ }
+
+ private def getStateSchemaPath(stateCheckpointPath: Path): Path = {
+ new Path(stateCheckpointPath, "_stateSchema/default/")
+ }
+
+ test("transformWithState - verify that metadata and schema logs are purged")
{
+ withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+ classOf[RocksDBStateStoreProvider].getName,
+ SQLConf.SHUFFLE_PARTITIONS.key ->
+ TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
+ SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") {
+ withTempDir { chkptDir =>
+ // in this test case, we are changing the state spec back and forth
+ // to trigger the writing of the schema and metadata files
+ val inputData = MemoryStream[(String, String)]
+ val result1 = inputData.toDS()
+ .groupByKey(x => x._1)
+ .transformWithState(new RunningCountMostRecentStatefulProcessor(),
+ TimeMode.None(),
+ OutputMode.Update())
+ testStream(result1, OutputMode.Update())(
+ StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+ AddData(inputData, ("a", "str1")),
+ CheckNewAnswer(("a", "1", "")),
+ Execute { q =>
+ eventually(timeout(Span(5, Seconds))) {
+ q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should
be(false)
+ }
+ },
+ StopStream
+ )
+ val result2 = inputData.toDS()
+ .groupByKey(x => x._1)
+ .transformWithState(new MostRecentStatefulProcessorWithDeletion(),
+ TimeMode.None(),
+ OutputMode.Update())
+ testStream(result2, OutputMode.Update())(
+ StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+ AddData(inputData, ("a", "str2")),
+ CheckNewAnswer(("a", "str1")),
+ Execute { q =>
+ eventually(timeout(Span(5, Seconds))) {
+ q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should
be(false)
+ }
+ },
+ StopStream
+ )
+ val result3 = inputData.toDS()
+ .groupByKey(x => x._1)
+ .transformWithState(new RunningCountMostRecentStatefulProcessor(),
+ TimeMode.None(),
+ OutputMode.Update())
+ testStream(result3, OutputMode.Update())(
+ StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+ AddData(inputData, ("a", "str3")),
+ CheckNewAnswer(("a", "1", "str2")),
+ Execute { q =>
+ eventually(timeout(Span(5, Seconds))) {
+ q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should
be(false)
+ }
+ },
+ StopStream
+ )
+ // because we don't change the schema for this run, there won't
+ // be a new schema file written.
+ testStream(result3, OutputMode.Update())(
+ StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+ AddData(inputData, ("a", "str4")),
+ CheckNewAnswer(("a", "2", "str3")),
+ Execute { q =>
+ eventually(timeout(Span(5, Seconds))) {
+ q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should
be(false)
+ }
+ },
+ StopStream
+ )
+ val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath,
"state"), "0")
+ val stateSchemaPath = getStateSchemaPath(stateOpIdPath)
+
+ val metadataPath =
OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
+ // by the end of the test, there have been 4 batches,
+ // so the metadata and schema logs, and commitLog has been purged
+ // for batches 0 and 1 so metadata and schema files exist for batches
0, 1, 2, 3
+ // and we only need to keep metadata files for batches 2, 3, and the
since schema
+ // hasn't changed between batches 2, 3, we only keep the schema file
for batch 2
+ assert(getFiles(metadataPath).length == 2)
+ assert(getFiles(stateSchemaPath).length == 1)
+ }
+ }
+ }
+
+ test("state data source integration - value state supports time travel") {
+ withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+ classOf[RocksDBStateStoreProvider].getName,
+ SQLConf.SHUFFLE_PARTITIONS.key ->
+ TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
+ SQLConf.MIN_BATCHES_TO_RETAIN.key -> "5") {
+ withTempDir { chkptDir =>
+ // in this test case, we are changing the state spec back and forth
+ // to trigger the writing of the schema and metadata files
+ val inputData = MemoryStream[(String, String)]
+ val result1 = inputData.toDS()
+ .groupByKey(x => x._1)
+ .transformWithState(new RunningCountMostRecentStatefulProcessor(),
+ TimeMode.None(),
+ OutputMode.Update())
+ testStream(result1, OutputMode.Update())(
+ StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+ AddData(inputData, ("a", "str1")),
+ CheckNewAnswer(("a", "1", "")),
+ AddData(inputData, ("a", "str1")),
+ CheckNewAnswer(("a", "2", "str1")),
+ AddData(inputData, ("a", "str1")),
+ CheckNewAnswer(("a", "3", "str1")),
+ AddData(inputData, ("a", "str1")),
+ CheckNewAnswer(("a", "4", "str1")),
+ AddData(inputData, ("a", "str1")),
+ CheckNewAnswer(("a", "5", "str1")),
+ AddData(inputData, ("a", "str1")),
+ CheckNewAnswer(("a", "6", "str1")),
+ AddData(inputData, ("a", "str1")),
+ CheckNewAnswer(("a", "7", "str1")),
+ Execute { q =>
+ eventually(timeout(Span(5, Seconds))) {
+ q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should
be(false)
+ }
+ },
+ StopStream
+ )
+ val result2 = inputData.toDS()
+ .groupByKey(x => x._1)
+ .transformWithState(new MostRecentStatefulProcessorWithDeletion(),
+ TimeMode.None(),
+ OutputMode.Update())
+ testStream(result2, OutputMode.Update())(
+ StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+ AddData(inputData, ("a", "str2")),
+ CheckNewAnswer(("a", "str1")),
+ AddData(inputData, ("a", "str3")),
+ CheckNewAnswer(("a", "str2")),
+ AddData(inputData, ("a", "str4")),
+ CheckNewAnswer(("a", "str3")),
+ Execute { q =>
+ eventually(timeout(Span(5, Seconds))) {
+ q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should
be(false)
+ }
+ },
+ StopStream
+ )
+
+ // Batches 0-7: countState, mostRecent
+ // Batches 8-9: countState
+
+ // By this time, offset and commit logs for batches 0-3 have been
purged.
+ // However, if we want to read the data for batch 2, we need to have
preserved the
Review Comment:
```suggestion
// However, if we want to read the data for batch 4, we need to have
preserved the
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]