Kimahriman commented on code in PR #52440:
URL: https://github.com/apache/spark/pull/52440#discussion_r2379359869


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -1116,19 +1116,22 @@ def load_stream(self, stream):
         """
         import pyarrow as pa
 
+        def process_group(batches: "Iterator[pa.RecordBatch]"):
+            for batch in batches:
+                struct = batch.column(0)
+                yield pa.RecordBatch.from_arrays(struct.flatten(), 
schema=pa.schema(struct.type))
+
         dataframes_in_group = None
 
         while dataframes_in_group is None or dataframes_in_group > 0:
             dataframes_in_group = read_int(stream)
 
             if dataframes_in_group == 1:
-                structs = [
-                    batch.column(0) for batch in 
ArrowStreamSerializer.load_stream(self, stream)
-                ]
-                yield [
-                    pa.RecordBatch.from_arrays(struct.flatten(), 
schema=pa.schema(struct.type))
-                    for struct in structs
-                ]
+                batch_iter = 
process_group(ArrowStreamSerializer.load_stream(self, stream))
+                yield batch_iter
+                # Make sure the batches are fully iterated before getting the 
next group

Review Comment:
   I don't think it should be an error if a user decides not to fully iterate 
all batches, could be valid use cases for that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to