jingz-db commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1859367581
##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
def dump_stream(self, iterator, stream):
"""
- Read through an iterator of (iterator of pandas DataFrame), serialize
them to Arrow
- RecordBatches, and write batches to stream.
+ Read through chained return results from a single partition of
handleInputRows.
Review Comment:
We need to make a tradeoff whether adding the complication in either
`serializer.py` or `TransformWithStateInPandasPythonRunner` if we put the above
timer handling codes in `if key is None`.
If we put the timer handling logic inside `if key is None`, we will need to
call `dump_stream()` again here in `finally `code block:
https://github.com/apache/spark/blob/master/python/pyspark/worker.py#L1966.
Calling dump_stream() twice means we will need to properly handle how JVM
receives batches. Currently we are reusing the `read()` function inside
`PythonArrowOutput`, and the reader will end the reading when the batch signals
the end here:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala#L118.
Since we are now calling `dump_stream()` twice, We will need to overwrite this
function in `TransformWithStateInPandasPythonRunner` and continues reading one
more time after receiving end. The extra complexity is that we will also need
to properly handle the case where some partitions may not have timer iterator
and won't start the additional dump stream writer at all and how we are going
to handle exceptio
ns if one of the dump_stream failed. Additionally, we need to set the
statefulHandlerState to `TIMER_PROCESSED` after all timer rows are processed so
we will need to do some code changes inside `worker.py` to set this properly. I
feel like it is better to put all TWS related code changes in one place for
better readability. So this means we will need to get the
`StatefulProcessorHandlerApiClient` object inside `worker.py` to set the state
correctly. This means we will need to have similar code complexity of what we
have now in `serializer.py` (return one extra StatefulProcessorHandlerApiClient
from `transformWithStateWithInitStateUDF`and deserialize it from `out_iter`).
We cannot set the `TIMER_PROCESSED` state in `group_ops.py` because the output
rows iterator are not fully consumed there. It is fully consumed after
`dump_stream` is called inside `worker.py`.
So either way we will need to deal with extra complexity. I personally think
putting timer handling code into `serializer.py` is slightly better because
this is more similar to how we are dealing with timer on Scala side - we are
chaining the timer output rows after the data handling rows into a single
iterator.
Let me know if you have suggestions on which way is better.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]