BryanCutler commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r523106731



##########
File path: python/pyspark/sql/pandas/serializers.py
##########
@@ -51,7 +55,20 @@ def load_stream(self, stream):
         """
         # load the batches
         for batch in self.serializer.load_stream(stream):
-            yield batch
+            if self.split_batches:
+                import pyarrow as pa
+                # When spark.sql.execution.arrow.pyspark.selfDestruct.enabled, 
ensure
+                # each column in each record batch is contained in its own 
allocation.
+                # Otherwise, selfDestruct does nothing; it frees each column 
as its
+                # converted, but each column will actually be a list of slices 
of record
+                # batches, and so no memory is actually freed until all 
columns are
+                # converted.
+                split_batch = pa.RecordBatch.from_arrays([
+                    pa.concat_arrays([array]) for array in batch

Review comment:
       I don't think we need to defer since what you have works, but if you 
could look into those 2 improvements on the Arrow side that would be great. 
Then we could come back and update this later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to