jingz-db commented on code in PR #47878:
URL: https://github.com/apache/spark/pull/47878#discussion_r1815785813


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServerSuite.scala:
##########
@@ -279,4 +283,68 @@ class TransformWithStateInPandasStateServerSuite extends 
SparkFunSuite with Befo
     verify(transformWithStateInPandasDeserializer).readArrowBatches(any)
     verify(listState).appendList(any)
   }
+
+  test("timer value get processing time") {
+    val message = TimerRequest.newBuilder().setTimerValueRequest(
+      TimerValueRequest.newBuilder().setGetProcessingTimer(
+        GetProcessingTime.newBuilder().build()
+      ).build()
+    ).build()
+    stateServer.handleTimerRequest(message)
+    verify(batchTimestampMs).isDefined
+    verify(outputStream).writeInt(argThat((x: Int) => x > 0))
+  }
+
+  test("timer value get watermark") {
+    val message = TimerRequest.newBuilder().setTimerValueRequest(
+      TimerValueRequest.newBuilder().setGetWatermark(
+        GetWatermark.newBuilder().build()
+      ).build()
+    ).build()
+    stateServer.handleTimerRequest(message)
+    verify(eventTimeWatermarkForEviction).isDefined
+    verify(outputStream).writeInt(argThat((x: Int) => x > 0))
+  }
+
+  test("get expiry timers") {
+    val message = TimerRequest.newBuilder().setExpiryTimerRequest(
+      ExpiryTimerRequest.newBuilder().setExpiryTimestampMs(
+        10L
+      ).build()
+    ).build()
+    stateServer.handleTimerRequest(message)
+    verify(statefulProcessorHandle).getExpiredTimers(any[Long])

Review Comment:
   Same above. This is also tested in e2e suites by assertions on the output of 
handle expired timer rows.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServerSuite.scala:
##########
@@ -279,4 +283,68 @@ class TransformWithStateInPandasStateServerSuite extends 
SparkFunSuite with Befo
     verify(transformWithStateInPandasDeserializer).readArrowBatches(any)
     verify(listState).appendList(any)
   }
