I am using Roaring64NavigableMap to compute uv. It is ok to us flink 
planner and not ok with blink planner. The SQL is as following:
SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as curTimestamp, A, 
B, C, D,
        E, uv(bitmap(id)) as bmp
FROM person
GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E

      The udf is as following:
public static class Bitmap extends AggregateFunction<Roaring64NavigableMap, 
Roaring64NavigableMap> {
   @Override
   public Roaring64NavigableMap createAccumulator() {
      return new Roaring64NavigableMap();
   }

   @Override
   public Roaring64NavigableMap getValue(Roaring64NavigableMap accumulator) {
      return accumulator;
   }

   public void accumulate(Roaring64NavigableMap bitmap, long id) {
      bitmap.add(id);
   }
}
public static class UV extends ScalarFunction {
   public long eval(Roaring64NavigableMap bitmap) {
      return bitmap.getLongCardinality();
   }
}
      The error is as following:

2020-04-20 16:37:13,868 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph      
[flink-akka.actor.default-dispatcher-40]  - 
GroupWindowAggregate(groupBy=[brand, platform, channel, versionName, 
appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 60000)], 
properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand, platform, 
channel, versionName, appMajorVersion, bitmap(id) AS $f5, start('w$) AS 
w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
w$proctime]) -> Calc(select=[toLong(w$start) AS curTimestamp, brand, platform, 
channel, versionName, appMajorVersion, uv($f5) AS bmp]) -> 
SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink: Unnamed) (321/480) 
(8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING to FAILED.
java.lang.ArrayIndexOutOfBoundsException: -1
  at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
  at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
  at 
org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62)
  at 
org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37)
  at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
  at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
  at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
  at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244)
  at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
  at 
org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
  at 
org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:337)
  at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204)
  at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196)
  at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
  at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
  at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
  at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
  at java.lang.Thread.run(Thread.java:745)

      Do I need register Roaring64NavigableMap somewhere? Anyone can help me? 
Thank you.

Reply via email to