viirya commented on a change in pull request #29548: URL: https://github.com/apache/spark/pull/29548#discussion_r478766422
########## File path: python/docs/source/user_guide/arrow_pandas.rst ########## @@ -0,0 +1,411 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +======================= +Apache Arrow in PySpark +======================= + +.. currentmodule:: pyspark.sql + +Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer +data between JVM and Python processes. This currently is most beneficial to Python users that +work with Pandas/NumPy data. Its usage is not automatic and might require some minor +changes to configuration or code to take full advantage and ensure compatibility. This guide will +give a high-level description of how to use Arrow in Spark and highlight any differences when +working with Arrow-enabled data. + +Ensure PyArrow Installed +------------------------ + +To use Apache Arrow in PySpark, `the recommended version of PyArrow <arrow_pandas.rst#recommended-pandas-and-pyarrow-versions>`_ +should be installed. +If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the +SQL module with the command ``pip install pyspark[sql]``. Otherwise, you must ensure that PyArrow +is installed and available on all cluster nodes. +You can install using pip or conda from the conda-forge channel. See PyArrow +`installation <https://arrow.apache.org/docs/python/install.html>`_ for details. + +Enabling for Conversion to/from Pandas +-------------------------------------- + +Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame +using the call :meth:`DataFrame.toPandas` and when creating a Spark DataFrame from a Pandas DataFrame with +:meth:`SparkSession.createDataFrame`. To use Arrow when executing these calls, users need to first set +the Spark configuration ``spark.sql.execution.arrow.pyspark.enabled`` to ``true``. This is disabled by default. + +In addition, optimizations enabled by ``spark.sql.execution.arrow.pyspark.enabled`` could fallback automatically +to non-Arrow optimization implementation if an error occurs before the actual computation within Spark. +This can be controlled by ``spark.sql.execution.arrow.pyspark.fallback.enabled``. + +.. literalinclude:: ../../../../examples/src/main/python/sql/arrow.py + :language: python + :lines: 35-48 + :dedent: 4 + +Using the above optimizations with Arrow will produce the same results as when Arrow is not +enabled. + +Note that even with Arrow, :meth:`DataFrame.toPandas` results in the collection of all records in the +DataFrame to the driver program and should be done on a small subset of the data. Not all Spark +data types are currently supported and an error can be raised if a column has an unsupported type. +If an error occurs during :meth:`SparkSession.createDataFrame`, Spark will fall back to create the +DataFrame without Arrow. + +Pandas UDFs (a.k.a. Vectorized UDFs) +------------------------------------ + +.. currentmodule:: pyspark.sql.functions + +Pandas UDFs are user defined functions that are executed by Spark using +Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas +UDF is defined using the :meth:`pandas_udf` as a decorator or to wrap the function, and no additional +configuration is required. A Pandas UDF behaves as a regular PySpark function API in general. + +Before Spark 3.0, Pandas UDFs used to be defined with ``pyspark.sql.functions.PandasUDFType``. From Spark 3.0 +with Python 3.6+, you can also use `Python type hints <https://www.python.org/dev/peps/pep-0484>`_. +Using Python type hints are preferred and using ``pyspark.sql.functions.PandasUDFType`` will be deprecated in +the future release. + +.. currentmodule:: pyspark.sql.types + +Note that the type hint should use ``pandas.Series`` in all cases but there is one variant +that ``pandas.DataFrame`` should be used for its input or output type hint instead when the input +or output column is of :class:`StructType`. The following example shows a Pandas UDF which takes long +column, string column and struct column, and outputs a struct column. It requires the function to +specify the type hints of ``pandas.Series`` and ``pandas.DataFrame`` as below: + +.. literalinclude:: ../../../../examples/src/main/python/sql/arrow.py + :language: python + :lines: 54-78 + :dedent: 4 + +In the following sections, it describes the combinations of the supported type hints. For simplicity, +``pandas.DataFrame`` variant is omitted. + +Series to Series +~~~~~~~~~~~~~~~~ + +.. currentmodule:: pyspark.sql.functions + +The type hint can be expressed as ``pandas.Series``, ... -> ``pandas.Series``. + +By using :func:`pandas_udf` with the function having such type hints above, it creates a Pandas UDF where the given +function takes one or more ``pandas.Series`` and outputs one ``pandas.Series``. The output of the function should +always be of the same length as the input. Internally, PySpark will execute a Pandas UDF by splitting +columns into batches and calling the function for each batch as a subset of the data, then concatenating +the results together. + +The following example shows how to create this Pandas UDF that computes the product of 2 columns. + +.. literalinclude:: ../../../../examples/src/main/python/sql/arrow.py + :language: python + :lines: 82-112 + :dedent: 4 + +For detailed usage, please see :func:`pandas_udf`. + +Iterator of Series to Iterator of Series +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. currentmodule:: pyspark.sql.functions + +The type hint can be expressed as ``Iterator[pandas.Series]`` -> ``Iterator[pandas.Series]``. + +By using :func:`pandas_udf` with the function having such type hints above, it creates a Pandas UDF where the given +function takes an iterator of ``pandas.Series`` and outputs an iterator of ``pandas.Series``. The +length of the entire output from the function should be the same length of the entire input; therefore, it can +prefetch the data from the input iterator as long as the lengths are the same. +In this case, the created Pandas UDF requires one input column when the Pandas UDF is called. To use +multiple input columns, a different type hint is required. See Iterator of Multiple Series to Iterator +of Series. + +It is also useful when the UDF execution requires initializing some states although internally it works +identically as Series to Series case. The pseudocode below illustrates the example. + +.. code-block:: python + + @pandas_udf("long") + def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: + # Do some expensive initialization with a state + state = very_expensive_initialization() + for x in iterator: + # Use that state for whole iterator. + yield calculate_with_state(x, state) + + df.select(calculate("value")).show() + +The following example shows how to create this Pandas UDF: + +.. literalinclude:: ../../../../examples/src/main/python/sql/arrow.py + :language: python + :lines: 116-138 + :dedent: 4 + +For detailed usage, please see :func:`pandas_udf`. + +Iterator of Multiple Series to Iterator of Series +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. currentmodule:: pyspark.sql.functions + +The type hint can be expressed as ``Iterator[Tuple[pandas.Series, ...]]`` -> ``Iterator[pandas.Series]``. + +By using :func:`pandas_udf` with the function having such type hints above, it creates a Pandas UDF where the +given function takes an iterator of a tuple of multiple ``pandas.Series`` and outputs an iterator of ``pandas.Series``. +In this case, the created pandas UDF requires multiple input columns as many as the series in the tuple +when the Pandas UDF is called. Otherwise, it has the same characteristics and restrictions as Iterator of Series +to Iterator of Series case. + +The following example shows how to create this Pandas UDF: + +.. literalinclude:: ../../../../examples/src/main/python/sql/arrow.py + :language: python + :lines: 142-165 + :dedent: 4 + +For detailed usage, please see :func:`pandas_udf`. + +Series to Scalar +~~~~~~~~~~~~~~~~ + +.. currentmodule:: pyspark.sql.functions + +The type hint can be expressed as ``pandas.Series``, ... -> ``Any``. + +By using :func:`pandas_udf` with the function having such type hints above, it creates a Pandas UDF similar +to PySpark's aggregate functions. The given function takes `pandas.Series` and returns a scalar value. +The return type should be a primitive data type, and the returned scalar can be either a python +primitive type, e.g., ``int`` or ``float`` or a numpy data type, e.g., ``numpy.int64`` or ``numpy.float64``. +``Any`` should ideally be a specific scalar type accordingly. + +.. currentmodule:: pyspark.sql + +This UDF can be also used with :meth:`GroupedData.agg` and `Window`. +It defines an aggregation from one or more ``pandas.Series`` to a scalar value, where each ``pandas.Series`` +represents a column within the group or window. + +Note that this type of UDF does not support partial aggregation and all data for a group or window +will be loaded into memory. Also, only unbounded window is supported with Grouped aggregate Pandas +UDFs currently. The following example shows how to use this type of UDF to compute mean with a group-by +and window operations: + +.. literalinclude:: ../../../../examples/src/main/python/sql/arrow.py + :language: python + :lines: 169-210 + :dedent: 4 + +.. currentmodule:: pyspark.sql.functions + +For detailed usage, please see :func:`pandas_udf`. + +Pandas Function APIs +-------------------- + +.. currentmodule:: pyspark.sql + +Pandas Function APIs can directly apply a Python native function against the whole :class:`DataFrame` by +using Pandas instances. Internally it works similarly with Pandas UDFs by using Arrow to transfer +data and Pandas to work with the data, which allows vectorized operations. However, A Pandas Function +API behaves as a regular API under PySpark :class:`DataFrame` instead of :class:`Column`, and Python type hints in Pandas Review comment: However, A Pandas Function API -> However, a Pandas Function API ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
