dongjoon-hyun commented on code in PR #52909:
URL: https://github.com/apache/spark/pull/52909#discussion_r2505979202
##########
python/pyspark/sql/dataframe.py:
##########
@@ -6524,8 +6524,7 @@ def mapInArrow(
>>> df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
>>> def filter_func(iterator):
... for batch in iterator:
- ... pdf = batch.to_pandas()
- ... yield pa.RecordBatch.from_pandas(pdf[pdf.id == 1])
+ ... yield batch.filter(pa.compute.field("id") == 1)
>>> df.mapInArrow(filter_func, df.schema).show()
Review Comment:
The failure happens here.
```
File "/__w/spark/spark/python/pyspark/sql/dataframe.py", line 409, in
pyspark.sql.dataframe.DataFrame.mapInArrow
Failed example:
df.mapInArrow(filter_func, df.schema).show()
Exception raised:
Traceback (most recent call last):
File "/usr/lib/python3.10/doctest.py", line 1350, in __run
exec(compile(example.source, filename, "single",
File "<doctest pyspark.sql.dataframe.DataFrame.mapInArrow[3]>", line
1, in <module>
df.mapInArrow(filter_func, df.schema).show()
File "/__w/spark/spark/python/pyspark/sql/classic/dataframe.py", line
285, in show
print(self._show_string(n, truncate, vertical))
File "/__w/spark/spark/python/pyspark/sql/classic/dataframe.py", line
303, in _show_string
return self._jdf.showString(n, 20, vertical)
File
"/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/java_gateway.py", line
1362, in __call__
return_value = get_return_value(
File "/__w/spark/spark/python/pyspark/errors/exceptions/captured.py",
line 269, in deco
raise converted from None
File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line
3343, in main
process()
File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line
3334, in process
serializer.dump_stream(out_iter, outfile)
File
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
line 187, in dump_stream
return super(ArrowStreamUDFSerializer,
self).dump_stream(wrap_and_init_stream(), stream)
File
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
line 121, in dump_stream
for batch in iterator:
File
"/__w/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
line 167, in wrap_and_init_stream
for batch, _ in iterator:
File "/__w/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line
2857, in func
for result_batch, result_type in result_iter:
File "<doctest pyspark.sql.dataframe.DataFrame.mapInArrow[2]>", line
3, in filter_func
yield batch.filter(pa.compute.field("id") == 1)
File "pyarrow/table.pxi", line 2580, in pyarrow.lib.RecordBatch.filter
return _pc().filter(self, mask, null_selection_behavior)
File "/usr/local/lib/python3.10/dist-packages/pyarrow/compute.py",
line 263, in wrapper
return func.call(args, options, memory_pool)
File "pyarrow/_compute.pyx", line 372, in
pyarrow._compute.Function.call
_pack_compute_args(args, &c_batch.values)
File "pyarrow/_compute.pyx", line 505, in
pyarrow._compute._pack_compute_args
raise TypeError(f"Got unexpected argument type {type(val)} "
pyspark.errors.exceptions.captured.PythonException:
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]