HeartSaVioR commented on code in PR #47490:
URL: https://github.com/apache/spark/pull/47490#discussion_r1749402345
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -1426,6 +1430,280 @@ class TransformWithStateSuite extends
StateStoreMetricsTest
}
}
}
+
+ private def getFiles(path: Path): Array[FileStatus] = {
+ val hadoopConf = spark.sessionState.newHadoopConf()
+ val fileManager = CheckpointFileManager.create(path, hadoopConf)
+ fileManager.list(path)
+ }
+
+ private def getStateSchemaPath(stateCheckpointPath: Path): Path = {
+ new Path(stateCheckpointPath, "_stateSchema/default/")
+ }
+
+ test("transformWithState - verify that metadata and schema logs are purged")
{
+ withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+ classOf[RocksDBStateStoreProvider].getName,
+ SQLConf.SHUFFLE_PARTITIONS.key ->
+ TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
+ SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") {
+ withTempDir { chkptDir =>
+ // in this test case, we are changing the state spec back and forth
+ // to trigger the writing of the schema and metadata files
+ val inputData = MemoryStream[(String, String)]
+ val result1 = inputData.toDS()
+ .groupByKey(x => x._1)
+ .transformWithState(new RunningCountMostRecentStatefulProcessor(),
+ TimeMode.None(),
+ OutputMode.Update())
+ testStream(result1, OutputMode.Update())(
+ StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+ AddData(inputData, ("a", "str1")),
+ CheckNewAnswer(("a", "1", "")),
+ Execute { q =>
+ eventually(timeout(Span(5, Seconds))) {
+ q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should
be(false)
+ }
+ },
+ StopStream
+ )
+ val result2 = inputData.toDS()
+ .groupByKey(x => x._1)
+ .transformWithState(new MostRecentStatefulProcessorWithDeletion(),
+ TimeMode.None(),
+ OutputMode.Update())
+ testStream(result2, OutputMode.Update())(
+ StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+ AddData(inputData, ("a", "str2")),
+ CheckNewAnswer(("a", "str1")),
+ Execute { q =>
+ eventually(timeout(Span(5, Seconds))) {
+ q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should
be(false)
+ }
+ },
+ StopStream
+ )
+ val result3 = inputData.toDS()
+ .groupByKey(x => x._1)
+ .transformWithState(new RunningCountMostRecentStatefulProcessor(),
+ TimeMode.None(),
+ OutputMode.Update())
+ testStream(result3, OutputMode.Update())(
+ StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+ AddData(inputData, ("a", "str3")),
+ CheckNewAnswer(("a", "1", "str2")),
+ Execute { q =>
+ eventually(timeout(Span(5, Seconds))) {
+ q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should
be(false)
+ }
+ },
+ StopStream
+ )
+ // because we don't change the schema for this run, there won't
+ // be a new schema file written.
+ testStream(result3, OutputMode.Update())(
+ StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+ AddData(inputData, ("a", "str4")),
+ CheckNewAnswer(("a", "2", "str3")),
+ Execute { q =>
+ eventually(timeout(Span(5, Seconds))) {
+ q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should
be(false)
+ }
+ },
+ StopStream
+ )
+ val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath,
"state"), "0")
+ val stateSchemaPath = getStateSchemaPath(stateOpIdPath)
+
+ val metadataPath =
OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
+ // by the end of the test, there have been 4 batches,
+ // so the metadata and schema logs, and commitLog has been purged
+ // for batches 0 and 1 so metadata and schema files exist for batches
0, 1, 2, 3
+ // and we only need to keep metadata files for batches 2, 3, and the
since schema
+ // hasn't changed between batches 2, 3, we only keep the schema file
for batch 2
+ assert(getFiles(metadataPath).length == 2)
+ assert(getFiles(stateSchemaPath).length == 1)
+ }
+ }
+ }
+
+ test("state data source integration - value state supports time travel") {
+ withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+ classOf[RocksDBStateStoreProvider].getName,
+ SQLConf.SHUFFLE_PARTITIONS.key ->
+ TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
+ SQLConf.MIN_BATCHES_TO_RETAIN.key -> "5") {
+ withTempDir { chkptDir =>
+ // in this test case, we are changing the state spec back and forth
+ // to trigger the writing of the schema and metadata files
+ val inputData = MemoryStream[(String, String)]
+ val result1 = inputData.toDS()
+ .groupByKey(x => x._1)
+ .transformWithState(new RunningCountMostRecentStatefulProcessor(),
+ TimeMode.None(),
+ OutputMode.Update())
+ testStream(result1, OutputMode.Update())(
+ StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+ AddData(inputData, ("a", "str1")),
+ CheckNewAnswer(("a", "1", "")),
+ AddData(inputData, ("a", "str1")),
+ CheckNewAnswer(("a", "2", "str1")),
+ AddData(inputData, ("a", "str1")),
+ CheckNewAnswer(("a", "3", "str1")),
+ AddData(inputData, ("a", "str1")),
+ CheckNewAnswer(("a", "4", "str1")),
+ AddData(inputData, ("a", "str1")),
+ CheckNewAnswer(("a", "5", "str1")),
+ AddData(inputData, ("a", "str1")),
+ CheckNewAnswer(("a", "6", "str1")),
+ AddData(inputData, ("a", "str1")),
+ CheckNewAnswer(("a", "7", "str1")),
+ Execute { q =>
+ eventually(timeout(Span(5, Seconds))) {
+ q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should
be(false)
+ }
+ },
+ StopStream
+ )
+ val result2 = inputData.toDS()
+ .groupByKey(x => x._1)
+ .transformWithState(new MostRecentStatefulProcessorWithDeletion(),
+ TimeMode.None(),
+ OutputMode.Update())
+ testStream(result2, OutputMode.Update())(
+ StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+ AddData(inputData, ("a", "str2")),
+ CheckNewAnswer(("a", "str1")),
+ AddData(inputData, ("a", "str3")),
+ CheckNewAnswer(("a", "str2")),
+ AddData(inputData, ("a", "str4")),
+ CheckNewAnswer(("a", "str3")),
+ Execute { q =>
+ eventually(timeout(Span(5, Seconds))) {
+ q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should
be(false)
+ }
+ },
+ StopStream
+ )
+
+ // Batches 0-7: countState, mostRecent
+ // Batches 8-9: countState
+
+ // By this time, offset and commit logs for batches 0-3 have been
purged.
Review Comment:
The comment is slightly misleading - state data source reader does not
support reading the version which commit log is no longer available. So reading
from batch 2 is no longer valid at this point.
Also probably good to comment shortly about "why" we need to preserve the
metadata and schema file which written batch is within criteria to purge. Here
we are intended to test is the behavior that purge does not remove the metadata
and schema file which is written at older batch (criteria to purge) but
"referenced" from "not-yet-purged" batch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]