HeartSaVioR commented on code in PR #48274:
URL: https://github.com/apache/spark/pull/48274#discussion_r1802420240


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -256,4 +271,21 @@ class StateStoreChangeDataPartitionReader(
     result.update(4, partition.partition)
     result
   }
+
+  private def createFlattenedRow(

Review Comment:
   Is the method expected to be used except MapState? Otherwise I'd just name 
the method less general / more specific to MapState.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala:
##########
@@ -498,6 +545,69 @@ class StateDataSourceTransformWithStateSuite extends 
StateStoreMetricsTest
     }
   }
 
+  testWithChangelogCheckpointingEnabled("state data source cdf integration - 
list state and TTL") {
+    withTempDir { tempDir =>
+      withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+        classOf[RocksDBStateStoreProvider].getName,
+        SQLConf.SHUFFLE_PARTITIONS.key ->
+          TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) {
+        val inputData = MemoryStream[(String, String)]
+        val result = inputData.toDS()
+          .groupByKey(x => x._1)
+          .transformWithState(new SessionGroupsStatefulProcessorWithTTL(),
+            TimeMode.ProcessingTime(),
+            OutputMode.Update())
+
+        testStream(result, OutputMode.Update())(
+          StartStream(checkpointLocation = tempDir.getAbsolutePath),
+          AddData(inputData, ("session1", "group2")),
+          AddData(inputData, ("session1", "group1")),
+          AddData(inputData, ("session2", "group1")),
+          AddData(inputData, ("session3", "group7")),
+          AddData(inputData, ("session1", "group4")),
+          Execute { _ =>
+            // wait for the batch to run since we are using processing time

Review Comment:
   Probably beyond the scope of the PR, but shall we consider using manual 
clock? I don't remember the detail, but if the engine picks the time of batch 
timestamp from manual clock, manual clock should work as well.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala:
##########
@@ -648,6 +810,79 @@ class StateDataSourceTransformWithStateSuite extends 
StateStoreMetricsTest
     }
   }
 
+  testWithChangelogCheckpointingEnabled("state data source cdf integration - " 
+
+   "map state TTL with single variable") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName,
+      SQLConf.SHUFFLE_PARTITIONS.key ->
+        TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) {
+      withTempDir { tempDir =>
+        val inputStream = MemoryStream[MapInputEvent]
+        val ttlConfig = TTLConfig(ttlDuration = Duration.ofMinutes(1))
+        val result = inputStream.toDS()
+          .groupByKey(x => x.key)
+          .transformWithState(
+            new MapStateTTLProcessor(ttlConfig),
+            TimeMode.ProcessingTime(),
+            OutputMode.Append())
+
+        val clock = new StreamManualClock
+        testStream(result)(
+          StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock,
+            checkpointLocation = tempDir.getCanonicalPath),
+          AddData(inputStream,
+            MapInputEvent("k1", "key1", "put", 1),
+            MapInputEvent("k1", "key2", "put", 2)
+          ),
+          AdvanceManualClock(1 * 1000), // batch timestamp: 1000
+          CheckNewAnswer(),
+          AddData(inputStream,
+            MapInputEvent("k1", "key1", "get", -1),
+            MapInputEvent("k1", "key2", "get", -1)
+          ),
+          AdvanceManualClock(30 * 1000), // batch timestamp: 31000
+          CheckNewAnswer(
+            MapOutputEvent("k1", "key1", 1, isTTLValue = false, -1),
+            MapOutputEvent("k1", "key2", 2, isTTLValue = false, -1)
+          ),
+          // get values from ttl state
+          AddData(inputStream,
+            MapInputEvent("k1", "", "get_values_in_ttl_state", -1)
+          ),
+          AdvanceManualClock(1 * 1000), // batch timestamp: 32000
+          CheckNewAnswer(
+            MapOutputEvent("k1", "key1", -1, isTTLValue = true, 61000),
+            MapOutputEvent("k1", "key2", -1, isTTLValue = true, 61000)
+          ),
+          StopStream
+        )
+
+        val flattenedStateReaderDf = spark.read
+          .format("statestore")
+          .option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
+          .option(StateSourceOptions.STATE_VAR_NAME, "mapState")
+          .option(StateSourceOptions.READ_CHANGE_FEED, true)
+          .option(StateSourceOptions.CHANGE_START_BATCH_ID, 0)
+          .load()
+
+        val outputDf = flattenedStateReaderDf
+          .selectExpr(
+            "change_type",
+          "key.value AS groupingKey",
+            "user_map_key.value AS mapKey",
+            "user_map_value.value.value AS mapValue",
+            "user_map_value.ttlExpirationMs AS ttlTimestamp",
+            "partition_id")
+
+        checkAnswer(outputDf,

Review Comment:
   Looks like we don't cover the "expired" element (via TTL). Is it just a 
missing one in the test, or is it technically not feasible to produce them in 
CDF?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala:
##########
@@ -563,6 +673,58 @@ class StateDataSourceTransformWithStateSuite extends 
StateStoreMetricsTest
     }
   }
 
+  testWithChangelogCheckpointingEnabled("state data source cdf integration -" +

Review Comment:
   nit: space after `-`, to be consistent with above test name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to