chaoqin-li1123 commented on code in PR #41099:
URL: https://github.com/apache/spark/pull/41099#discussion_r1206244922
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala:
##########
@@ -956,3 +956,57 @@ class StreamingAggregationSuite extends
StateStoreMetricsTest with Assertions {
}
}
}
+
+class RocksDBStateStoreStreamingAggregationSuite
+ extends StreamingAggregationSuite with RocksDBStateStoreTest {
+ import testImplicits._
+ def snapshotVersionsPresent(dir: File): Seq[Long] = {
+ dir.listFiles.filter(_.getName.endsWith(".zip"))
+ .map(_.getName.stripSuffix(".zip"))
+ .map(_.toLong)
+ .sorted
+ }
+ def changelogVersionsPresent(dir: File): Seq[Long] = {
+ dir.listFiles.filter(_.getName.endsWith(".changelog"))
+ .map(_.getName.stripSuffix(".changelog"))
+ .map(_.toLong)
+ .sorted
+ }
+ test("Streaming aggregation RocksDB State Store backward compatibility.") {
+ val checkpointDir = Utils.createTempDir().getCanonicalFile
+ checkpointDir.delete()
+
+ val rocksDBStateDir = new File(checkpointDir.getAbsolutePath, "/state/0/0")
+ val inputData = MemoryStream[Int]
+ val aggregated =
+ inputData.toDF()
+ .groupBy($"value")
+ .agg(count("*"))
+ .as[(Int, Long)]
+
+ // Run the stream with changelog checkpointing disabled.
+ testStream(aggregated, Update)(
+ StartStream(checkpointLocation = checkpointDir.getAbsolutePath,
+ additionalConfs = Map(rocksdbChangelogCheckpointingConfKey ->
"false")),
+ AddData(inputData, 3),
+ CheckLastBatch((3, 1)),
+ AddData(inputData, 3, 2),
+ CheckLastBatch((3, 2), (2, 1)),
+ StopStream
+ )
+ assert(changelogVersionsPresent(rocksDBStateDir).isEmpty)
+ assert(snapshotVersionsPresent(rocksDBStateDir) == List(1L, 2L))
+
+ // Run the stream with changelog checkpointing enabled.
+ testStream(aggregated, Update)(
+ StartStream(checkpointLocation = checkpointDir.getAbsolutePath,
+ additionalConfs = Map(rocksdbChangelogCheckpointingConfKey -> "true")),
+ AddData(inputData, 3, 2, 1),
+ CheckLastBatch((3, 3), (2, 2), (1, 1)),
+ // By default we run in new tuple mode.
+ AddData(inputData, 4, 4, 4, 4),
+ CheckLastBatch((4, 4))
+ )
+ assert(changelogVersionsPresent(rocksDBStateDir) == List(3L, 4L))
Review Comment:
Added.
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala:
##########
@@ -956,3 +956,57 @@ class StreamingAggregationSuite extends
StateStoreMetricsTest with Assertions {
}
}
}
+
+class RocksDBStateStoreStreamingAggregationSuite
+ extends StreamingAggregationSuite with RocksDBStateStoreTest {
+ import testImplicits._
+ def snapshotVersionsPresent(dir: File): Seq[Long] = {
+ dir.listFiles.filter(_.getName.endsWith(".zip"))
+ .map(_.getName.stripSuffix(".zip"))
+ .map(_.toLong)
+ .sorted
+ }
+ def changelogVersionsPresent(dir: File): Seq[Long] = {
+ dir.listFiles.filter(_.getName.endsWith(".changelog"))
+ .map(_.getName.stripSuffix(".changelog"))
+ .map(_.toLong)
+ .sorted
+ }
+ test("Streaming aggregation RocksDB State Store backward compatibility.") {
+ val checkpointDir = Utils.createTempDir().getCanonicalFile
+ checkpointDir.delete()
+
+ val rocksDBStateDir = new File(checkpointDir.getAbsolutePath, "/state/0/0")
Review Comment:
Renamed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]