David Vogelbacher created SPARK-27805:
-----------------------------------------

             Summary: toPandas does not propagate SparkExceptions with arrow 
enabled
                 Key: SPARK-27805
                 URL: https://issues.apache.org/jira/browse/SPARK-27805
             Project: Spark
          Issue Type: Improvement
          Components: PySpark, SQL
    Affects Versions: 3.1.0
            Reporter: David Vogelbacher


When calling {{toPandas}} with arrow enabled errors encountered during the 
collect are not propagated to the python process.
There is only a very general {{EofError}} raised.
Example of behavior with arrow enabled vs. arrow disabled:
{noformat}
import traceback
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def raise_exception():
  raise Exception("My error")
error_udf = udf(raise_exception, IntegerType())
df = spark.range(3).toDF("i").withColumn("x", error_udf())
try:
    df.toPandas()
except:
    no_arrow_exception = traceback.format_exc()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
try:
    df.toPandas()
except:
    arrow_exception = traceback.format_exc()
print no_arrow_exception
print arrow_exception
{noformat}
{{arrow_exception}} gives as output:
{noformat}
>>> print arrow_exception
Traceback (most recent call last):
  File "<stdin>", line 2, in <module>
  File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 
2143, in toPandas
    batches = self._collectAsArrow()
  File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 
2205, in _collectAsArrow
    results = list(_load_from_socket(sock_info, ArrowCollectSerializer()))
  File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 210, 
in load_stream
    num = read_int(stream)
  File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 810, 
in read_int
    raise EOFError
EOFError
{noformat}

{{no_arrow_exception}} gives as output:
{noformat}
Traceback (most recent call last):
  File "<stdin>", line 2, in <module>
  File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 
2166, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "/Users/dvogelbacher/git/spark/python/pyspark/sql/dataframe.py", line 
516, in collect
    sock_info = self._jdf.collectToPython()
  File 
"/Users/dvogelbacher/git/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
 line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/dvogelbacher/git/spark/python/pyspark/sql/utils.py", line 89, in 
deco
    return f(*a, **kw)
  File 
"/Users/dvogelbacher/git/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py",
 line 328, in get_return_value
    format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o38.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in 
stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 (TID 
7, localhost, executor driver): org.apache.spark.api.python.PythonException: 
Traceback (most recent call last):
  File 
"/Users/dvogelbacher/git/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
428, in main
    process()
  File 
"/Users/dvogelbacher/git/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
423, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 438, 
in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 141, 
in dump_stream
    for obj in iterator:
  File "/Users/dvogelbacher/git/spark/python/pyspark/serializers.py", line 427, 
in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File 
"/Users/dvogelbacher/git/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
86, in <lambda>
    return lambda *a: f(*a)
  File "/Users/dvogelbacher/git/spark/python/pyspark/util.py", line 99, in 
wrapper
    return f(*args, **kwargs)
  File "<stdin>", line 2, in raise_exception
Exception: My error
...
{noformat}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to