brkyvz commented on code in PR #48460:
URL: https://github.com/apache/spark/pull/48460#discussion_r1915285425


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala:
##########
@@ -203,14 +210,299 @@ class RocksDBStateStoreCheckpointFormatV2Suite extends 
StreamTest
       withSQLConf(
         (SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName),
         (SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2"),
-        (SQLConf.SHUFFLE_PARTITIONS.key, "2")) {
+        (SQLConf.SHUFFLE_PARTITIONS.key, "2"),
+        (SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key, "2")

Review Comment:
   can you add a comment on why this is necessary?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala:
##########
@@ -305,6 +597,252 @@ class RocksDBStateStoreCheckpointFormatV2Suite extends 
StreamTest
     validateBaseCheckpointInfo()
   }
 
+  // Verify lineage for each partition across batches. Below should satisfy 
because
+  // these ids are stored in the following manner:
+  // stateStoreCkptIds: id3, id2, id1
+  // baseStateStoreCkptIds:  id2, id1, None
+  // Below checks [id2, id1] are the same,
+  // which is the lineage for this partition across batches
+  private def checkpointInfoLineageVerification(
+      pickedCheckpointInfoList: Iterable[StateStoreCheckpointInfo]): Unit = {
+
+    Seq(0, 1).foreach {
+      partitionId =>
+        val stateStoreCkptIds = pickedCheckpointInfoList
+          .filter(_.partitionId == partitionId).map(_.stateStoreCkptId)
+        val baseStateStoreCkptIds = pickedCheckpointInfoList
+          .filter(_.partitionId == partitionId).map(_.baseStateStoreCkptId)
+
+        assert(stateStoreCkptIds.drop(1).iterator
+          .sameElements(baseStateStoreCkptIds.dropRight(1)))
+    }
+  }
+
+  private def verifyCheckpointInfoFromCommitLog(
+      checkpointDir: File,
+      pickedCheckpointInfoList: Iterable[StateStoreCheckpointInfo]): Boolean = 
{
+    var ret: Boolean = true
+
+    // Verify the version->UniqueId mapping from StateStore from state store 
is the same as
+    // what from the commit log
+    val versionToUniqueIdFromStateStore = Seq(1, 2).map {
+      batchVersion =>
+        val res = pickedCheckpointInfoList
+          .filter(_.batchVersion == batchVersion).map(_.stateStoreCkptId.get)
+
+        // batch Id is batchVersion - 1
+        batchVersion - 1 -> res.toArray
+    }.toMap
+
+    val commitLogPath = new Path(
+      new Path(checkpointDir.getAbsolutePath), "commits").toString
+
+    val commitLog = new CommitLog(spark, commitLogPath)
+    val metadata_ = commitLog.get(Some(0), Some(1)).map(_._2)

Review Comment:
   uber nit: remove trailing `_`



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala:
##########
@@ -203,14 +210,299 @@ class RocksDBStateStoreCheckpointFormatV2Suite extends 
StreamTest
       withSQLConf(
         (SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName),
         (SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2"),
-        (SQLConf.SHUFFLE_PARTITIONS.key, "2")) {
+        (SQLConf.SHUFFLE_PARTITIONS.key, "2"),
+        (SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key, "2")
+      ) {
         testBody
       }
       // in case tests have any code that needs to execute after every test
       super.afterEach()
     }
   }
 
+  val changelogEnabled =
+    "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled" -> 
"true"
+  val changelogDisabled =
+    "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled" -> 
"false"
+  val ckptv2 = SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2"
+
+  case class checkpointV2CompatibilityTest(
+    configsForFirstRun: Seq[(String, String)],
+    configsForSecondRun: Seq[(String, String)]
+  ) {
+    override def toString: String = "first run - " +
+      s"changeLogEnabled: ${configsForFirstRun(0)._2}, ckpt ver: 
${configsForFirstRun(1)._2}" +
+      s" - second run - changeLogEnabled: ${configsForSecondRun(0)._2}, " +
+      s"ckpt ver: ${configsForSecondRun(1)._2}"
+  }
+
+  val testConfigSetups = Seq(
+    // Enable and disable changelog under ckpt v2
+    checkpointV2CompatibilityTest(Seq(changelogEnabled, ckptv2), 
Seq(changelogDisabled, ckptv2)),
+    checkpointV2CompatibilityTest(Seq(changelogDisabled, ckptv2), 
Seq(changelogEnabled, ckptv2))
+  )
+
+  testConfigSetups.foreach { testConfig =>
+    testWithRocksDBStateStore("checkpointFormatVersion2 Compatibility - simple 
agg - " +
+      s"$testConfig") {
+        withTempDir { checkpointDir =>
+          val inputData = MemoryStream[Int]
+          val aggregated =
+            inputData
+              .toDF()
+              .groupBy($"value")
+              .agg(count("*"))
+              .as[(Int, Long)]
+
+          withSQLConf(testConfig.configsForFirstRun: _*) {
+            testStream(aggregated, Update)(
+              StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+              AddData(inputData, 3),
+              CheckLastBatch((3, 1)),
+              AddData(inputData, 3, 2),
+              CheckLastBatch((3, 2), (2, 1)),
+              AddData(inputData, 10),
+              CheckLastBatch((10, 1)),
+              StopStream
+            )
+
+            testStream(aggregated, Update)(
+              StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+              AddData(inputData, 3, 2, 1),
+              CheckLastBatch((3, 3), (2, 2), (1, 1)),
+              // By default we run in new tuple mode.
+              AddData(inputData, 4, 4, 4, 4),
+              CheckLastBatch((4, 4)),
+              AddData(inputData, 5, 5),
+              CheckLastBatch((5, 2))
+            )
+          }
+
+          withSQLConf(testConfig.configsForSecondRun: _*) {
+            testStream(aggregated, Update)(
+              StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+              AddData(inputData, 4),
+              CheckLastBatch((4, 5))
+            )
+          }
+        }
+      }
+  }
+
+  testWithRocksDBStateStore("checkpointFormatVersion2 Compatibility - simple 
agg - " +
+    "v2 - changelog on -> off -> on") {
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[Int]
+      val aggregated =
+        inputData
+          .toDF()
+          .groupBy($"value")
+          .agg(count("*"))
+          .as[(Int, Long)]
+
+      withSQLConf(changelogEnabled, ckptv2) {
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inputData, 3),
+          CheckLastBatch((3, 1)),
+          AddData(inputData, 3, 2),
+          CheckLastBatch((3, 2), (2, 1)),
+          StopStream
+        )
+
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inputData, 3, 2, 1),
+          CheckLastBatch((3, 3), (2, 2), (1, 1)),
+          // By default we run in new tuple mode.
+          AddData(inputData, 4, 4, 4, 4),
+          CheckLastBatch((4, 4)),
+          AddData(inputData, 5, 5),
+          CheckLastBatch((5, 2)),
+          StopStream
+        )
+      }
+
+      withSQLConf(changelogDisabled, ckptv2) {
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inputData, 4),
+          CheckLastBatch((4, 5)),
+          AddData(inputData, 4),
+          CheckLastBatch((4, 6)),
+          StopStream
+        )
+
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inputData, 6, 7, 8),
+          CheckLastBatch((6, 1), (7, 1), (8, 1)),
+          AddData(inputData, 4, 4, 4, 4),
+          CheckLastBatch((4, 10)),
+          AddData(inputData, 5, 5),
+          CheckLastBatch((5, 4)),
+          StopStream
+        )
+      }
+
+      withSQLConf(changelogDisabled, ckptv2) {
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inputData, 3, 2, 1),
+          CheckLastBatch((3, 4), (2, 3), (1, 2)),
+          AddData(inputData, 4),
+          CheckLastBatch((4, 11)),
+          StopStream
+        )
+      }
+    }
+  }
+
+  testWithRocksDBStateStore("checkpointFormatVersion2 Compatibility - simple 
agg - " +
+    "v2 - changelog off -> on -> off") {
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[Int]
+      val aggregated =
+        inputData
+          .toDF()
+          .groupBy($"value")
+          .agg(count("*"))
+          .as[(Int, Long)]
+
+      withSQLConf(changelogDisabled, ckptv2) {
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inputData, 3),
+          CheckLastBatch((3, 1)),
+          AddData(inputData, 3, 2),
+          CheckLastBatch((3, 2), (2, 1)),
+          StopStream
+        )
+
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inputData, 3, 2, 1),
+          CheckLastBatch((3, 3), (2, 2), (1, 1)),
+          // By default we run in new tuple mode.
+          AddData(inputData, 4, 4, 4, 4),
+          CheckLastBatch((4, 4)),
+          AddData(inputData, 5, 5),
+          CheckLastBatch((5, 2)),
+          StopStream
+        )
+      }
+
+      withSQLConf(changelogEnabled, ckptv2) {
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inputData, 4),
+          CheckLastBatch((4, 5)),
+          AddData(inputData, 4),
+          CheckLastBatch((4, 6)),
+          StopStream
+        )
+
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inputData, 6, 7, 8),
+          CheckLastBatch((6, 1), (7, 1), (8, 1)),
+          AddData(inputData, 4, 4, 4, 4),
+          CheckLastBatch((4, 10)),
+          AddData(inputData, 5, 5),
+          CheckLastBatch((5, 4)),
+          StopStream
+        )
+      }
+
+      withSQLConf(changelogDisabled, ckptv2) {
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inputData, 3, 2, 1),
+          CheckLastBatch((3, 4), (2, 3), (1, 2)),
+          AddData(inputData, 4),
+          CheckLastBatch((4, 11)),
+          StopStream
+        )
+      }
+    }
+  }
+
+  testConfigSetups.foreach { testConfig =>
+    testWithRocksDBStateStore("checkpointFormatVersion2 Compatibility - " +
+      s"transformWithState - $testConfig") {
+        withTempDir { checkpointDir =>
+          val inputData = MemoryStream[String]
+          val result = inputData.toDS()
+            .groupByKey(x => x)
+            .transformWithState(new RunningCountStatefulProcessor(),
+              TimeMode.None(),
+              OutputMode.Update())
+
+          withSQLConf(testConfig.configsForFirstRun: _*) {
+            testStream(result, Update())(
+              StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+              AddData(inputData, "a"),
+              CheckNewAnswer(("a", "1")),
+              Execute { q =>
+                assert(q.lastProgress.stateOperators(0)
+                  .customMetrics.get("numValueStateVars") > 0)
+                assert(q.lastProgress.stateOperators(0)
+                  .customMetrics.get("numRegisteredTimers") == 0)
+              },
+              AddData(inputData, "a", "b"),
+              CheckNewAnswer(("a", "2"), ("b", "1")),
+              StopStream
+            )
+            testStream(result, Update())(
+              StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+              // should remove state for "a" and not return anything for a
+              AddData(inputData, "a", "b"),
+              CheckNewAnswer(("b", "2")),
+              StopStream
+            )
+          }
+
+          withSQLConf(testConfig.configsForSecondRun: _*) {
+            testStream(result, Update())(
+              StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+              // should recreate state for "a" and return count as 1 and
+              AddData(inputData, "a", "c"),
+              CheckNewAnswer(("a", "1"), ("c", "1")),
+              StopStream
+            )
+          }
+        }
+      }
+  }
+
+  test("checkpointFormatVersion2 validate ") {
+    val inputData = MemoryStream[String]
+    val result = inputData.toDS()
+      .groupByKey(x => x)
+      .transformWithState(new RunningCountStatefulProcessor(),
+        TimeMode.None(),
+        OutputMode.Update())
+
+    testStream(result, Update())(
+      AddData(inputData, "a"),
+      CheckNewAnswer(("a", "1")),
+      Execute { q =>
+        
assert(q.lastProgress.stateOperators(0).customMetrics.get("numValueStateVars") 
> 0)
+        
assert(q.lastProgress.stateOperators(0).customMetrics.get("numRegisteredTimers")
 == 0)
+      },
+      AddData(inputData, "a", "b"),
+      CheckNewAnswer(("a", "2"), ("b", "1")),
+      StopStream,
+      StartStream(),
+      AddData(inputData, "a", "b"), // should remove state for "a" and not 
return anything for a
+      CheckNewAnswer(("b", "2")),
+      StopStream,
+      StartStream(),
+      AddData(inputData, "a", "c"), // should recreate state for "a" and 
return count as 1 and
+      CheckNewAnswer(("a", "1"), ("c", "1"))
+    )
+  }

