I am attempting to create an aggregate UDF that takes a generic parameter T, 
but for the life of me, I can’t seem to get it to work.

The UDF I’m trying to implement takes two input arguments, a value that is 
generic, and a date. It will choose the non-null value with the latest 
associated date. I had originally done this with separate Top 1 queries 
connected with a left join, but the memory usage seems far higher than doing 
this with a custom aggregate function.

As a first attempt, I tried to use custom type inference to have it validate 
that the first argument type is the output type and have a single function, and 
also used DataTypes.STRUCTURE to try to define the shape of my accumulator. 
However, that resulted in an exception like this whenever I tried to use a 
non-string value as the first argument:

[error] Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast 
to java.lang.String
[error]   at 
io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown 
Source)
[error]   at 
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)
[error]   at 
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)
[error]   at 
org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)
[error]   at GroupAggsHandler$777.getAccumulators(Unknown Source)
[error]   at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)
[error]   at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)
[error]   at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
[error]   at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
[error]   at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
[error]   at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
[error]   at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
[error]   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
[error]   at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
[error]   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
[error]   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
[error]   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
[error]   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
[error]   at java.lang.Thread.run(Thread.java:748)

Figuring that I can’t do something of that sort, I tried to follow the general 
approach in the Sum accumulator[1] in the Flink source code where separate 
classes are derived from a base class, and each advertises its accumulator 
shape, but ended up with the exact same stack trace as above when I tried to 
create and use a function specifically for a non-string type like Long.

Is there something I’m missing as far as how this is supposed to be done? 
Everything I try either results in a stack track like the above, or type 
erasure issues when trying to get type information for the accumulator. If I 
just copy the generic code multiple times and just directly use Long or String 
rather than using subclassing, then it works just fine. I appreciate any help I 
can get on this!

Regards,
Dylan Forciea

[1] 
https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala



Reply via email to