+
+  test("timer value get processing time") {
+    val message = TimerRequest.newBuilder().setTimerValueRequest(
+      TimerValueRequest.newBuilder().setGetProcessingTimer(
+        GetProcessingTime.newBuilder().build()
+      ).build()
+    ).build()
+    stateServer.handleTimerRequest(message)
+    verify(batchTimestampMs).isDefined
+    verify(outputStream).writeInt(argThat((x: Int) => x > 0))
+  }
+
+  test("timer value get watermark") {
+    val message = TimerRequest.newBuilder().setTimerValueRequest(
+      TimerValueRequest.newBuilder().setGetWatermark(
+        GetWatermark.newBuilder().build()
+      ).build()
+    ).build()
+    stateServer.handleTimerRequest(message)
+    verify(eventTimeWatermarkForEviction).isDefined
+    verify(outputStream).writeInt(argThat((x: Int) => x > 0))
+  }
+
+  test("get expiry timers") {
+    val message = TimerRequest.newBuilder().setExpiryTimerRequest(
+      ExpiryTimerRequest.newBuilder().setExpiryTimestampMs(
+        10L
+      ).build()
+    ).build()
+    stateServer.handleTimerRequest(message)
+    verify(statefulProcessorHandle).getExpiredTimers(any[Long])
+  }
+
+  test("stateful processor register timer") {
+    val message = StatefulProcessorCall.newBuilder().setTimerStateCall(
+      TimerStateCallCommand.newBuilder()
+        
.setRegister(RegisterTimer.newBuilder().setExpiryTimestampMs(10L).build())
+        .build()
+    ).build()
+    stateServer.handleStatefulProcessorCall(message)
+    verify(statefulProcessorHandle).registerTimer(any[Long])
+    verify(outputStream).writeInt(0)
+  }
+
+  test("stateful processor delete timer") {
+    val message = StatefulProcessorCall.newBuilder().setTimerStateCall(
+      TimerStateCallCommand.newBuilder()
+        .setDelete(DeleteTimer.newBuilder().setExpiryTimestampMs(10L).build())
+        .build()
+    ).build()
+    stateServer.handleStatefulProcessorCall(message)
+    verify(statefulProcessorHandle).deleteTimer(any[Long])
+    verify(outputStream).writeInt(0)
+  }
+
+  test("stateful processor list timer") {
+    val message = StatefulProcessorCall.newBuilder().setTimerStateCall(
+      TimerStateCallCommand.newBuilder()
+        .setList(ListTimers.newBuilder().build())
+        .build()
+    ).build()
+    stateServer.handleStatefulProcessorCall(message)
+    verify(statefulProcessorHandle).listTimers()

Review Comment:
   Adding spying instance param and added one more test case. Also this is 
tested in the e2e suite `test_pandas_transform_with_state` 
[here](https://github.com/apache/spark/pull/47878/files#diff-03ab27c9011040bf67b55f05489a273973a8059f31e8f201a71f7df22ff85cd0R590)
 for multiple iterators called on the same grouping key.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServerSuite.scala:
##########
@@ -279,4 +283,68 @@ class TransformWithStateInPandasStateServerSuite extends 
SparkFunSuite with Befo
     verify(transformWithStateInPandasDeserializer).readArrowBatches(any)
     verify(listState).appendList(any)
   }
+
+  test("timer value get processing time") {
+    val message = TimerRequest.newBuilder().setTimerValueRequest(
+      TimerValueRequest.newBuilder().setGetProcessingTimer(
+        GetProcessingTime.newBuilder().build()
+      ).build()
+    ).build()
+    stateServer.handleTimerRequest(message)
+    verify(batchTimestampMs).isDefined
+    verify(outputStream).writeInt(argThat((x: Int) => x > 0))
+  }
+
+  test("timer value get watermark") {
+    val message = TimerRequest.newBuilder().setTimerValueRequest(
+      TimerValueRequest.newBuilder().setGetWatermark(
+        GetWatermark.newBuilder().build()
+      ).build()
+    ).build()
+    stateServer.handleTimerRequest(message)
+    verify(eventTimeWatermarkForEviction).isDefined
+    verify(outputStream).writeInt(argThat((x: Int) => x > 0))
+  }
+
+  test("get expiry timers") {
+    val message = TimerRequest.newBuilder().setExpiryTimerRequest(
+      ExpiryTimerRequest.newBuilder().setExpiryTimestampMs(
+        10L
+      ).build()
+    ).build()
+    stateServer.handleTimerRequest(message)
+    verify(statefulProcessorHandle).getExpiredTimers(any[Long])
+  }
+
+  test("stateful processor register timer") {
+    val message = StatefulProcessorCall.newBuilder().setTimerStateCall(
+      TimerStateCallCommand.newBuilder()
+        
.setRegister(RegisterTimer.newBuilder().setExpiryTimestampMs(10L).build())
+        .build()
+    ).build()
+    stateServer.handleStatefulProcessorCall(message)
+    verify(statefulProcessorHandle).registerTimer(any[Long])
+    verify(outputStream).writeInt(0)
+  }
+
+  test("stateful processor delete timer") {
+    val message = StatefulProcessorCall.newBuilder().setTimerStateCall(
+      TimerStateCallCommand.newBuilder()
+        .setDelete(DeleteTimer.newBuilder().setExpiryTimestampMs(10L).build())
+        .build()
+    ).build()
+    stateServer.handleStatefulProcessorCall(message)
+    verify(statefulProcessorHandle).deleteTimer(any[Long])
+    verify(outputStream).writeInt(0)
+  }
+
+  test("stateful processor list timer") {
+    val message = StatefulProcessorCall.newBuilder().setTimerStateCall(
+      TimerStateCallCommand.newBuilder()
+        .setList(ListTimers.newBuilder().build())
+        .build()
+    ).build()
+    stateServer.handleStatefulProcessorCall(message)
+    verify(statefulProcessorHandle).listTimers()

Review Comment:
   Added spying instance param and added one more test case. Also this is 
tested in the e2e suite `test_pandas_transform_with_state` 
[here](https://github.com/apache/spark/pull/47878/files#diff-03ab27c9011040bf67b55f05489a273973a8059f31e8f201a71f7df22ff85cd0R590)
 for multiple iterators called on the same grouping key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to