Review Comment:
   what's the benefit of this test?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala:
##########
@@ -305,6 +597,252 @@ class RocksDBStateStoreCheckpointFormatV2Suite extends 
StreamTest
     validateBaseCheckpointInfo()
   }
 
+  // Verify lineage for each partition across batches. Below should satisfy 
because
+  // these ids are stored in the following manner:
+  // stateStoreCkptIds: id3, id2, id1
+  // baseStateStoreCkptIds:  id2, id1, None
+  // Below checks [id2, id1] are the same,
+  // which is the lineage for this partition across batches

Review Comment:
   ultra nit: use scaladoc please
   ```
   /**
    * Verify lineage for each partition across batches. Below should satisfy 
because
    * ...
    */
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala:
##########
@@ -305,6 +597,252 @@ class RocksDBStateStoreCheckpointFormatV2Suite extends 
StreamTest
     validateBaseCheckpointInfo()
   }
 
+  // Verify lineage for each partition across batches. Below should satisfy 
because
+  // these ids are stored in the following manner:
+  // stateStoreCkptIds: id3, id2, id1
+  // baseStateStoreCkptIds:  id2, id1, None
+  // Below checks [id2, id1] are the same,
+  // which is the lineage for this partition across batches
+  private def checkpointInfoLineageVerification(
+      pickedCheckpointInfoList: Iterable[StateStoreCheckpointInfo]): Unit = {
+
+    Seq(0, 1).foreach {
+      partitionId =>
+        val stateStoreCkptIds = pickedCheckpointInfoList
+          .filter(_.partitionId == partitionId).map(_.stateStoreCkptId)
+        val baseStateStoreCkptIds = pickedCheckpointInfoList
+          .filter(_.partitionId == partitionId).map(_.baseStateStoreCkptId)
+
+        assert(stateStoreCkptIds.drop(1).iterator
+          .sameElements(baseStateStoreCkptIds.dropRight(1)))
+    }
+  }
+
+  private def verifyCheckpointInfoFromCommitLog(
+      checkpointDir: File,
+      pickedCheckpointInfoList: Iterable[StateStoreCheckpointInfo]): Boolean = 
{
+    var ret: Boolean = true
+
+    // Verify the version->UniqueId mapping from StateStore from state store 
is the same as
+    // what from the commit log
+    val versionToUniqueIdFromStateStore = Seq(1, 2).map {
+      batchVersion =>
+        val res = pickedCheckpointInfoList
+          .filter(_.batchVersion == batchVersion).map(_.stateStoreCkptId.get)
+
+        // batch Id is batchVersion - 1
+        batchVersion - 1 -> res.toArray
+    }.toMap
+
+    val commitLogPath = new Path(
+      new Path(checkpointDir.getAbsolutePath), "commits").toString
+
+    val commitLog = new CommitLog(spark, commitLogPath)
+    val metadata_ = commitLog.get(Some(0), Some(1)).map(_._2)
+
+    val versionToUniqueIdFromCommitLog = metadata_.zipWithIndex.map { case 
(metadata, idx) =>
+      // Use stateUniqueIds(0) because there is only one state operator
+      val res2 = metadata.stateUniqueIds(0).map { uniqueIds =>
+        uniqueIds(0)
+      }
+      idx -> res2
+    }.toMap
+
+    versionToUniqueIdFromCommitLog.foreach {
+      case (version, uniqueIds) =>
+        if 
(!versionToUniqueIdFromStateStore(version).sorted.sameElements(uniqueIds.sorted))
 {
+          ret = false
+          return ret
+        }
+    }
+    ret
+  }
+
+  // scalastyle:off line.size.limit
+  // When there is task retries or multiple jobs launched, there will be 
different
+  // sets of state store checkpointInfo because of lineage.
+  // e.g.
+  // [Picked] StateStoreCheckpointInfo[partitionId=1, batchVersion=2, 
stateStoreCkptId=Some(a9d5afec-0e8d-4473-b948-6c55513aa509), 
baseStateStoreCkptId=Some(061f7c53-b300-477a-a599-5387d55e315a)]
+  // [Picked] StateStoreCheckpointInfo[partitionId=0, batchVersion=2, 
stateStoreCkptId=Some(879cc517-6b85-4dae-abba-794bf2dbab82), 
baseStateStoreCkptId=Some(513726e7-2448-41a6-a874-92053c5cf86b)]
+  // StateStoreCheckpointInfo[partitionId=1, batchVersion=2, 
stateStoreCkptId=Some(7f4ad39f-d019-4ca2-8cf4-300379821cd6), 
baseStateStoreCkptId=Some(061f7c53-b300-477a-a599-5387d55e315a)]
+  // StateStoreCheckpointInfo[partitionId=0, batchVersion=2, 
stateStoreCkptId=Some(9dc215fe-54f9-4dc1-a59b-a8734f359e46), 
baseStateStoreCkptId=Some(513726e7-2448-41a6-a874-92053c5cf86b)]
+  // This function finds the correct one based on commit log
+  // scalastyle:on line.size.limit
+   private def pickCheckpointInfoFromCommitLog(
+        checkpointDir: File,
+        checkpointInfoList: Iterable[StateStoreCheckpointInfo]
+        ): Option[Iterable[StateStoreCheckpointInfo]] = {
+      val allPossibleCheckpointInfoList = checkpointInfoList
+        .groupBy(x => (x.partitionId, x.batchVersion)).values.transpose
+
+     var ret: Option[Iterable[StateStoreCheckpointInfo]] = None
+     for (elem <- allPossibleCheckpointInfoList) {
+       if (verifyCheckpointInfoFromCommitLog(checkpointDir, elem)) {
+         ret = Some(elem)
+         return ret
+       }
+     }
+     ret
+   }
+
+  // This test verifies when there are two job launched inside the
+  // foreachBatch function (write to multiple sinks).
+  // When this happens, the unique ids actually stored in the commit log 
should be the the same
+  // as those recorded by CkptIdCollectingStateStoreWrapper
+  testWithCheckpointInfoTracked(s"checkpointFormatVersion2 validate ID - two 
jobs launched") {
+    withTempDir { checkpointDir =>
+      withTable("wei_test_t1") {
+        withTable("wei_test_t2") {
+          val inputData = MemoryStream[Int]
+          val aggregated =
+            inputData
+              .toDF()
+              .groupBy($"value")
+              .agg(count("*"))
+
+          val writer = (ds: DataFrame, batchId: Long) => {
+            ds.write.mode("append").save("wei_test_t1")
+            ds.write.mode("append").save("wei_test_t2")

Review Comment:
   please write to a temp directory and delete them. These are currently not 
creating tables. Or use `saveAsTable`



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala:
##########
@@ -305,6 +597,252 @@ class RocksDBStateStoreCheckpointFormatV2Suite extends 
StreamTest
     validateBaseCheckpointInfo()
   }
 
+  // Verify lineage for each partition across batches. Below should satisfy 
because
+  // these ids are stored in the following manner:
+  // stateStoreCkptIds: id3, id2, id1
+  // baseStateStoreCkptIds:  id2, id1, None
+  // Below checks [id2, id1] are the same,
+  // which is the lineage for this partition across batches
+  private def checkpointInfoLineageVerification(
+      pickedCheckpointInfoList: Iterable[StateStoreCheckpointInfo]): Unit = {
+
+    Seq(0, 1).foreach {
+      partitionId =>
+        val stateStoreCkptIds = pickedCheckpointInfoList
+          .filter(_.partitionId == partitionId).map(_.stateStoreCkptId)
+        val baseStateStoreCkptIds = pickedCheckpointInfoList
+          .filter(_.partitionId == partitionId).map(_.baseStateStoreCkptId)
+
+        assert(stateStoreCkptIds.drop(1).iterator
+          .sameElements(baseStateStoreCkptIds.dropRight(1)))
+    }
+  }
+
+  private def verifyCheckpointInfoFromCommitLog(

Review Comment:
   Can you add a scaladoc on when this returns true or false? It's not obvious



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala:
##########
@@ -203,14 +210,299 @@ class RocksDBStateStoreCheckpointFormatV2Suite extends 
StreamTest
       withSQLConf(
         (SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName),
         (SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2"),
-        (SQLConf.SHUFFLE_PARTITIONS.key, "2")) {
+        (SQLConf.SHUFFLE_PARTITIONS.key, "2"),
+        (SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key, "2")
+      ) {
         testBody
       }
       // in case tests have any code that needs to execute after every test
       super.afterEach()
     }
   }
 
+  val changelogEnabled =
+    "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled" -> 
"true"
+  val changelogDisabled =
+    "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled" -> 
"false"
+  val ckptv2 = SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2"
+
+  case class checkpointV2CompatibilityTest(
+    configsForFirstRun: Seq[(String, String)],
+    configsForSecondRun: Seq[(String, String)]
+  ) {
+    override def toString: String = "first run - " +
+      s"changeLogEnabled: ${configsForFirstRun(0)._2}, ckpt ver: 
${configsForFirstRun(1)._2}" +
+      s" - second run - changeLogEnabled: ${configsForSecondRun(0)._2}, " +
+      s"ckpt ver: ${configsForSecondRun(1)._2}"
+  }
+
+  val testConfigSetups = Seq(
+    // Enable and disable changelog under ckpt v2
+    checkpointV2CompatibilityTest(Seq(changelogEnabled, ckptv2), 
Seq(changelogDisabled, ckptv2)),
+    checkpointV2CompatibilityTest(Seq(changelogDisabled, ckptv2), 
Seq(changelogEnabled, ckptv2))
+  )
+
+  testConfigSetups.foreach { testConfig =>
+    testWithRocksDBStateStore("checkpointFormatVersion2 Compatibility - simple 
agg - " +
+      s"$testConfig") {
+        withTempDir { checkpointDir =>
+          val inputData = MemoryStream[Int]
+          val aggregated =
+            inputData
+              .toDF()
+              .groupBy($"value")
+              .agg(count("*"))
+              .as[(Int, Long)]
+
+          withSQLConf(testConfig.configsForFirstRun: _*) {
+            testStream(aggregated, Update)(
+              StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+              AddData(inputData, 3),
+              CheckLastBatch((3, 1)),
+              AddData(inputData, 3, 2),
+              CheckLastBatch((3, 2), (2, 1)),
+              AddData(inputData, 10),
+              CheckLastBatch((10, 1)),
+              StopStream
+            )
+
+            testStream(aggregated, Update)(
+              StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+              AddData(inputData, 3, 2, 1),
+              CheckLastBatch((3, 3), (2, 2), (1, 1)),
+              // By default we run in new tuple mode.
+              AddData(inputData, 4, 4, 4, 4),
+              CheckLastBatch((4, 4)),
+              AddData(inputData, 5, 5),
+              CheckLastBatch((5, 2))
+            )
+          }
+
+          withSQLConf(testConfig.configsForSecondRun: _*) {
+            testStream(aggregated, Update)(
+              StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+              AddData(inputData, 4),
+              CheckLastBatch((4, 5))
+            )
+          }
+        }
+      }
+  }
+
+  testWithRocksDBStateStore("checkpointFormatVersion2 Compatibility - simple 
agg - " +
+    "v2 - changelog on -> off -> on") {
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[Int]
+      val aggregated =
+        inputData
+          .toDF()
+          .groupBy($"value")
+          .agg(count("*"))
+          .as[(Int, Long)]
+
+      withSQLConf(changelogEnabled, ckptv2) {
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inputData, 3),
+          CheckLastBatch((3, 1)),
+          AddData(inputData, 3, 2),
+          CheckLastBatch((3, 2), (2, 1)),
+          StopStream
+        )
+
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inputData, 3, 2, 1),
+          CheckLastBatch((3, 3), (2, 2), (1, 1)),
+          // By default we run in new tuple mode.
+          AddData(inputData, 4, 4, 4, 4),
+          CheckLastBatch((4, 4)),
+          AddData(inputData, 5, 5),
+          CheckLastBatch((5, 2)),
+          StopStream
+        )
+      }
+
+      withSQLConf(changelogDisabled, ckptv2) {
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inputData, 4),
+          CheckLastBatch((4, 5)),
+          AddData(inputData, 4),
+          CheckLastBatch((4, 6)),
+          StopStream
+        )
+
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inputData, 6, 7, 8),
+          CheckLastBatch((6, 1), (7, 1), (8, 1)),
+          AddData(inputData, 4, 4, 4, 4),
+          CheckLastBatch((4, 10)),
+          AddData(inputData, 5, 5),
+          CheckLastBatch((5, 4)),
+          StopStream
+        )
+      }
+
+      withSQLConf(changelogDisabled, ckptv2) {
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inputData, 3, 2, 1),
+          CheckLastBatch((3, 4), (2, 3), (1, 2)),
+          AddData(inputData, 4),
+          CheckLastBatch((4, 11)),
+          StopStream
+        )
+      }
+    }
+  }
+
+  testWithRocksDBStateStore("checkpointFormatVersion2 Compatibility - simple 
agg - " +
+    "v2 - changelog off -> on -> off") {
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[Int]
+      val aggregated =
+        inputData
+          .toDF()
+          .groupBy($"value")
+          .agg(count("*"))
+          .as[(Int, Long)]
+
+      withSQLConf(changelogDisabled, ckptv2) {
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inputData, 3),
+          CheckLastBatch((3, 1)),
+          AddData(inputData, 3, 2),
+          CheckLastBatch((3, 2), (2, 1)),
+          StopStream
+        )
+
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inputData, 3, 2, 1),
+          CheckLastBatch((3, 3), (2, 2), (1, 1)),
+          // By default we run in new tuple mode.
+          AddData(inputData, 4, 4, 4, 4),
+          CheckLastBatch((4, 4)),
+          AddData(inputData, 5, 5),
+          CheckLastBatch((5, 2)),
+          StopStream
+        )
+      }
+
+      withSQLConf(changelogEnabled, ckptv2) {
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inputData, 4),
+          CheckLastBatch((4, 5)),
+          AddData(inputData, 4),
+          CheckLastBatch((4, 6)),
+          StopStream
+        )
+
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inputData, 6, 7, 8),
+          CheckLastBatch((6, 1), (7, 1), (8, 1)),
+          AddData(inputData, 4, 4, 4, 4),
+          CheckLastBatch((4, 10)),
+          AddData(inputData, 5, 5),
+          CheckLastBatch((5, 4)),
+          StopStream
+        )
+      }
+
+      withSQLConf(changelogDisabled, ckptv2) {
+        testStream(aggregated, Update)(
+          StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+          AddData(inputData, 3, 2, 1),
+          CheckLastBatch((3, 4), (2, 3), (1, 2)),
+          AddData(inputData, 4),
+          CheckLastBatch((4, 11)),
+          StopStream
+        )
+      }
+    }
+  }
+
+  testConfigSetups.foreach { testConfig =>
+    testWithRocksDBStateStore("checkpointFormatVersion2 Compatibility - " +
+      s"transformWithState - $testConfig") {
+        withTempDir { checkpointDir =>
+          val inputData = MemoryStream[String]
+          val result = inputData.toDS()
+            .groupByKey(x => x)
+            .transformWithState(new RunningCountStatefulProcessor(),
+              TimeMode.None(),
+              OutputMode.Update())
+
+          withSQLConf(testConfig.configsForFirstRun: _*) {
+            testStream(result, Update())(
+              StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+              AddData(inputData, "a"),
+              CheckNewAnswer(("a", "1")),
+              Execute { q =>
+                assert(q.lastProgress.stateOperators(0)
+                  .customMetrics.get("numValueStateVars") > 0)
+                assert(q.lastProgress.stateOperators(0)
+                  .customMetrics.get("numRegisteredTimers") == 0)
+              },
+              AddData(inputData, "a", "b"),
+              CheckNewAnswer(("a", "2"), ("b", "1")),
+              StopStream
+            )
+            testStream(result, Update())(
+              StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+              // should remove state for "a" and not return anything for a

Review Comment:
   curious, why does that happen?
   oh... i see, `RunningCountStatefulProcessor` drops it when count gets to 3



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to