Thanks for reporting this.

Looks like the window namespace was replaced by VoidNamespace in state
entry.
I've created https://issues.apache.org/jira/browse/FLINK-18464 to further
investigate it.

Regards,
Roman


On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <[email protected]> wrote:

> I'm using flink 1.9 on Mesos and I try to use my own trigger and evictor.
> The state is stored to memory.
>
> input.setParallelism(processParallelism)
>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>         .keyBy(_.key)
>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>         .trigger(new MyTrigger)
>         .evictor(new MyEvictor)
>         .process(new MyFunction).setParallelism(aggregateParallelism)
>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>         .name("kafka-record-sink")
>
> And the exception stack is here, could anyone help with this? Thanks!
>
> java.lang.Exception: Could not materialize checkpoint 1 for operator
> Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor,
> ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask
> .java:1100)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: java.lang.
> ClassCastException: org.apache.flink.streaming.api.windowing.windows.
> TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>     at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(
> FutureUtils.java:450)
>     at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer
> .<init>(OperatorSnapshotFinalizer.java:47)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>     ... 3 more
> Caused by: java.lang.ClassCastException:
> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast
> to org.apache.flink.runtime.state.VoidNamespace
>     at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(
> VoidNamespaceSerializer.java:32)
>     at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot
> .writeState(CopyOnWriteStateMapSnapshot.java:114)
>     at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot
> .writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
>     at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot
> .writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
> .callInternal(HeapSnapshotStrategy.java:191)
>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
> .callInternal(HeapSnapshotStrategy.java:158)
>     at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(
> AsyncSnapshotCallable.java:75)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(
> FutureUtils.java:447)
>     ... 5 more
>
> --
> Best regards
>
> Sili Liu
>

Reply via email to