Thanks for reporting this. Looks like the window namespace was replaced by VoidNamespace in state entry. I've created https://issues.apache.org/jira/browse/FLINK-18464 to further investigate it.
Regards, Roman On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <[email protected]> wrote: > I'm using flink 1.9 on Mesos and I try to use my own trigger and evictor. > The state is stored to memory. > > input.setParallelism(processParallelism) > .assignTimestampsAndWatermarks(new UETimeAssigner) > .keyBy(_.key) > .window(TumblingEventTimeWindows.of(Time.minutes(20))) > .trigger(new MyTrigger) > .evictor(new MyEvictor) > .process(new MyFunction).setParallelism(aggregateParallelism) > .addSink(kafkaSink).setParallelism(sinkParallelism) > .name("kafka-record-sink") > > And the exception stack is here, could anyone help with this? Thanks! > > java.lang.Exception: Could not materialize checkpoint 1 for operator > Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor, > ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5). > at org.apache.flink.streaming.runtime.tasks. > StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask > .java:1100) > at org.apache.flink.streaming.runtime.tasks. > StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.ExecutionException: java.lang. > ClassCastException: org.apache.flink.streaming.api.windowing.windows. > TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet( > FutureUtils.java:450) > at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer > .<init>(OperatorSnapshotFinalizer.java:47) > at org.apache.flink.streaming.runtime.tasks. > StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011) > ... 3 more > Caused by: java.lang.ClassCastException: > org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast > to org.apache.flink.runtime.state.VoidNamespace > at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize( > VoidNamespaceSerializer.java:32) > at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot > .writeState(CopyOnWriteStateMapSnapshot.java:114) > at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot > .writeStateInKeyGroup(AbstractStateTableSnapshot.java:121) > at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot > .writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37) > at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1 > .callInternal(HeapSnapshotStrategy.java:191) > at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1 > .callInternal(HeapSnapshotStrategy.java:158) > at org.apache.flink.runtime.state.AsyncSnapshotCallable.call( > AsyncSnapshotCallable.java:75) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet( > FutureUtils.java:447) > ... 5 more > > -- > Best regards > > Sili Liu >
