Timo,

Will do! I have been patching in a change locally that I have a PR [1] out for, 
so if this will end up in the next 1.12 patch release, I may add this in with 
it once it has been approved and merged.

On a side note, that PR has been out since the end of October (looks like I 
need to do a rebase to accommodate the code reformatting change that occurred 
since). Is there a process for getting somebody to review it? Not sure if with 
the New Year and the 1.12 release and follow-up if it just got lost in the 
commotion.

Regards,
Dylan Forciea

[1] https://github.com/apache/flink/pull/13787

On 1/21/21, 8:50 AM, "Timo Walther" <twal...@apache.org> wrote:

    I opened a PR. Feel free to try it out.

    https://github.com/apache/flink/pull/14720

    Btw:

     >> env.createTemporarySystemFunction("LatestNonNullLong",
     >> classOf[LatestNonNull[Long]])
     >>
     >> env.createTemporarySystemFunction("LatestNonNullString",
     >> classOf[LatestNonNull[String]])

    don't make a difference. The generics will be type erased in bytecode 
    and only the class name matters.

    Thanks,
    Timo

    On 21.01.21 11:36, Timo Walther wrote:
    > Hi Dylan,
    > 
    > thanks for the investigation. I can now also reproduce it my code. Yes, 
    > this is a bug. I opened
    > 
    > https://issues.apache.org/jira/browse/FLINK-21070
    > 
    > and will try to fix this asap.
    > 
    > Thanks,
    > Timo
    > 
    > On 20.01.21 17:52, Dylan Forciea wrote:
    >> Timo,
    >>
    >> I converted what I had to Java, and ended up with the exact same issue 
    >> as before where it will work if I only ever use it on 1 type, but not 
    >> if I use it on multiple. Maybe this is a bug?
    >>
    >> Dylan
    >>
    >> On 1/20/21, 10:06 AM, "Dylan Forciea" <dy...@oseberg.io> wrote:
    >>
    >>      Oh, I think I might have a clue as to what is going on. I notice 
    >> that it will work properly when I only call it on Long. I think that 
    >> it is using the same generated code for the Converter for whatever was 
    >> called first.
    >>
    >>      Since in Scala I can't declare an object as static within the 
    >> class itself, I wonder if it won't generate appropriate Converter code 
    >> per subtype. I tried creating a subclass that is specific to the type 
    >> within my class and returning that as the accumulator, but that didn't 
    >> help. And, I can't refer to that class in the TypeInference since it 
    >> isn't static and I get an error from Flink because of that. I'm going 
    >> to see if I just write this UDF in Java with an embedded public static 
    >> class like you have if it will solve my problems. I'll report back to 
    >> let you know what I find. If that works, I'm not quite sure how to 
    >> make it work in Scala.
    >>
    >>      Regards,
    >>      Dylan Forciea
    >>
    >>      On 1/20/21, 9:34 AM, "Dylan Forciea" <dy...@oseberg.io> wrote:
    >>
    >>          As a side note, I also just tried to unify into a single 
    >> function registration and used _ as the type parameter in the classOf 
    >> calls there and within the TypeInference definition for the 
    >> accumulator and still ended up with the exact same stack trace.
    >>
    >>          Dylan
    >>
    >>          On 1/20/21, 9:22 AM, "Dylan Forciea" <dy...@oseberg.io> wrote:
    >>
    >>              Timo,
    >>
    >>              I appreciate it! I am using Flink 1.12.0 right now with 
    >> the Blink planner. What you proposed is roughly what I had come up 
    >> with the first time around that resulted in the stack trace with the 
    >> ClassCastException I had originally included. I saw that you had used 
    >> a Row instead of just the value in our example, but changing it that 
    >> way didn't seem to help, which makes sense since the problem seems to 
    >> be in the code generated for the accumulator Converter and not the 
    >> output.
    >>
    >>              Here is the exact code that caused that error (while 
    >> calling LatestNonNullLong):
    >>
    >>              The registration of the below:
    >>                  
    >> env.createTemporarySystemFunction("LatestNonNullLong", 
    >> classOf[LatestNonNull[Long]])
    >>                  
    >> env.createTemporarySystemFunction("LatestNonNullString", 
    >> classOf[LatestNonNull[String]])
    >>
    >>
    >>              The class itself:
    >>
    >>              import java.time.LocalDate
    >>              import java.util.Optional
    >>              import org.apache.flink.table.api.DataTypes
    >>              import org.apache.flink.table.catalog.DataTypeFactory
    >>              import org.apache.flink.table.functions.AggregateFunction
    >>              import 
    >> org.apache.flink.table.types.inference.{InputTypeStrategies, 
    >> TypeInference}
    >>
    >>              case class LatestNonNullAccumulator[T](
    >>                  var value: T = null.asInstanceOf[T],
    >>                  var date: LocalDate = null)
    >>
    >>              class LatestNonNull[T] extends AggregateFunction[T, 
    >> LatestNonNullAccumulator[T]] {
    >>
    >>                override def createAccumulator(): 
    >> LatestNonNullAccumulator[T] = {
    >>                  LatestNonNullAccumulator[T]()
    >>                }
    >>
    >>                override def getValue(acc: 
    >> LatestNonNullAccumulator[T]): T = {
    >>                  acc.value
    >>                }
    >>
    >>                def accumulate(acc: LatestNonNullAccumulator[T], value: 
    >> T, date: LocalDate): Unit = {
    >>                  if (value != null) {
    >>                    Option(acc.date).fold {
    >>                      acc.value = value
    >>                      acc.date = date
    >>                    } { accDate =>
    >>                      if (date != null && date.isAfter(accDate)) {
    >>                        acc.value = value
    >>                        acc.date = date
    >>                      }
    >>                    }
    >>                  }
    >>                }
    >>
    >>                def merge(
    >>                    acc: LatestNonNullAccumulator[T],
    >>                    it: 
    >> java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
    >>                  val iter = it.iterator()
    >>                  while (iter.hasNext) {
    >>                    val a = iter.next()
    >>                    if (a.value != null) {
    >>                      Option(acc.date).fold {
    >>                        acc.value = a.value
    >>                        acc.date = a.date
    >>                      } { accDate =>
    >>                        Option(a.date).map { curDate =>
    >>                          if (curDate.isAfter(accDate)) {
    >>                            acc.value = a.value
    >>                            acc.date = a.date
    >>                          }
    >>                        }
    >>                      }
    >>                    }
    >>                  }
    >>                }
    >>
    >>                def resetAccumulator(acc: LatestNonNullAccumulator[T]): 
    >> Unit = {
    >>                  acc.value = null.asInstanceOf[T]
    >>                  acc.date = null
    >>                }
    >>
    >>                override def getTypeInference(typeFactory: 
    >> DataTypeFactory): TypeInference = {
    >>                  TypeInference
    >>                    .newBuilder()
    >>                    .inputTypeStrategy(InputTypeStrategies
    >>                      .sequence(InputTypeStrategies.ANY, 
    >> InputTypeStrategies.explicit(DataTypes.DATE())))
    >>                    .accumulatorTypeStrategy { callContext =>
    >>                      val accDataType = DataTypes.STRUCTURED(
    >>                        classOf[LatestNonNullAccumulator[T]],
    >>                        DataTypes.FIELD("value", 
    >> callContext.getArgumentDataTypes.get(0)),
    >>                        DataTypes.FIELD("date", DataTypes.DATE()))
    >>
    >>                      Optional.of(accDataType)
    >>                    }
    >>                    .outputTypeStrategy { callContext =>
    >>                      val outputDataType = 
    >> callContext.getArgumentDataTypes().get(0);
    >>                      Optional.of(outputDataType);
    >>                    }
    >>                    .build()
    >>                }
    >>              }
    >>
    >>              Regards,
    >>              Dylan Forciea
    >>
    >>              On 1/20/21, 2:37 AM, "Timo Walther" <twal...@apache.org> 
    >> wrote:
    >>
    >>                  Hi Dylan,
    >>
    >>                  I'm assuming your are using Flink 1.12 and the Blink 
    >> planner?
    >>
    >>                  Beginning from 1.12 you can use the "new" aggregate 
    >> functions with a
    >>                  better type inference. So TypeInformation will not be 
    >> used in this stack.
    >>
    >>                  I tried to come up with an example that should 
    >> explain the rough design.
    >>                  I will include this example into the Flink code base. 
    >> I hope this helps:
    >>
    >>
    >>
    >>                  import 
    >> org.apache.flink.table.types.inference.InputTypeStrategies;
    >>
    >>                  public static class LastIfNotNull<T>
    >>                           extends AggregateFunction<Row, 
    >> LastIfNotNull.Accumulator<T>> {
    >>
    >>                       public static class Accumulator<T> {
    >>                           public T value;
    >>                           public LocalDate date;
    >>                       }
    >>
    >>                       public void accumulate(Accumulator<T> acc, T 
    >> input, LocalDate date) {
    >>                           if (input != null) {
    >>                               acc.value = input;
    >>                               acc.date = date;
    >>                           }
    >>                       }
    >>
    >>                       @Override
    >>                       public Row getValue(Accumulator<T> acc) {
    >>                           return Row.of(acc.value, acc.date);
    >>                       }
    >>
    >>                       @Override
    >>                       public Accumulator<T> createAccumulator() {
    >>                           return new Accumulator<>();
    >>                       }
    >>
    >>                       @Override
    >>                       public TypeInference 
    >> getTypeInference(DataTypeFactory typeFactory) {
    >>                           return TypeInference.newBuilder()
    >>                                   .inputTypeStrategy(
    >>                                           InputTypeStrategies.sequence(
    >>                                                   
    >> InputTypeStrategies.ANY,
    >>
    >>                  InputTypeStrategies.explicit(DataTypes.DATE())))
    >>                                   .accumulatorTypeStrategy(
    >>                                           callContext -> {
    >>                                               DataType accDataType =
    >>                                                       
    >> DataTypes.STRUCTURED(
    >>                                                               
    >> Accumulator.class,
    >>                                                               
    >> DataTypes.FIELD(
    >>                                                                       
    >> "value",
    >>
    >>                  callContext.getArgumentDataTypes().get(0)),
    >>                                                               
    >> DataTypes.FIELD("date",
    >>                  DataTypes.DATE()));
    >>                                               return 
    >> Optional.of(accDataType);
    >>                                           })
    >>                                   .outputTypeStrategy(
    >>                                           callContext -> {
    >>                                               DataType argDataType =
    >>                  callContext.getArgumentDataTypes().get(0);
    >>                                               DataType outputDataType =
    >>                                                       DataTypes.ROW(
    >>                                                               
    >> DataTypes.FIELD("value",
    >>                  argDataType),
    >>                                                               
    >> DataTypes.FIELD("date",
    >>                  DataTypes.DATE()));
    >>                                               return 
    >> Optional.of(outputDataType);
    >>                                           })
    >>                                   .build();
    >>                       }
    >>                  }
    >>
    >>                  Regards,
    >>                  Timo
    >>
    >>
    >>
    >>                  On 20.01.21 01:04, Dylan Forciea wrote:
    >>                  > I am attempting to create an aggregate UDF that 
    >> takes a generic
    >>                  > parameter T, but for the life of me, I can’t seem 
    >> to get it to work.
    >>                  >
    >>                  > The UDF I’m trying to implement takes two input 
    >> arguments, a value that
    >>                  > is generic, and a date. It will choose the non-null 
    >> value with the
    >>                  > latest associated date. I had originally done this 
    >> with separate Top 1
    >>                  > queries connected with a left join, but the memory 
    >> usage seems far
    >>                  > higher than doing this with a custom aggregate 
    >> function.
    >>                  >
    >>                  > As a first attempt, I tried to use custom type 
    >> inference to have it
    >>                  > validate that the first argument type is the output 
    >> type and have a
    >>                  > single function, and also used DataTypes.STRUCTURE 
    >> to try to define the
    >>                  > shape of my accumulator. However, that resulted in 
    >> an exception like
    >>                  > this whenever I tried to use a non-string value as 
    >> the first argument:
    >>                  >
    >>                  > [error] Caused by: java.lang.ClassCastException: 
    >> java.lang.Long cannot
    >>                  > be cast to java.lang.String
    >>                  >
    >>                  > [error]   at
    >>                  > 
    >> 
