I seem to have an issue in Spark where I create a spark worker process and listen for jobs from another machine. After about 24 hours and ~3000 jobs, some jobs in my spark worker just hang indefinitely.
I am trying to set a timeout for my tasks so that the spark session can be stopped and re-started if a job is taking more than 1 hour or so. To do this, I send a signal and raise an exception - similar to: library https://github.com/pnpnpn/timeout-decorator And it seems to work well in normal python, but not in PySpark. When the timeout signal is sent in pySpark, py4j seems to catch it and throws a py4j.Py4jError - and hence i cannot figure out if the error was caused by a timeout or something else. I am wondering how I can figure out what caused the original exception in Pyspark. here is some example code to throw a similar error, and I am unable to figure out in my `except` whether it was caused by MyExc or something else: import pyspark from pyspark.sql import functions as F spark = pyspark.sql.SparkSession.builder.getOrCreate() df = spark.createDataFrame( [['a1', 'b1', 'c1', 1], ['a2', 'b2', 'c2', 2], ['a3', 'b3', 'c3', 3]], ['a', 'b', 'c', 'x']) class MyExc(Exception): pass @pyspark.sql.functions.udf def myudf(x): raise MyExc("my exception") return x df = df.withColumn("x2", myudf(df['x'])) try: df.show() except Exception as err: print("Got err", type(err), err) # import ipdb; ipdb.set_trace() raise