HeartSaVioR commented on code in PR #53159:
URL: https://github.com/apache/spark/pull/53159#discussion_r2667298547


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/testing/InMemoryStatefulProcessorHandle.scala:
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.testing
+
+import java.util.UUID
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.spark.sql.Encoder
+import 
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.statefulprocessor.ImplicitGroupingKeyTracker
+import 
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.statefulprocessor.QueryInfoImpl
+import org.apache.spark.sql.streaming.{ListState, MapState, QueryInfo, 
StatefulProcessorHandle, TimeMode, TTLConfig, ValueState}
+import org.apache.spark.util.Clock
+
+/** In-memory implementation of ValueState. */
+class InMemoryValueState[T](clock: Clock, ttl: TTLConfig) extends 
ValueState[T] {
+  private val keyToStateValue = mutable.Map[Any, T]()
+
+  private def getValue: Option[T] = {
+    keyToStateValue.get(ImplicitGroupingKeyTracker.getImplicitKeyOption.get)
+  }
+  override def exists(): Boolean = getValue.isDefined
+
+  override def get(): T = getValue.getOrElse(null.asInstanceOf[T])
+
+  override def update(newState: T): Unit =
+    keyToStateValue.put(ImplicitGroupingKeyTracker.getImplicitKeyOption.get, 
newState)
+
+  override def clear(): Unit =
+    keyToStateValue.remove(ImplicitGroupingKeyTracker.getImplicitKeyOption.get)
+}
+
+/** In-memory implementation of ListState. */
+class InMemoryListState[T](clock: Clock, ttl: TTLConfig) extends ListState[T] {
+  private val keyToStateValue = mutable.Map[Any, mutable.ArrayBuffer[T]]()
+
+  private def getList: Option[mutable.ArrayBuffer[T]] = {
+    keyToStateValue.get(ImplicitGroupingKeyTracker.getImplicitKeyOption.get)
+  }
+
+  override def exists(): Boolean = getList.isDefined
+
+  override def get(): Iterator[T] = {
+    getList.getOrElse(mutable.ArrayBuffer.empty[T]).iterator
+  }
+
+  override def put(newState: Array[T]): Unit = {
+    keyToStateValue.put(
+      ImplicitGroupingKeyTracker.getImplicitKeyOption.get,
+      mutable.ArrayBuffer.empty[T] ++ newState
+    )
+  }
+
+  override def appendValue(newState: T): Unit = {
+    if (!exists()) {
+      keyToStateValue.put(
+        ImplicitGroupingKeyTracker.getImplicitKeyOption.get,
+        mutable.ArrayBuffer.empty[T]
+      )
+    }
+    keyToStateValue(ImplicitGroupingKeyTracker.getImplicitKeyOption.get) += 
newState
+  }
+
+  override def appendList(newState: Array[T]): Unit = {
+    if (!exists()) {
+      keyToStateValue.put(
+        ImplicitGroupingKeyTracker.getImplicitKeyOption.get,
+        mutable.ArrayBuffer.empty[T]
+      )
+    }
+    keyToStateValue(ImplicitGroupingKeyTracker.getImplicitKeyOption.get) ++= 
newState
+  }
+
+  override def clear(): Unit = {
+    keyToStateValue.remove(ImplicitGroupingKeyTracker.getImplicitKeyOption.get)
+  }
+}
+
+/** In-memory implementation of MapState. */
+class InMemoryMapState[K, V](clock: Clock, ttl: TTLConfig) extends MapState[K, 
V] {
+  private val keyToStateValue = mutable.Map[Any, mutable.HashMap[K, V]]()
+
+  private def getMap: Option[mutable.HashMap[K, V]] =
+    keyToStateValue.get(ImplicitGroupingKeyTracker.getImplicitKeyOption.get)
+
+  override def exists(): Boolean = getMap.isDefined
+
+  override def getValue(key: K): V = {
+    getMap.flatMap(_.get(key)).getOrElse(null.asInstanceOf[V])
+  }
+
+  override def containsKey(key: K): Boolean = {
+    getMap.exists(_.contains(key))
+  }
+
+  override def updateValue(key: K, value: V): Unit = {
+    if (!exists()) {
+      keyToStateValue.put(
+        ImplicitGroupingKeyTracker.getImplicitKeyOption.get,
+        mutable.HashMap.empty[K, V]
+      )
+    }
+
+    keyToStateValue(ImplicitGroupingKeyTracker.getImplicitKeyOption.get) += 
(key -> value)
+  }
+
+  override def iterator(): Iterator[(K, V)] = {
+    getMap.map(_.iterator).getOrElse(Iterator.empty)
+  }
+
+  override def keys(): Iterator[K] = {
+    getMap.map(_.keys.iterator).getOrElse(Iterator.empty)
+  }
+
+  override def values(): Iterator[V] = {
+    getMap.map(_.values.iterator).getOrElse(Iterator.empty)
+  }
+
+  override def removeKey(key: K): Unit = {
+    getMap.foreach(_.remove(key))
+  }
+
+  override def clear(): Unit = {
+    keyToStateValue.remove(ImplicitGroupingKeyTracker.getImplicitKeyOption.get)
+  }
+}
+
+/** In-memory timers. */
+class InMemoryTimers {
+  // Maps expiration times to keys.
+  // If time t is mapped to key k, there is a timer associated with key k 
expiring at time t.
+  private val timeToKeys = mutable.Map[Long, mutable.HashSet[Any]]()
+
+  def registerTimer(expiryTimestampMs: Long): Unit = {
+    val groupingKey = ImplicitGroupingKeyTracker.getImplicitKeyOption.get
+    if (!timeToKeys.contains(expiryTimestampMs)) {
+      timeToKeys.put(expiryTimestampMs, mutable.HashSet[Any]())
+    }
+    timeToKeys(expiryTimestampMs).add(groupingKey)
+  }
+
+  def deleteTimer(expiryTimestampMs: Long): Unit = {
+    val groupingKey = ImplicitGroupingKeyTracker.getImplicitKeyOption.get
+    if (timeToKeys.contains(expiryTimestampMs)) {
+      timeToKeys(expiryTimestampMs).remove(groupingKey)
+      if (timeToKeys(expiryTimestampMs).isEmpty) {
+        timeToKeys.remove(expiryTimestampMs)
+      }
+    }
+  }
+
+  def listTimers(): Iterator[Long] = {
+    val groupingKey = ImplicitGroupingKeyTracker.getImplicitKeyOption.get
+    timeToKeys.iterator.filter(_._2.contains(groupingKey) ).map(_._1)
+  }
+
+  // Lists pairs (expiryTimestampMs, key) for all timers such that 
expiryTimestampMs<=currentTimeMs.
+  // Result is ordered by timestamp.
+  def listExpiredTimers[K](currentTimeMs: Long): List[(Long, K)] = {
+    timeToKeys.iterator
+      .filter(_._1 <= currentTimeMs)
+      .flatMap(entry => entry._2.map(key => (entry._1, key.asInstanceOf[K])))
+      .toList
+      .sortBy(_._1)
+  }
+}
+
+/**
+ * In-memory implementation of StatefulProcessorHandle for testing purposes.
+ *
+ * == Internal Implementation ==
+ *
+ * '''State Storage:''' All state is stored in Scala mutable collections. A 
central
+ * `states: Map[String, Any]` maps state names to their corresponding 
in-memory state instances
+ * (InMemoryValueState, InMemoryListState, or InMemoryMapState). Each state 
instance internally
+ * uses a `Map[Any, T]` where the key is the implicit grouping key obtained 
from
+ * [[ImplicitGroupingKeyTracker]].
+ *
+ * '''Grouping Key Tracking:''' Operations on state are scoped to the current 
grouping key,
+ * which is retrieved via `ImplicitGroupingKeyTracker.getImplicitKeyOption`. 
This mirrors the
+ * production implementation where state is partitioned by key.
+ *
+ * '''Timers:''' Managed by [[InMemoryTimers]], which maintains a `Map[Long, 
Set[Any]]` mapping
+ * expiration timestamps to sets of grouping keys. Expired timers can be 
queried via
+ * `listExpiredTimers()`.
+ *
+ * '''Direct State Access:''' Unlike the production handle, this 
implementation exposes
+ * `peekXxxState` and `updateXxxState` methods for test assertions and setup, 
allowing
+ * direct manipulation of state without going through the processor logic.
+ */
+class InMemoryStatefulProcessorHandle(private val timeMode: TimeMode, private 
val clock: Clock)

