Sorry I can't reproduce it with reduce/aggregate/fold/apply and due to some limitations in my working environment, I can't use flink 1.10 or 1.11.
Congxian Qiu <qcx978132...@gmail.com> 于2020年7月5日周日 下午6:21写道: > Hi > > First, Could you please try this problem still there if use flink 1.10 or > 1.11? > > It seems strange, from the error message, here is an error when trying to > convert a non-Window state(VoidNameSpace) to a Window State (serializer is > the serializer of Window state, but the state is non-Window state). > Could you please try to replace the MyFuction with a > reduce/aggregate/fold/apply() > function to see what happens? -- this wants to narrow down the problem. > > Best, > Congxian > > > Si-li Liu <unix...@gmail.com> 于2020年7月3日周五 下午6:44写道: > >> Thanks for your help >> >> 1. I started the job from scratch, not a savepoint or externalized >> checkpoint >> 2. No job graph change >> 3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> 4. My Flink version is 1.9.1 >> >> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 下午4:49写道: >> >>> I still wasn't able to reproduce the issue. >>> >>> Can you also clarify: >>> - Are you starting the job from a savepoint or externalized checkpoint? >>> - If yes, was the job graph changed? >>> - What StreamTimeCharacteristic is set, if any? >>> - What exact version of Flink do you use? >>> >>> Regards, >>> Roman >>> >>> >>> On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <unix...@gmail.com> wrote: >>> >>>> Hi, Thanks for your help. >>>> >>>> The checkpoint configuration is >>>> >>>> checkpoint.intervalMS=300000 >>>> checkpoint.timeoutMS=300000 >>>> >>>> The error callstack is from JM's log, which happened in every cp. >>>> Currently I don't have a success cp yet. >>>> >>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 上午3:50写道: >>>> >>>>> Hi, >>>>> >>>>> Thanks for the details. >>>>> However, I was not able to reproduce the issue. I used parallelism >>>>> levels 4, file system backend and tried different timings for >>>>> checkpointing, windowing and source. >>>>> Do you encounter this problem deterministically, is it always 1st >>>>> checkpoint? >>>>> What checkpointing interval do you use? >>>>> >>>>> Regards, >>>>> Roman >>>>> >>>>> >>>>> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <unix...@gmail.com> wrote: >>>>> >>>>>> Hi, this is our production code so I have to modify it a little bit, >>>>>> such as variable name and function name. I think 3 classes I provide here >>>>>> is enough. >>>>>> >>>>>> I try to join two streams, but I don't want to use the default join >>>>>> function, because I want to send the joined log immediately and remove it >>>>>> from window state immediately. And my window gap time is very long( 20 >>>>>> minutes), so it maybe evaluate it multiple times. >>>>>> >>>>>> class JoinFunction extends >>>>>> ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{ >>>>>> >>>>>> var ueState: ValueState[RawLog] = _ >>>>>> @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _ >>>>>> val invalidCounter = new LongCounter() >>>>>> val processCounter = new LongCounter() >>>>>> val sendToKafkaCounter = new LongCounter() >>>>>> >>>>>> override def open(parameters: Configuration): Unit = { >>>>>> ueState = getRuntimeContext.getState( >>>>>> new ValueStateDescriptor[RawLog](bidState, classOf[RawLog]) >>>>>> ) >>>>>> gZipThriftSerializer = new GZipThriftSerializer[MyType]() >>>>>> getRuntimeContext.addAccumulator("processCounter", >>>>>> this.processCounter) >>>>>> getRuntimeContext.addAccumulator("invalidCounter", >>>>>> this.invalidCounter) >>>>>> getRuntimeContext.addAccumulator("sendToKafkaCounter", >>>>>> this.sendToKafkaCounter) >>>>>> } >>>>>> >>>>>> override def process(key: String, >>>>>> ctx: Context, >>>>>> logs: Iterable[RawLog], >>>>>> out: Collector[OutputLog]): Unit = { >>>>>> if (ueState.value() != null) { >>>>>> processCounter.add(1L) >>>>>> val bid = ueState.value() >>>>>> val bidLog = >>>>>> gZipThriftSerializer.decompressAndDeserialize(bid.payload, >>>>>> classOf[MyType]) >>>>>> logs.foreach( log => { >>>>>> if (log.eventType == SHOW) { >>>>>> val showLog = >>>>>> gZipThriftSerializer.decompressAndDeserialize(log.payload, >>>>>> classOf[MyType]) >>>>>> sendToKafkaCounter.add(1L) >>>>>> out.collect(new OutputLog(ThriftUtils.serialize(showLog), >>>>>> Utils.getOutputTopic(showLog))) >>>>>> } >>>>>> }) >>>>>> } else { >>>>>> invalidCounter.add(1L) >>>>>> } >>>>>> } >>>>>> } >>>>>> >>>>>> class JoinTrigger extends Trigger[RawLog, TimeWindow] { >>>>>> >>>>>> override def onElement(log: RawLog, >>>>>> timestamp: Long, >>>>>> window: TimeWindow, >>>>>> ctx: Trigger.TriggerContext): TriggerResult = { >>>>>> val ueState: ValueState[RawLog] = ctx.getPartitionedState( >>>>>> new ValueStateDescriptor[RawLog](bidState, classOf[RawLog]) >>>>>> ) >>>>>> val firstSeen: ValueState[Boolean] = ctx.getPartitionedState( >>>>>> new ValueStateDescriptor[Boolean](initState, classOf[Boolean])) >>>>>> >>>>>> if (!firstSeen.value()) { >>>>>> ctx.registerEventTimeTimer(window.getEnd) >>>>>> firstSeen.update(true) >>>>>> } >>>>>> val eventType = log.eventType >>>>>> if (eventType == BID) { >>>>>> ueState.update(log) >>>>>> TriggerResult.CONTINUE >>>>>> } else { >>>>>> if (ueState.value() == null) { >>>>>> TriggerResult.CONTINUE >>>>>> } else { >>>>>> TriggerResult.FIRE >>>>>> } >>>>>> } >>>>>> } >>>>>> >>>>>> override def onEventTime(timestamp: Long, >>>>>> window: TimeWindow, >>>>>> ctx: Trigger.TriggerContext): TriggerResult = >>>>>> { >>>>>> if (timestamp == window.getEnd) { >>>>>> TriggerResult.PURGE >>>>>> } >>>>>> TriggerResult.CONTINUE >>>>>> } >>>>>> >>>>>> override def onProcessingTime(timestamp: Long, >>>>>> window: TimeWindow, >>>>>> ctx: Trigger.TriggerContext): >>>>>> TriggerResult = { >>>>>> TriggerResult.CONTINUE >>>>>> } >>>>>> >>>>>> override def clear(window: TimeWindow, >>>>>> ctx: Trigger.TriggerContext): Unit = { >>>>>> val ueState: ValueState[RawLog] = ctx.getPartitionedState( >>>>>> new ValueStateDescriptor[RawLog](bidState, classOf[RawLog]) >>>>>> ) >>>>>> ueState.clear() >>>>>> >>>>>> val firstSeen: ValueState[Boolean] = ctx.getPartitionedState( >>>>>> new ValueStateDescriptor[Boolean](initState, classOf[Boolean])) >>>>>> firstSeen.clear() >>>>>> >>>>>> ctx.deleteEventTimeTimer(window.getEnd) >>>>>> } >>>>>> } >>>>>> >>>>>> class JoinEvictor extends Evictor[RawLog, TimeWindow] { >>>>>> >>>>>> override def evictBefore(elements: JIterable[TimestampedValue[RawLog]], >>>>>> size: Int, >>>>>> window: TimeWindow, >>>>>> evictorContext: Evictor.EvictorContext): Unit >>>>>> = {} >>>>>> >>>>>> override def evictAfter(elements: JIterable[TimestampedValue[RawLog]], >>>>>> size: Int, >>>>>> window: TimeWindow, >>>>>> evictorContext: Evictor.EvictorContext): Unit >>>>>> = { >>>>>> val iter = elements.iterator() >>>>>> while (iter.hasNext) { >>>>>> iter.next() >>>>>> iter.remove() >>>>>> } >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四 >>>>>> 下午7:18写道: >>>>>> >>>>>>> Thanks for the clarification. >>>>>>> >>>>>>> Can you also share the code of other parts, particularly MyFunction? >>>>>>> >>>>>>> Regards, >>>>>>> Roman >>>>>>> >>>>>>> >>>>>>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <unix...@gmail.com> wrote: >>>>>>> >>>>>>>> Rocksdb backend has the same problem >>>>>>>> >>>>>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四 >>>>>>>> 下午6:11写道: >>>>>>>> >>>>>>>>> Thanks for reporting this. >>>>>>>>> >>>>>>>>> Looks like the window namespace was replaced by VoidNamespace in >>>>>>>>> state entry. >>>>>>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to >>>>>>>>> further investigate it. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Roman >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <unix...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and >>>>>>>>>> evictor. The state is stored to memory. >>>>>>>>>> >>>>>>>>>> input.setParallelism(processParallelism) >>>>>>>>>> .assignTimestampsAndWatermarks(new UETimeAssigner) >>>>>>>>>> .keyBy(_.key) >>>>>>>>>> .window(TumblingEventTimeWindows.of(Time.minutes(20))) >>>>>>>>>> .trigger(new MyTrigger) >>>>>>>>>> .evictor(new MyEvictor) >>>>>>>>>> .process(new >>>>>>>>>> MyFunction).setParallelism(aggregateParallelism) >>>>>>>>>> .addSink(kafkaSink).setParallelism(sinkParallelism) >>>>>>>>>> .name("kafka-record-sink") >>>>>>>>>> >>>>>>>>>> And the exception stack is here, could anyone help with this? >>>>>>>>>> Thanks! >>>>>>>>>> >>>>>>>>>> java.lang.Exception: Could not materialize checkpoint 1 for >>>>>>>>>> operator Window(TumblingEventTimeWindows(1200000), JoinTrigger, >>>>>>>>>> JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink: >>>>>>>>>> kafka-record-sink (2/5). >>>>>>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException( >>>>>>>>>> StreamTask.java:1100) >>>>>>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042) >>>>>>>>>> at java.util.concurrent.ThreadPoolExecutor.runWorker( >>>>>>>>>> ThreadPoolExecutor.java:1149) >>>>>>>>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run( >>>>>>>>>> ThreadPoolExecutor.java:624) >>>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang. >>>>>>>>>> ClassCastException: >>>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow >>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace >>>>>>>>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122 >>>>>>>>>> ) >>>>>>>>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) >>>>>>>>>> at org.apache.flink.runtime.concurrent.FutureUtils >>>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:450) >>>>>>>>>> at org.apache.flink.streaming.api.operators. >>>>>>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java: >>>>>>>>>> 47) >>>>>>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011) >>>>>>>>>> ... 3 more >>>>>>>>>> Caused by: java.lang.ClassCastException: >>>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow >>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace >>>>>>>>>> at org.apache.flink.runtime.state.VoidNamespaceSerializer >>>>>>>>>> .serialize(VoidNamespaceSerializer.java:32) >>>>>>>>>> at org.apache.flink.runtime.state.heap. >>>>>>>>>> CopyOnWriteStateMapSnapshot.writeState( >>>>>>>>>> CopyOnWriteStateMapSnapshot.java:114) >>>>>>>>>> at org.apache.flink.runtime.state.heap. >>>>>>>>>> AbstractStateTableSnapshot.writeStateInKeyGroup( >>>>>>>>>> AbstractStateTableSnapshot.java:121) >>>>>>>>>> at org.apache.flink.runtime.state.heap. >>>>>>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup( >>>>>>>>>> CopyOnWriteStateTableSnapshot.java:37) >>>>>>>>>> at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1 >>>>>>>>>> .callInternal(HeapSnapshotStrategy.java:191) >>>>>>>>>> at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1 >>>>>>>>>> .callInternal(HeapSnapshotStrategy.java:158) >>>>>>>>>> at org.apache.flink.runtime.state.AsyncSnapshotCallable.call( >>>>>>>>>> AsyncSnapshotCallable.java:75) >>>>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>>>>>> at org.apache.flink.runtime.concurrent.FutureUtils >>>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:447) >>>>>>>>>> ... 5 more >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Best regards >>>>>>>>>> >>>>>>>>>> Sili Liu >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best regards >>>>>>>> >>>>>>>> Sili Liu >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Best regards >>>>>> >>>>>> Sili Liu >>>>>> >>>>> >>>> >>>> -- >>>> Best regards >>>> >>>> Sili Liu >>>> >>> >> >> -- >> Best regards >> >> Sili Liu >> > -- Best regards Sili Liu