Thanks for your help 1. I started the job from scratch, not a savepoint or externalized checkpoint 2. No job graph change 3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 4. My Flink version is 1.9.1
Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 下午4:49写道: > I still wasn't able to reproduce the issue. > > Can you also clarify: > - Are you starting the job from a savepoint or externalized checkpoint? > - If yes, was the job graph changed? > - What StreamTimeCharacteristic is set, if any? > - What exact version of Flink do you use? > > Regards, > Roman > > > On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <unix...@gmail.com> wrote: > >> Hi, Thanks for your help. >> >> The checkpoint configuration is >> >> checkpoint.intervalMS=300000 >> checkpoint.timeoutMS=300000 >> >> The error callstack is from JM's log, which happened in every cp. >> Currently I don't have a success cp yet. >> >> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 上午3:50写道: >> >>> Hi, >>> >>> Thanks for the details. >>> However, I was not able to reproduce the issue. I used parallelism >>> levels 4, file system backend and tried different timings for >>> checkpointing, windowing and source. >>> Do you encounter this problem deterministically, is it always 1st >>> checkpoint? >>> What checkpointing interval do you use? >>> >>> Regards, >>> Roman >>> >>> >>> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <unix...@gmail.com> wrote: >>> >>>> Hi, this is our production code so I have to modify it a little bit, >>>> such as variable name and function name. I think 3 classes I provide here >>>> is enough. >>>> >>>> I try to join two streams, but I don't want to use the default join >>>> function, because I want to send the joined log immediately and remove it >>>> from window state immediately. And my window gap time is very long( 20 >>>> minutes), so it maybe evaluate it multiple times. >>>> >>>> class JoinFunction extends >>>> ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{ >>>> >>>> var ueState: ValueState[RawLog] = _ >>>> @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _ >>>> val invalidCounter = new LongCounter() >>>> val processCounter = new LongCounter() >>>> val sendToKafkaCounter = new LongCounter() >>>> >>>> override def open(parameters: Configuration): Unit = { >>>> ueState = getRuntimeContext.getState( >>>> new ValueStateDescriptor[RawLog](bidState, classOf[RawLog]) >>>> ) >>>> gZipThriftSerializer = new GZipThriftSerializer[MyType]() >>>> getRuntimeContext.addAccumulator("processCounter", this.processCounter) >>>> getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter) >>>> getRuntimeContext.addAccumulator("sendToKafkaCounter", >>>> this.sendToKafkaCounter) >>>> } >>>> >>>> override def process(key: String, >>>> ctx: Context, >>>> logs: Iterable[RawLog], >>>> out: Collector[OutputLog]): Unit = { >>>> if (ueState.value() != null) { >>>> processCounter.add(1L) >>>> val bid = ueState.value() >>>> val bidLog = >>>> gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType]) >>>> logs.foreach( log => { >>>> if (log.eventType == SHOW) { >>>> val showLog = >>>> gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType]) >>>> sendToKafkaCounter.add(1L) >>>> out.collect(new OutputLog(ThriftUtils.serialize(showLog), >>>> Utils.getOutputTopic(showLog))) >>>> } >>>> }) >>>> } else { >>>> invalidCounter.add(1L) >>>> } >>>> } >>>> } >>>> >>>> class JoinTrigger extends Trigger[RawLog, TimeWindow] { >>>> >>>> override def onElement(log: RawLog, >>>> timestamp: Long, >>>> window: TimeWindow, >>>> ctx: Trigger.TriggerContext): TriggerResult = { >>>> val ueState: ValueState[RawLog] = ctx.getPartitionedState( >>>> new ValueStateDescriptor[RawLog](bidState, classOf[RawLog]) >>>> ) >>>> val firstSeen: ValueState[Boolean] = ctx.getPartitionedState( >>>> new ValueStateDescriptor[Boolean](initState, classOf[Boolean])) >>>> >>>> if (!firstSeen.value()) { >>>> ctx.registerEventTimeTimer(window.getEnd) >>>> firstSeen.update(true) >>>> } >>>> val eventType = log.eventType >>>> if (eventType == BID) { >>>> ueState.update(log) >>>> TriggerResult.CONTINUE >>>> } else { >>>> if (ueState.value() == null) { >>>> TriggerResult.CONTINUE >>>> } else { >>>> TriggerResult.FIRE >>>> } >>>> } >>>> } >>>> >>>> override def onEventTime(timestamp: Long, >>>> window: TimeWindow, >>>> ctx: Trigger.TriggerContext): TriggerResult = { >>>> if (timestamp == window.getEnd) { >>>> TriggerResult.PURGE >>>> } >>>> TriggerResult.CONTINUE >>>> } >>>> >>>> override def onProcessingTime(timestamp: Long, >>>> window: TimeWindow, >>>> ctx: Trigger.TriggerContext): >>>> TriggerResult = { >>>> TriggerResult.CONTINUE >>>> } >>>> >>>> override def clear(window: TimeWindow, >>>> ctx: Trigger.TriggerContext): Unit = { >>>> val ueState: ValueState[RawLog] = ctx.getPartitionedState( >>>> new ValueStateDescriptor[RawLog](bidState, classOf[RawLog]) >>>> ) >>>> ueState.clear() >>>> >>>> val firstSeen: ValueState[Boolean] = ctx.getPartitionedState( >>>> new ValueStateDescriptor[Boolean](initState, classOf[Boolean])) >>>> firstSeen.clear() >>>> >>>> ctx.deleteEventTimeTimer(window.getEnd) >>>> } >>>> } >>>> >>>> class JoinEvictor extends Evictor[RawLog, TimeWindow] { >>>> >>>> override def evictBefore(elements: JIterable[TimestampedValue[RawLog]], >>>> size: Int, >>>> window: TimeWindow, >>>> evictorContext: Evictor.EvictorContext): Unit = >>>> {} >>>> >>>> override def evictAfter(elements: JIterable[TimestampedValue[RawLog]], >>>> size: Int, >>>> window: TimeWindow, >>>> evictorContext: Evictor.EvictorContext): Unit = >>>> { >>>> val iter = elements.iterator() >>>> while (iter.hasNext) { >>>> iter.next() >>>> iter.remove() >>>> } >>>> } >>>> } >>>> >>>> >>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四 下午7:18写道: >>>> >>>>> Thanks for the clarification. >>>>> >>>>> Can you also share the code of other parts, particularly MyFunction? >>>>> >>>>> Regards, >>>>> Roman >>>>> >>>>> >>>>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <unix...@gmail.com> wrote: >>>>> >>>>>> Rocksdb backend has the same problem >>>>>> >>>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四 >>>>>> 下午6:11写道: >>>>>> >>>>>>> Thanks for reporting this. >>>>>>> >>>>>>> Looks like the window namespace was replaced by VoidNamespace in >>>>>>> state entry. >>>>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to >>>>>>> further investigate it. >>>>>>> >>>>>>> Regards, >>>>>>> Roman >>>>>>> >>>>>>> >>>>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <unix...@gmail.com> wrote: >>>>>>> >>>>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and >>>>>>>> evictor. The state is stored to memory. >>>>>>>> >>>>>>>> input.setParallelism(processParallelism) >>>>>>>> .assignTimestampsAndWatermarks(new UETimeAssigner) >>>>>>>> .keyBy(_.key) >>>>>>>> .window(TumblingEventTimeWindows.of(Time.minutes(20))) >>>>>>>> .trigger(new MyTrigger) >>>>>>>> .evictor(new MyEvictor) >>>>>>>> .process(new >>>>>>>> MyFunction).setParallelism(aggregateParallelism) >>>>>>>> .addSink(kafkaSink).setParallelism(sinkParallelism) >>>>>>>> .name("kafka-record-sink") >>>>>>>> >>>>>>>> And the exception stack is here, could anyone help with this? >>>>>>>> Thanks! >>>>>>>> >>>>>>>> java.lang.Exception: Could not materialize checkpoint 1 for >>>>>>>> operator Window(TumblingEventTimeWindows(1200000), JoinTrigger, >>>>>>>> JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink: >>>>>>>> kafka-record-sink (2/5). >>>>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException( >>>>>>>> StreamTask.java:1100) >>>>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042) >>>>>>>> at java.util.concurrent.ThreadPoolExecutor.runWorker( >>>>>>>> ThreadPoolExecutor.java:1149) >>>>>>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run( >>>>>>>> ThreadPoolExecutor.java:624) >>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang. >>>>>>>> ClassCastException: >>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot >>>>>>>> be cast to org.apache.flink.runtime.state.VoidNamespace >>>>>>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122) >>>>>>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) >>>>>>>> at org.apache.flink.runtime.concurrent.FutureUtils >>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:450) >>>>>>>> at org.apache.flink.streaming.api.operators. >>>>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47) >>>>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011) >>>>>>>> ... 3 more >>>>>>>> Caused by: java.lang.ClassCastException: >>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot >>>>>>>> be cast to org.apache.flink.runtime.state.VoidNamespace >>>>>>>> at org.apache.flink.runtime.state.VoidNamespaceSerializer >>>>>>>> .serialize(VoidNamespaceSerializer.java:32) >>>>>>>> at org.apache.flink.runtime.state.heap. >>>>>>>> CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot >>>>>>>> .java:114) >>>>>>>> at org.apache.flink.runtime.state.heap. >>>>>>>> AbstractStateTableSnapshot.writeStateInKeyGroup( >>>>>>>> AbstractStateTableSnapshot.java:121) >>>>>>>> at org.apache.flink.runtime.state.heap. >>>>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup( >>>>>>>> CopyOnWriteStateTableSnapshot.java:37) >>>>>>>> at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1 >>>>>>>> .callInternal(HeapSnapshotStrategy.java:191) >>>>>>>> at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1 >>>>>>>> .callInternal(HeapSnapshotStrategy.java:158) >>>>>>>> at org.apache.flink.runtime.state.AsyncSnapshotCallable.call( >>>>>>>> AsyncSnapshotCallable.java:75) >>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>>>> at org.apache.flink.runtime.concurrent.FutureUtils >>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:447) >>>>>>>> ... 5 more >>>>>>>> >>>>>>>> -- >>>>>>>> Best regards >>>>>>>> >>>>>>>> Sili Liu >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Best regards >>>>>> >>>>>> Sili Liu >>>>>> >>>>> >>>> >>>> -- >>>> Best regards >>>> >>>> Sili Liu >>>> >>> >> >> -- >> Best regards >> >> Sili Liu >> > -- Best regards Sili Liu