BryanCutler commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r518947536



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1843,6 +1843,16 @@ object SQLConf {
       .version("3.0.0")
       .fallbackConf(ARROW_EXECUTION_ENABLED)
 
+  val ARROW_PYSPARK_SELF_DESTRUCT_ENABLED =
+    buildConf("spark.sql.execution.arrow.pyspark.selfDestruct.enabled")
+      .doc("When true, make use of Apache Arrow's self-destruct option " +
+        "for columnar data transfers in PySpark. " +
+        "This reduces memory usage at the cost of some CPU time. " +
+        "This optimization applies to: pyspark.sql.DataFrame.toPandas")

Review comment:
       Can you also add that the `split_blocks` will be enabled too, and make 
it clear that is is for the conversion of arrow to pandas?

##########
File path: python/pyspark/sql/pandas/serializers.py
##########
@@ -51,7 +55,20 @@ def load_stream(self, stream):
         """
         # load the batches
         for batch in self.serializer.load_stream(stream):
-            yield batch
+            if self.split_batches:
+                import pyarrow as pa
+                # When spark.sql.execution.arrow.pyspark.selfDestruct.enabled, 
ensure
+                # each column in each record batch is contained in its own 
allocation.
+                # Otherwise, selfDestruct does nothing; it frees each column 
as its
+                # converted, but each column will actually be a list of slices 
of record
+                # batches, and so no memory is actually freed until all 
columns are
+                # converted.
+                split_batch = pa.RecordBatch.from_arrays([
+                    pa.concat_arrays([array]) for array in batch

Review comment:
       I'm not crazy about using `pa.concat_arrays()` to force a copy. I don't 
think it's safe to assume it will always make a copy, it might end up producing 
a chunked array in the future with zero copy, as is done in 
`pa.concat_tables()`. Is there another, more direct way we can make copies?

##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -190,6 +190,13 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        with 
self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+            pdf = self.create_pandas_data_frame()

Review comment:
       This is a pretty minimal dataframe, mostly for checking type 
compatibility. It would be nice to also check against a larger dataframe that 
the memory usage doesn't double during conversion, but I'm not sure how to do 
that reliably.

##########
File path: python/pyspark/sql/pandas/serializers.py
##########
@@ -51,7 +55,20 @@ def load_stream(self, stream):
         """
         # load the batches
         for batch in self.serializer.load_stream(stream):
-            yield batch
+            if self.split_batches:

Review comment:
       This doesn't need to be done in the serializer, could you move it to 
`_collect_as_arrow` and leave the serializer as it was?

##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -232,8 +252,9 @@ def _collect_as_arrow(self):
             port, auth_secret, jsocket_auth_server = 
self._jdf.collectAsArrowToPython()
 
         # Collect list of un-ordered batches where last element is a list of 
correct order indices
+        serializer = ArrowCollectSerializer(split_batches=split_batches)
         try:
-            results = list(_load_from_socket((port, auth_secret), 
ArrowCollectSerializer()))
+            results = list(_load_from_socket((port, auth_secret), serializer))

Review comment:
       you should be able to split the batches here, as they are yielded from 
the socket




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to