HeartSaVioR commented on code in PR #47490:
URL: https://github.com/apache/spark/pull/47490#discussion_r1749402345


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -1426,6 +1430,280 @@ class TransformWithStateSuite extends 
StateStoreMetricsTest
       }
     }
   }
+
+  private def getFiles(path: Path): Array[FileStatus] = {
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    val fileManager = CheckpointFileManager.create(path, hadoopConf)
+    fileManager.list(path)
+  }
+
+  private def getStateSchemaPath(stateCheckpointPath: Path): Path = {
+    new Path(stateCheckpointPath, "_stateSchema/default/")
+  }
+
+  test("transformWithState - verify that metadata and schema logs are purged") 
{
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName,
+      SQLConf.SHUFFLE_PARTITIONS.key ->
+        TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
+      SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") {
+      withTempDir { chkptDir =>
+        // in this test case, we are changing the state spec back and forth
+        // to trigger the writing of the schema and metadata files
+        val inputData = MemoryStream[(String, String)]
+        val result1 = inputData.toDS()
+          .groupByKey(x => x._1)
+          .transformWithState(new RunningCountMostRecentStatefulProcessor(),
+            TimeMode.None(),
+            OutputMode.Update())
+        testStream(result1, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "1", "")),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+        val result2 = inputData.toDS()
+          .groupByKey(x => x._1)
+          .transformWithState(new MostRecentStatefulProcessorWithDeletion(),
+            TimeMode.None(),
+            OutputMode.Update())
+        testStream(result2, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("a", "str2")),
+          CheckNewAnswer(("a", "str1")),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+        val result3 = inputData.toDS()
+          .groupByKey(x => x._1)
+          .transformWithState(new RunningCountMostRecentStatefulProcessor(),
+            TimeMode.None(),
+            OutputMode.Update())
+        testStream(result3, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("a", "str3")),
+          CheckNewAnswer(("a", "1", "str2")),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+        // because we don't change the schema for this run, there won't
+        // be a new schema file written.
+        testStream(result3, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("a", "str4")),
+          CheckNewAnswer(("a", "2", "str3")),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+        val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, 
"state"), "0")
+        val stateSchemaPath = getStateSchemaPath(stateOpIdPath)
+
+        val metadataPath = 
OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
+        // by the end of the test, there have been 4 batches,
+        // so the metadata and schema logs, and commitLog has been purged
+        // for batches 0 and 1 so metadata and schema files exist for batches 
0, 1, 2, 3
+        // and we only need to keep metadata files for batches 2, 3, and the 
since schema
+        // hasn't changed between batches 2, 3, we only keep the schema file 
for batch 2
+        assert(getFiles(metadataPath).length == 2)
+        assert(getFiles(stateSchemaPath).length == 1)
+      }
+    }
+  }
+
+  test("state data source integration - value state supports time travel") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName,
+      SQLConf.SHUFFLE_PARTITIONS.key ->
+        TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
+      SQLConf.MIN_BATCHES_TO_RETAIN.key -> "5") {
+      withTempDir { chkptDir =>
+        // in this test case, we are changing the state spec back and forth
+        // to trigger the writing of the schema and metadata files
+        val inputData = MemoryStream[(String, String)]
+        val result1 = inputData.toDS()
+          .groupByKey(x => x._1)
+          .transformWithState(new RunningCountMostRecentStatefulProcessor(),
+            TimeMode.None(),
+            OutputMode.Update())
+        testStream(result1, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "1", "")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "2", "str1")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "3", "str1")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "4", "str1")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "5", "str1")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "6", "str1")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "7", "str1")),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+        val result2 = inputData.toDS()
+          .groupByKey(x => x._1)
+          .transformWithState(new MostRecentStatefulProcessorWithDeletion(),
+            TimeMode.None(),
+            OutputMode.Update())
+        testStream(result2, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("a", "str2")),
+          CheckNewAnswer(("a", "str1")),
+          AddData(inputData, ("a", "str3")),
+          CheckNewAnswer(("a", "str2")),
+          AddData(inputData, ("a", "str4")),
+          CheckNewAnswer(("a", "str3")),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+
+        // Batches 0-7: countState, mostRecent
+        // Batches 8-9: countState
+
+        // By this time, offset and commit logs for batches 0-3 have been 
purged.

Review Comment:
   The comment is slightly misleading - state data source reader does not 
support reading the version which commit log is no longer available. So reading 
from batch 2 is no longer valid at this point.
   
   Also probably good to comment shortly about "why" we need to preserve the 
metadata and schema file which written batch is within criteria to purge. Here 
we are intended to test is the behavior that purge does not remove the metadata 
and schema file which is written at older batch (criteria to purge) but 
"referenced" from "not-yet-purged" batch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to