jingz-db commented on code in PR #47878:
URL: https://github.com/apache/spark/pull/47878#discussion_r1762084726


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala:
##########
@@ -113,11 +123,66 @@ class TransformWithStateInPandasStateServer(
         handleStatefulProcessorCall(message.getStatefulProcessorCall)
       case StateRequest.MethodCase.STATEVARIABLEREQUEST =>
         handleStateVariableRequest(message.getStateVariableRequest)
+      case StateRequest.MethodCase.TIMERREQUEST =>
+        handleTimerRequest(message.getTimerRequest)
       case _ =>
         throw new IllegalArgumentException("Invalid method call")
     }
   }
 
+  private[sql] def handleTimerRequest(message: TimerRequest): Unit = {
+    message.getMethodCase match {
+      case TimerRequest.MethodCase.TIMERVALUEREQUEST =>
+        val timerRequest = message.getTimerValueRequest()
+        timerRequest.getMethodCase match {
+          case TimerValueRequest.MethodCase.GETPROCESSINGTIMER =>
+            val valueStr =
+              if (batchTimestampMs.isDefined) batchTimestampMs.get.toString 
else "-1"
+            sendResponse(0, null, ByteString.copyFromUtf8(valueStr))
+          case TimerValueRequest.MethodCase.GETWATERMARK =>
+            val valueStr = if (eventTimeWatermarkForEviction.isDefined) {
+              eventTimeWatermarkForEviction.get.toString()
+            } else "-1"
+            sendResponse(0, null, ByteString.copyFromUtf8(valueStr))
+          case _ =>
+            throw new IllegalArgumentException("Invalid timer value method 
call")
+        }
+
+      case TimerRequest.MethodCase.EXPIRYTIMERREQUEST =>
+        val expiryRequest = message.getExpiryTimerRequest()
+        val expiryTimestamp = expiryRequest.getExpiryTimestampMs
+        val iter = 
statefulProcessorHandle.getExpiredTimersWithKeyRow(expiryTimestamp)
+        if (iter == null || !iter.hasNext) {
+          // avoid sending over empty batch
+          sendResponse(1)
+        } else {
+          sendResponse(0)
+          outputStream.flush()
+          val arrowStreamWriter = {
+            val outputSchema = new StructType()
+              .add("key", groupingKeySchema)
+              .add(StructField("timestamp", LongType))
+            val arrowSchema = ArrowUtils.toArrowSchema(outputSchema, 
timeZoneId,
+              errorOnDuplicatedFieldNames, largeVarTypes)
+            val allocator = ArrowUtils.rootAllocator.newChildAllocator(
+              s"stdout writer for transformWithStateInPandas state socket", 0, 
Long.MaxValue)
+            val root = VectorSchemaRoot.create(arrowSchema, allocator)
+            new BaseStreamingArrowWriter(root, new ArrowStreamWriter(root, 
null, outputStream),

Review Comment:
   Done. Create a util object and put all python related writer functions into 
the Util object.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to