Hi,

For an aggregating UDF, use spark.udf.registerJavaUDAF(name, className).

Enrico



Am 23.04.23 um 23:42 schrieb Thomas Wang:
Hi Spark Community,

I have implemented a custom Spark Aggregator (a subclass to |org.apache.spark.sql.expressions.Aggregator|). Now I'm trying to use it in a PySpark application, but for some reason, I'm not able to trigger the function. Here is what I'm doing, could someone help me take a look? Thanks.

spark = self._gen_spark_session()
spark.udf.registerJavaFunction(
name="MyAggrator",
javaClassName="my.package.MyAggrator",
returnType=ArrayType(elementType=LongType()),
)

The above code runs successfully. However, to call it, I assume I should do something like the following.

df = df.groupBy().agg(
functions.expr("MyAggrator(input)").alias("output"),
)

But this one gives me the following error:

pyspark.sql.utils.AnalysisException: UDF class my.package.MyAggrator doesn't 
implement any UDF interface

My question is how can I use the Spark Aggregator defined in a jar file in PySpark? Thanks.

Thomas

Reply via email to