io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown 
    >>
    >>                  > Source)
    >>                  >
    >>                  > [error]   at
    >>                  > 
    >> 
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)
 
    >>
    >>                  >
    >>                  > [error]   at
    >>                  > 
    >> 
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)
 
    >>
    >>                  >
    >>                  > [error]   at
    >>                  > 
    >> 
org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)
 
    >>
    >>                  >
    >>                  > [error]   at 
    >> GroupAggsHandler$777.getAccumulators(Unknown Source)
    >>                  >
    >>                  > [error]   at
    >>                  > 
    >> 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)
 
    >>
    >>                  >
    >>                  > [error]   at
    >>                  > 
    >> 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)
 
    >>
    >>                  >
    >>                  > [error]   at
    >>                  > 
    >> 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
 
    >>
    >>                  >
    >>                  > [error]   at
    >>                  > 
    >> 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
 
    >>
    >>                  >
    >>                  > [error]   at
    >>                  > 
    >> 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
 
    >>
    >>                  >
    >>                  > [error]   at
    >>                  > 
    >> 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
 
    >>
    >>                  >
    >>                  > [error]   at
    >>                  > 
    >> 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 
    >>
    >>                  >
    >>                  > [error]   at
    >>                  > 
    >> 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
 
    >>
    >>                  >
    >>                  > [error]   at
    >>                  > 
    >> 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
 
    >>
    >>                  >
    >>                  > [error]   at
    >>                  > 
    >> 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
 
    >>
    >>                  >
    >>                  > [error]   at
    >>                  > 
    >> 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) 
    >>
    >>                  >
    >>                  > [error]   at 
    >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    >>                  >
    >>                  > [error]   at 
    >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    >>                  >
    >>                  > [error]   at java.lang.Thread.run(Thread.java:748)
    >>                  >
    >>                  > Figuring that I can’t do something of that sort, I 
    >> tried to follow the
    >>                  > general approach in the Sum accumulator[1] in the 
    >> Flink source code
    >>                  > where separate classes are derived from a base 
    >> class, and each
    >>                  > advertises its accumulator shape, but ended up with 
    >> the exact same stack
    >>                  > trace as above when I tried to create and use a 
    >> function specifically
    >>                  > for a non-string type like Long.
    >>                  >
    >>                  > Is there something I’m missing as far as how this 
    >> is supposed to be
    >>                  > done? Everything I try either results in a stack 
    >> track like the above,
    >>                  > or type erasure issues when trying to get type 
    >> information for the
    >>                  > accumulator. If I just copy the generic code 
    >> multiple times and just
    >>                  > directly use Long or String rather than using 
    >> subclassing, then it works
    >>                  > just fine. I appreciate any help I can get on this!
    >>                  >
    >>                  > Regards,
    >>                  >
    >>                  > Dylan Forciea
    >>                  >
    >>                  > [1]
    >>                  > 
    >> 
https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 
    >>
    >>                  >
    >>
    >>
    >>
    >>
    >>
    > 


Reply via email to