Hi, Thanks for your help. The checkpoint configuration is
checkpoint.intervalMS=300000 checkpoint.timeoutMS=300000 The error callstack is from JM's log, which happened in every cp. Currently I don't have a success cp yet. Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 上午3:50写道: > Hi, > > Thanks for the details. > However, I was not able to reproduce the issue. I used parallelism levels > 4, file system backend and tried different timings for > checkpointing, windowing and source. > Do you encounter this problem deterministically, is it always 1st > checkpoint? > What checkpointing interval do you use? > > Regards, > Roman > > > On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <unix...@gmail.com> wrote: > >> Hi, this is our production code so I have to modify it a little bit, such >> as variable name and function name. I think 3 classes I provide here is >> enough. >> >> I try to join two streams, but I don't want to use the default join >> function, because I want to send the joined log immediately and remove it >> from window state immediately. And my window gap time is very long( 20 >> minutes), so it maybe evaluate it multiple times. >> >> class JoinFunction extends >> ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{ >> >> var ueState: ValueState[RawLog] = _ >> @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _ >> val invalidCounter = new LongCounter() >> val processCounter = new LongCounter() >> val sendToKafkaCounter = new LongCounter() >> >> override def open(parameters: Configuration): Unit = { >> ueState = getRuntimeContext.getState( >> new ValueStateDescriptor[RawLog](bidState, classOf[RawLog]) >> ) >> gZipThriftSerializer = new GZipThriftSerializer[MyType]() >> getRuntimeContext.addAccumulator("processCounter", this.processCounter) >> getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter) >> getRuntimeContext.addAccumulator("sendToKafkaCounter", >> this.sendToKafkaCounter) >> } >> >> override def process(key: String, >> ctx: Context, >> logs: Iterable[RawLog], >> out: Collector[OutputLog]): Unit = { >> if (ueState.value() != null) { >> processCounter.add(1L) >> val bid = ueState.value() >> val bidLog = >> gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType]) >> logs.foreach( log => { >> if (log.eventType == SHOW) { >> val showLog = >> gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType]) >> sendToKafkaCounter.add(1L) >> out.collect(new OutputLog(ThriftUtils.serialize(showLog), >> Utils.getOutputTopic(showLog))) >> } >> }) >> } else { >> invalidCounter.add(1L) >> } >> } >> } >> >> class JoinTrigger extends Trigger[RawLog, TimeWindow] { >> >> override def onElement(log: RawLog, >> timestamp: Long, >> window: TimeWindow, >> ctx: Trigger.TriggerContext): TriggerResult = { >> val ueState: ValueState[RawLog] = ctx.getPartitionedState( >> new ValueStateDescriptor[RawLog](bidState, classOf[RawLog]) >> ) >> val firstSeen: ValueState[Boolean] = ctx.getPartitionedState( >> new ValueStateDescriptor[Boolean](initState, classOf[Boolean])) >> >> if (!firstSeen.value()) { >> ctx.registerEventTimeTimer(window.getEnd) >> firstSeen.update(true) >> } >> val eventType = log.eventType >> if (eventType == BID) { >> ueState.update(log) >> TriggerResult.CONTINUE >> } else { >> if (ueState.value() == null) { >> TriggerResult.CONTINUE >> } else { >> TriggerResult.FIRE >> } >> } >> } >> >> override def onEventTime(timestamp: Long, >> window: TimeWindow, >> ctx: Trigger.TriggerContext): TriggerResult = { >> if (timestamp == window.getEnd) { >> TriggerResult.PURGE >> } >> TriggerResult.CONTINUE >> } >> >> override def onProcessingTime(timestamp: Long, >> window: TimeWindow, >> ctx: Trigger.TriggerContext): TriggerResult >> = { >> TriggerResult.CONTINUE >> } >> >> override def clear(window: TimeWindow, >> ctx: Trigger.TriggerContext): Unit = { >> val ueState: ValueState[RawLog] = ctx.getPartitionedState( >> new ValueStateDescriptor[RawLog](bidState, classOf[RawLog]) >> ) >> ueState.clear() >> >> val firstSeen: ValueState[Boolean] = ctx.getPartitionedState( >> new ValueStateDescriptor[Boolean](initState, classOf[Boolean])) >> firstSeen.clear() >> >> ctx.deleteEventTimeTimer(window.getEnd) >> } >> } >> >> class JoinEvictor extends Evictor[RawLog, TimeWindow] { >> >> override def evictBefore(elements: JIterable[TimestampedValue[RawLog]], >> size: Int, >> window: TimeWindow, >> evictorContext: Evictor.EvictorContext): Unit = {} >> >> override def evictAfter(elements: JIterable[TimestampedValue[RawLog]], >> size: Int, >> window: TimeWindow, >> evictorContext: Evictor.EvictorContext): Unit = { >> val iter = elements.iterator() >> while (iter.hasNext) { >> iter.next() >> iter.remove() >> } >> } >> } >> >> >> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四 下午7:18写道: >> >>> Thanks for the clarification. >>> >>> Can you also share the code of other parts, particularly MyFunction? >>> >>> Regards, >>> Roman >>> >>> >>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <unix...@gmail.com> wrote: >>> >>>> Rocksdb backend has the same problem >>>> >>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四 下午6:11写道: >>>> >>>>> Thanks for reporting this. >>>>> >>>>> Looks like the window namespace was replaced by VoidNamespace in state >>>>> entry. >>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to >>>>> further investigate it. >>>>> >>>>> Regards, >>>>> Roman >>>>> >>>>> >>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <unix...@gmail.com> wrote: >>>>> >>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and >>>>>> evictor. The state is stored to memory. >>>>>> >>>>>> input.setParallelism(processParallelism) >>>>>> .assignTimestampsAndWatermarks(new UETimeAssigner) >>>>>> .keyBy(_.key) >>>>>> .window(TumblingEventTimeWindows.of(Time.minutes(20))) >>>>>> .trigger(new MyTrigger) >>>>>> .evictor(new MyEvictor) >>>>>> .process(new MyFunction).setParallelism(aggregateParallelism) >>>>>> .addSink(kafkaSink).setParallelism(sinkParallelism) >>>>>> .name("kafka-record-sink") >>>>>> >>>>>> And the exception stack is here, could anyone help with this? Thanks! >>>>>> >>>>>> java.lang.Exception: Could not materialize checkpoint 1 for operator >>>>>> Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor, >>>>>> ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5). >>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException( >>>>>> StreamTask.java:1100) >>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042) >>>>>> at java.util.concurrent.ThreadPoolExecutor.runWorker( >>>>>> ThreadPoolExecutor.java:1149) >>>>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run( >>>>>> ThreadPoolExecutor.java:624) >>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang. >>>>>> ClassCastException: org.apache.flink.streaming.api.windowing.windows. >>>>>> TimeWindow cannot be cast to org.apache.flink.runtime.state. >>>>>> VoidNamespace >>>>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122) >>>>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) >>>>>> at org.apache.flink.runtime.concurrent.FutureUtils >>>>>> .runIfNotDoneAndGet(FutureUtils.java:450) >>>>>> at org.apache.flink.streaming.api.operators. >>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47) >>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011) >>>>>> ... 3 more >>>>>> Caused by: java.lang.ClassCastException: >>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot >>>>>> be cast to org.apache.flink.runtime.state.VoidNamespace >>>>>> at org.apache.flink.runtime.state.VoidNamespaceSerializer >>>>>> .serialize(VoidNamespaceSerializer.java:32) >>>>>> at org.apache.flink.runtime.state.heap. >>>>>> CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot >>>>>> .java:114) >>>>>> at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot >>>>>> .writeStateInKeyGroup(AbstractStateTableSnapshot.java:121) >>>>>> at org.apache.flink.runtime.state.heap. >>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup( >>>>>> CopyOnWriteStateTableSnapshot.java:37) >>>>>> at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1 >>>>>> .callInternal(HeapSnapshotStrategy.java:191) >>>>>> at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1 >>>>>> .callInternal(HeapSnapshotStrategy.java:158) >>>>>> at org.apache.flink.runtime.state.AsyncSnapshotCallable.call( >>>>>> AsyncSnapshotCallable.java:75) >>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>> at org.apache.flink.runtime.concurrent.FutureUtils >>>>>> .runIfNotDoneAndGet(FutureUtils.java:447) >>>>>> ... 5 more >>>>>> >>>>>> -- >>>>>> Best regards >>>>>> >>>>>> Sili Liu >>>>>> >>>>> >>>> >>>> -- >>>> Best regards >>>> >>>> Sili Liu >>>> >>> >> >> -- >> Best regards >> >> Sili Liu >> > -- Best regards Sili Liu