lidavidm commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r518958067



##########
File path: python/pyspark/sql/pandas/serializers.py
##########
@@ -51,7 +55,20 @@ def load_stream(self, stream):
         """
         # load the batches
         for batch in self.serializer.load_stream(stream):
-            yield batch
+            if self.split_batches:
+                import pyarrow as pa
+                # When spark.sql.execution.arrow.pyspark.selfDestruct.enabled, 
ensure
+                # each column in each record batch is contained in its own 
allocation.
+                # Otherwise, selfDestruct does nothing; it frees each column 
as its
+                # converted, but each column will actually be a list of slices 
of record
+                # batches, and so no memory is actually freed until all 
columns are
+                # converted.
+                split_batch = pa.RecordBatch.from_arrays([
+                    pa.concat_arrays([array]) for array in batch

Review comment:
       I'll look around. I couldn't find another obvious one but I agree that 
this is not ideal.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to