jingz-db commented on code in PR #47878:
URL: https://github.com/apache/spark/pull/47878#discussion_r1815785813
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServerSuite.scala:
##########
@@ -279,4 +283,68 @@ class TransformWithStateInPandasStateServerSuite extends
SparkFunSuite with Befo
verify(transformWithStateInPandasDeserializer).readArrowBatches(any)
verify(listState).appendList(any)
}
+
+ test("timer value get processing time") {
+ val message = TimerRequest.newBuilder().setTimerValueRequest(
+ TimerValueRequest.newBuilder().setGetProcessingTimer(
+ GetProcessingTime.newBuilder().build()
+ ).build()
+ ).build()
+ stateServer.handleTimerRequest(message)
+ verify(batchTimestampMs).isDefined
+ verify(outputStream).writeInt(argThat((x: Int) => x > 0))
+ }
+
+ test("timer value get watermark") {
+ val message = TimerRequest.newBuilder().setTimerValueRequest(
+ TimerValueRequest.newBuilder().setGetWatermark(
+ GetWatermark.newBuilder().build()
+ ).build()
+ ).build()
+ stateServer.handleTimerRequest(message)
+ verify(eventTimeWatermarkForEviction).isDefined
+ verify(outputStream).writeInt(argThat((x: Int) => x > 0))
+ }
+
+ test("get expiry timers") {
+ val message = TimerRequest.newBuilder().setExpiryTimerRequest(
+ ExpiryTimerRequest.newBuilder().setExpiryTimestampMs(
+ 10L
+ ).build()
+ ).build()
+ stateServer.handleTimerRequest(message)
+ verify(statefulProcessorHandle).getExpiredTimers(any[Long])
Review Comment:
Same above. This is also tested in e2e suites by assertions on the output of
handle expired timer rows.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServerSuite.scala:
##########
@@ -279,4 +283,68 @@ class TransformWithStateInPandasStateServerSuite extends
SparkFunSuite with Befo
verify(transformWithStateInPandasDeserializer).readArrowBatches(any)
verify(listState).appendList(any)
}
+
+ test("timer value get processing time") {
+ val message = TimerRequest.newBuilder().setTimerValueRequest(
+ TimerValueRequest.newBuilder().setGetProcessingTimer(
+ GetProcessingTime.newBuilder().build()
+ ).build()
+ ).build()
+ stateServer.handleTimerRequest(message)
+ verify(batchTimestampMs).isDefined
+ verify(outputStream).writeInt(argThat((x: Int) => x > 0))
+ }
+
+ test("timer value get watermark") {
+ val message = TimerRequest.newBuilder().setTimerValueRequest(
+ TimerValueRequest.newBuilder().setGetWatermark(
+ GetWatermark.newBuilder().build()
+ ).build()
+ ).build()
+ stateServer.handleTimerRequest(message)
+ verify(eventTimeWatermarkForEviction).isDefined
+ verify(outputStream).writeInt(argThat((x: Int) => x > 0))
+ }
+
+ test("get expiry timers") {
+ val message = TimerRequest.newBuilder().setExpiryTimerRequest(
+ ExpiryTimerRequest.newBuilder().setExpiryTimestampMs(
+ 10L
+ ).build()
+ ).build()
+ stateServer.handleTimerRequest(message)
+ verify(statefulProcessorHandle).getExpiredTimers(any[Long])
+ }
+
+ test("stateful processor register timer") {
+ val message = StatefulProcessorCall.newBuilder().setTimerStateCall(
+ TimerStateCallCommand.newBuilder()
+
.setRegister(RegisterTimer.newBuilder().setExpiryTimestampMs(10L).build())
+ .build()
+ ).build()
+ stateServer.handleStatefulProcessorCall(message)
+ verify(statefulProcessorHandle).registerTimer(any[Long])
+ verify(outputStream).writeInt(0)
+ }
+
+ test("stateful processor delete timer") {
+ val message = StatefulProcessorCall.newBuilder().setTimerStateCall(
+ TimerStateCallCommand.newBuilder()
+ .setDelete(DeleteTimer.newBuilder().setExpiryTimestampMs(10L).build())
+ .build()
+ ).build()
+ stateServer.handleStatefulProcessorCall(message)
+ verify(statefulProcessorHandle).deleteTimer(any[Long])
+ verify(outputStream).writeInt(0)
+ }
+
+ test("stateful processor list timer") {
+ val message = StatefulProcessorCall.newBuilder().setTimerStateCall(
+ TimerStateCallCommand.newBuilder()
+ .setList(ListTimers.newBuilder().build())
+ .build()
+ ).build()
+ stateServer.handleStatefulProcessorCall(message)
+ verify(statefulProcessorHandle).listTimers()
Review Comment:
Adding spying instance param and added one more test case. Also this is
tested in the e2e suite `test_pandas_transform_with_state`
[here](https://github.com/apache/spark/pull/47878/files#diff-03ab27c9011040bf67b55f05489a273973a8059f31e8f201a71f7df22ff85cd0R590)
for multiple iterators called on the same grouping key.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServerSuite.scala:
##########
@@ -279,4 +283,68 @@ class TransformWithStateInPandasStateServerSuite extends
SparkFunSuite with Befo
verify(transformWithStateInPandasDeserializer).readArrowBatches(any)
verify(listState).appendList(any)
}
+
+ test("timer value get processing time") {
+ val message = TimerRequest.newBuilder().setTimerValueRequest(
+ TimerValueRequest.newBuilder().setGetProcessingTimer(
+ GetProcessingTime.newBuilder().build()
+ ).build()
+ ).build()
+ stateServer.handleTimerRequest(message)
+ verify(batchTimestampMs).isDefined
+ verify(outputStream).writeInt(argThat((x: Int) => x > 0))
+ }
+
+ test("timer value get watermark") {
+ val message = TimerRequest.newBuilder().setTimerValueRequest(
+ TimerValueRequest.newBuilder().setGetWatermark(
+ GetWatermark.newBuilder().build()
+ ).build()
+ ).build()
+ stateServer.handleTimerRequest(message)
+ verify(eventTimeWatermarkForEviction).isDefined
+ verify(outputStream).writeInt(argThat((x: Int) => x > 0))
+ }
+
+ test("get expiry timers") {
+ val message = TimerRequest.newBuilder().setExpiryTimerRequest(
+ ExpiryTimerRequest.newBuilder().setExpiryTimestampMs(
+ 10L
+ ).build()
+ ).build()
+ stateServer.handleTimerRequest(message)
+ verify(statefulProcessorHandle).getExpiredTimers(any[Long])
+ }
+
+ test("stateful processor register timer") {
+ val message = StatefulProcessorCall.newBuilder().setTimerStateCall(
+ TimerStateCallCommand.newBuilder()
+
.setRegister(RegisterTimer.newBuilder().setExpiryTimestampMs(10L).build())
+ .build()
+ ).build()
+ stateServer.handleStatefulProcessorCall(message)
+ verify(statefulProcessorHandle).registerTimer(any[Long])
+ verify(outputStream).writeInt(0)
+ }
+
+ test("stateful processor delete timer") {
+ val message = StatefulProcessorCall.newBuilder().setTimerStateCall(
+ TimerStateCallCommand.newBuilder()
+ .setDelete(DeleteTimer.newBuilder().setExpiryTimestampMs(10L).build())
+ .build()
+ ).build()
+ stateServer.handleStatefulProcessorCall(message)
+ verify(statefulProcessorHandle).deleteTimer(any[Long])
+ verify(outputStream).writeInt(0)
+ }
+
+ test("stateful processor list timer") {
+ val message = StatefulProcessorCall.newBuilder().setTimerStateCall(
+ TimerStateCallCommand.newBuilder()
+ .setList(ListTimers.newBuilder().build())
+ .build()
+ ).build()
+ stateServer.handleStatefulProcessorCall(message)
+ verify(statefulProcessorHandle).listTimers()
Review Comment:
Added spying instance param and added one more test case. Also this is
tested in the e2e suite `test_pandas_transform_with_state`
[here](https://github.com/apache/spark/pull/47878/files#diff-03ab27c9011040bf67b55f05489a273973a8059f31e8f201a71f7df22ff85cd0R590)
for multiple iterators called on the same grouping key.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]