HeartSaVioR commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1860396150


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
     def dump_stream(self, iterator, stream):
         """
-        Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-        RecordBatches, and write batches to stream.
+        Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   
https://github.com/HeartSaVioR/spark/commit/f8952b213ba7f2cbfbc78ef145552317812e9f9b
   
   This is the implementation of my suggestion based on 0c5ab3f. I've confirmed 
that `pyspark.sql.tests.pandas.test_pandas_transform_with_state` passed with 
this change - I haven't added new tests you've added later though.
   
   I think this is lot much simpler - we just add two markers into input 
iterator which carries over the mode, and the flow does not change at all. No 
trick on teeing and chaining iterators, minimum changes on the data structure, 
etc.
   
   How this works? This is just the same with how we use iterator in Spark in 
Scala codebase; with iterator in Scala, we pull one entry, process it and 
produce output, and pull another entry. The generator would have each entry for 
every grouping key, and then the marker for timer, and then the marker for 
completion. Each entry will call the function which eventually calls the user 
function, and the user function is expected to return the iterator, but the 
logic to produce the iterator should be synchronous (no async and no laziness, 
otherwise I guess it can even fail without my change).
   
   So when the marker for timer has been evaluated, function calls for all 
grouping keys must have been already done. Same for the marker for completion. 
This is same with Scala implementation.
   
   As a side effect, updating the phase is corrected in this commit.



##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
     def dump_stream(self, iterator, stream):
         """
-        Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-        RecordBatches, and write batches to stream.
+        Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   If you agree with this, please pick the commit in above. You've already gone 
through some commits and I can't revert partially by myself.
   
   My fork is public, so you can add my repo and fetch and pull the branch, and 
cherrypick the commit into this PR branch with merge conflict. I'd recommend 
you to take whole different way - perform "hard reset" to my commit in this PR 
branch (`git reset --hard f8952b213ba7f2cbfbc78ef145552317812e9f9b`), and add 
more commits which are used to address other review comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to