PySpark 2.3.1 on YARN, Python 3.6, PyArrow 0.8.
I'm trying to run a pandas UDF, but I seem to get nonsensical exceptions in
the last stage of the job regardless of my output type.
The problem I'm trying to solve:
I have a column of scalar values, and each value on the same row has a
sorted vector. I'm trying to replace each scalar value with its closest
index from its vector. I'm applying the grouping arbitrarily and performing
a python operation row-wise because even when the same vector appears on
many rows it's not clear how I would get the lookup to scale.
My input data, the product of a join of hive tables, has the following
schema:
root
|-- scalar_value: float (nullable = true)
|-- quantilelist: array (nullable = true)
| |-- element: double (containsNull = true)
My UDF is at bottom. I'm using a GROUPED_MAP UDF because I want to perform
an operation on two columns, and because I want to take advantage of Arrow
to avoid serialization.
The schema my UDF returns is this:
pos_schema = T.StructType([
T.StructField('feature_value',T.FloatType(),True),
T.StructField('error',T.StringType())
])
...however when I try to apply my UDF, either with saveAsTable or show(), I
get the following exception:
org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand
the buffer
at
org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
at
org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
at
org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
at
org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
at
org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
at
org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
at
org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
at
org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
at
org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95)
at
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
at
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)
I assumed it was the result of some bad typing on my part, until I did a
test with a degenerate UDF that only returns a column of 1:
@F.pandas_udf(T.StructType([T.StructField('feature_value',T.IntegerType(),True)]),
F.PandasUDFType.GROUPED_MAP)
def groupedPercentileInt(df):
return
pd.DataFrame({'feature_value':[int(1)]*df.shape[0]}).reset_index(drop=True)
This clearly only has one return value of type int, yet I get the same
exception:
org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand
the buffer
at
org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
at
org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
at
org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
at
org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
at
org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
at
org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
at
org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
at
org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
at
org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95)
at
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
at
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)
What seems very strange is that it's still falling over when trying to work
with double types, even though I'm not working with any double types. I
tried to look into the underlying code, but I don't know Scala well enough
to suss out the issue.
Is this a bug?
My UDF:
@F.pandas_udf(pos_schema, F.PandasUDFType.GROUPED_MAP)
def groupedPercentile(df):
"""
Pandas UDF to apply binary search for a group of records.
"""
def getPercentile(x):
"""
Given a scalar v and a 1000-length vector of quantiles q
produce the percentile of the distribution most closely
corresponding to v's position in q
"""
v = x['scalar_value']
q = x['quantilelist']
# the vector is length 1000 so for the sake of simplicity
# we're going to pretend it's actually 1024
q1024 = []
q1024.extend(q.tolist())
q1024.extend([q[-1]]*24)
start = 0
end = 1024
while start != end:
half_len = int((end - start) / 2)
if v > q1024[start + half_len]:
start = (end - half_len)
else:
end = (end - half_len)
if start > 1000:
start = 1000
return start
try:
df.loc[:,'feature_value'] = df.apply(getPercentile,axis=1)
df.loc[:,'error'] = [None]*df.shape[0]
except Exception as e:
df.loc[:,'feature_value'] = [None]*df.shape[0]
df.loc[:,'error'] = [str(e)]*df.shape[0]
finally:
return df[['feature_value','error']]