jingz-db commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1859367581


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
     def dump_stream(self, iterator, stream):
         """
-        Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-        RecordBatches, and write batches to stream.
+        Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   We need to make a tradeoff whether adding the complication in either 
`serializer.py` or `TransformWithStateInPandasPythonRunner` if we put the above 
timer handling codes in `if key is None`.
   If we put the timer handling logic inside `if key is None`, we will need to 
call `dump_stream()` again here in `finally `code block: 
https://github.com/apache/spark/blob/master/python/pyspark/worker.py#L1966. 
Calling dump_stream() twice means we will need to properly handle how JVM 
receives batches. Currently we are reusing the `read()` function inside 
`PythonArrowOutput`, and the reader will end the reading when the batch signals 
the end here: 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala#L118.
 Since we are now calling `dump_stream()` twice, We will need to overwrite this 
function in `TransformWithStateInPandasPythonRunner` and continues reading one 
more time after receiving end. The extra complexity is that we will also need 
to properly handle the case where some partitions may not have timer iterator 
and won't start the additional dump stream writer at all and how we are going 
to handle exceptio
 ns if one of the dump_stream failed. Additionally, we need to set the 
statefulHandlerState to `TIMER_PROCESSED` after all timer rows are processed so 
we will need to do some code changes inside `worker.py` to set this properly. I 
feel like it is better to put all TWS related code changes in one place for 
better readability. So this means we will need to get the 
`StatefulProcessorHandlerApiClient` object inside `worker.py` to set the state 
correctly. This means we will need to have similar code complexity of what we 
have now in `serializer.py` (return one extra StatefulProcessorHandlerApiClient 
from `transformWithStateWithInitStateUDF`and deserialize it from `out_iter`). 
We cannot set the `TIMER_PROCESSED` state in `group_ops.py` because the output 
rows iterator are not fully consumed there. It is fully consumed after 
`dump_stream` is called inside `worker.py`.
   
   So either way we will need to deal with extra complexity. I personally think 
putting timer handling code into `serializer.py` is slightly better because 
this is more similar to how we are dealing with timer on Scala side - we are 
chaining the timer output rows after the data handling rows into a single 
iterator. 
   
   Let me know if you have suggestions on which way is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to