Use Spark Aggregator in PySpark

2023-04-23 Thread Thomas Wang
Hi Spark Community,

I have implemented a custom Spark Aggregator (a subclass to
org.apache.spark.sql.expressions.Aggregator). Now I'm trying to use it in a
PySpark application, but for some reason, I'm not able to trigger the
function. Here is what I'm doing, could someone help me take a look? Thanks.

spark = self._gen_spark_session()
spark.udf.registerJavaFunction(
name="MyAggrator",
javaClassName="my.package.MyAggrator",
returnType=ArrayType(elementType=LongType()),
)

The above code runs successfully. However, to call it, I assume I should do
something like the following.

df = df.groupBy().agg(
functions.expr("MyAggrator(input)").alias("output"),
)

But this one gives me the following error:

pyspark.sql.utils.AnalysisException: UDF class my.package.MyAggrator
doesn't implement any UDF interface


My question is how can I use the Spark Aggregator defined in a jar file in
PySpark? Thanks.

Thomas


Re: Spark Aggregator with ARRAY input and ARRAY output

2023-04-23 Thread Thomas Wang
Thanks Raghavendra,

Could you be more specific about how I can use ExpressionEncoder()? More
specifically, how can I conform to the return type of Encoder>?

Thomas

On Sun, Apr 23, 2023 at 9:42 AM Raghavendra Ganesh 
wrote:

> For simple array types setting encoder to ExpressionEncoder() should work.
> --
> Raghavendra
>
>
> On Sun, Apr 23, 2023 at 9:20 PM Thomas Wang  wrote:
>
>> Hi Spark Community,
>>
>> I'm trying to implement a custom Spark Aggregator (a subclass to
>> org.apache.spark.sql.expressions.Aggregator). Correct me if I'm wrong,
>> but I'm assuming I will be able to use it as an aggregation function like
>> SUM.
>>
>> What I'm trying to do is that I have a column of ARRAY and I
>> would like to GROUP BY another column and perform element-wise SUM if
>> the boolean flag is set to True. The result of such aggregation should
>> return ARRAY.
>>
>> Here is my implementation so far:
>>
>> package mypackage.udf;
>>
>> import org.apache.spark.sql.Encoder;
>> import org.apache.spark.sql.expressions.Aggregator;
>>
>> import java.util.ArrayList;
>> import java.util.List;
>>
>> public class ElementWiseAgg extends Aggregator, List, 
>> List> {
>>
>> @Override
>> public List zero() {
>> return new ArrayList<>();
>> }
>>
>> @Override
>> public List reduce(List b, List a) {
>> if (a == null) return b;
>> int diff = a.size() - b.size();
>> for (int i = 0; i < diff; i++) {
>> b.add(0L);
>> }
>> for (int i = 0; i < a.size(); i++) {
>> if (a.get(i)) b.set(i, b.get(i) + 1);
>> }
>> return b;
>> }
>>
>> @Override
>> public List merge(List b1, List b2) {
>> List longer;
>> List shorter;
>> if (b1.size() > b2.size()) {
>> longer = b1;
>> shorter = b2;
>> } else {
>> longer = b2;
>> shorter = b1;
>> }
>> for (int i = 0; i < shorter.size(); i++) {
>> longer.set(i, longer.get(i) + shorter.get(i));
>> }
>> return longer;
>> }
>>
>> @Override
>> public List finish(List reduction) {
>> return reduction;
>> }
>>
>> @Override
>> public Encoder> bufferEncoder() {
>> return null;
>> }
>>
>> @Override
>> public Encoder> outputEncoder() {
>> return null;
>> }
>> }
>>
>> The part I'm not quite sure is how to override bufferEncoder and
>> outputEncoder. The default Encoders list does not provide encoding for
>> List.
>>
>> Can someone point me to the right direction? Thanks!
>>
>>
>> Thomas
>>
>>
>>


Re: Spark Aggregator with ARRAY input and ARRAY output

2023-04-23 Thread Raghavendra Ganesh
For simple array types setting encoder to ExpressionEncoder() should work.
--
Raghavendra


On Sun, Apr 23, 2023 at 9:20 PM Thomas Wang  wrote:

> Hi Spark Community,
>
> I'm trying to implement a custom Spark Aggregator (a subclass to
> org.apache.spark.sql.expressions.Aggregator). Correct me if I'm wrong,
> but I'm assuming I will be able to use it as an aggregation function like
> SUM.
>
> What I'm trying to do is that I have a column of ARRAY and I
> would like to GROUP BY another column and perform element-wise SUM if the
> boolean flag is set to True. The result of such aggregation should return
> ARRAY.
>
> Here is my implementation so far:
>
> package mypackage.udf;
>
> import org.apache.spark.sql.Encoder;
> import org.apache.spark.sql.expressions.Aggregator;
>
> import java.util.ArrayList;
> import java.util.List;
>
> public class ElementWiseAgg extends Aggregator, List, 
> List> {
>
> @Override
> public List zero() {
> return new ArrayList<>();
> }
>
> @Override
> public List reduce(List b, List a) {
> if (a == null) return b;
> int diff = a.size() - b.size();
> for (int i = 0; i < diff; i++) {
> b.add(0L);
> }
> for (int i = 0; i < a.size(); i++) {
> if (a.get(i)) b.set(i, b.get(i) + 1);
> }
> return b;
> }
>
> @Override
> public List merge(List b1, List b2) {
> List longer;
> List shorter;
> if (b1.size() > b2.size()) {
> longer = b1;
> shorter = b2;
> } else {
> longer = b2;
> shorter = b1;
> }
> for (int i = 0; i < shorter.size(); i++) {
> longer.set(i, longer.get(i) + shorter.get(i));
> }
> return longer;
> }
>
> @Override
> public List finish(List reduction) {
> return reduction;
> }
>
> @Override
> public Encoder> bufferEncoder() {
> return null;
> }
>
> @Override
> public Encoder> outputEncoder() {
> return null;
> }
> }
>
> The part I'm not quite sure is how to override bufferEncoder and
> outputEncoder. The default Encoders list does not provide encoding for
> List.
>
> Can someone point me to the right direction? Thanks!
>
>
> Thomas
>
>
>


Spark Aggregator with ARRAY input and ARRAY output

2023-04-23 Thread Thomas Wang
Hi Spark Community,

I'm trying to implement a custom Spark Aggregator (a subclass to
org.apache.spark.sql.expressions.Aggregator). Correct me if I'm wrong, but
I'm assuming I will be able to use it as an aggregation function like SUM.

What I'm trying to do is that I have a column of ARRAY and I would
like to GROUP BY another column and perform element-wise SUM if the boolean
flag is set to True. The result of such aggregation should return
ARRAY.

Here is my implementation so far:

package mypackage.udf;

import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.expressions.Aggregator;

import java.util.ArrayList;
import java.util.List;

public class ElementWiseAgg extends Aggregator,
List, List> {

@Override
public List zero() {
return new ArrayList<>();
}

@Override
public List reduce(List b, List a) {
if (a == null) return b;
int diff = a.size() - b.size();
for (int i = 0; i < diff; i++) {
b.add(0L);
}
for (int i = 0; i < a.size(); i++) {
if (a.get(i)) b.set(i, b.get(i) + 1);
}
return b;
}

@Override
public List merge(List b1, List b2) {
List longer;
List shorter;
if (b1.size() > b2.size()) {
longer = b1;
shorter = b2;
} else {
longer = b2;
shorter = b1;
}
for (int i = 0; i < shorter.size(); i++) {
longer.set(i, longer.get(i) + shorter.get(i));
}
return longer;
}

@Override
public List finish(List reduction) {
return reduction;
}

@Override
public Encoder> bufferEncoder() {
return null;
}

@Override
public Encoder> outputEncoder() {
return null;
}
}

The part I'm not quite sure is how to override bufferEncoder and
outputEncoder. The default Encoders list does not provide encoding for List.

Can someone point me to the right direction? Thanks!


Thomas


State of GraphX and GraphFrames

2023-04-23 Thread g
Hello,

I am currently doing my Master thesis on data provenance on Apache Spark and 
would like to extend the provenance capabilities to include GraphX/GraphFrames. 
I am curious what the current status of both GraphX and GraphFrames is. It 
seems that GraphX is no longer being updated (but still supported) as noted in 
the excellent High Performance Spark book by Rachel Warren & Holden Karau. As 
for GraphFrames, it also seems that it is in a similar situation seeing that 
the pace of commits to the repo  
has also been quite low over the last years.

Could anyone enlighten me on what the current state of graph processing is in 
Apache Spark?

Kind regards,
Gilles Magalhaes