Review Comment:
   ```suggestion
   class InMemoryStatefulProcessorHandle(timeMode: TimeMode, clock: Clock)
   ```
   
   nit: do they need to be defined as `value` if we make them be `private`?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/testing/InMemoryStatefulProcessorHandle.scala:
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.testing
+
+import java.util.UUID
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.spark.sql.Encoder
+import 
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.statefulprocessor.ImplicitGroupingKeyTracker
+import 
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.statefulprocessor.QueryInfoImpl
+import org.apache.spark.sql.streaming.{ListState, MapState, QueryInfo, 
StatefulProcessorHandle, TimeMode, TTLConfig, ValueState}
+import org.apache.spark.util.Clock
+
+/** In-memory implementation of ValueState. */
+class InMemoryValueState[T](clock: Clock, ttl: TTLConfig) extends 
ValueState[T] {
+  private val keyToStateValue = mutable.Map[Any, T]()
+
+  private def getValue: Option[T] = {
+    keyToStateValue.get(ImplicitGroupingKeyTracker.getImplicitKeyOption.get)
+  }
+  override def exists(): Boolean = getValue.isDefined
+
+  override def get(): T = getValue.getOrElse(null.asInstanceOf[T])
+
+  override def update(newState: T): Unit =
+    keyToStateValue.put(ImplicitGroupingKeyTracker.getImplicitKeyOption.get, 
newState)
+
+  override def clear(): Unit =
+    keyToStateValue.remove(ImplicitGroupingKeyTracker.getImplicitKeyOption.get)
+}
+
+/** In-memory implementation of ListState. */
+class InMemoryListState[T](clock: Clock, ttl: TTLConfig) extends ListState[T] {
+  private val keyToStateValue = mutable.Map[Any, mutable.ArrayBuffer[T]]()
+
+  private def getList: Option[mutable.ArrayBuffer[T]] = {
+    keyToStateValue.get(ImplicitGroupingKeyTracker.getImplicitKeyOption.get)
+  }
+
+  override def exists(): Boolean = getList.isDefined
+
+  override def get(): Iterator[T] = {
+    getList.getOrElse(mutable.ArrayBuffer.empty[T]).iterator
+  }
+
+  override def put(newState: Array[T]): Unit = {
+    keyToStateValue.put(
+      ImplicitGroupingKeyTracker.getImplicitKeyOption.get,
+      mutable.ArrayBuffer.empty[T] ++ newState
+    )
+  }
+
+  override def appendValue(newState: T): Unit = {
+    if (!exists()) {
+      keyToStateValue.put(
+        ImplicitGroupingKeyTracker.getImplicitKeyOption.get,
+        mutable.ArrayBuffer.empty[T]
+      )
+    }
+    keyToStateValue(ImplicitGroupingKeyTracker.getImplicitKeyOption.get) += 
newState
+  }
+
+  override def appendList(newState: Array[T]): Unit = {
+    if (!exists()) {
+      keyToStateValue.put(
+        ImplicitGroupingKeyTracker.getImplicitKeyOption.get,
+        mutable.ArrayBuffer.empty[T]
+      )
+    }
+    keyToStateValue(ImplicitGroupingKeyTracker.getImplicitKeyOption.get) ++= 
newState
+  }
+
+  override def clear(): Unit = {
+    keyToStateValue.remove(ImplicitGroupingKeyTracker.getImplicitKeyOption.get)
+  }
+}
+
+/** In-memory implementation of MapState. */
+class InMemoryMapState[K, V](clock: Clock, ttl: TTLConfig) extends MapState[K, 
V] {
+  private val keyToStateValue = mutable.Map[Any, mutable.HashMap[K, V]]()
+
+  private def getMap: Option[mutable.HashMap[K, V]] =
+    keyToStateValue.get(ImplicitGroupingKeyTracker.getImplicitKeyOption.get)
+
+  override def exists(): Boolean = getMap.isDefined
+
+  override def getValue(key: K): V = {
+    getMap.flatMap(_.get(key)).getOrElse(null.asInstanceOf[V])
+  }
+
+  override def containsKey(key: K): Boolean = {
+    getMap.exists(_.contains(key))
+  }
+
+  override def updateValue(key: K, value: V): Unit = {
+    if (!exists()) {
+      keyToStateValue.put(
+        ImplicitGroupingKeyTracker.getImplicitKeyOption.get,
+        mutable.HashMap.empty[K, V]
+      )
+    }
+
+    keyToStateValue(ImplicitGroupingKeyTracker.getImplicitKeyOption.get) += 
(key -> value)
+  }
+
+  override def iterator(): Iterator[(K, V)] = {
+    getMap.map(_.iterator).getOrElse(Iterator.empty)
+  }
+
+  override def keys(): Iterator[K] = {
+    getMap.map(_.keys.iterator).getOrElse(Iterator.empty)
+  }
+
+  override def values(): Iterator[V] = {
+    getMap.map(_.values.iterator).getOrElse(Iterator.empty)
+  }
+
+  override def removeKey(key: K): Unit = {
+    getMap.foreach(_.remove(key))
+  }
+
+  override def clear(): Unit = {
+    keyToStateValue.remove(ImplicitGroupingKeyTracker.getImplicitKeyOption.get)
+  }
+}
+
+/** In-memory timers. */
+class InMemoryTimers {
+  // Maps expiration times to keys.
+  // If time t is mapped to key k, there is a timer associated with key k 
expiring at time t.
+  private val timeToKeys = mutable.Map[Long, mutable.HashSet[Any]]()
+
+  def registerTimer(expiryTimestampMs: Long): Unit = {
+    val groupingKey = ImplicitGroupingKeyTracker.getImplicitKeyOption.get
+    if (!timeToKeys.contains(expiryTimestampMs)) {
+      timeToKeys.put(expiryTimestampMs, mutable.HashSet[Any]())
+    }
+    timeToKeys(expiryTimestampMs).add(groupingKey)
+  }
+
+  def deleteTimer(expiryTimestampMs: Long): Unit = {
+    val groupingKey = ImplicitGroupingKeyTracker.getImplicitKeyOption.get
+    if (timeToKeys.contains(expiryTimestampMs)) {
+      timeToKeys(expiryTimestampMs).remove(groupingKey)
+      if (timeToKeys(expiryTimestampMs).isEmpty) {
+        timeToKeys.remove(expiryTimestampMs)
+      }
+    }
+  }
+
+  def listTimers(): Iterator[Long] = {
+    val groupingKey = ImplicitGroupingKeyTracker.getImplicitKeyOption.get
+    timeToKeys.iterator.filter(_._2.contains(groupingKey) ).map(_._1)
+  }
+
+  // Lists pairs (expiryTimestampMs, key) for all timers such that 
expiryTimestampMs<=currentTimeMs.
+  // Result is ordered by timestamp.
+  def listExpiredTimers[K](currentTimeMs: Long): List[(Long, K)] = {
+    timeToKeys.iterator
+      .filter(_._1 <= currentTimeMs)
+      .flatMap(entry => entry._2.map(key => (entry._1, key.asInstanceOf[K])))
+      .toList
+      .sortBy(_._1)
+  }
+}
+
+/**
+ * In-memory implementation of StatefulProcessorHandle for testing purposes.
+ *
+ * == Internal Implementation ==
+ *
+ * '''State Storage:''' All state is stored in Scala mutable collections. A 
central
+ * `states: Map[String, Any]` maps state names to their corresponding 
in-memory state instances
+ * (InMemoryValueState, InMemoryListState, or InMemoryMapState). Each state 
instance internally
+ * uses a `Map[Any, T]` where the key is the implicit grouping key obtained 
from
+ * [[ImplicitGroupingKeyTracker]].
+ *
+ * '''Grouping Key Tracking:''' Operations on state are scoped to the current 
grouping key,
+ * which is retrieved via `ImplicitGroupingKeyTracker.getImplicitKeyOption`. 
This mirrors the
+ * production implementation where state is partitioned by key.
+ *
+ * '''Timers:''' Managed by [[InMemoryTimers]], which maintains a `Map[Long, 
Set[Any]]` mapping
+ * expiration timestamps to sets of grouping keys. Expired timers can be 
queried via
+ * `listExpiredTimers()`.
+ *
+ * '''Direct State Access:''' Unlike the production handle, this 
implementation exposes
+ * `peekXxxState` and `updateXxxState` methods for test assertions and setup, 
allowing
+ * direct manipulation of state without going through the processor logic.
+ */
+class InMemoryStatefulProcessorHandle(private val timeMode: TimeMode, private 
val clock: Clock)
+  extends StatefulProcessorHandle {
+
+  val timers = new InMemoryTimers()

Review Comment:
   ```suggestion
     private val timers = new InMemoryTimers()
   ```
   
   Please feel free to reject the suggestion if there's a reference to access 
this outside of this class.



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/TwsTester.scala:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.streaming
+
+import scala.reflect.ClassTag
+
+import 
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.statefulprocessor.ImplicitGroupingKeyTracker
+import 
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.testing.InMemoryStatefulProcessorHandle
+import 
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.timers.{ExpiredTimerInfoImpl,
 TimerValuesImpl}
+import org.apache.spark.util.ManualClock
+
+/**
+ * Testing utility for transformWithState stateful processors.
+ *
+ * This class enables unit testing of StatefulProcessor business logic by 
simulating the
+ * behavior of transformWithState. It processes input rows and returns output 
rows equivalent
+ * to those that would be produced by the processor in an actual Spark 
streaming query.
+ *
+ * '''Supported:'''
+ *  - Processing input rows and producing output rows via `test()`.
+ *  - Initial state setup via constructor parameter.
+ *  - Direct state manipulation via `updateValueState`, `updateListState`, 
`updateMapState`.
+ *  - Direct state inspection via `peekValueState`, `peekListState`, 
`peekMapState`.
+ *  - Timers in ProcessingTime mode (use `setProcessingTime` to fire timers).
+ *  - Timers in EventTime mode (use `setWatermark` to manually set the 
watermark
+ *    and fire expired timers).
+ *
+ * '''Not Supported:'''
+ *  - '''TTL'''. States persist indefinitely, even if TTLConfig is set.
+ *  - '''Automatic watermark propagation''': In production Spark streaming, 
the watermark is
+ *    computed from event times and propagated at the end of each microbatch. 
TwsTester does
+ *    not simulate this behavior because it processes keys individually rather 
than in batches.
+ *    To test watermark-dependent logic, use `setWatermark()` to manually set 
the watermark
+ *    to the desired value before calling `test()`.
+ *  - '''Late event filtering''': Since TwsTester does not track event times 
or automatically

Review Comment:
   Looks like we track the watermark and have a guard against setting 
processing time and watermark backward, but I'm not sure I see the guard to set 
the timer earlier than the watermark. Once we add a guard, we would need to 
have late record filtering.



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/TwsTester.scala:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.streaming
+
+import java.sql.Timestamp
+import java.time.{Clock, Instant, ZoneId}
+
+import scala.reflect.ClassTag
+
+import 
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.statefulprocessor.ImplicitGroupingKeyTracker
+import 
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.testing.InMemoryStatefulProcessorHandle
+import 
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.timers.{ExpiredTimerInfoImpl,
 TimerValuesImpl}
+
+/**
+ * Testing utility for transformWithState stateful processors.
+ *
+ * This class enables unit testing of StatefulProcessor business logic by 
simulating the
+ * behavior of transformWithState. It processes input rows and returns output 
rows equivalent
+ * to those that would be produced by the processor in an actual Spark 
streaming query.
+ *
+ * '''Supported:'''
+ *  - Processing input rows and producing output rows via `test()`.
+ *  - Initial state setup via constructor parameter.
+ *  - Direct state manipulation via `setValueState`, `setListState`, 
`setMapState`.
+ *  - Direct state inspection via `peekValueState`, `peekListState`, 
`peekMapState`.
+ *  - Timers in ProcessingTime mode (use `advanceProcessingTime` to fire 
timers).
+ *  - Timers in EventTime mode (use `eventTimeExtractor` and 
`watermarkDelayMs` to configure;
+ *    watermark advances automatically based on event times, or use 
`advanceWatermark` manually).
+ *  - Late event filtering in EventTime mode (events older than the current 
watermark are dropped).
+ *  - TTL for ValueState, ListState, and MapState (use ProcessingTime mode and
+ *    `advanceProcessingTime` to test expiry).
+ *
+ * '''Testing EventTime Mode:'''
+ *  To test with EventTime, provide `eventTimeExtractor` (a function 
extracting the event
+ *  timestamp from each input row) and `watermarkDelayMs` (the watermark delay 
in milliseconds).
+ *  The watermark is computed as `max(event_time_seen) - watermarkDelayMs` and 
is updated
+ *  automatically after each `test()` call. Late events (with event time older 
than the current
+ *  watermark) are filtered out before processing, matching production 
behavior. Timers with
+ *  expiry time <= watermark will fire. You can also manually advance the 
watermark using
+ *  `advanceWatermark()`.
+ *
+ * '''Use Cases:'''
+ *  - '''Primary''': Unit testing business logic in `handleInputRows` 
implementations.
+ *  - '''Not recommended''': End-to-end testing or performance testing - use 
actual Spark
+ *    streaming queries for those scenarios.
+ *
+ * @param processor the StatefulProcessor to test.
+ * @param initialState initial state for each key as a list of (key, state) 
tuples.
+ * @param timeMode time mode (None, ProcessingTime or EventTime).
+ * @param outputMode output mode (Append, Update, or Complete).

Review Comment:
   I'm OK with it.



##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/TwsTester.scala:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.streaming
+
+import scala.reflect.ClassTag
+
+import 
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.statefulprocessor.ImplicitGroupingKeyTracker
+import 
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.testing.InMemoryStatefulProcessorHandle
+import 
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.timers.{ExpiredTimerInfoImpl,
 TimerValuesImpl}
+import org.apache.spark.util.ManualClock
+
+/**
+ * Testing utility for transformWithState stateful processors.
+ *
+ * This class enables unit testing of StatefulProcessor business logic by 
simulating the
+ * behavior of transformWithState. It processes input rows and returns output 
rows equivalent
+ * to those that would be produced by the processor in an actual Spark 
streaming query.
+ *
+ * '''Supported:'''
+ *  - Processing input rows and producing output rows via `test()`.
+ *  - Initial state setup via constructor parameter.
+ *  - Direct state manipulation via `updateValueState`, `updateListState`, 
`updateMapState`.
+ *  - Direct state inspection via `peekValueState`, `peekListState`, 
`peekMapState`.
+ *  - Timers in ProcessingTime mode (use `setProcessingTime` to fire timers).
+ *  - Timers in EventTime mode (use `setWatermark` to manually set the 
watermark
+ *    and fire expired timers).
+ *
+ * '''Not Supported:'''
+ *  - '''TTL'''. States persist indefinitely, even if TTLConfig is set.
+ *  - '''Automatic watermark propagation''': In production Spark streaming, 
the watermark is
+ *    computed from event times and propagated at the end of each microbatch. 
TwsTester does
+ *    not simulate this behavior because it processes keys individually rather 
than in batches.
+ *    To test watermark-dependent logic, use `setWatermark()` to manually set 
the watermark
+ *    to the desired value before calling `test()`.
+ *  - '''Late event filtering''': Since TwsTester does not track event times 
or automatically

Review Comment:
   What happens if user's state processor impl tries to register the timer 
which is "earlier" than the current watermark? AFAIK TWS does not allow setting 
the timer for that. Is this implemented for the same?
   
   I'm asking because not doing late event filtering would do the thing a bit 
differently - e.g. say, user's state processor impl registers timer from the 
input's event time + 3 seconds. Doing late event filtering would ensure that 
there is dropping of events but there is no case timer will be registered with 
prior time than the watermark. Not doing late event filtering would behave 
differently for this case, raising exception if the test util has been 
implemented correctly to simulate the same with TWS.
   
   I'd suggest making the test util to be stateful, keep track of the watermark 
from the operation user sets it manually. Also disallow setting watermark to 
earlier time than the current.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TwsTesterSuite.scala:
##########
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
+import org.apache.spark.sql.internal.SQLConf
+
+/** Test suite for TwsTester utility class. */
+class TwsTesterSuite extends SparkFunSuite {
+
+  test("TwsTester should correctly test processor with ValueState") {
+    val tester = new TwsTester(new RunningCountProcessor[String]())
+    assert(tester.test("key1", List("a")) == List(("key1", 1L)))
+    assert(tester.test("key2", List("a", "a")) == List(("key2", 2L)))
+    assert(tester.test("key3", List("a")) == List(("key3", 1L)))
+    assert(tester.test("key1", List("a", "a", "a")) == List(("key1", 4L)))
+
+    assert(tester.peekValueState[Long]("count", "key1").get == 4L)
+    assert(tester.peekValueState[Long]("count", "key2").get == 2L)
+    assert(tester.peekValueState[Long]("count", "key3").get == 1L)
+    assert(tester.peekValueState[Long]("count", "key4").isEmpty)
+  }
+
+  test("TwsTester should allow direct access to ValueState") {
+    val processor = new RunningCountProcessor[String]()
+    val tester = new TwsTester[String, String, (String, Long)](processor)
+    tester.updateValueState[Long]("count", "foo", 5)
+    tester.test("foo", List("a"))
+    assert(tester.peekValueState[Long]("count", "foo").get == 6L)
+  }
+
+  test("TwsTester should correctly test processor with ListState") {
+    val tester = new TwsTester(new TopKProcessor(2))
+    val ans1 = tester.test("key1", List(("b", 2.0), ("c", 3.0), ("a", 1.0)))
+    assert(ans1 == List(("key1", 3.0), ("key1", 2.0)))
+    val ans2 = tester.test("key2", List(("a", 10.0), ("b", 20.0), ("c", 30.0), 
("d", 40.0)))
+    assert(ans2 == List(("key2", 40.0), ("key2", 30.0)))
+    val ans3 = tester.test("key3", List(("a", 100.0)))
+    assert(ans3 == List(("key3", 100.0)))
+
+    assert(tester.peekListState[Double]("topK", "key1") == List(3.0, 2.0))
+    assert(tester.peekListState[Double]("topK", "key2") == List(40.0, 30.0))
+    assert(tester.peekListState[Double]("topK", "key3") == List(100.0))
+    assert(tester.peekListState[Double]("topK", "key4").isEmpty)
+
+    val ans4 = tester.test("key1", List(("a", 10.0)))
+    assert(ans4 == List(("key1", 10.0), ("key1", 3.0)))
+    assert(tester.peekListState[Double]("topK", "key1") == List(10.0, 3.0))
+  }
+
+  test("TwsTester should allow direct access to ListState") {
+    val tester = new TwsTester(new TopKProcessor(2))
+    tester.updateListState("topK", "a", List(6.0, 5.0))
+    tester.updateListState("topK", "b", List(8.0, 7.0))
+    tester.test("a", List(("", 10.0)))
+    tester.test("b", List(("", 7.5)))
+    tester.test("c", List(("", 1.0)))
+
+    assert(tester.peekListState[Double]("topK", "a") == List(10.0, 6.0))
+    assert(tester.peekListState[Double]("topK", "b") == List(8.0, 7.5))
+    assert(tester.peekListState[Double]("topK", "c") == List(1.0))
+    assert(tester.peekListState[Double]("topK", "d") == List())
+  }
+
+  test("TwsTester should correctly test processor with MapState") {
+    val tester = new TwsTester(new WordFrequencyProcessor())
+    val ans1 =
+      tester.test("user1", List(("", "hello"), ("", "world"), ("", "hello"), 
("", "world")))
+    assert(
+      ans1.sorted == List(
+        ("user1", "hello", 1L),
+        ("user1", "hello", 2L),
+        ("user1", "world", 1L),
+        ("user1", "world", 2L)
+      ).sorted
+    )
+
+    val ans2 = tester.test("user2", List(("", "hello"), ("", "spark")))
+    assert(ans2.sorted == List(("user2", "hello", 1L), ("user2", "spark", 
1L)).sorted)
+
+    // Check state using peekMapState
+    assert(
+      tester.peekMapState[String, Long]("frequencies", "user1") == Map("hello" 
-> 2L, "world" -> 2L)
+    )
+    assert(
+      tester.peekMapState[String, Long]("frequencies", "user2") == Map("hello" 
-> 1L, "spark" -> 1L)
+    )
+    assert(tester.peekMapState[String, Long]("frequencies", "user3") == Map())
+    assert(tester.peekMapState[String, Long]("frequencies", "user3").isEmpty)
+
+    // Process more data for user1
+    val ans3 = tester.test("user1", List(("", "hello"), ("", "test")))
+    assert(ans3.sorted == List(("user1", "hello", 3L), ("user1", "test", 
1L)).sorted)
+    assert(
+      tester.peekMapState[String, Long]("frequencies", "user1") == Map(
+        "hello" -> 3L,
+        "world" -> 2L,
+        "test" -> 1L
+      )
+    )
+  }
+
+  test("TwsTester should allow direct access to MapState") {
+    val tester = new TwsTester(new WordFrequencyProcessor())
+
+    // Set initial state directly
+    tester.updateMapState("frequencies", "user1", Map("hello" -> 5L, "world" 
-> 3L))
+    tester.updateMapState("frequencies", "user2", Map("spark" -> 10L))
+
+    // Process new words
+    tester.test("user1", List(("", "hello")))
+    tester.test("user1", List(("", "goodbye")))
+    tester.test("user2", List(("", "spark")))
+    tester.test("user3", List(("", "new")))
+
+    // Verify updated state
+    assert(
+      tester.peekMapState[String, Long]("frequencies", "user1") == Map(
+        "hello" -> 6L,
+        "world" -> 3L,
+        "goodbye" -> 1L
+      )
+    )
+    assert(tester.peekMapState[String, Long]("frequencies", "user2") == 
Map("spark" -> 11L))
+    assert(tester.peekMapState[String, Long]("frequencies", "user3") == 
Map("new" -> 1L))
+    assert(tester.peekMapState[String, Long]("frequencies", "user4") == Map())
+  }
+
+  test("TwsTester can be used to test step function") {
+    val processor = new RunningCountProcessor[String]()
+    val tester = new TwsTester(processor)
+
+    // Example of helper function using TwsTester to inspect how processing a 
single row changes
+    // state.
+    def testStepFunction(key: String, inputRow: String, stateIn: Long): Long = 
{
+      tester.updateValueState[Long]("count", key, stateIn)
+      tester.test(key, List(inputRow))
+      tester.peekValueState("count", key).get
+    }
+
+    assert(testStepFunction("key1", "a", 10L) == 11L)
+  }
+
+  test("TwsTester should call handleInitialState") {
+    val processor = new RunningCountProcessor[String]()
+    val tester = new TwsTester(processor, initialState = List(("a", 10L), 
("b", 20L)))
+    assert(tester.peekValueState[Long]("count", "a").get == 10L)
+    assert(tester.peekValueState[Long]("count", "b").get == 20L)
+
+    val ans1 = tester.test("a", List("a"))
+    val ans2 = tester.test("c", List("c"))
+    assert(ans1 == List(("a", 11L)))
+    assert(ans2 == List(("c", 1L)))
+  }
+
+  test("TwsTester should fail when initialState is passed but not supported") {
+    val processor = new TopKProcessor(5)
+    val exception = intercept[IllegalArgumentException] {
+      new TwsTester(processor, initialState = List(("a", List(1.0, 2.0))))
+    }
+    assert(exception.getMessage.contains("stateful processor doesn't support 
initial state"))
+  }
+
+  test("TwsTester should support row-by-row processing") {
+    val tester = new TwsTester(new RunningCountProcessor[String]())
+
+    // Example of helper function to test how TransformWithState processes 
rows one-by-one, which
+    // can be used to simulate real-time mode.
+    def testRowByRow(input: List[(String, String)]): List[(String, Long)] = {
+      input.flatMap { case (key, value) => tester.test(key, List(value)) }
+    }
+
+    val input: List[(String, String)] = List(
+      ("key1", "a"),
+      ("key2", "b"),
+      ("key1", "c"),
+      ("key2", "b"),
+      ("key1", "c"),
+      ("key1", "c"),
+      ("key3", "q")
+    )
+    val ans: List[(String, Long)] = testRowByRow(input)
+    assert(
+      ans == List(
+        ("key1", 1L),
+        ("key2", 1L),
+        ("key1", 2L),
+        ("key2", 2L),
+        ("key1", 3L),
+        ("key1", 4L),
+        ("key3", 1L)
+      )
+    )
+  }
+
+  test("TwsTester should exercise all state methods") {
+    val tester = new TwsTester(new AllMethodsTestProcessor())
+    val results = tester.test(
+      "k",
+      List(
+        "value-exists", // false
+        "value-set", // set to 42
+        "value-exists", // true
+        "value-clear", // clear
+        "value-exists", // false again
+        "list-exists", // false
+        "list-append", // append a, b
+        "list-exists", // true
+        "list-append-array", // append c, d
+        "list-get", // a,b,c,d
+        "map-exists", // false
+        "map-add", // add x=1, y=2, z=3
+        "map-exists", // true
+        "map-keys", // x,y,z
+        "map-values", // 1,2,3
+        "map-iterator", // x=1,y=2,z=3
+        "map-remove", // remove y
+        "map-keys", // x,z
+        "map-clear", // clear map
+        "map-exists" // false
+      )
+    )
+
+    assert(
+      results == List(
+        ("k", "value-exists:false"),
+        ("k", "value-set:done"),
+        ("k", "value-exists:true"),
+        ("k", "value-clear:done"),
+        ("k", "value-exists:false"),
+        ("k", "list-exists:false"),
+        ("k", "list-append:done"),
+        ("k", "list-exists:true"),
+        ("k", "list-append-array:done"),
+        ("k", "list-get:a,b,c,d"),
+        ("k", "map-exists:false"),
+        ("k", "map-add:done"),
+        ("k", "map-exists:true"),
+        ("k", "map-keys:x,y,z"),
+        ("k", "map-values:1,2,3"),
+        ("k", "map-iterator:x=1,y=2,z=3"),
+        ("k", "map-remove:done"),
+        ("k", "map-keys:x,z"),
+        ("k", "map-clear:done"),
+        ("k", "map-exists:false")
+      )
+    )
+  }
+
+  test("TwsTester: value-get before value-set") {
+    val tester = new TwsTester(new AllMethodsTestProcessor())
+    val results = tester.test("k", List("value-get", "value-set", "value-get"))
+    assert(results == List(("k", ""), ("k", "value-set:done"), ("k", "42")))
+  }
+
+  test("TwsTester: list operations") {
+    val tester = new TwsTester(new AllMethodsTestProcessor())
+    val results =
+      tester.test(
+        "k",
+        List(
+          "list-get",
+          "list-append",
+          "list-get",
+          "list-append",
+          "list-get",
+          "list-put",
+          "list-get",
+          "list-clear",
+          "list-get"
+        )
+      )
+    assert(
+      results == List(
+        ("k", "list-get:"),
+        ("k", "list-append:done"),
+        ("k", "list-get:a,b"),
+        ("k", "list-append:done"),
+        ("k", "list-get:a,b,a,b"),
+        ("k", "list-put:done"),
+        ("k", "list-get:put"),
+        ("k", "list-clear:done"),
+        ("k", "list-get:")
+      )
+    )
+  }
+
+  test("TwsTester should delete value state") {
+    val valueTester = new TwsTester(new RunningCountProcessor[String]())
+    valueTester.updateValueState[Long]("count", "key1", 10L)
+    valueTester.updateValueState[Long]("count", "key2", 20L)
+    assert(valueTester.peekValueState[Long]("count", "key1").get == 10L)
+    valueTester.deleteState("count", "key1")
+    assert(valueTester.peekValueState[Long]("count", "key1").isEmpty)
+    assert(valueTester.peekValueState[Long]("count", "key2").get == 20L)
+  }
+
+  test("TwsTester should delete list state") {
+    val listTester = new TwsTester(new TopKProcessor(3))
+    listTester.updateListState("topK", "key1", List(1.0, 2.0, 3.0))
+    listTester.updateListState("topK", "key2", List(4.0, 5.0))
+    assert(listTester.peekListState[Double]("topK", "key1") == List(1.0, 2.0, 
3.0))
+    listTester.deleteState("topK", "key1")
+    assert(listTester.peekListState[Double]("topK", "key1").isEmpty)
+    assert(listTester.peekListState[Double]("topK", "key2") == List(4.0, 5.0))
+  }
+
+  test("TwsTester should delete map state") {
+    val mapTester = new TwsTester(new WordFrequencyProcessor())
+    mapTester.updateMapState("frequencies", "user1", Map("hello" -> 5L, 
"world" -> 3L))
+    mapTester.updateMapState("frequencies", "user2", Map("spark" -> 10L))
+    assert(
+      mapTester
+        .peekMapState[String, Long]("frequencies", "user1") == Map("hello" -> 
5L, "world" -> 3L)
+    )
+    mapTester.deleteState("frequencies", "user1")
+    assert(mapTester.peekMapState[String, Long]("frequencies", 
"user1").isEmpty)
+    assert(mapTester.peekMapState[String, Long]("frequencies", "user2") == 
Map("spark" -> 10L))
+  }
+
+  test("TwsTester should support ProcessingTime timers") {
+    val tester = new TwsTester(
+      new SessionTimeoutProcessor(),
+      timeMode = TimeMode.ProcessingTime()
+    )
+
+    // Process input for key1 - should register a timer at t=10000
+    val result1 = tester.test("key1", List("hello"))
+    assert(result1 == List(("key1", "received:hello")))
+
+    // Set time to 5 seconds - timer should NOT fire yet
+    val expired1 = tester.setProcessingTime(5000)
+    assert(expired1.isEmpty)
+
+    // Process input for key2 at t=5000 - should register timer at t=15000
+    val result2 = tester.test("key2", List("world"))
+    assert(result2 == List(("key2", "received:world")))
+
+    // Set time to 11 seconds - key1's timer should fire
+    val expired2 = tester.setProcessingTime(11000)
+    assert(expired2 == List(("key1", "session-expired@11000")))

Review Comment:
   probably better to return the value of timer's expiration time (10000 
instead of 11000 here) instead of when it is fired, to validate the expectation 
of the timer in more detail.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/testing/InMemoryStatefulProcessorHandle.scala:
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.testing
+
+import java.util.UUID
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.spark.sql.Encoder
+import 
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.statefulprocessor.ImplicitGroupingKeyTracker
+import 
org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.statefulprocessor.QueryInfoImpl
+import org.apache.spark.sql.streaming.{ListState, MapState, QueryInfo, 
StatefulProcessorHandle, TimeMode, TTLConfig, ValueState}
+import org.apache.spark.util.Clock
+
+/** In-memory implementation of ValueState. */
+class InMemoryValueState[T](clock: Clock, ttl: TTLConfig) extends 
ValueState[T] {
+  private val keyToStateValue = mutable.Map[Any, T]()
+
+  private def getValue: Option[T] = {
+    keyToStateValue.get(ImplicitGroupingKeyTracker.getImplicitKeyOption.get)
+  }
+  override def exists(): Boolean = getValue.isDefined
+
+  override def get(): T = getValue.getOrElse(null.asInstanceOf[T])
+
+  override def update(newState: T): Unit =
+    keyToStateValue.put(ImplicitGroupingKeyTracker.getImplicitKeyOption.get, 
newState)
+
+  override def clear(): Unit =
+    keyToStateValue.remove(ImplicitGroupingKeyTracker.getImplicitKeyOption.get)
+}
+
+/** In-memory implementation of ListState. */
+class InMemoryListState[T](clock: Clock, ttl: TTLConfig) extends ListState[T] {
+  private val keyToStateValue = mutable.Map[Any, mutable.ArrayBuffer[T]]()
+
+  private def getList: Option[mutable.ArrayBuffer[T]] = {
+    keyToStateValue.get(ImplicitGroupingKeyTracker.getImplicitKeyOption.get)
+  }
+
+  override def exists(): Boolean = getList.isDefined
+
+  override def get(): Iterator[T] = {
+    getList.getOrElse(mutable.ArrayBuffer.empty[T]).iterator
+  }
+
+  override def put(newState: Array[T]): Unit = {
+    keyToStateValue.put(
+      ImplicitGroupingKeyTracker.getImplicitKeyOption.get,
+      mutable.ArrayBuffer.empty[T] ++ newState
+    )
+  }
+
+  override def appendValue(newState: T): Unit = {
+    if (!exists()) {
+      keyToStateValue.put(
+        ImplicitGroupingKeyTracker.getImplicitKeyOption.get,
+        mutable.ArrayBuffer.empty[T]
+      )
+    }
+    keyToStateValue(ImplicitGroupingKeyTracker.getImplicitKeyOption.get) += 
newState
+  }
+
+  override def appendList(newState: Array[T]): Unit = {
+    if (!exists()) {
+      keyToStateValue.put(
+        ImplicitGroupingKeyTracker.getImplicitKeyOption.get,
+        mutable.ArrayBuffer.empty[T]
+      )
+    }
+    keyToStateValue(ImplicitGroupingKeyTracker.getImplicitKeyOption.get) ++= 
newState
+  }
+
+  override def clear(): Unit = {
+    keyToStateValue.remove(ImplicitGroupingKeyTracker.getImplicitKeyOption.get)
+  }
+}
+
+/** In-memory implementation of MapState. */
+class InMemoryMapState[K, V](clock: Clock, ttl: TTLConfig) extends MapState[K, 
V] {
+  private val keyToStateValue = mutable.Map[Any, mutable.HashMap[K, V]]()
+
+  private def getMap: Option[mutable.HashMap[K, V]] =
+    keyToStateValue.get(ImplicitGroupingKeyTracker.getImplicitKeyOption.get)
+
+  override def exists(): Boolean = getMap.isDefined
+
+  override def getValue(key: K): V = {
+    getMap.flatMap(_.get(key)).getOrElse(null.asInstanceOf[V])
+  }
+
+  override def containsKey(key: K): Boolean = {
+    getMap.exists(_.contains(key))
+  }
+
+  override def updateValue(key: K, value: V): Unit = {
+    if (!exists()) {
+      keyToStateValue.put(
+        ImplicitGroupingKeyTracker.getImplicitKeyOption.get,
+        mutable.HashMap.empty[K, V]
+      )
+    }
+
+    keyToStateValue(ImplicitGroupingKeyTracker.getImplicitKeyOption.get) += 
(key -> value)
+  }
+
+  override def iterator(): Iterator[(K, V)] = {
+    getMap.map(_.iterator).getOrElse(Iterator.empty)
+  }
+
+  override def keys(): Iterator[K] = {
+    getMap.map(_.keys.iterator).getOrElse(Iterator.empty)
+  }
+
+  override def values(): Iterator[V] = {
+    getMap.map(_.values.iterator).getOrElse(Iterator.empty)
+  }
+
+  override def removeKey(key: K): Unit = {
+    getMap.foreach(_.remove(key))
+  }
+
+  override def clear(): Unit = {
+    keyToStateValue.remove(ImplicitGroupingKeyTracker.getImplicitKeyOption.get)
+  }
+}
+
+/** In-memory timers. */
+class InMemoryTimers {
+  // Maps expiration times to keys.
+  // If time t is mapped to key k, there is a timer associated with key k 
expiring at time t.
+  private val timeToKeys = mutable.Map[Long, mutable.HashSet[Any]]()
+
+  def registerTimer(expiryTimestampMs: Long): Unit = {
+    val groupingKey = ImplicitGroupingKeyTracker.getImplicitKeyOption.get
+    if (!timeToKeys.contains(expiryTimestampMs)) {
+      timeToKeys.put(expiryTimestampMs, mutable.HashSet[Any]())
+    }
+    timeToKeys(expiryTimestampMs).add(groupingKey)
+  }
+
+  def deleteTimer(expiryTimestampMs: Long): Unit = {
+    val groupingKey = ImplicitGroupingKeyTracker.getImplicitKeyOption.get
+    if (timeToKeys.contains(expiryTimestampMs)) {
+      timeToKeys(expiryTimestampMs).remove(groupingKey)
+      if (timeToKeys(expiryTimestampMs).isEmpty) {
+        timeToKeys.remove(expiryTimestampMs)
+      }
+    }
+  }
+
+  def listTimers(): Iterator[Long] = {
+    val groupingKey = ImplicitGroupingKeyTracker.getImplicitKeyOption.get
+    timeToKeys.iterator.filter(_._2.contains(groupingKey) ).map(_._1)

Review Comment:
   ```suggestion
       timeToKeys.iterator.filter(_._2.contains(groupingKey)).map(_._1)
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TwsTesterSuite.scala:
##########
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
+import org.apache.spark.sql.internal.SQLConf
+
+/** Test suite for TwsTester utility class. */
+class TwsTesterSuite extends SparkFunSuite {
+
+  test("TwsTester should correctly test processor with ValueState") {
+    val tester = new TwsTester(new RunningCountProcessor[String]())
+    assert(tester.test("key1", List("a")) == List(("key1", 1L)))
+    assert(tester.test("key2", List("a", "a")) == List(("key2", 2L)))
+    assert(tester.test("key3", List("a")) == List(("key3", 1L)))
+    assert(tester.test("key1", List("a", "a", "a")) == List(("key1", 4L)))
+
+    assert(tester.peekValueState[Long]("count", "key1").get == 4L)
+    assert(tester.peekValueState[Long]("count", "key2").get == 2L)
+    assert(tester.peekValueState[Long]("count", "key3").get == 1L)
+    assert(tester.peekValueState[Long]("count", "key4").isEmpty)
+  }
+
+  test("TwsTester should allow direct access to ValueState") {
+    val processor = new RunningCountProcessor[String]()
+    val tester = new TwsTester[String, String, (String, Long)](processor)
+    tester.updateValueState[Long]("count", "foo", 5)
+    tester.test("foo", List("a"))
+    assert(tester.peekValueState[Long]("count", "foo").get == 6L)
+  }
+
+  test("TwsTester should correctly test processor with ListState") {
+    val tester = new TwsTester(new TopKProcessor(2))
+    val ans1 = tester.test("key1", List(("b", 2.0), ("c", 3.0), ("a", 1.0)))
+    assert(ans1 == List(("key1", 3.0), ("key1", 2.0)))
+    val ans2 = tester.test("key2", List(("a", 10.0), ("b", 20.0), ("c", 30.0), 
("d", 40.0)))
+    assert(ans2 == List(("key2", 40.0), ("key2", 30.0)))
+    val ans3 = tester.test("key3", List(("a", 100.0)))
+    assert(ans3 == List(("key3", 100.0)))
+
+    assert(tester.peekListState[Double]("topK", "key1") == List(3.0, 2.0))
+    assert(tester.peekListState[Double]("topK", "key2") == List(40.0, 30.0))
+    assert(tester.peekListState[Double]("topK", "key3") == List(100.0))
+    assert(tester.peekListState[Double]("topK", "key4").isEmpty)
+
+    val ans4 = tester.test("key1", List(("a", 10.0)))
+    assert(ans4 == List(("key1", 10.0), ("key1", 3.0)))
+    assert(tester.peekListState[Double]("topK", "key1") == List(10.0, 3.0))
+  }
+
+  test("TwsTester should allow direct access to ListState") {
+    val tester = new TwsTester(new TopKProcessor(2))
+    tester.updateListState("topK", "a", List(6.0, 5.0))
+    tester.updateListState("topK", "b", List(8.0, 7.0))
+    tester.test("a", List(("", 10.0)))
+    tester.test("b", List(("", 7.5)))
+    tester.test("c", List(("", 1.0)))
+
+    assert(tester.peekListState[Double]("topK", "a") == List(10.0, 6.0))
+    assert(tester.peekListState[Double]("topK", "b") == List(8.0, 7.5))
+    assert(tester.peekListState[Double]("topK", "c") == List(1.0))
+    assert(tester.peekListState[Double]("topK", "d") == List())
+  }
+
+  test("TwsTester should correctly test processor with MapState") {
+    val tester = new TwsTester(new WordFrequencyProcessor())
+    val ans1 =
+      tester.test("user1", List(("", "hello"), ("", "world"), ("", "hello"), 
("", "world")))
+    assert(
+      ans1.sorted == List(
+        ("user1", "hello", 1L),
+        ("user1", "hello", 2L),
+        ("user1", "world", 1L),
+        ("user1", "world", 2L)
+      ).sorted
+    )
+
+    val ans2 = tester.test("user2", List(("", "hello"), ("", "spark")))
+    assert(ans2.sorted == List(("user2", "hello", 1L), ("user2", "spark", 
1L)).sorted)
+
+    // Check state using peekMapState
+    assert(
+      tester.peekMapState[String, Long]("frequencies", "user1") == Map("hello" 
-> 2L, "world" -> 2L)
+    )
+    assert(
+      tester.peekMapState[String, Long]("frequencies", "user2") == Map("hello" 
-> 1L, "spark" -> 1L)
+    )
+    assert(tester.peekMapState[String, Long]("frequencies", "user3") == Map())
+    assert(tester.peekMapState[String, Long]("frequencies", "user3").isEmpty)
+
+    // Process more data for user1
+    val ans3 = tester.test("user1", List(("", "hello"), ("", "test")))
+    assert(ans3.sorted == List(("user1", "hello", 3L), ("user1", "test", 
1L)).sorted)
+    assert(
+      tester.peekMapState[String, Long]("frequencies", "user1") == Map(
+        "hello" -> 3L,
+        "world" -> 2L,
+        "test" -> 1L
+      )
+    )
+  }
+
+  test("TwsTester should allow direct access to MapState") {
+    val tester = new TwsTester(new WordFrequencyProcessor())
+
+    // Set initial state directly
+    tester.updateMapState("frequencies", "user1", Map("hello" -> 5L, "world" 
-> 3L))
+    tester.updateMapState("frequencies", "user2", Map("spark" -> 10L))
+
+    // Process new words
+    tester.test("user1", List(("", "hello")))
+    tester.test("user1", List(("", "goodbye")))
+    tester.test("user2", List(("", "spark")))
+    tester.test("user3", List(("", "new")))
+
+    // Verify updated state
+    assert(
+      tester.peekMapState[String, Long]("frequencies", "user1") == Map(
+        "hello" -> 6L,
+        "world" -> 3L,
+        "goodbye" -> 1L
+      )
+    )
+    assert(tester.peekMapState[String, Long]("frequencies", "user2") == 
Map("spark" -> 11L))
+    assert(tester.peekMapState[String, Long]("frequencies", "user3") == 
Map("new" -> 1L))
+    assert(tester.peekMapState[String, Long]("frequencies", "user4") == Map())
+  }
+
+  test("TwsTester can be used to test step function") {
+    val processor = new RunningCountProcessor[String]()
+    val tester = new TwsTester(processor)
+
+    // Example of helper function using TwsTester to inspect how processing a 
single row changes
+    // state.
+    def testStepFunction(key: String, inputRow: String, stateIn: Long): Long = 
{
+      tester.updateValueState[Long]("count", key, stateIn)
+      tester.test(key, List(inputRow))
+      tester.peekValueState("count", key).get
+    }
+
+    assert(testStepFunction("key1", "a", 10L) == 11L)
+  }
+
+  test("TwsTester should call handleInitialState") {
+    val processor = new RunningCountProcessor[String]()
+    val tester = new TwsTester(processor, initialState = List(("a", 10L), 
("b", 20L)))
+    assert(tester.peekValueState[Long]("count", "a").get == 10L)
+    assert(tester.peekValueState[Long]("count", "b").get == 20L)
+
+    val ans1 = tester.test("a", List("a"))
+    val ans2 = tester.test("c", List("c"))
+    assert(ans1 == List(("a", 11L)))
+    assert(ans2 == List(("c", 1L)))
+  }
+
+  test("TwsTester should fail when initialState is passed but not supported") {
+    val processor = new TopKProcessor(5)
+    val exception = intercept[IllegalArgumentException] {
+      new TwsTester(processor, initialState = List(("a", List(1.0, 2.0))))
+    }
+    assert(exception.getMessage.contains("stateful processor doesn't support 
initial state"))
+  }
+
+  test("TwsTester should support row-by-row processing") {
+    val tester = new TwsTester(new RunningCountProcessor[String]())
+
+    // Example of helper function to test how TransformWithState processes 
rows one-by-one, which
+    // can be used to simulate real-time mode.
+    def testRowByRow(input: List[(String, String)]): List[(String, Long)] = {
+      input.flatMap { case (key, value) => tester.test(key, List(value)) }
+    }
+
+    val input: List[(String, String)] = List(
+      ("key1", "a"),
+      ("key2", "b"),
+      ("key1", "c"),
+      ("key2", "b"),
+      ("key1", "c"),
+      ("key1", "c"),
+      ("key3", "q")
+    )
+    val ans: List[(String, Long)] = testRowByRow(input)
+    assert(
+      ans == List(
+        ("key1", 1L),
+        ("key2", 1L),
+        ("key1", 2L),
+        ("key2", 2L),
+        ("key1", 3L),
+        ("key1", 4L),
+        ("key3", 1L)
+      )
+    )
+  }
+
+  test("TwsTester should exercise all state methods") {
+    val tester = new TwsTester(new AllMethodsTestProcessor())
+    val results = tester.test(
+      "k",
+      List(
+        "value-exists", // false
+        "value-set", // set to 42
+        "value-exists", // true
+        "value-clear", // clear
+        "value-exists", // false again
+        "list-exists", // false
+        "list-append", // append a, b
+        "list-exists", // true
+        "list-append-array", // append c, d
+        "list-get", // a,b,c,d
+        "map-exists", // false
+        "map-add", // add x=1, y=2, z=3
+        "map-exists", // true
+        "map-keys", // x,y,z
+        "map-values", // 1,2,3
+        "map-iterator", // x=1,y=2,z=3
+        "map-remove", // remove y
+        "map-keys", // x,z
+        "map-clear", // clear map
+        "map-exists" // false
+      )
+    )
+
+    assert(
+      results == List(
+        ("k", "value-exists:false"),
+        ("k", "value-set:done"),
+        ("k", "value-exists:true"),
+        ("k", "value-clear:done"),
+        ("k", "value-exists:false"),
+        ("k", "list-exists:false"),
+        ("k", "list-append:done"),
+        ("k", "list-exists:true"),
+        ("k", "list-append-array:done"),
+        ("k", "list-get:a,b,c,d"),
+        ("k", "map-exists:false"),
+        ("k", "map-add:done"),
+        ("k", "map-exists:true"),
+        ("k", "map-keys:x,y,z"),
+        ("k", "map-values:1,2,3"),
+        ("k", "map-iterator:x=1,y=2,z=3"),
+        ("k", "map-remove:done"),
+        ("k", "map-keys:x,z"),
+        ("k", "map-clear:done"),
+        ("k", "map-exists:false")
+      )
+    )
+  }
+
+  test("TwsTester: value-get before value-set") {
+    val tester = new TwsTester(new AllMethodsTestProcessor())
+    val results = tester.test("k", List("value-get", "value-set", "value-get"))
+    assert(results == List(("k", ""), ("k", "value-set:done"), ("k", "42")))
+  }
+
+  test("TwsTester: list operations") {
+    val tester = new TwsTester(new AllMethodsTestProcessor())
+    val results =
+      tester.test(
+        "k",
+        List(
+          "list-get",
+          "list-append",
+          "list-get",
+          "list-append",
+          "list-get",
+          "list-put",
+          "list-get",
+          "list-clear",
+          "list-get"
+        )
+      )
+    assert(
+      results == List(
+        ("k", "list-get:"),
+        ("k", "list-append:done"),
+        ("k", "list-get:a,b"),
+        ("k", "list-append:done"),
+        ("k", "list-get:a,b,a,b"),
+        ("k", "list-put:done"),
+        ("k", "list-get:put"),
+        ("k", "list-clear:done"),
+        ("k", "list-get:")
+      )
+    )
+  }
+
+  test("TwsTester should delete value state") {
+    val valueTester = new TwsTester(new RunningCountProcessor[String]())
+    valueTester.updateValueState[Long]("count", "key1", 10L)
+    valueTester.updateValueState[Long]("count", "key2", 20L)
+    assert(valueTester.peekValueState[Long]("count", "key1").get == 10L)
+    valueTester.deleteState("count", "key1")
+    assert(valueTester.peekValueState[Long]("count", "key1").isEmpty)
+    assert(valueTester.peekValueState[Long]("count", "key2").get == 20L)
+  }
+
+  test("TwsTester should delete list state") {
+    val listTester = new TwsTester(new TopKProcessor(3))
+    listTester.updateListState("topK", "key1", List(1.0, 2.0, 3.0))
+    listTester.updateListState("topK", "key2", List(4.0, 5.0))
+    assert(listTester.peekListState[Double]("topK", "key1") == List(1.0, 2.0, 
3.0))
+    listTester.deleteState("topK", "key1")
+    assert(listTester.peekListState[Double]("topK", "key1").isEmpty)
+    assert(listTester.peekListState[Double]("topK", "key2") == List(4.0, 5.0))
+  }
+
+  test("TwsTester should delete map state") {
+    val mapTester = new TwsTester(new WordFrequencyProcessor())
+    mapTester.updateMapState("frequencies", "user1", Map("hello" -> 5L, 
"world" -> 3L))
+    mapTester.updateMapState("frequencies", "user2", Map("spark" -> 10L))
+    assert(
+      mapTester
+        .peekMapState[String, Long]("frequencies", "user1") == Map("hello" -> 
5L, "world" -> 3L)
+    )
+    mapTester.deleteState("frequencies", "user1")
+    assert(mapTester.peekMapState[String, Long]("frequencies", 
"user1").isEmpty)
+    assert(mapTester.peekMapState[String, Long]("frequencies", "user2") == 
Map("spark" -> 10L))
+  }
+
+  test("TwsTester should support ProcessingTime timers") {
+    val tester = new TwsTester(
+      new SessionTimeoutProcessor(),
+      timeMode = TimeMode.ProcessingTime()
+    )
+
+    // Process input for key1 - should register a timer at t=10000
+    val result1 = tester.test("key1", List("hello"))
+    assert(result1 == List(("key1", "received:hello")))
+
+    // Set time to 5 seconds - timer should NOT fire yet
+    val expired1 = tester.setProcessingTime(5000)
+    assert(expired1.isEmpty)
+
+    // Process input for key2 at t=5000 - should register timer at t=15000
+    val result2 = tester.test("key2", List("world"))
+    assert(result2 == List(("key2", "received:world")))
+
+    // Set time to 11 seconds - key1's timer should fire
+    val expired2 = tester.setProcessingTime(11000)
+    assert(expired2 == List(("key1", "session-expired@11000")))
+
+    // Set time to 16 seconds - key2's timer should fire
+    val expired3 = tester.setProcessingTime(16000)
+    assert(expired3 == List(("key2", "session-expired@16000")))

Review Comment:
   same here



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TwsTesterSuite.scala:
##########
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
+import org.apache.spark.sql.internal.SQLConf
+
+/** Test suite for TwsTester utility class. */
+class TwsTesterSuite extends SparkFunSuite {
+
+  test("TwsTester should correctly test processor with ValueState") {
+    val tester = new TwsTester(new RunningCountProcessor[String]())
+    assert(tester.test("key1", List("a")) == List(("key1", 1L)))
+    assert(tester.test("key2", List("a", "a")) == List(("key2", 2L)))
+    assert(tester.test("key3", List("a")) == List(("key3", 1L)))
+    assert(tester.test("key1", List("a", "a", "a")) == List(("key1", 4L)))
+
+    assert(tester.peekValueState[Long]("count", "key1").get == 4L)
+    assert(tester.peekValueState[Long]("count", "key2").get == 2L)
+    assert(tester.peekValueState[Long]("count", "key3").get == 1L)
+    assert(tester.peekValueState[Long]("count", "key4").isEmpty)
+  }
+
+  test("TwsTester should allow direct access to ValueState") {
+    val processor = new RunningCountProcessor[String]()
+    val tester = new TwsTester[String, String, (String, Long)](processor)
+    tester.updateValueState[Long]("count", "foo", 5)
+    tester.test("foo", List("a"))
+    assert(tester.peekValueState[Long]("count", "foo").get == 6L)
+  }
+
+  test("TwsTester should correctly test processor with ListState") {
+    val tester = new TwsTester(new TopKProcessor(2))
+    val ans1 = tester.test("key1", List(("b", 2.0), ("c", 3.0), ("a", 1.0)))
+    assert(ans1 == List(("key1", 3.0), ("key1", 2.0)))
+    val ans2 = tester.test("key2", List(("a", 10.0), ("b", 20.0), ("c", 30.0), 
("d", 40.0)))
+    assert(ans2 == List(("key2", 40.0), ("key2", 30.0)))
+    val ans3 = tester.test("key3", List(("a", 100.0)))
+    assert(ans3 == List(("key3", 100.0)))
+
+    assert(tester.peekListState[Double]("topK", "key1") == List(3.0, 2.0))
+    assert(tester.peekListState[Double]("topK", "key2") == List(40.0, 30.0))
+    assert(tester.peekListState[Double]("topK", "key3") == List(100.0))
+    assert(tester.peekListState[Double]("topK", "key4").isEmpty)
+
+    val ans4 = tester.test("key1", List(("a", 10.0)))
+    assert(ans4 == List(("key1", 10.0), ("key1", 3.0)))
+    assert(tester.peekListState[Double]("topK", "key1") == List(10.0, 3.0))
+  }
+
+  test("TwsTester should allow direct access to ListState") {
+    val tester = new TwsTester(new TopKProcessor(2))
+    tester.updateListState("topK", "a", List(6.0, 5.0))
+    tester.updateListState("topK", "b", List(8.0, 7.0))
+    tester.test("a", List(("", 10.0)))
+    tester.test("b", List(("", 7.5)))
+    tester.test("c", List(("", 1.0)))
+
+    assert(tester.peekListState[Double]("topK", "a") == List(10.0, 6.0))
+    assert(tester.peekListState[Double]("topK", "b") == List(8.0, 7.5))
+    assert(tester.peekListState[Double]("topK", "c") == List(1.0))
+    assert(tester.peekListState[Double]("topK", "d") == List())
+  }
+
+  test("TwsTester should correctly test processor with MapState") {
+    val tester = new TwsTester(new WordFrequencyProcessor())
+    val ans1 =
+      tester.test("user1", List(("", "hello"), ("", "world"), ("", "hello"), 
("", "world")))
+    assert(
+      ans1.sorted == List(
+        ("user1", "hello", 1L),
+        ("user1", "hello", 2L),
+        ("user1", "world", 1L),
+        ("user1", "world", 2L)
+      ).sorted
+    )
+
+    val ans2 = tester.test("user2", List(("", "hello"), ("", "spark")))
+    assert(ans2.sorted == List(("user2", "hello", 1L), ("user2", "spark", 
1L)).sorted)
+
+    // Check state using peekMapState
+    assert(
+      tester.peekMapState[String, Long]("frequencies", "user1") == Map("hello" 
-> 2L, "world" -> 2L)
+    )
+    assert(
+      tester.peekMapState[String, Long]("frequencies", "user2") == Map("hello" 
-> 1L, "spark" -> 1L)
+    )
+    assert(tester.peekMapState[String, Long]("frequencies", "user3") == Map())
+    assert(tester.peekMapState[String, Long]("frequencies", "user3").isEmpty)
+
+    // Process more data for user1
+    val ans3 = tester.test("user1", List(("", "hello"), ("", "test")))
+    assert(ans3.sorted == List(("user1", "hello", 3L), ("user1", "test", 
1L)).sorted)
+    assert(
+      tester.peekMapState[String, Long]("frequencies", "user1") == Map(
+        "hello" -> 3L,
+        "world" -> 2L,
+        "test" -> 1L
+      )
+    )
+  }
+
+  test("TwsTester should allow direct access to MapState") {
+    val tester = new TwsTester(new WordFrequencyProcessor())
+
+    // Set initial state directly
+    tester.updateMapState("frequencies", "user1", Map("hello" -> 5L, "world" 
-> 3L))
+    tester.updateMapState("frequencies", "user2", Map("spark" -> 10L))
+
+    // Process new words
+    tester.test("user1", List(("", "hello")))
+    tester.test("user1", List(("", "goodbye")))
+    tester.test("user2", List(("", "spark")))
+    tester.test("user3", List(("", "new")))
+
+    // Verify updated state
+    assert(
+      tester.peekMapState[String, Long]("frequencies", "user1") == Map(
+        "hello" -> 6L,
+        "world" -> 3L,
+        "goodbye" -> 1L
+      )
+    )
+    assert(tester.peekMapState[String, Long]("frequencies", "user2") == 
Map("spark" -> 11L))
+    assert(tester.peekMapState[String, Long]("frequencies", "user3") == 
Map("new" -> 1L))
+    assert(tester.peekMapState[String, Long]("frequencies", "user4") == Map())
+  }
+
+  test("TwsTester can be used to test step function") {
+    val processor = new RunningCountProcessor[String]()
+    val tester = new TwsTester(processor)
+
+    // Example of helper function using TwsTester to inspect how processing a 
single row changes
+    // state.
+    def testStepFunction(key: String, inputRow: String, stateIn: Long): Long = 
{
+      tester.updateValueState[Long]("count", key, stateIn)
+      tester.test(key, List(inputRow))
+      tester.peekValueState("count", key).get
+    }
+
+    assert(testStepFunction("key1", "a", 10L) == 11L)
+  }
+
+  test("TwsTester should call handleInitialState") {
+    val processor = new RunningCountProcessor[String]()
+    val tester = new TwsTester(processor, initialState = List(("a", 10L), 
("b", 20L)))
+    assert(tester.peekValueState[Long]("count", "a").get == 10L)
+    assert(tester.peekValueState[Long]("count", "b").get == 20L)
+
+    val ans1 = tester.test("a", List("a"))
+    val ans2 = tester.test("c", List("c"))
+    assert(ans1 == List(("a", 11L)))
+    assert(ans2 == List(("c", 1L)))
+  }
+
+  test("TwsTester should fail when initialState is passed but not supported") {
+    val processor = new TopKProcessor(5)
+    val exception = intercept[IllegalArgumentException] {
+      new TwsTester(processor, initialState = List(("a", List(1.0, 2.0))))
+    }
+    assert(exception.getMessage.contains("stateful processor doesn't support 
initial state"))
+  }
+
+  test("TwsTester should support row-by-row processing") {
+    val tester = new TwsTester(new RunningCountProcessor[String]())
+
+    // Example of helper function to test how TransformWithState processes 
rows one-by-one, which
+    // can be used to simulate real-time mode.
+    def testRowByRow(input: List[(String, String)]): List[(String, Long)] = {
+      input.flatMap { case (key, value) => tester.test(key, List(value)) }
+    }
+
+    val input: List[(String, String)] = List(
+      ("key1", "a"),
+      ("key2", "b"),
+      ("key1", "c"),
+      ("key2", "b"),
+      ("key1", "c"),
+      ("key1", "c"),
+      ("key3", "q")
+    )
+    val ans: List[(String, Long)] = testRowByRow(input)
+    assert(
+      ans == List(
+        ("key1", 1L),
+        ("key2", 1L),
+        ("key1", 2L),
+        ("key2", 2L),
+        ("key1", 3L),
+        ("key1", 4L),
+        ("key3", 1L)
+      )
+    )
+  }
+
+  test("TwsTester should exercise all state methods") {
+    val tester = new TwsTester(new AllMethodsTestProcessor())
+    val results = tester.test(
+      "k",
+      List(
+        "value-exists", // false
+        "value-set", // set to 42
+        "value-exists", // true
+        "value-clear", // clear
+        "value-exists", // false again
+        "list-exists", // false
+        "list-append", // append a, b
+        "list-exists", // true
+        "list-append-array", // append c, d
+        "list-get", // a,b,c,d
+        "map-exists", // false
+        "map-add", // add x=1, y=2, z=3
+        "map-exists", // true
+        "map-keys", // x,y,z
+        "map-values", // 1,2,3
+        "map-iterator", // x=1,y=2,z=3
+        "map-remove", // remove y
+        "map-keys", // x,z
+        "map-clear", // clear map
+        "map-exists" // false
+      )
+    )
+
+    assert(
+      results == List(
+        ("k", "value-exists:false"),
+        ("k", "value-set:done"),
+        ("k", "value-exists:true"),
+        ("k", "value-clear:done"),
+        ("k", "value-exists:false"),
+        ("k", "list-exists:false"),
+        ("k", "list-append:done"),
+        ("k", "list-exists:true"),
+        ("k", "list-append-array:done"),
+        ("k", "list-get:a,b,c,d"),
+        ("k", "map-exists:false"),
+        ("k", "map-add:done"),
+        ("k", "map-exists:true"),
+        ("k", "map-keys:x,y,z"),
+        ("k", "map-values:1,2,3"),
+        ("k", "map-iterator:x=1,y=2,z=3"),
+        ("k", "map-remove:done"),
+        ("k", "map-keys:x,z"),
+        ("k", "map-clear:done"),
+        ("k", "map-exists:false")
+      )
+    )
+  }
+
+  test("TwsTester: value-get before value-set") {
+    val tester = new TwsTester(new AllMethodsTestProcessor())
+    val results = tester.test("k", List("value-get", "value-set", "value-get"))
+    assert(results == List(("k", ""), ("k", "value-set:done"), ("k", "42")))
+  }
+
+  test("TwsTester: list operations") {
+    val tester = new TwsTester(new AllMethodsTestProcessor())
+    val results =
+      tester.test(
+        "k",
+        List(
+          "list-get",
+          "list-append",
+          "list-get",
+          "list-append",
+          "list-get",
+          "list-put",
+          "list-get",
+          "list-clear",
+          "list-get"
+        )
+      )
+    assert(
+      results == List(
+        ("k", "list-get:"),
+        ("k", "list-append:done"),
+        ("k", "list-get:a,b"),
+        ("k", "list-append:done"),
+        ("k", "list-get:a,b,a,b"),
+        ("k", "list-put:done"),
+        ("k", "list-get:put"),
+        ("k", "list-clear:done"),
+        ("k", "list-get:")
+      )
+    )
+  }
+
+  test("TwsTester should delete value state") {
+    val valueTester = new TwsTester(new RunningCountProcessor[String]())
+    valueTester.updateValueState[Long]("count", "key1", 10L)
+    valueTester.updateValueState[Long]("count", "key2", 20L)
+    assert(valueTester.peekValueState[Long]("count", "key1").get == 10L)
+    valueTester.deleteState("count", "key1")
+    assert(valueTester.peekValueState[Long]("count", "key1").isEmpty)
+    assert(valueTester.peekValueState[Long]("count", "key2").get == 20L)
+  }
+
+  test("TwsTester should delete list state") {
+    val listTester = new TwsTester(new TopKProcessor(3))
+    listTester.updateListState("topK", "key1", List(1.0, 2.0, 3.0))
+    listTester.updateListState("topK", "key2", List(4.0, 5.0))
+    assert(listTester.peekListState[Double]("topK", "key1") == List(1.0, 2.0, 
3.0))
+    listTester.deleteState("topK", "key1")
+    assert(listTester.peekListState[Double]("topK", "key1").isEmpty)
+    assert(listTester.peekListState[Double]("topK", "key2") == List(4.0, 5.0))
+  }
+
+  test("TwsTester should delete map state") {
+    val mapTester = new TwsTester(new WordFrequencyProcessor())
+    mapTester.updateMapState("frequencies", "user1", Map("hello" -> 5L, 
"world" -> 3L))
+    mapTester.updateMapState("frequencies", "user2", Map("spark" -> 10L))
+    assert(
+      mapTester
+        .peekMapState[String, Long]("frequencies", "user1") == Map("hello" -> 
5L, "world" -> 3L)
+    )
+    mapTester.deleteState("frequencies", "user1")
+    assert(mapTester.peekMapState[String, Long]("frequencies", 
"user1").isEmpty)
+    assert(mapTester.peekMapState[String, Long]("frequencies", "user2") == 
Map("spark" -> 10L))
+  }
+
+  test("TwsTester should support ProcessingTime timers") {
+    val tester = new TwsTester(
+      new SessionTimeoutProcessor(),
+      timeMode = TimeMode.ProcessingTime()
+    )
+
+    // Process input for key1 - should register a timer at t=10000
+    val result1 = tester.test("key1", List("hello"))
+    assert(result1 == List(("key1", "received:hello")))
+
+    // Set time to 5 seconds - timer should NOT fire yet
+    val expired1 = tester.setProcessingTime(5000)
+    assert(expired1.isEmpty)
+
+    // Process input for key2 at t=5000 - should register timer at t=15000
+    val result2 = tester.test("key2", List("world"))
+    assert(result2 == List(("key2", "received:world")))
+
+    // Set time to 11 seconds - key1's timer should fire
+    val expired2 = tester.setProcessingTime(11000)
+    assert(expired2 == List(("key1", "session-expired@11000")))
+
+    // Set time to 16 seconds - key2's timer should fire
+    val expired3 = tester.setProcessingTime(16000)
+    assert(expired3 == List(("key2", "session-expired@16000")))
+
+    // Verify state is cleared after session expiry
+    assert(tester.peekValueState[Long]("lastSeen", "key1").isEmpty)
+    assert(tester.peekValueState[Long]("lastSeen", "key2").isEmpty)
+  }
+
+  test("TwsTester should support EventTime timers fired by manual watermark 
advance") {
+    val tester = new TwsTester(
+      new EventTimeSessionProcessor(),
+      timeMode = TimeMode.EventTime()
+    )
+
+    // Process event at t=10000 for key1 - registers timer at t=15000
+    // Note: watermark starts at 0 and must be advanced manually
+    val result1 = tester.test("key1", List((10000L, "hello")))
+    assert(result1 == List(("key1", "received:hello@10000")))
+
+    // Process another event at t=12000 for key1 - should cancel timer at 
t=15000
+    // and register new timer at t=17000
+    val result2 = tester.test("key1", List((12000L, "hello2")))
+    assert(result2 == List(("key1", "received:hello2@12000")))
+
+    // Set watermark to 16000 (past where old timer at 15000 would have fired)
+    // No timer should fire because the old timer was cancelled
+    val expired1 = tester.setWatermark(16000)
+    assert(expired1.isEmpty, "Old timer should have been cancelled, but it 
fired")
+
+    // Verify state is still present (session not expired yet)
+    assert(tester.peekValueState[Long]("lastEventTime", "key1").isDefined)
+
+    // Now set watermark past the new timer at 17000
+    val expired2 = tester.setWatermark(18000)
+    assert(expired2.size == 1)
+    assert(expired2.head._1 == "key1")
+    assert(expired2.head._2.startsWith("session-expired@watermark="))

Review Comment:
   same here, why not returning the expiration time of the timer and verify it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to