Hi Si-li Thanks for the notice. I just want to double-check is the original problem has been solved? As I found that the created issue FLINK-18464 has been closed with reason "can not reproduce". Am I missing something here?
Best, Congxian Si-li Liu <unix...@gmail.com> 于2020年7月10日周五 下午6:06写道: > Sorry > > I can't reproduce it with reduce/aggregate/fold/apply and due to some > limitations in my working environment, I can't use flink 1.10 or 1.11. > > Congxian Qiu <qcx978132...@gmail.com> 于2020年7月5日周日 下午6:21写道: > >> Hi >> >> First, Could you please try this problem still there if use flink 1.10 or >> 1.11? >> >> It seems strange, from the error message, here is an error when trying to >> convert a non-Window state(VoidNameSpace) to a Window State (serializer is >> the serializer of Window state, but the state is non-Window state). >> Could you please try to replace the MyFuction with a >> reduce/aggregate/fold/apply() >> function to see what happens? -- this wants to narrow down the problem. >> >> Best, >> Congxian >> >> >> Si-li Liu <unix...@gmail.com> 于2020年7月3日周五 下午6:44写道: >> >>> Thanks for your help >>> >>> 1. I started the job from scratch, not a savepoint or externalized >>> checkpoint >>> 2. No job graph change >>> 3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>> 4. My Flink version is 1.9.1 >>> >>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 下午4:49写道: >>> >>>> I still wasn't able to reproduce the issue. >>>> >>>> Can you also clarify: >>>> - Are you starting the job from a savepoint or externalized checkpoint? >>>> - If yes, was the job graph changed? >>>> - What StreamTimeCharacteristic is set, if any? >>>> - What exact version of Flink do you use? >>>> >>>> Regards, >>>> Roman >>>> >>>> >>>> On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <unix...@gmail.com> wrote: >>>> >>>>> Hi, Thanks for your help. >>>>> >>>>> The checkpoint configuration is >>>>> >>>>> checkpoint.intervalMS=300000 >>>>> checkpoint.timeoutMS=300000 >>>>> >>>>> The error callstack is from JM's log, which happened in every cp. >>>>> Currently I don't have a success cp yet. >>>>> >>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 上午3:50写道: >>>>> >>>>>> Hi, >>>>>> >>>>>> Thanks for the details. >>>>>> However, I was not able to reproduce the issue. I used parallelism >>>>>> levels 4, file system backend and tried different timings for >>>>>> checkpointing, windowing and source. >>>>>> Do you encounter this problem deterministically, is it always 1st >>>>>> checkpoint? >>>>>> What checkpointing interval do you use? >>>>>> >>>>>> Regards, >>>>>> Roman >>>>>> >>>>>> >>>>>> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <unix...@gmail.com> wrote: >>>>>> >>>>>>> Hi, this is our production code so I have to modify it a little bit, >>>>>>> such as variable name and function name. I think 3 classes I provide >>>>>>> here >>>>>>> is enough. >>>>>>> >>>>>>> I try to join two streams, but I don't want to use the default join >>>>>>> function, because I want to send the joined log immediately and remove >>>>>>> it >>>>>>> from window state immediately. And my window gap time is very long( 20 >>>>>>> minutes), so it maybe evaluate it multiple times. >>>>>>> >>>>>>> class JoinFunction extends >>>>>>> ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{ >>>>>>> >>>>>>> var ueState: ValueState[RawLog] = _ >>>>>>> @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _ >>>>>>> val invalidCounter = new LongCounter() >>>>>>> val processCounter = new LongCounter() >>>>>>> val sendToKafkaCounter = new LongCounter() >>>>>>> >>>>>>> override def open(parameters: Configuration): Unit = { >>>>>>> ueState = getRuntimeContext.getState( >>>>>>> new ValueStateDescriptor[RawLog](bidState, classOf[RawLog]) >>>>>>> ) >>>>>>> gZipThriftSerializer = new GZipThriftSerializer[MyType]() >>>>>>> getRuntimeContext.addAccumulator("processCounter", >>>>>>> this.processCounter) >>>>>>> getRuntimeContext.addAccumulator("invalidCounter", >>>>>>> this.invalidCounter) >>>>>>> getRuntimeContext.addAccumulator("sendToKafkaCounter", >>>>>>> this.sendToKafkaCounter) >>>>>>> } >>>>>>> >>>>>>> override def process(key: String, >>>>>>> ctx: Context, >>>>>>> logs: Iterable[RawLog], >>>>>>> out: Collector[OutputLog]): Unit = { >>>>>>> if (ueState.value() != null) { >>>>>>> processCounter.add(1L) >>>>>>> val bid = ueState.value() >>>>>>> val bidLog = >>>>>>> gZipThriftSerializer.decompressAndDeserialize(bid.payload, >>>>>>> classOf[MyType]) >>>>>>> logs.foreach( log => { >>>>>>> if (log.eventType == SHOW) { >>>>>>> val showLog = >>>>>>> gZipThriftSerializer.decompressAndDeserialize(log.payload, >>>>>>> classOf[MyType]) >>>>>>> sendToKafkaCounter.add(1L) >>>>>>> out.collect(new OutputLog(ThriftUtils.serialize(showLog), >>>>>>> Utils.getOutputTopic(showLog))) >>>>>>> } >>>>>>> }) >>>>>>> } else { >>>>>>> invalidCounter.add(1L) >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> class JoinTrigger extends Trigger[RawLog, TimeWindow] { >>>>>>> >>>>>>> override def onElement(log: RawLog, >>>>>>> timestamp: Long, >>>>>>> window: TimeWindow, >>>>>>> ctx: Trigger.TriggerContext): TriggerResult = { >>>>>>> val ueState: ValueState[RawLog] = ctx.getPartitionedState( >>>>>>> new ValueStateDescriptor[RawLog](bidState, classOf[RawLog]) >>>>>>> ) >>>>>>> val firstSeen: ValueState[Boolean] = ctx.getPartitionedState( >>>>>>> new ValueStateDescriptor[Boolean](initState, classOf[Boolean])) >>>>>>> >>>>>>> if (!firstSeen.value()) { >>>>>>> ctx.registerEventTimeTimer(window.getEnd) >>>>>>> firstSeen.update(true) >>>>>>> } >>>>>>> val eventType = log.eventType >>>>>>> if (eventType == BID) { >>>>>>> ueState.update(log) >>>>>>> TriggerResult.CONTINUE >>>>>>> } else { >>>>>>> if (ueState.value() == null) { >>>>>>> TriggerResult.CONTINUE >>>>>>> } else { >>>>>>> TriggerResult.FIRE >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> override def onEventTime(timestamp: Long, >>>>>>> window: TimeWindow, >>>>>>> ctx: Trigger.TriggerContext): TriggerResult >>>>>>> = { >>>>>>> if (timestamp == window.getEnd) { >>>>>>> TriggerResult.PURGE >>>>>>> } >>>>>>> TriggerResult.CONTINUE >>>>>>> } >>>>>>> >>>>>>> override def onProcessingTime(timestamp: Long, >>>>>>> window: TimeWindow, >>>>>>> ctx: Trigger.TriggerContext): >>>>>>> TriggerResult = { >>>>>>> TriggerResult.CONTINUE >>>>>>> } >>>>>>> >>>>>>> override def clear(window: TimeWindow, >>>>>>> ctx: Trigger.TriggerContext): Unit = { >>>>>>> val ueState: ValueState[RawLog] = ctx.getPartitionedState( >>>>>>> new ValueStateDescriptor[RawLog](bidState, classOf[RawLog]) >>>>>>> ) >>>>>>> ueState.clear() >>>>>>> >>>>>>> val firstSeen: ValueState[Boolean] = ctx.getPartitionedState( >>>>>>> new ValueStateDescriptor[Boolean](initState, classOf[Boolean])) >>>>>>> firstSeen.clear() >>>>>>> >>>>>>> ctx.deleteEventTimeTimer(window.getEnd) >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> class JoinEvictor extends Evictor[RawLog, TimeWindow] { >>>>>>> >>>>>>> override def evictBefore(elements: >>>>>>> JIterable[TimestampedValue[RawLog]], >>>>>>> size: Int, >>>>>>> window: TimeWindow, >>>>>>> evictorContext: Evictor.EvictorContext): >>>>>>> Unit = {} >>>>>>> >>>>>>> override def evictAfter(elements: JIterable[TimestampedValue[RawLog]], >>>>>>> size: Int, >>>>>>> window: TimeWindow, >>>>>>> evictorContext: Evictor.EvictorContext): >>>>>>> Unit = { >>>>>>> val iter = elements.iterator() >>>>>>> while (iter.hasNext) { >>>>>>> iter.next() >>>>>>> iter.remove() >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四 >>>>>>> 下午7:18写道: >>>>>>> >>>>>>>> Thanks for the clarification. >>>>>>>> >>>>>>>> Can you also share the code of other parts, particularly MyFunction? >>>>>>>> >>>>>>>> Regards, >>>>>>>> Roman >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <unix...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Rocksdb backend has the same problem >>>>>>>>> >>>>>>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四 >>>>>>>>> 下午6:11写道: >>>>>>>>> >>>>>>>>>> Thanks for reporting this. >>>>>>>>>> >>>>>>>>>> Looks like the window namespace was replaced by VoidNamespace in >>>>>>>>>> state entry. >>>>>>>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to >>>>>>>>>> further investigate it. >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Roman >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <unix...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and >>>>>>>>>>> evictor. The state is stored to memory. >>>>>>>>>>> >>>>>>>>>>> input.setParallelism(processParallelism) >>>>>>>>>>> .assignTimestampsAndWatermarks(new UETimeAssigner) >>>>>>>>>>> .keyBy(_.key) >>>>>>>>>>> .window(TumblingEventTimeWindows.of(Time.minutes(20))) >>>>>>>>>>> .trigger(new MyTrigger) >>>>>>>>>>> .evictor(new MyEvictor) >>>>>>>>>>> .process(new >>>>>>>>>>> MyFunction).setParallelism(aggregateParallelism) >>>>>>>>>>> .addSink(kafkaSink).setParallelism(sinkParallelism) >>>>>>>>>>> .name("kafka-record-sink") >>>>>>>>>>> >>>>>>>>>>> And the exception stack is here, could anyone help with this? >>>>>>>>>>> Thanks! >>>>>>>>>>> >>>>>>>>>>> java.lang.Exception: Could not materialize checkpoint 1 for >>>>>>>>>>> operator Window(TumblingEventTimeWindows(1200000), JoinTrigger, >>>>>>>>>>> JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink: >>>>>>>>>>> kafka-record-sink (2/5). >>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException( >>>>>>>>>>> StreamTask.java:1100) >>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042) >>>>>>>>>>> at java.util.concurrent.ThreadPoolExecutor.runWorker( >>>>>>>>>>> ThreadPoolExecutor.java:1149) >>>>>>>>>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run( >>>>>>>>>>> ThreadPoolExecutor.java:624) >>>>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang. >>>>>>>>>>> ClassCastException: >>>>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow >>>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace >>>>>>>>>>> at java.util.concurrent.FutureTask.report(FutureTask.java: >>>>>>>>>>> 122) >>>>>>>>>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) >>>>>>>>>>> at org.apache.flink.runtime.concurrent.FutureUtils >>>>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:450) >>>>>>>>>>> at org.apache.flink.streaming.api.operators. >>>>>>>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java: >>>>>>>>>>> 47) >>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011) >>>>>>>>>>> ... 3 more >>>>>>>>>>> Caused by: java.lang.ClassCastException: >>>>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow >>>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace >>>>>>>>>>> at org.apache.flink.runtime.state.VoidNamespaceSerializer >>>>>>>>>>> .serialize(VoidNamespaceSerializer.java:32) >>>>>>>>>>> at org.apache.flink.runtime.state.heap. >>>>>>>>>>> CopyOnWriteStateMapSnapshot.writeState( >>>>>>>>>>> CopyOnWriteStateMapSnapshot.java:114) >>>>>>>>>>> at org.apache.flink.runtime.state.heap. >>>>>>>>>>> AbstractStateTableSnapshot.writeStateInKeyGroup( >>>>>>>>>>> AbstractStateTableSnapshot.java:121) >>>>>>>>>>> at org.apache.flink.runtime.state.heap. >>>>>>>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup( >>>>>>>>>>> CopyOnWriteStateTableSnapshot.java:37) >>>>>>>>>>> at org.apache.flink.runtime.state.heap. >>>>>>>>>>> HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java: >>>>>>>>>>> 191) >>>>>>>>>>> at org.apache.flink.runtime.state.heap. >>>>>>>>>>> HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java: >>>>>>>>>>> 158) >>>>>>>>>>> at org.apache.flink.runtime.state.AsyncSnapshotCallable >>>>>>>>>>> .call(AsyncSnapshotCallable.java:75) >>>>>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>>>>>>> at org.apache.flink.runtime.concurrent.FutureUtils >>>>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:447) >>>>>>>>>>> ... 5 more >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Best regards >>>>>>>>>>> >>>>>>>>>>> Sili Liu >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Best regards >>>>>>>>> >>>>>>>>> Sili Liu >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best regards >>>>>>> >>>>>>> Sili Liu >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> Best regards >>>>> >>>>> Sili Liu >>>>> >>>> >>> >>> -- >>> Best regards >>> >>> Sili Liu >>> >> > > -- > Best regards > > Sili Liu >