GitHub user BryanCutler opened a pull request:

    https://github.com/apache/spark/pull/21546

    [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format for creating from 
and collecting Pandas DataFrames

    ## What changes were proposed in this pull request?
    
    This changes the calls of `toPandas()` and `createDataFrame()` to use the 
Arrow stream format, when Arrow is enabled.  Previously, Arrow data was written 
to byte arrays where each chunk is an output of the Arrow file format.  This 
was mainly due to constraints at the time, and caused some overhead by writing 
the schema/footer on each chunk of data and then having to read multiple Arrow 
file inputs and concat them together.
    
    Using the Arrow stream format has improved these by increasing performance, 
lower memory overhead for the average case, and simplified the code.  Here are 
the details of this change:
    
    **toPandas()**
    
    _Before:_
    Spark internal rows are converted to Arrow file format, each group of 
records is a complete Arrow file which contains the schema and other metadata.  
Next a collect is done and an Array of Arrow files is the result.  After that 
each Arrow file is sent to Python driver which then loads each file and concats 
them to a single Arrow DataFrame.
    
    _After:_
    Spark internal rows are converted to ArrowRecordBatches directly, which is 
the simplest Arrow component for IPC data transfers.  The driver JVM then 
immediately starts serving data to Python as an Arrow stream, sending the 
schema first. It then starts Spark jobs with a custom handler such that when a 
partition is received (and in the correct order) the ArrowRecordBatches can be 
sent to python as soon as possible.  This improves performance, simplifies 
memory usage on executors, and improves the average memory usage on the JVM 
driver.  Since the order of partitions must be preserved, the worst case is 
that the first partition will be the last to arrive and all data must be kept 
in memory until then.  This case is no worse that before when doing a full 
collect.
    
    **createDataFrame()**
    
    _Before:_
    A Pandas DataFrame is split into parts and each part is made into an Arrow 
file.  Then each file is prefixed by the buffer size and written to a temp 
file.  The temp file is read and each Arrow file is parallelized as a byte 
array.
    
    _After:_
    A Pandas DataFrame is split into parts, then an Arrow stream is written to 
a temp file where each part is an ArrowRecordBatch.  The temp file is read as a 
stream and the Arrow messages are examined.  If the message is an 
ArrowRecordBatch, the data is saved as a byte array.  After reading the file, 
each ArrowRecordBatch is parallelized as a byte array.  This has slightly more 
processing than before because we must look each Arrow message to extract the 
record batches, but performance remains the same.  It is cleaner in the sense 
that IPC from Python to JVM is done over a single Arrow stream.
    
    ## How was this patch tested?
    
    Added new unit tests for the additions to ArrowConverters in Scala, 
existing tests for Python.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/BryanCutler/spark 
arrow-toPandas-stream-SPARK-23030

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21546.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21546
    
----
commit 9af482170ee95831bbda139e6e931ba2631df386
Author: Bryan Cutler <cutlerb@...>
Date:   2018-01-10T22:02:15Z

    change ArrowConverters to stream format

commit d617f0da8eff1509da465bb707340e391314bec4
Author: Bryan Cutler <cutlerb@...>
Date:   2018-01-10T22:14:07Z

    Change ArrowSerializer to use stream format

commit f10d5d9cd3cece7f56749e1de7fe01699e4759a0
Author: Bryan Cutler <cutlerb@...>
Date:   2018-01-12T00:40:36Z

    toPandas is working with RecordBatch payloads, using custom handler to 
stream ordered partitions

commit 03653c687473b82bbfb6653504479498a2a3c63b
Author: Bryan Cutler <cutlerb@...>
Date:   2018-02-10T00:23:17Z

    cleanup and removed ArrowPayload, createDataFrame now working

commit 1b932463bca0815e79f3a8d61d1c816e62949698
Author: Bryan Cutler <cutlerb@...>
Date:   2018-03-09T00:14:06Z

    toPandas and createDataFrame working but tests fail with date cols

commit ce22d8ad18e052d150528752b727c6cfe11485f7
Author: Bryan Cutler <cutlerb@...>
Date:   2018-03-27T00:32:03Z

    removed usage of seekableByteChannel

commit dede0bd96921c439747a9176f24c9ecbb9c8ce0a
Author: Bryan Cutler <cutlerb@...>
Date:   2018-03-28T00:28:54Z

    for toPandas, set old collection result to null and add comments

commit 9e29b092cb7d45fa486db0215c3bd4a99c5f8d98
Author: Bryan Cutler <cutlerb@...>
Date:   2018-03-28T18:28:18Z

    cleanup, not yet passing ArrowConvertersSuite

commit ceb8d38a6c83c3b6dae040c9e8d860811ecad0cc
Author: Bryan Cutler <cutlerb@...>
Date:   2018-03-29T21:14:03Z

    fix to read Arrow stream with multiple batches, cleanup, add docs, scala 
tests pass, style pass

commit f42e4ea7b4fb944eeefd39a0fd6a1428b527214a
Author: Bryan Cutler <cutlerb@...>
Date:   2018-03-29T22:17:25Z

    use base OutputStream for serveToStream instead of DataOutputStream

commit 951843d760aa6d29ff18112e82d28f4f6dc09907
Author: Bryan Cutler <cutlerb@...>
Date:   2018-03-29T22:21:13Z

    accidentally removed date type checks, passing pyspark tests

commit af03c6b384fe4ea73d67ad1d3f46be4a1e027e9e
Author: Bryan Cutler <cutlerb@...>
Date:   2018-06-12T00:29:12Z

    Changed to only use Arrow batch bytes as payload, had to hack Arrow 
MessageChannelReader

commit b047c1624429ea579aa279e92909b90400b40c58
Author: Bryan Cutler <cutlerb@...>
Date:   2018-06-12T00:30:48Z

    added todo comment

commit a77b89ea0357e3ce146ff35537eb7da8a8c80bad
Author: Bryan Cutler <cutlerb@...>
Date:   2018-06-12T17:35:05Z

    change getBatchesFromStream to return iterator

commit 81c82093edf78d36a1de850b3d8faede88fb0524
Author: Bryan Cutler <cutlerb@...>
Date:   2018-06-12T18:06:22Z

    need to end stream on toPandas after all batches sent to python, and added 
some comments

commit 5f46a02aa34e9f51ac310d27e3272b883c67cc37
Author: Bryan Cutler <cutlerb@...>
Date:   2018-06-12T18:11:19Z

    forgot to remove old comment

commit 7694b8fe6970789d4e88f9c5df46c97a4b235f02
Author: Bryan Cutler <cutlerb@...>
Date:   2018-06-12T18:47:49Z

    fixed up comments

commit a5a1fbe7121c5b0dd93876a56c29ad17dcd9b168
Author: Bryan Cutler <cutlerb@...>
Date:   2018-06-12T20:22:30Z

    fixed up some wording

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to