Kyle Osborne created SPARK-46776:
------------------------------------

             Summary: Pandas UDF Serialization Error: Expected Array, got 
<class 'pyarrow.lib.ChunkedArray'>
                 Key: SPARK-46776
                 URL: https://issues.apache.org/jira/browse/SPARK-46776
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 3.5.0
         Environment: python 3.11.7

-> pip install pyspark[sql]

pyspark: 3.5.0

pyarrow: 14.0.2

pandas: 2.1.4
            Reporter: Kyle Osborne


When returning a large pandas Dataframe from a UDF, it can get converted to a 
"pyarrow.lib.ChunkedArray", which is not allowed in the pyarrow 
"StructArray.from_arrays" function, giving the error message:
{code:java}
File "pyarrow/array.pxi", line 3211, in pyarrow.lib.StructArray.from_arrays 
TypeError: Expected Array, got <class 'pyarrow.lib.ChunkedArray'> {code}
code to reproduce:
{code:java}
from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
from pyspark.sql.functions import pandas_udf
import numpy as np
import pyarrow as pa
import pandas as pd

def large_pandas_udf(iterator):
    for pdf in iterator:
        big_unique_strings = ['x' * ((1 << 20) - 1) + str(i % 10) for i in 
range(10)]
        strings_list = [big_unique_strings[i % 10] for i in range(1 << 11)]
        arr = np.array(strings_list)
        yield pd.DataFrame(arr, columns=['string_column'])

def large_pandas_workaround_udf(iterator):
    for pdf in iterator:
        big_unique_strings = ['x' * ((1 << 20) - 1) + str(i % 10) for i in 
range(10)]
        strings_list = [big_unique_strings[i % 10] for i in range(1 << 11)]
        arr = np.array(strings_list)
        num_chunks = 20
        chunk_size = len(arr) // 20
        for i in range(num_chunks+10):
            yield pd.DataFrame(arr[i*chunk_size:(i+1)*chunk_size], 
columns=['string_column'])

def run_udf(spark, udf=large_pandas_udf):
    schema = T.StructType([T.StructField("string_column", T.StringType())])
    
spark.conf.set('spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled', 
'false') # get full error traceback
    df = spark.createDataFrame([['v']], schema)
    return df.mapInPandas(udf, schema).count() {code}
 

stacktrace:
{quote}{{Traceback (most recent call last):}}
{{  File "<stdin>", line 1, in <module>}}
{{  File "<string>", line 30, in run_udf}}
{{  File 
"/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/sql/dataframe.py",
 line 1234, in count}}
{{    return int(self._jdf.count())}}
{{               ^^^^^^^^^^^^^^^^^}}
{{  File 
"/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
 line 1322, in _{_}call{_}_}}
{{  File 
"/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py",
 line 185, in deco}}
{{    raise converted from None}}
{{pyspark.errors.exceptions.captured.PythonException:}}
{{  An exception was thrown from the Python worker. Please see the stack trace 
below.}}
{{Traceback (most recent call last):}}
{{  File 
"/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py",
 line 1247, in main}}
{{    process()}}
{{  File 
"/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py",
 line 1239, in process}}
{{    serializer.dump_stream(out_iter, outfile)}}
{{  File 
"/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
 line 470, in dump_stream}}
{{    return ArrowStreamSerializer.dump_stream(self, 
init_stream_yield_batches(), stream)}}
{{           
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^}}
{{  File 
"/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
 line 100, in dump_stream}}
{{    for batch in iterator:}}
{{  File 
"/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
 line 464, in init_stream_yield_batches}}
{{    batch = self._create_batch(series)}}
{{            ^^^^^^^^^^^^^^^^^^^^^^^^^^}}
{{  File 
"/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
 line 448, in _create_batch}}
{{    arrs.append(self._create_struct_array(s, t))}}
{{                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^}}
{{  File 
"/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
 line 410, in _create_struct_array}}
{{    return pa.StructArray.from_arrays(struct_arrs, struct_names)}}
{{           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^}}
{{  File "pyarrow/array.pxi", line 3211, in 
pyarrow.lib.StructArray.from_arrays}}
{{TypeError: Expected Array, got <class 'pyarrow.lib.ChunkedArray'>}}
{quote}
Related github issue discussing this pyarrow array constructor behavior: 
[https://github.com/apache/arrow/issues/34755]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to