lidavidm commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r518958067
##########
File path: python/pyspark/sql/pandas/serializers.py
##########
@@ -51,7 +55,20 @@ def load_stream(self, stream):
"""
# load the batches
for batch in self.serializer.load_stream(stream):
- yield batch
+ if self.split_batches:
+ import pyarrow as pa
+ # When spark.sql.execution.arrow.pyspark.selfDestruct.enabled,
ensure
+ # each column in each record batch is contained in its own
allocation.
+ # Otherwise, selfDestruct does nothing; it frees each column
as its
+ # converted, but each column will actually be a list of slices
of record
+ # batches, and so no memory is actually freed until all
columns are
+ # converted.
+ split_batch = pa.RecordBatch.from_arrays([
+ pa.concat_arrays([array]) for array in batch
Review comment:
I'll look around. I couldn't find another obvious one but I agree that
this is not ideal.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]