HeartSaVioR commented on code in PR #47490:
URL: https://github.com/apache/spark/pull/47490#discussion_r1749399236


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -1426,6 +1430,280 @@ class TransformWithStateSuite extends 
StateStoreMetricsTest
       }
     }
   }
+
+  private def getFiles(path: Path): Array[FileStatus] = {
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    val fileManager = CheckpointFileManager.create(path, hadoopConf)
+    fileManager.list(path)
+  }
+
+  private def getStateSchemaPath(stateCheckpointPath: Path): Path = {
+    new Path(stateCheckpointPath, "_stateSchema/default/")
+  }
+
+  test("transformWithState - verify that metadata and schema logs are purged") 
{
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName,
+      SQLConf.SHUFFLE_PARTITIONS.key ->
+        TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
+      SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") {
+      withTempDir { chkptDir =>
+        // in this test case, we are changing the state spec back and forth
+        // to trigger the writing of the schema and metadata files
+        val inputData = MemoryStream[(String, String)]
+        val result1 = inputData.toDS()
+          .groupByKey(x => x._1)
+          .transformWithState(new RunningCountMostRecentStatefulProcessor(),
+            TimeMode.None(),
+            OutputMode.Update())
+        testStream(result1, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "1", "")),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+        val result2 = inputData.toDS()
+          .groupByKey(x => x._1)
+          .transformWithState(new MostRecentStatefulProcessorWithDeletion(),
+            TimeMode.None(),
+            OutputMode.Update())
+        testStream(result2, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("a", "str2")),
+          CheckNewAnswer(("a", "str1")),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+        val result3 = inputData.toDS()
+          .groupByKey(x => x._1)
+          .transformWithState(new RunningCountMostRecentStatefulProcessor(),
+            TimeMode.None(),
+            OutputMode.Update())
+        testStream(result3, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("a", "str3")),
+          CheckNewAnswer(("a", "1", "str2")),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+        // because we don't change the schema for this run, there won't
+        // be a new schema file written.
+        testStream(result3, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("a", "str4")),
+          CheckNewAnswer(("a", "2", "str3")),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+        val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, 
"state"), "0")
+        val stateSchemaPath = getStateSchemaPath(stateOpIdPath)
+
+        val metadataPath = 
OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
+        // by the end of the test, there have been 4 batches,
+        // so the metadata and schema logs, and commitLog has been purged
+        // for batches 0 and 1 so metadata and schema files exist for batches 
0, 1, 2, 3
+        // and we only need to keep metadata files for batches 2, 3, and the 
since schema
+        // hasn't changed between batches 2, 3, we only keep the schema file 
for batch 2
+        assert(getFiles(metadataPath).length == 2)
+        assert(getFiles(stateSchemaPath).length == 1)
+      }
+    }
+  }
+
+  test("state data source integration - value state supports time travel") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName,
+      SQLConf.SHUFFLE_PARTITIONS.key ->
+        TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
+      SQLConf.MIN_BATCHES_TO_RETAIN.key -> "5") {
+      withTempDir { chkptDir =>
+        // in this test case, we are changing the state spec back and forth
+        // to trigger the writing of the schema and metadata files
+        val inputData = MemoryStream[(String, String)]
+        val result1 = inputData.toDS()
+          .groupByKey(x => x._1)
+          .transformWithState(new RunningCountMostRecentStatefulProcessor(),
+            TimeMode.None(),
+            OutputMode.Update())
+        testStream(result1, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "1", "")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "2", "str1")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "3", "str1")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "4", "str1")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "5", "str1")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "6", "str1")),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "7", "str1")),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+        val result2 = inputData.toDS()
+          .groupByKey(x => x._1)
+          .transformWithState(new MostRecentStatefulProcessorWithDeletion(),
+            TimeMode.None(),
+            OutputMode.Update())
+        testStream(result2, OutputMode.Update())(
+          StartStream(checkpointLocation = chkptDir.getCanonicalPath),
+          AddData(inputData, ("a", "str2")),
+          CheckNewAnswer(("a", "str1")),
+          AddData(inputData, ("a", "str3")),
+          CheckNewAnswer(("a", "str2")),
+          AddData(inputData, ("a", "str4")),
+          CheckNewAnswer(("a", "str3")),
+          Execute { q =>
+            eventually(timeout(Span(5, Seconds))) {
+              q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should 
be(false)
+            }
+          },
+          StopStream
+        )
+
+        // Batches 0-7: countState, mostRecent
+        // Batches 8-9: countState
+
+        // By this time, offset and commit logs for batches 0-3 have been 
purged.
+        // However, if we want to read the data for batch 2, we need to have 
preserved the

Review Comment:
   ```suggestion
           // However, if we want to read the data for batch 4, we need to have 
preserved the
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to