Kyle Osborne created SPARK-46776: ------------------------------------ Summary: Pandas UDF Serialization Error: Expected Array, got <class 'pyarrow.lib.ChunkedArray'> Key: SPARK-46776 URL: https://issues.apache.org/jira/browse/SPARK-46776 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 3.5.0 Environment: python 3.11.7
-> pip install pyspark[sql] pyspark: 3.5.0 pyarrow: 14.0.2 pandas: 2.1.4 Reporter: Kyle Osborne When returning a large pandas Dataframe from a UDF, it can get converted to a "pyarrow.lib.ChunkedArray", which is not allowed in the pyarrow "StructArray.from_arrays" function, giving the error message: {code:java} File "pyarrow/array.pxi", line 3211, in pyarrow.lib.StructArray.from_arrays TypeError: Expected Array, got <class 'pyarrow.lib.ChunkedArray'> {code} code to reproduce: {code:java} from pyspark.sql import DataFrame, SparkSession from typing import List import pyspark.sql.types as T from pyspark.sql.functions import pandas_udf import numpy as np import pyarrow as pa import pandas as pd def large_pandas_udf(iterator): for pdf in iterator: big_unique_strings = ['x' * ((1 << 20) - 1) + str(i % 10) for i in range(10)] strings_list = [big_unique_strings[i % 10] for i in range(1 << 11)] arr = np.array(strings_list) yield pd.DataFrame(arr, columns=['string_column']) def large_pandas_workaround_udf(iterator): for pdf in iterator: big_unique_strings = ['x' * ((1 << 20) - 1) + str(i % 10) for i in range(10)] strings_list = [big_unique_strings[i % 10] for i in range(1 << 11)] arr = np.array(strings_list) num_chunks = 20 chunk_size = len(arr) // 20 for i in range(num_chunks+10): yield pd.DataFrame(arr[i*chunk_size:(i+1)*chunk_size], columns=['string_column']) def run_udf(spark, udf=large_pandas_udf): schema = T.StructType([T.StructField("string_column", T.StringType())]) spark.conf.set('spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled', 'false') # get full error traceback df = spark.createDataFrame([['v']], schema) return df.mapInPandas(udf, schema).count() {code} stacktrace: {quote}{{Traceback (most recent call last):}} {{ File "<stdin>", line 1, in <module>}} {{ File "<string>", line 30, in run_udf}} {{ File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/sql/dataframe.py", line 1234, in count}} {{ return int(self._jdf.count())}} {{ ^^^^^^^^^^^^^^^^^}} {{ File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in _{_}call{_}_}} {{ File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py", line 185, in deco}} {{ raise converted from None}} {{pyspark.errors.exceptions.captured.PythonException:}} {{ An exception was thrown from the Python worker. Please see the stack trace below.}} {{Traceback (most recent call last):}} {{ File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main}} {{ process()}} {{ File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process}} {{ serializer.dump_stream(out_iter, outfile)}} {{ File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 470, in dump_stream}} {{ return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)}} {{ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^}} {{ File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 100, in dump_stream}} {{ for batch in iterator:}} {{ File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 464, in init_stream_yield_batches}} {{ batch = self._create_batch(series)}} {{ ^^^^^^^^^^^^^^^^^^^^^^^^^^}} {{ File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 448, in _create_batch}} {{ arrs.append(self._create_struct_array(s, t))}} {{ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^}} {{ File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 410, in _create_struct_array}} {{ return pa.StructArray.from_arrays(struct_arrs, struct_names)}} {{ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^}} {{ File "pyarrow/array.pxi", line 3211, in pyarrow.lib.StructArray.from_arrays}} {{TypeError: Expected Array, got <class 'pyarrow.lib.ChunkedArray'>}} {quote} Related github issue discussing this pyarrow array constructor behavior: [https://github.com/apache/arrow/issues/34755] -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org