anishshri-db commented on code in PR #48853:
URL: https://github.com/apache/spark/pull/48853#discussion_r1845930622
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala:
##########
@@ -25,6 +25,34 @@ import
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.StreamManualClock
+// OnlyListAppendTTLProcessor is a StatefulProcessor that only appends values
to the
+// end of its (keyed) TTL list state. For each record it processes, it returns
the new
+// length of its list state.
+//
+// The pattern of calling appendValue is to simulate the old behavior of
appendValue, which
+// used to add a record into the secondary index for every appendList call.
+class OnlyListAppendTTLProcessor(ttlConfig: TTLConfig)
+ extends StatefulProcessor[String, String, (String, Long)]{
+ @transient private var _listState: ListStateImplWithTTL[String] = _
+
+ override def init(
+ outputMode: OutputMode,
+ timeMode: TimeMode): Unit = {
+ _listState = getHandle
Review Comment:
Do we also have a test with multiple state variables using different
TTLState types i.e. One-To-One and One-To-Many within the same stateful
processor ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]