BryanCutler commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r532991912
##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -191,6 +191,32 @@ def test_pandas_round_trip(self):
pdf_arrow = df.toPandas()
assert_frame_equal(pdf_arrow, pdf)
+ def test_pandas_self_destruct(self):
+ import pyarrow as pa
+ rows = 2 ** 16
+ cols = 8
+ df = self.spark.range(0, rows).select(*[rand() for _ in range(cols)])
+ expected_bytes = rows * cols * 8
+ with
self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+ # We hold on to the table reference here, so if self destruct
didn't work, then
+ # there would be 2 copies of the data (one in Arrow, one in
Pandas), both
+ # tracked by the Arrow allocator
+ pdf, table = df._collect_as_arrow_table()
+ self.assertEqual((rows, cols), pdf.shape)
+ # If self destruct did work, then memory usage should be only a
little above
+ # the minimum memory necessary for the dataframe
+ self.assertLessEqual(pa.total_allocated_bytes(), 1.2 *
expected_bytes)
Review comment:
just to be safe, what do you think about getting allocated bytes before
and after, then comparing difference? It should probably be the same, but would
be a little more focused then in case something changes in the future
##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -191,6 +191,32 @@ def test_pandas_round_trip(self):
pdf_arrow = df.toPandas()
assert_frame_equal(pdf_arrow, pdf)
+ def test_pandas_self_destruct(self):
+ import pyarrow as pa
+ rows = 2 ** 16
+ cols = 8
+ df = self.spark.range(0, rows).select(*[rand() for _ in range(cols)])
+ expected_bytes = rows * cols * 8
+ with
self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+ # We hold on to the table reference here, so if self destruct
didn't work, then
+ # there would be 2 copies of the data (one in Arrow, one in
Pandas), both
+ # tracked by the Arrow allocator
+ pdf, table = df._collect_as_arrow_table()
+ self.assertEqual((rows, cols), pdf.shape)
+ # If self destruct did work, then memory usage should be only a
little above
+ # the minimum memory necessary for the dataframe
+ self.assertLessEqual(pa.total_allocated_bytes(), 1.2 *
expected_bytes)
+ del pdf, table
+ self.assertEqual(pa.total_allocated_bytes(), 0)
+ with
self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled":
False}):
+ # Force the internals to reallocate data via PyArrow's allocator
so that it
+ # gets measured by total_allocated_bytes
+ pdf, table = df._collect_as_arrow_table(_force_split_batches=True)
+ total_allocated_bytes = pa.total_allocated_bytes()
Review comment:
looks to be unused?
##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -252,6 +242,74 @@ def _collect_as_arrow(self):
# Re-order the batch list using the correct order
return [batches[i] for i in batch_order]
+ def _collect_as_arrow_table(self, _force_split_batches=False):
Review comment:
I don't think we really need to add this method, it just does the
conversion from batches to DataFrame. The unit test doesn't even use the
returned Table right?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]