Kimahriman commented on code in PR #52440: URL: https://github.com/apache/spark/pull/52440#discussion_r2379359869
########## python/pyspark/sql/pandas/serializers.py: ########## @@ -1116,19 +1116,22 @@ def load_stream(self, stream): """ import pyarrow as pa + def process_group(batches: "Iterator[pa.RecordBatch]"): + for batch in batches: + struct = batch.column(0) + yield pa.RecordBatch.from_arrays(struct.flatten(), schema=pa.schema(struct.type)) + dataframes_in_group = None while dataframes_in_group is None or dataframes_in_group > 0: dataframes_in_group = read_int(stream) if dataframes_in_group == 1: - structs = [ - batch.column(0) for batch in ArrowStreamSerializer.load_stream(self, stream) - ] - yield [ - pa.RecordBatch.from_arrays(struct.flatten(), schema=pa.schema(struct.type)) - for struct in structs - ] + batch_iter = process_group(ArrowStreamSerializer.load_stream(self, stream)) + yield batch_iter + # Make sure the batches are fully iterated before getting the next group Review Comment: I don't think it should be an error if a user decides not to fully iterate all batches, could be valid use cases for that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org