As a side note, I also just tried to unify into a single function registration 
and used _ as the type parameter in the classOf calls there and within the 
TypeInference definition for the accumulator and still ended up with the exact 
same stack trace.

Dylan

On 1/20/21, 9:22 AM, "Dylan Forciea" <dy...@oseberg.io> wrote:

    Timo,

    I appreciate it! I am using Flink 1.12.0 right now with the Blink planner. 
What you proposed is roughly what I had come up with the first time around that 
resulted in the stack trace with the ClassCastException I had originally 
included. I saw that you had used a Row instead of just the value in our 
example, but changing it that way didn't seem to help, which makes sense since 
the problem seems to be in the code generated for the accumulator Converter and 
not the output. 

    Here is the exact code that caused that error (while calling 
LatestNonNullLong):

    The registration of the below:
        env.createTemporarySystemFunction("LatestNonNullLong", 
classOf[LatestNonNull[Long]])
        env.createTemporarySystemFunction("LatestNonNullString", 
classOf[LatestNonNull[String]])


    The class itself:

    import java.time.LocalDate
    import java.util.Optional
    import org.apache.flink.table.api.DataTypes
    import org.apache.flink.table.catalog.DataTypeFactory
    import org.apache.flink.table.functions.AggregateFunction
    import org.apache.flink.table.types.inference.{InputTypeStrategies, 
TypeInference}

    case class LatestNonNullAccumulator[T](
        var value: T = null.asInstanceOf[T],
        var date: LocalDate = null)

    class LatestNonNull[T] extends AggregateFunction[T, 
LatestNonNullAccumulator[T]] {

      override def createAccumulator(): LatestNonNullAccumulator[T] = {
        LatestNonNullAccumulator[T]()
      }

      override def getValue(acc: LatestNonNullAccumulator[T]): T = {
        acc.value
      }

      def accumulate(acc: LatestNonNullAccumulator[T], value: T, date: 
LocalDate): Unit = {
        if (value != null) {
          Option(acc.date).fold {
            acc.value = value
            acc.date = date
          } { accDate =>
            if (date != null && date.isAfter(accDate)) {
              acc.value = value
              acc.date = date
            }
          }
        }
      }

      def merge(
          acc: LatestNonNullAccumulator[T],
          it: java.lang.Iterable[LatestNonNullAccumulator[T]]): Unit = {
        val iter = it.iterator()
        while (iter.hasNext) {
          val a = iter.next()
          if (a.value != null) {
            Option(acc.date).fold {
              acc.value = a.value
              acc.date = a.date
            } { accDate =>
              Option(a.date).map { curDate =>
                if (curDate.isAfter(accDate)) {
                  acc.value = a.value
                  acc.date = a.date
                }
              }
            }
          }
        }
      }

      def resetAccumulator(acc: LatestNonNullAccumulator[T]): Unit = {
        acc.value = null.asInstanceOf[T]
        acc.date = null
      }

      override def getTypeInference(typeFactory: DataTypeFactory): 
TypeInference = {
        TypeInference
          .newBuilder()
          .inputTypeStrategy(InputTypeStrategies
            .sequence(InputTypeStrategies.ANY, 
InputTypeStrategies.explicit(DataTypes.DATE())))
          .accumulatorTypeStrategy { callContext =>
            val accDataType = DataTypes.STRUCTURED(
              classOf[LatestNonNullAccumulator[T]],
              DataTypes.FIELD("value", callContext.getArgumentDataTypes.get(0)),
              DataTypes.FIELD("date", DataTypes.DATE()))

            Optional.of(accDataType)
          }
          .outputTypeStrategy { callContext =>
            val outputDataType = callContext.getArgumentDataTypes().get(0);
            Optional.of(outputDataType);
          }
          .build()
      }
    }

    Regards,
    Dylan Forciea

    On 1/20/21, 2:37 AM, "Timo Walther" <twal...@apache.org> wrote:

        Hi Dylan,

        I'm assuming your are using Flink 1.12 and the Blink planner?

        Beginning from 1.12 you can use the "new" aggregate functions with a 
        better type inference. So TypeInformation will not be used in this 
stack.

        I tried to come up with an example that should explain the rough 
