[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22275 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r232420076 --- Diff: python/pyspark/sql/tests.py --- @@ -4923,6 +4923,28 @@ def test_timestamp_dst(self): self.assertPandasEqual(pdf, df_from_python.toPandas()) self.assertPandasEqual(pdf, df_from_pandas.toPandas()) +def test_toPandas_batch_order(self): + +# Collects Arrow RecordBatches out of order in driver JVM then re-orders in Python +def run_test(num_records, num_parts, max_records): +df = self.spark.range(num_records, numPartitions=num_parts).toDF("a") +with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": max_records}): +pdf, pdf_arrow = self._toPandas_arrow_toggle(df) +self.assertPandasEqual(pdf, pdf_arrow) + +cases = [ +(1024, 512, 2), # Try large num partitions for good chance of not collecting in order +(512, 64, 2),# Try medium num partitions to test out of order collection +(64, 8, 2), # Try small number of partitions to test out of order collection +(64, 64, 1), # Test single batch per partition +(64, 1, 64), # Test single partition, single batch +(64, 1, 8), # Test single partition, multiple batches +(30, 7, 2), # Test different sized partitions +] --- End diff -- I like the new tests, I think 0.1 on one of partitions is enough. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r232420015 --- Diff: python/pyspark/sql/tests.py --- @@ -4923,6 +4923,34 @@ def test_timestamp_dst(self): self.assertPandasEqual(pdf, df_from_python.toPandas()) self.assertPandasEqual(pdf, df_from_pandas.toPandas()) +def test_toPandas_batch_order(self): + +def delay_first_part(partition_index, iterator): +if partition_index == 0: +time.sleep(0.1) --- End diff -- I like this :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r232145973 --- Diff: python/pyspark/sql/tests.py --- @@ -4923,6 +4923,28 @@ def test_timestamp_dst(self): self.assertPandasEqual(pdf, df_from_python.toPandas()) self.assertPandasEqual(pdf, df_from_pandas.toPandas()) +def test_toPandas_batch_order(self): + +# Collects Arrow RecordBatches out of order in driver JVM then re-orders in Python +def run_test(num_records, num_parts, max_records): +df = self.spark.range(num_records, numPartitions=num_parts).toDF("a") +with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": max_records}): +pdf, pdf_arrow = self._toPandas_arrow_toggle(df) +self.assertPandasEqual(pdf, pdf_arrow) + +cases = [ +(1024, 512, 2), # Try large num partitions for good chance of not collecting in order +(512, 64, 2),# Try medium num partitions to test out of order collection +(64, 8, 2), # Try small number of partitions to test out of order collection +(64, 64, 1), # Test single batch per partition +(64, 1, 64), # Test single partition, single batch +(64, 1, 8), # Test single partition, multiple batches +(30, 7, 2), # Test different sized partitions +] --- End diff -- @holdenk , I updated the tests, please take another look when you get a chance. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r231311398 --- Diff: python/pyspark/sql/tests.py --- @@ -4923,6 +4923,28 @@ def test_timestamp_dst(self): self.assertPandasEqual(pdf, df_from_python.toPandas()) self.assertPandasEqual(pdf, df_from_pandas.toPandas()) +def test_toPandas_batch_order(self): + +# Collects Arrow RecordBatches out of order in driver JVM then re-orders in Python +def run_test(num_records, num_parts, max_records): +df = self.spark.range(num_records, numPartitions=num_parts).toDF("a") +with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": max_records}): +pdf, pdf_arrow = self._toPandas_arrow_toggle(df) +self.assertPandasEqual(pdf, pdf_arrow) + +cases = [ +(1024, 512, 2), # Try large num partitions for good chance of not collecting in order +(512, 64, 2),# Try medium num partitions to test out of order collection +(64, 8, 2), # Try small number of partitions to test out of order collection +(64, 64, 1), # Test single batch per partition +(64, 1, 64), # Test single partition, single batch +(64, 1, 8), # Test single partition, multiple batches +(30, 7, 2), # Test different sized partitions +] --- End diff -- Yeah, it's not a guarantee, but with a large num of partitions, it's a pretty slim chance they will all be in order. I can also add a case with some delay. My only concern is how big to make a delay to be sure it's enough without adding wasted time to the tests. How about we keep the case with a large number of partitions and add a case with 100ms delay on the first partition? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r230423471 --- Diff: python/pyspark/sql/tests.py --- @@ -4923,6 +4923,28 @@ def test_timestamp_dst(self): self.assertPandasEqual(pdf, df_from_python.toPandas()) self.assertPandasEqual(pdf, df_from_pandas.toPandas()) +def test_toPandas_batch_order(self): + +# Collects Arrow RecordBatches out of order in driver JVM then re-orders in Python +def run_test(num_records, num_parts, max_records): +df = self.spark.range(num_records, numPartitions=num_parts).toDF("a") +with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": max_records}): +pdf, pdf_arrow = self._toPandas_arrow_toggle(df) +self.assertPandasEqual(pdf, pdf_arrow) + +cases = [ +(1024, 512, 2), # Try large num partitions for good chance of not collecting in order +(512, 64, 2),# Try medium num partitions to test out of order collection +(64, 8, 2), # Try small number of partitions to test out of order collection +(64, 64, 1), # Test single batch per partition +(64, 1, 64), # Test single partition, single batch +(64, 1, 8), # Test single partition, multiple batches +(30, 7, 2), # Test different sized partitions +] --- End diff -- I don't see how we're guaranteeing out-of-order from the JVM. Could we delay on one of the early partitions to guarantee out of order? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r229522939 --- Diff: python/pyspark/sql/tests.py --- @@ -4923,6 +4923,28 @@ def test_timestamp_dst(self): self.assertPandasEqual(pdf, df_from_python.toPandas()) self.assertPandasEqual(pdf, df_from_pandas.toPandas()) +def test_toPandas_batch_order(self): + +# Collects Arrow RecordBatches out of order in driver JVM then re-orders in Python +def run_test(num_records, num_parts, max_records): +df = self.spark.range(num_records, numPartitions=num_parts).toDF("a") +with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": max_records}): +pdf, pdf_arrow = self._toPandas_arrow_toggle(df) +self.assertPandasEqual(pdf, pdf_arrow) + +cases = [ +(1024, 512, 2), # Try large num partitions for good chance of not collecting in order +(512, 64, 2),# Try medium num partitions to test out of order collection +(64, 8, 2), # Try small number of partitions to test out of order collection +(64, 64, 1), # Test single batch per partition +(64, 1, 64), # Test single partition, single batch +(64, 1, 8), # Test single partition, multiple batches +(30, 7, 2), # Test different sized partitions +] --- End diff -- @holdenk and @felixcheung , I didn't do a loop but chose some different levels of partition numbers to be a bit more sure that partitions won't end up in order. I also added some other cases of different partition/batch ratios. Let me know if you think we need more to be sure here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r223197940 --- Diff: python/pyspark/sql/tests.py --- @@ -4434,6 +4434,12 @@ def test_timestamp_dst(self): self.assertPandasEqual(pdf, df_from_python.toPandas()) self.assertPandasEqual(pdf, df_from_pandas.toPandas()) +def test_toPandas_batch_order(self): +df = self.spark.range(64, numPartitions=8).toDF("a") +with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 4}): +pdf, pdf_arrow = self._toPandas_arrow_toggle(df) +self.assertPandasEqual(pdf, pdf_arrow) --- End diff -- that sounds good --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r223116201 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3279,34 +3280,33 @@ class Dataset[T] private[sql]( val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone withAction("collectAsArrowToPython", queryExecution) { plan => - PythonRDD.serveToStream("serve-Arrow") { out => + PythonRDD.serveToStream("serve-Arrow") { outputStream => +val out = new DataOutputStream(outputStream) val batchWriter = new ArrowBatchStreamWriter(schema, out, timeZoneId) val arrowBatchRdd = toArrowBatchRdd(plan) val numPartitions = arrowBatchRdd.partitions.length -// Store collection results for worst case of 1 to N-1 partitions -val results = new Array[Array[Array[Byte]]](numPartitions - 1) -var lastIndex = -1 // index of last partition written +// Batches ordered by (index of partition, batch # in partition) tuple +val batchOrder = new ArrayBuffer[(Int, Int)]() +var partitionCount = 0 -// Handler to eagerly write partitions to Python in order +// Handler to eagerly write batches to Python out of order def handlePartitionBatches(index: Int, arrowBatches: Array[Array[Byte]]): Unit = { - // If result is from next partition in order - if (index - 1 == lastIndex) { + if (arrowBatches.nonEmpty) { batchWriter.writeBatches(arrowBatches.iterator) -lastIndex += 1 -// Write stored partitions that come next in order -while (lastIndex < results.length && results(lastIndex) != null) { - batchWriter.writeBatches(results(lastIndex).iterator) - results(lastIndex) = null - lastIndex += 1 -} -// After last batch, end the stream -if (lastIndex == results.length) { - batchWriter.end() +arrowBatches.indices.foreach { i => batchOrder.append((index, i)) } + } + partitionCount += 1 + + // After last batch, end the stream and write batch order + if (partitionCount == numPartitions) { +batchWriter.end() +out.writeInt(batchOrder.length) +// Batch order indices are from 0 to N-1 batches, sorted by order they arrived --- End diff -- yeah, sounds good --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r223116082 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3279,34 +3280,33 @@ class Dataset[T] private[sql]( val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone withAction("collectAsArrowToPython", queryExecution) { plan => - PythonRDD.serveToStream("serve-Arrow") { out => + PythonRDD.serveToStream("serve-Arrow") { outputStream => +val out = new DataOutputStream(outputStream) val batchWriter = new ArrowBatchStreamWriter(schema, out, timeZoneId) val arrowBatchRdd = toArrowBatchRdd(plan) val numPartitions = arrowBatchRdd.partitions.length -// Store collection results for worst case of 1 to N-1 partitions -val results = new Array[Array[Array[Byte]]](numPartitions - 1) -var lastIndex = -1 // index of last partition written +// Batches ordered by (index of partition, batch # in partition) tuple +val batchOrder = new ArrayBuffer[(Int, Int)]() +var partitionCount = 0 -// Handler to eagerly write partitions to Python in order +// Handler to eagerly write batches to Python out of order def handlePartitionBatches(index: Int, arrowBatches: Array[Array[Byte]]): Unit = { - // If result is from next partition in order - if (index - 1 == lastIndex) { + if (arrowBatches.nonEmpty) { batchWriter.writeBatches(arrowBatches.iterator) -lastIndex += 1 -// Write stored partitions that come next in order -while (lastIndex < results.length && results(lastIndex) != null) { - batchWriter.writeBatches(results(lastIndex).iterator) - results(lastIndex) = null - lastIndex += 1 -} -// After last batch, end the stream -if (lastIndex == results.length) { - batchWriter.end() +arrowBatches.indices.foreach { i => batchOrder.append((index, i)) } --- End diff -- yup! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r219556033 --- Diff: python/pyspark/serializers.py --- @@ -208,8 +214,26 @@ def load_stream(self, stream): for batch in reader: yield batch +if self.load_batch_order: +num = read_int(stream) +self.batch_order = [] +for i in xrange(num): +index = read_int(stream) +self.batch_order.append(index) + +def get_batch_order_and_reset(self): --- End diff -- Looking at `_load_from_socket` I think I understand why this was done as a separate function here, but what about if the serializer its self returned either a tuple or re-ordered the batches its self? I'm just trying to get a better understanding, not saying those are better designs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r219558311 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3279,34 +3280,33 @@ class Dataset[T] private[sql]( val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone withAction("collectAsArrowToPython", queryExecution) { plan => - PythonRDD.serveToStream("serve-Arrow") { out => + PythonRDD.serveToStream("serve-Arrow") { outputStream => +val out = new DataOutputStream(outputStream) val batchWriter = new ArrowBatchStreamWriter(schema, out, timeZoneId) val arrowBatchRdd = toArrowBatchRdd(plan) val numPartitions = arrowBatchRdd.partitions.length -// Store collection results for worst case of 1 to N-1 partitions -val results = new Array[Array[Array[Byte]]](numPartitions - 1) -var lastIndex = -1 // index of last partition written +// Batches ordered by (index of partition, batch # in partition) tuple +val batchOrder = new ArrayBuffer[(Int, Int)]() +var partitionCount = 0 -// Handler to eagerly write partitions to Python in order +// Handler to eagerly write batches to Python out of order def handlePartitionBatches(index: Int, arrowBatches: Array[Array[Byte]]): Unit = { - // If result is from next partition in order - if (index - 1 == lastIndex) { + if (arrowBatches.nonEmpty) { batchWriter.writeBatches(arrowBatches.iterator) -lastIndex += 1 -// Write stored partitions that come next in order -while (lastIndex < results.length && results(lastIndex) != null) { - batchWriter.writeBatches(results(lastIndex).iterator) - results(lastIndex) = null - lastIndex += 1 -} -// After last batch, end the stream -if (lastIndex == results.length) { - batchWriter.end() +arrowBatches.indices.foreach { i => batchOrder.append((index, i)) } + } + partitionCount += 1 + + // After last batch, end the stream and write batch order + if (partitionCount == numPartitions) { +batchWriter.end() +out.writeInt(batchOrder.length) +// Batch order indices are from 0 to N-1 batches, sorted by order they arrived --- End diff -- How about something like `// Sort by the output global batch indexes partition index, partition batch index tuple`? When I was first read this code path I got confused my self so I think we should spend a bit of time on the comment here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r219556534 --- Diff: python/pyspark/serializers.py --- @@ -208,8 +214,26 @@ def load_stream(self, stream): for batch in reader: yield batch +if self.load_batch_order: +num = read_int(stream) +self.batch_order = [] --- End diff -- If we're going to have get_batch_order_and_reset as a separate function, could we verify batch_order is None before we reset and throw here if it's not? Just thinking of future folks who might have to debug something here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r219561178 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3279,34 +3280,33 @@ class Dataset[T] private[sql]( val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone withAction("collectAsArrowToPython", queryExecution) { plan => - PythonRDD.serveToStream("serve-Arrow") { out => + PythonRDD.serveToStream("serve-Arrow") { outputStream => +val out = new DataOutputStream(outputStream) val batchWriter = new ArrowBatchStreamWriter(schema, out, timeZoneId) val arrowBatchRdd = toArrowBatchRdd(plan) val numPartitions = arrowBatchRdd.partitions.length -// Store collection results for worst case of 1 to N-1 partitions -val results = new Array[Array[Array[Byte]]](numPartitions - 1) -var lastIndex = -1 // index of last partition written +// Batches ordered by (index of partition, batch # in partition) tuple +val batchOrder = new ArrayBuffer[(Int, Int)]() +var partitionCount = 0 -// Handler to eagerly write partitions to Python in order +// Handler to eagerly write batches to Python out of order def handlePartitionBatches(index: Int, arrowBatches: Array[Array[Byte]]): Unit = { - // If result is from next partition in order - if (index - 1 == lastIndex) { + if (arrowBatches.nonEmpty) { batchWriter.writeBatches(arrowBatches.iterator) -lastIndex += 1 -// Write stored partitions that come next in order -while (lastIndex < results.length && results(lastIndex) != null) { - batchWriter.writeBatches(results(lastIndex).iterator) - results(lastIndex) = null - lastIndex += 1 -} -// After last batch, end the stream -if (lastIndex == results.length) { - batchWriter.end() +arrowBatches.indices.foreach { i => batchOrder.append((index, i)) } --- End diff -- Could we call `i` something more descriptive like partition_batch_num or similar? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r219557215 --- Diff: python/pyspark/sql/tests.py --- @@ -4434,6 +4434,12 @@ def test_timestamp_dst(self): self.assertPandasEqual(pdf, df_from_python.toPandas()) self.assertPandasEqual(pdf, df_from_pandas.toPandas()) +def test_toPandas_batch_order(self): +df = self.spark.range(64, numPartitions=8).toDF("a") +with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 4}): +pdf, pdf_arrow = self._toPandas_arrow_toggle(df) +self.assertPandasEqual(pdf, pdf_arrow) --- End diff -- This looks pretty similar to the kind of test case we could verify with something like hypothesis. Integrating hypothesis is probably too much work, but we could at least explore num partitions space in a loop quickly here. Would that help do you think @felixcheung ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r219404072 --- Diff: python/pyspark/sql/tests.py --- @@ -4434,6 +4434,12 @@ def test_timestamp_dst(self): self.assertPandasEqual(pdf, df_from_python.toPandas()) self.assertPandasEqual(pdf, df_from_pandas.toPandas()) +def test_toPandas_batch_order(self): +df = self.spark.range(64, numPartitions=8).toDF("a") +with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 4}): +pdf, pdf_arrow = self._toPandas_arrow_toggle(df) +self.assertPandasEqual(pdf, pdf_arrow) --- End diff -- hm, is this test case "enough" to trigger any possible problem just by random? would increasing the number of batch or num record per batch increase the chance of streaming order or concurrency issue perhaps? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r214131313 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3279,34 +3280,33 @@ class Dataset[T] private[sql]( val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone withAction("collectAsArrowToPython", queryExecution) { plan => - PythonRDD.serveToStream("serve-Arrow") { out => + PythonRDD.serveToStream("serve-Arrow") { outputStream => +val out = new DataOutputStream(outputStream) val batchWriter = new ArrowBatchStreamWriter(schema, out, timeZoneId) val arrowBatchRdd = toArrowBatchRdd(plan) val numPartitions = arrowBatchRdd.partitions.length -// Store collection results for worst case of 1 to N-1 partitions -val results = new Array[Array[Array[Byte]]](numPartitions - 1) -var lastIndex = -1 // index of last partition written +// Batches ordered by (index of partition, batch # in partition) tuple +val batchOrder = new ArrayBuffer[(Int, Int)]() +var partitionCount = 0 -// Handler to eagerly write partitions to Python in order +// Handler to eagerly write batches to Python out of order def handlePartitionBatches(index: Int, arrowBatches: Array[Array[Byte]]): Unit = { - // If result is from next partition in order - if (index - 1 == lastIndex) { + if (arrowBatches.nonEmpty) { batchWriter.writeBatches(arrowBatches.iterator) -lastIndex += 1 -// Write stored partitions that come next in order -while (lastIndex < results.length && results(lastIndex) != null) { - batchWriter.writeBatches(results(lastIndex).iterator) - results(lastIndex) = null - lastIndex += 1 -} -// After last batch, end the stream -if (lastIndex == results.length) { - batchWriter.end() +arrowBatches.indices.foreach { i => batchOrder.append((index, i)) } + } + partitionCount += 1 + + // After last batch, end the stream and write batch order + if (partitionCount == numPartitions) { +batchWriter.end() +out.writeInt(batchOrder.length) +// Batch order indices are from 0 to N-1 batches, sorted by order they arrived --- End diff -- How about a slight change? `// Re-order batches according to these indices to build a table.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r214129436 --- Diff: python/pyspark/serializers.py --- @@ -187,9 +187,15 @@ def loads(self, obj): class ArrowStreamSerializer(Serializer): """ -Serializes Arrow record batches as a stream. +Serializes Arrow record batches as a stream. Optionally load the ordering of the batches as a --- End diff -- Yeah, it's also used in the `createDataFrame` path, but that does only use `dump_stream`. Still, it seemed best to make this an optional feature of the serializer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r213862165 --- Diff: python/pyspark/serializers.py --- @@ -187,9 +187,15 @@ def loads(self, obj): class ArrowStreamSerializer(Serializer): """ -Serializes Arrow record batches as a stream. +Serializes Arrow record batches as a stream. Optionally load the ordering of the batches as a --- End diff -- This is optional. Do we have other usage of this `ArrowStreamSerializer` without the ordering? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r213860328 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3279,34 +3280,33 @@ class Dataset[T] private[sql]( val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone withAction("collectAsArrowToPython", queryExecution) { plan => - PythonRDD.serveToStream("serve-Arrow") { out => + PythonRDD.serveToStream("serve-Arrow") { outputStream => +val out = new DataOutputStream(outputStream) val batchWriter = new ArrowBatchStreamWriter(schema, out, timeZoneId) val arrowBatchRdd = toArrowBatchRdd(plan) val numPartitions = arrowBatchRdd.partitions.length -// Store collection results for worst case of 1 to N-1 partitions -val results = new Array[Array[Array[Byte]]](numPartitions - 1) -var lastIndex = -1 // index of last partition written +// Batches ordered by (index of partition, batch # in partition) tuple +val batchOrder = new ArrayBuffer[(Int, Int)]() +var partitionCount = 0 -// Handler to eagerly write partitions to Python in order +// Handler to eagerly write batches to Python out of order def handlePartitionBatches(index: Int, arrowBatches: Array[Array[Byte]]): Unit = { - // If result is from next partition in order - if (index - 1 == lastIndex) { + if (arrowBatches.nonEmpty) { batchWriter.writeBatches(arrowBatches.iterator) -lastIndex += 1 -// Write stored partitions that come next in order -while (lastIndex < results.length && results(lastIndex) != null) { - batchWriter.writeBatches(results(lastIndex).iterator) - results(lastIndex) = null - lastIndex += 1 -} -// After last batch, end the stream -if (lastIndex == results.length) { - batchWriter.end() +arrowBatches.indices.foreach { i => batchOrder.append((index, i)) } + } + partitionCount += 1 + + // After last batch, end the stream and write batch order + if (partitionCount == numPartitions) { +batchWriter.end() +out.writeInt(batchOrder.length) +// Batch order indices are from 0 to N-1 batches, sorted by order they arrived --- End diff -- nit: `Batch order indices are from 0 to N-1 batches, sorted by order they arrived. Re-sort indices to the correct order to build a table.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
GitHub user BryanCutler opened a pull request: https://github.com/apache/spark/pull/22275 [SPARK-25274][PYTHON][SQL] In toPandas with Arrow send out-of-order record batches to improve performance ## What changes were proposed in this pull request? When executing `toPandas` with Arrow enabled, partitions that arrive in the JVM out-of-order must be buffered before they can be send to Python. This causes an excess of memory to be used in the driver JVM and increases the time it takes to complete because data must sit in the JVM waiting for preceding partitions to come in. This change sends out-of-order partitions to Python as soon as they arrive in the JVM, followed by a list of partition indices so that Python can assemble the data in the correct order. This way, data is not buffered at the JVM and there is no waiting on particular partitions so performance will be increased. Followup to #21546 ## How was this patch tested? Added new test with a large number of batches per partition to ensure they are put in the correct order You can merge this pull request into a Git repository by running: $ git pull https://github.com/BryanCutler/spark arrow-toPandas-oo-batches-SPARK-25274 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22275.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22275 commit 2041317ba1efa0595431a900472b2e347c50d23c Author: Bryan Cutler Date: 2018-08-29T21:26:49Z changed toPandas to send out of order batches, followed by batch order indices commit d6fefee68c30aa579b345c32d9f00b32bf9a505b Author: Bryan Cutler Date: 2018-08-29T21:38:25Z Consolidated BatchOrderSerializer into ArrowStreamSerializer and made optional --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org