GitHub user BryanCutler opened a pull request: https://github.com/apache/spark/pull/21546
[WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format for creating from and collecting Pandas DataFrames ## What changes were proposed in this pull request? This changes the calls of `toPandas()` and `createDataFrame()` to use the Arrow stream format, when Arrow is enabled. Previously, Arrow data was written to byte arrays where each chunk is an output of the Arrow file format. This was mainly due to constraints at the time, and caused some overhead by writing the schema/footer on each chunk of data and then having to read multiple Arrow file inputs and concat them together. Using the Arrow stream format has improved these by increasing performance, lower memory overhead for the average case, and simplified the code. Here are the details of this change: **toPandas()** _Before:_ Spark internal rows are converted to Arrow file format, each group of records is a complete Arrow file which contains the schema and other metadata. Next a collect is done and an Array of Arrow files is the result. After that each Arrow file is sent to Python driver which then loads each file and concats them to a single Arrow DataFrame. _After:_ Spark internal rows are converted to ArrowRecordBatches directly, which is the simplest Arrow component for IPC data transfers. The driver JVM then immediately starts serving data to Python as an Arrow stream, sending the schema first. It then starts Spark jobs with a custom handler such that when a partition is received (and in the correct order) the ArrowRecordBatches can be sent to python as soon as possible. This improves performance, simplifies memory usage on executors, and improves the average memory usage on the JVM driver. Since the order of partitions must be preserved, the worst case is that the first partition will be the last to arrive and all data must be kept in memory until then. This case is no worse that before when doing a full collect. **createDataFrame()** _Before:_ A Pandas DataFrame is split into parts and each part is made into an Arrow file. Then each file is prefixed by the buffer size and written to a temp file. The temp file is read and each Arrow file is parallelized as a byte array. _After:_ A Pandas DataFrame is split into parts, then an Arrow stream is written to a temp file where each part is an ArrowRecordBatch. The temp file is read as a stream and the Arrow messages are examined. If the message is an ArrowRecordBatch, the data is saved as a byte array. After reading the file, each ArrowRecordBatch is parallelized as a byte array. This has slightly more processing than before because we must look each Arrow message to extract the record batches, but performance remains the same. It is cleaner in the sense that IPC from Python to JVM is done over a single Arrow stream. ## How was this patch tested? Added new unit tests for the additions to ArrowConverters in Scala, existing tests for Python. You can merge this pull request into a Git repository by running: $ git pull https://github.com/BryanCutler/spark arrow-toPandas-stream-SPARK-23030 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21546.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21546 ---- commit 9af482170ee95831bbda139e6e931ba2631df386 Author: Bryan Cutler <cutlerb@...> Date: 2018-01-10T22:02:15Z change ArrowConverters to stream format commit d617f0da8eff1509da465bb707340e391314bec4 Author: Bryan Cutler <cutlerb@...> Date: 2018-01-10T22:14:07Z Change ArrowSerializer to use stream format commit f10d5d9cd3cece7f56749e1de7fe01699e4759a0 Author: Bryan Cutler <cutlerb@...> Date: 2018-01-12T00:40:36Z toPandas is working with RecordBatch payloads, using custom handler to stream ordered partitions commit 03653c687473b82bbfb6653504479498a2a3c63b Author: Bryan Cutler <cutlerb@...> Date: 2018-02-10T00:23:17Z cleanup and removed ArrowPayload, createDataFrame now working commit 1b932463bca0815e79f3a8d61d1c816e62949698 Author: Bryan Cutler <cutlerb@...> Date: 2018-03-09T00:14:06Z toPandas and createDataFrame working but tests fail with date cols commit ce22d8ad18e052d150528752b727c6cfe11485f7 Author: Bryan Cutler <cutlerb@...> Date: 2018-03-27T00:32:03Z removed usage of seekableByteChannel commit dede0bd96921c439747a9176f24c9ecbb9c8ce0a Author: Bryan Cutler <cutlerb@...> Date: 2018-03-28T00:28:54Z for toPandas, set old collection result to null and add comments commit 9e29b092cb7d45fa486db0215c3bd4a99c5f8d98 Author: Bryan Cutler <cutlerb@...> Date: 2018-03-28T18:28:18Z cleanup, not yet passing ArrowConvertersSuite commit ceb8d38a6c83c3b6dae040c9e8d860811ecad0cc Author: Bryan Cutler <cutlerb@...> Date: 2018-03-29T21:14:03Z fix to read Arrow stream with multiple batches, cleanup, add docs, scala tests pass, style pass commit f42e4ea7b4fb944eeefd39a0fd6a1428b527214a Author: Bryan Cutler <cutlerb@...> Date: 2018-03-29T22:17:25Z use base OutputStream for serveToStream instead of DataOutputStream commit 951843d760aa6d29ff18112e82d28f4f6dc09907 Author: Bryan Cutler <cutlerb@...> Date: 2018-03-29T22:21:13Z accidentally removed date type checks, passing pyspark tests commit af03c6b384fe4ea73d67ad1d3f46be4a1e027e9e Author: Bryan Cutler <cutlerb@...> Date: 2018-06-12T00:29:12Z Changed to only use Arrow batch bytes as payload, had to hack Arrow MessageChannelReader commit b047c1624429ea579aa279e92909b90400b40c58 Author: Bryan Cutler <cutlerb@...> Date: 2018-06-12T00:30:48Z added todo comment commit a77b89ea0357e3ce146ff35537eb7da8a8c80bad Author: Bryan Cutler <cutlerb@...> Date: 2018-06-12T17:35:05Z change getBatchesFromStream to return iterator commit 81c82093edf78d36a1de850b3d8faede88fb0524 Author: Bryan Cutler <cutlerb@...> Date: 2018-06-12T18:06:22Z need to end stream on toPandas after all batches sent to python, and added some comments commit 5f46a02aa34e9f51ac310d27e3272b883c67cc37 Author: Bryan Cutler <cutlerb@...> Date: 2018-06-12T18:11:19Z forgot to remove old comment commit 7694b8fe6970789d4e88f9c5df46c97a4b235f02 Author: Bryan Cutler <cutlerb@...> Date: 2018-06-12T18:47:49Z fixed up comments commit a5a1fbe7121c5b0dd93876a56c29ad17dcd9b168 Author: Bryan Cutler <cutlerb@...> Date: 2018-06-12T20:22:30Z fixed up some wording ---- --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org