design. 
        I will include this example into the Flink code base. I hope this helps:



        import org.apache.flink.table.types.inference.InputTypeStrategies;

        public static class LastIfNotNull<T>
                 extends AggregateFunction<Row, LastIfNotNull.Accumulator<T>> {

             public static class Accumulator<T> {
                 public T value;
                 public LocalDate date;
             }

             public void accumulate(Accumulator<T> acc, T input, LocalDate 
date) {
                 if (input != null) {
                     acc.value = input;
                     acc.date = date;
                 }
             }

             @Override
             public Row getValue(Accumulator<T> acc) {
                 return Row.of(acc.value, acc.date);
             }

             @Override
             public Accumulator<T> createAccumulator() {
                 return new Accumulator<>();
             }

             @Override
             public TypeInference getTypeInference(DataTypeFactory typeFactory) 
{
                 return TypeInference.newBuilder()
                         .inputTypeStrategy(
                                 InputTypeStrategies.sequence(
                                         InputTypeStrategies.ANY,

        InputTypeStrategies.explicit(DataTypes.DATE())))
                         .accumulatorTypeStrategy(
                                 callContext -> {
                                     DataType accDataType =
                                             DataTypes.STRUCTURED(
                                                     Accumulator.class,
                                                     DataTypes.FIELD(
                                                             "value",

        callContext.getArgumentDataTypes().get(0)),
                                                     DataTypes.FIELD("date", 
        DataTypes.DATE()));
                                     return Optional.of(accDataType);
                                 })
                         .outputTypeStrategy(
                                 callContext -> {
                                     DataType argDataType = 
        callContext.getArgumentDataTypes().get(0);
                                     DataType outputDataType =
                                             DataTypes.ROW(
                                                     DataTypes.FIELD("value", 
        argDataType),
                                                     DataTypes.FIELD("date", 
        DataTypes.DATE()));
                                     return Optional.of(outputDataType);
                                 })
                         .build();
             }
        }

        Regards,
        Timo



        On 20.01.21 01:04, Dylan Forciea wrote:
        > I am attempting to create an aggregate UDF that takes a generic 
        > parameter T, but for the life of me, I can’t seem to get it to work.
        > 
        > The UDF I’m trying to implement takes two input arguments, a value 
that 
        > is generic, and a date. It will choose the non-null value with the 
        > latest associated date. I had originally done this with separate Top 
1 
        > queries connected with a left join, but the memory usage seems far 
        > higher than doing this with a custom aggregate function.
        > 
        > As a first attempt, I tried to use custom type inference to have it 
        > validate that the first argument type is the output type and have a 
        > single function, and also used DataTypes.STRUCTURE to try to define 
the 
        > shape of my accumulator. However, that resulted in an exception like 
        > this whenever I tried to use a non-string value as the first argument:
        > 
        > [error] Caused by: java.lang.ClassCastException: java.lang.Long 
cannot 
        > be cast to java.lang.String
        > 
        > [error]   at 
        > 
io$oseberg$flink$udf$LatestNonNullAccumulator$Converter.toInternal(Unknown 
        > Source)
        > 
        > [error]   at 
        > 
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)
        > 
        > [error]   at 
        > 
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)
        > 
        > [error]   at 
        > 
org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)
        > 
        > [error]   at GroupAggsHandler$777.getAccumulators(Unknown Source)
        > 
        > [error]   at 
        > 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)
        > 
        > [error]   at 
        > 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)
        > 
        > [error]   at 
        > 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
        > 
        > [error]   at 
        > 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
        > 
        > [error]   at 
        > 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
        > 
        > [error]   at 
        > 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
        > 
        > [error]   at 
        > 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
        > 
        > [error]   at 
        > 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
        > 
        > [error]   at 
        > 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
        > 
        > [error]   at 
        > 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
        > 
        > [error]   at 
        > 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
        > 
        > [error]   at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
        > 
        > [error]   at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
        > 
        > [error]   at java.lang.Thread.run(Thread.java:748)
        > 
        > Figuring that I can’t do something of that sort, I tried to follow 
the 
        > general approach in the Sum accumulator[1] in the Flink source code 
        > where separate classes are derived from a base class, and each 
        > advertises its accumulator shape, but ended up with the exact same 
stack 
        > trace as above when I tried to create and use a function specifically 
        > for a non-string type like Long.
        > 
        > Is there something I’m missing as far as how this is supposed to be 
        > done? Everything I try either results in a stack track like the 
above, 
        > or type erasure issues when trying to get type information for the 
        > accumulator. If I just copy the generic code multiple times and just 
        > directly use Long or String rather than using subclassing, then it 
works 
        > just fine. I appreciate any help I can get on this!
        > 
        > Regards,
        > 
        > Dylan Forciea
        > 
        > [1] 
        > 
https://github.com/apache/flink/blob/release-1.12.0/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
        > 



Reply via email to