jingz-db commented on code in PR #47878:
URL: https://github.com/apache/spark/pull/47878#discussion_r1763702978
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala:
##########
@@ -158,6 +229,51 @@ class TransformWithStateInPandasStateServer(
Some(message.getGetValueState.getTtl.getDurationMs)
} else None
initializeValueState(stateName, schema, ttlDurationMs)
+
+ case StatefulProcessorCall.MethodCase.TIMERSTATECALL =>
+ message.getTimerStateCall.getMethodCase match {
+ case TimerStateCallCommand.MethodCase.REGISTER =>
+ val expiryTimestamp =
+ message.getTimerStateCall.getRegister.getExpiryTimestampMs
+ statefulProcessorHandle.registerTimer(expiryTimestamp)
+ sendResponse(0)
+ case TimerStateCallCommand.MethodCase.DELETE =>
+ val expiryTimestamp =
+ message.getTimerStateCall.getDelete.getExpiryTimestampMs
+ statefulProcessorHandle.deleteTimer(expiryTimestamp)
+ sendResponse(0)
+ case TimerStateCallCommand.MethodCase.LIST =>
+ val iter = statefulProcessorHandle.listTimers()
+
+ if (iter == null || !iter.hasNext) {
+ // avoid sending over empty batch
+ sendResponse(1)
+ } else {
+ sendResponse(0)
+ outputStream.flush()
+ val arrowStreamWriter = {
+ val outputSchema = new StructType()
+ .add(StructField("timestamp", LongType))
+ val arrowSchema = ArrowUtils.toArrowSchema(outputSchema,
timeZoneId,
+ errorOnDuplicatedFieldNames, largeVarTypes)
+ val allocator = ArrowUtils.rootAllocator.newChildAllocator(
+ s"stdout writer for transformWithStateInPandas state
socket", 0, Long.MaxValue)
+ val root = VectorSchemaRoot.create(arrowSchema, allocator)
+ new BaseStreamingArrowWriter(root, new ArrowStreamWriter(root,
null, outputStream),
+ arrowTransformWithStateInPandasMaxRecordsPerBatch)
+ }
+ while (iter.hasNext) {
+ val timestamp = iter.next()
+ val internalRow = InternalRow(timestamp)
+ arrowStreamWriter.writeRow(internalRow)
Review Comment:
I guess I'll rebase on your ListState PR change and this
`arrowTransformWithStateInPandasMaxRecordsPerBatch ` will be passed as the new
config you'll add here:
https://github.com/apache/spark/pull/47933/files#diff-0b0aaf91850194b6980b75d47bc166148566cbdc1b17b3da16faff1f0740e0f4R107.
But your concern above still holds. Should we pass a different default value
for transmitting the `list[Int]` here? If so, should we add a new config or
shall we just assign a fixed value for it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]