Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19575#discussion_r163499909 --- Diff: docs/sql-programming-guide.md --- @@ -1640,6 +1640,147 @@ Configuration of Hive is done by placing your `hive-site.xml`, `core-site.xml` a You may run `./bin/spark-sql --help` for a complete list of all available options. +# Usage Guide for Pandas with Arrow + +## Arrow in Spark + +Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer +data between JVM and Python processes. This currently is most beneficial to Python users that +work with Pandas/NumPy data. It's usage is not automatic and might require some minor +changes to configuration or code to take full advantage and ensure compatibility. This guide will +give a high-level description of how to use Arrow in Spark and highlight any differences when +working with Arrow-enabled data. + +## Ensure pyarrow Installed + +If you have installed pyspark using pip, then pyarrow will automatically be brought in as a dependency. +Otherwise, you must ensure that pyarrow is installed and available on all cluster node Python +environments. The current supported version is 0.8.0. You can install using pip or conda from the +conda-forge channel. See pyarrow [installation](https://arrow.apache.org/docs/python/install.html) +for details. + +## How to Enable for Conversion to/from Pandas + +Arrow is available as an optimization when converting a Spark DataFrame to Pandas using the call +`toPandas()` and when creating a Spark DataFrame from Pandas with `createDataFrame(pandas_df)`. +To use Arrow when executing these calls, it first must be enabled by setting the Spark conf +'spark.sql.execution.arrow.enabled' to 'true', this is disabled by default. + +<div class="codetabs"> +<div data-lang="python" markdown="1"> +{% highlight python %} + +import numpy as np +import pandas as pd + +# Enable Arrow, 'spark' is an existing SparkSession +spark.conf.set("spark.sql.execution.arrow.enabled", "true") + +# Generate sample data +pdf = pd.DataFrame(np.random.rand(100, 3)) + +# Create a Spark DataFrame from Pandas data using Arrow +df = spark.createDataFrame(pdf) + +# Convert the Spark DataFrame to a local Pandas DataFrame +selpdf = df.select(" * ").toPandas() + +{% endhighlight %} +</div> +</div> + +Using the above optimizations with Arrow will produce the same results as when Arrow is not +enabled. Not all Spark data types are currently supported and an error will be raised if a column +has an unsupported type, see [Supported Types](#supported-types). + +## How to Write Vectorized UDFs + +A vectorized UDF is similar to a standard UDF in Spark except the inputs and output of the will +be Pandas Series, which allow the function to be composed with vectorized operations. This function +can then be run very efficiently in Spark where data is sent in batches to Python and the function +is executed using Pandas Series as input. The exected output of the function is also a Pandas +Series of the same length as the inputs. A vectorized UDF is declared using the `pandas_udf` +keyword, no additional configuration is required. + +The following example shows how to create a vectorized UDF that computes the product of 2 columns. + +<div class="codetabs"> +<div data-lang="python" markdown="1"> +{% highlight python %} + +import pandas as pd +from pyspark.sql.functions import col, pandas_udf +from pyspark.sql.types import LongType + +# Declare the function and create the UDF +def multiply_func(a, b): + return a * b + +multiply = pandas_udf(multiply_func, returnType=LongType()) + +# The function for a pandas_udf should be able to execute with local Pandas data +x = pd.Series([1, 2, 3]) +print(multiply_func(x, x)) +# 0 1 +# 1 4 +# 2 9 +# dtype: int64 + +# Create a Spark DataFrame +df = spark.createDataFrame(pd.DataFrame(x, columns=["x"])) + +# Execute function as a Spark vectorized UDF +df.select(multiply(col("x"), col("x"))).show() +# +-------------------+ +# |multiply_func(x, x)| +# +-------------------+ +# | 1| +# | 4| +# | 9| +# +-------------------+ + +{% endhighlight %} +</div> +</div> + +## GroupBy-Apply UDFs + +## Usage Notes + +### Supported types + +Currently, all Spark SQL data types are supported except `MapType`, `ArrayType` of `TimestampType`, and +nested `StructType`. + +### Setting Arrow Batch Size + +Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to +high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow +record batches can be adjusted by setting the conf "spark.sql.execution.arrow.maxRecordsPerBatch" +to an integer that will determine the maximum number of rows for each batch. Using this limit, +each data partition will be made into 1 or more record batches for processing. --- End diff -- Should we mention about the default value of `spark.sql.execution.arrow.maxRecordsPerBatch`?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org