ericm-db commented on code in PR #47490:
URL: https://github.com/apache/spark/pull/47490#discussion_r1746360875


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -1426,6 +1426,130 @@ class TransformWithStateSuite extends 
StateStoreMetricsTest
       }
     }
   }
+
+  private def getFiles(path: Path): Array[FileStatus] = {
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    val fileManager = CheckpointFileManager.create(path, hadoopConf)
+    fileManager.list(path)
+  }
+
+  private def getStateSchemaPath(stateCheckpointPath: Path): Path = {
+    new Path(stateCheckpointPath, "_stateSchema/default/")
+  }
+
+  test("transformWithState - verify that metadata and schema logs are purged") 
{
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName,
+      SQLConf.SHUFFLE_PARTITIONS.key ->
+        TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
+      SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") {
+      withTempDir { chkptDir =>
+        val inputData = MemoryStream[(String, String)]
+        val result1 = inputData.toDS()
+          .groupByKey(x => x._1)
+          .transformWithState(new RunningCountMostRecentStatefulProcessor(),
+            TimeMode.None(),
+            OutputMode.Update())
+        testStream(result1, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "1", "")),
+          StopStream
+        )
+        val result2 = inputData.toDS()
+          .groupByKey(x => x._1)
+          .transformWithState(new MostRecentStatefulProcessorWithDeletion(),
+            TimeMode.None(),
+            OutputMode.Update())
+        testStream(result2, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("a", "str2")),
+          CheckNewAnswer(("a", "str1")),
+          StopStream
+        )
+        val result3 = inputData.toDS()
+          .groupByKey(x => x._1)
+          .transformWithState(new RunningCountMostRecentStatefulProcessor(),
+            TimeMode.None(),
+            OutputMode.Update())
+        testStream(result3, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("a", "str3")),
+          CheckNewAnswer(("a", "1", "str2")),
+          StopStream
+        )
+        val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, 
"state"), "0")
+        val stateSchemaPath = getStateSchemaPath(stateOpIdPath)
+
+        val metadataPath = 
OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
+        // by the end of the test, there have been 3 batches,
+        // so the metadata and schema logs, and commitLog has been purged
+        // for batch 0, so metadata and schema files should only exist for
+        // batches 1 and 2
+        assert(getFiles(metadataPath).length == 2)
+        assert(getFiles(stateSchemaPath).length == 2)
+      }
+    }
+  }
+
+  test("transformWithState - verify that not all metadata and schema logs are 
purged") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName,
+      SQLConf.SHUFFLE_PARTITIONS.key ->
+        TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
+      SQLConf.MIN_BATCHES_TO_RETAIN.key -> "3") {
+      withTempDir { chkptDir =>
+        val inputData = MemoryStream[(String, String)]
+        val result1 = inputData.toDS()
+          .groupByKey(x => x._1)
+          .transformWithState(new RunningCountMostRecentStatefulProcessor(),
+            TimeMode.None(),
+            OutputMode.Update())
+        testStream(result1, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "1", "")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "2", "str1")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "3", "str1")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "4", "str1")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "5", "str1")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "6", "str1")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "7", "str1")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "8", "str1")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "9", "str1")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "10", "str1")),
+          StopStream
+        )
+        testStream(result1, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "11", "str1")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "12", "str1")),
+          StopStream
+        )
+
+        val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, 
"state"), "0")

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to