anishshri-db commented on code in PR #48853:
URL: https://github.com/apache/spark/pull/48853#discussion_r1847388987
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala:
##########
@@ -105,6 +141,54 @@ class TransformWithListStateTTLSuite extends
TransformWithStateTTLTest {
override def getStateTTLMetricName: String = "numListStateWithTTLVars"
+
+ test("verify the list state secondary index has at most one record per key")
{
+ withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+ classOf[RocksDBStateStoreProvider].getName,
+ SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
+ val ttlConfig = TTLConfig(ttlDuration = Duration.ofMinutes(10))
+ val inputStream = MemoryStream[String]
+ val result = inputStream.toDS()
+ .groupByKey(x => x)
+ .transformWithState(
+ new OnlyListAppendTTLProcessor(ttlConfig),
+ TimeMode.ProcessingTime(),
+ OutputMode.Append())
+ val clock = new StreamManualClock
+
+ testStream(result)(
+ StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
+
+ // We want all of the inputs to have different timestamps, so that
each record
+ // gets its own unique TTL, and thus, its own unique secondary index
record.
+ //
+ // While this looks contrived, in the real-world, when non-manual
clocks are used,
+ // it's very likely that different records will be processed at
different UNIX epoch
+ // times.
+ AddData(inputStream, "k1"),
+ AdvanceManualClock(1 * 1000),
Review Comment:
Could we ensure that the microbatches are run as required ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]