Hi, Thanks for your help.

The checkpoint configuration is

checkpoint.intervalMS=300000
checkpoint.timeoutMS=300000

The error callstack is from JM's log, which happened in every cp. Currently
I don't have a success cp yet.

Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 上午3:50写道:

> Hi,
>
> Thanks for the details.
> However, I was not able to reproduce the issue. I used parallelism levels
> 4, file system backend and tried different timings for
> checkpointing, windowing and source.
> Do you encounter this problem deterministically, is it always 1st
> checkpoint?
> What checkpointing interval do you use?
>
> Regards,
> Roman
>
>
> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <unix...@gmail.com> wrote:
>
>> Hi, this is our production code so I have to modify it a little bit, such
>> as variable name and function name. I think 3 classes I provide here is
>> enough.
>>
>> I try to join two streams, but I don't want to use the default join
>> function, because I want to send the joined log immediately and remove it
>> from window state immediately. And my window gap time is very long( 20
>> minutes), so it maybe evaluate it multiple times.
>>
>> class JoinFunction extends
>>   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{
>>
>>   var ueState: ValueState[RawLog] = _
>>   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
>>   val invalidCounter = new LongCounter()
>>   val processCounter = new LongCounter()
>>   val sendToKafkaCounter = new LongCounter()
>>
>>   override def open(parameters: Configuration): Unit = {
>>     ueState = getRuntimeContext.getState(
>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>     )
>>     gZipThriftSerializer = new GZipThriftSerializer[MyType]()
>>     getRuntimeContext.addAccumulator("processCounter", this.processCounter)
>>     getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
>>     getRuntimeContext.addAccumulator("sendToKafkaCounter", 
>> this.sendToKafkaCounter)
>>   }
>>
>>   override def process(key: String,
>>                        ctx: Context,
>>                        logs: Iterable[RawLog],
>>                        out: Collector[OutputLog]): Unit = {
>>     if (ueState.value() != null) {
>>       processCounter.add(1L)
>>       val bid = ueState.value()
>>       val bidLog = 
>> gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType])
>>       logs.foreach( log => {
>>         if (log.eventType == SHOW) {
>>           val showLog = 
>> gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType])
>>           sendToKafkaCounter.add(1L)
>>           out.collect(new OutputLog(ThriftUtils.serialize(showLog), 
>> Utils.getOutputTopic(showLog)))
>>         }
>>       })
>>     } else {
>>       invalidCounter.add(1L)
>>     }
>>   }
>> }
>>
>> class JoinTrigger extends Trigger[RawLog, TimeWindow] {
>>
>>   override def onElement(log: RawLog,
>>                          timestamp: Long,
>>                          window: TimeWindow,
>>                          ctx: Trigger.TriggerContext): TriggerResult = {
>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>     )
>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>
>>     if (!firstSeen.value()) {
>>       ctx.registerEventTimeTimer(window.getEnd)
>>       firstSeen.update(true)
>>     }
>>     val eventType = log.eventType
>>     if (eventType == BID) {
>>       ueState.update(log)
>>       TriggerResult.CONTINUE
>>     } else {
>>       if (ueState.value() == null) {
>>         TriggerResult.CONTINUE
>>       } else {
>>         TriggerResult.FIRE
>>       }
>>     }
>>   }
>>
>>   override def onEventTime(timestamp: Long,
>>                            window: TimeWindow,
>>                            ctx: Trigger.TriggerContext): TriggerResult = {
>>     if (timestamp == window.getEnd) {
>>       TriggerResult.PURGE
>>     }
>>     TriggerResult.CONTINUE
>>   }
>>
>>   override def onProcessingTime(timestamp: Long,
>>                                 window: TimeWindow,
>>                                 ctx: Trigger.TriggerContext): TriggerResult 
>> = {
>>     TriggerResult.CONTINUE
>>   }
>>
>>   override def clear(window: TimeWindow,
>>                      ctx: Trigger.TriggerContext): Unit = {
>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>     )
>>     ueState.clear()
>>
>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>     firstSeen.clear()
>>
>>     ctx.deleteEventTimeTimer(window.getEnd)
>>   }
>> }
>>
>> class JoinEvictor extends Evictor[RawLog, TimeWindow] {
>>
>>   override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
>>                            size: Int,
>>                            window: TimeWindow,
>>                            evictorContext: Evictor.EvictorContext): Unit = {}
>>
>>   override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
>>                            size: Int,
>>                            window: TimeWindow,
>>                            evictorContext: Evictor.EvictorContext): Unit = {
>>     val iter = elements.iterator()
>>     while (iter.hasNext) {
>>       iter.next()
>>       iter.remove()
>>     }
>>   }
>> }
>>
>>
>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四 下午7:18写道:
>>
>>> Thanks for the clarification.
>>>
>>> Can you also share the code of other parts, particularly MyFunction?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <unix...@gmail.com> wrote:
>>>
>>>> Rocksdb backend has the same problem
>>>>
>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四 下午6:11写道:
>>>>
>>>>> Thanks for reporting this.
>>>>>
>>>>> Looks like the window namespace was replaced by VoidNamespace in state
>>>>> entry.
>>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to
>>>>> further investigate it.
>>>>>
>>>>> Regards,
>>>>> Roman
>>>>>
>>>>>
>>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <unix...@gmail.com> wrote:
>>>>>
>>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and
>>>>>> evictor. The state is stored to memory.
>>>>>>
>>>>>> input.setParallelism(processParallelism)
>>>>>>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>>>>>>         .keyBy(_.key)
>>>>>>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>>>>>>         .trigger(new MyTrigger)
>>>>>>         .evictor(new MyEvictor)
>>>>>>         .process(new MyFunction).setParallelism(aggregateParallelism)
>>>>>>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>>>>>>         .name("kafka-record-sink")
>>>>>>
>>>>>> And the exception stack is here, could anyone help with this? Thanks!
>>>>>>
>>>>>> java.lang.Exception: Could not materialize checkpoint 1 for operator
>>>>>> Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor,
>>>>>> ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException(
>>>>>> StreamTask.java:1100)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>> ThreadPoolExecutor.java:1149)
>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>>> ThreadPoolExecutor.java:624)
>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.
>>>>>> ClassCastException: org.apache.flink.streaming.api.windowing.windows.
>>>>>> TimeWindow cannot be cast to org.apache.flink.runtime.state.
>>>>>> VoidNamespace
>>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>> .runIfNotDoneAndGet(FutureUtils.java:450)
>>>>>>     at org.apache.flink.streaming.api.operators.
>>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>>>>>     ... 3 more
>>>>>> Caused by: java.lang.ClassCastException:
>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot
>>>>>> be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>     at org.apache.flink.runtime.state.VoidNamespaceSerializer
>>>>>> .serialize(VoidNamespaceSerializer.java:32)
>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>> CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot
>>>>>> .java:114)
>>>>>>     at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot
>>>>>> .writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(
>>>>>> CopyOnWriteStateTableSnapshot.java:37)
>>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>>> .callInternal(HeapSnapshotStrategy.java:191)
>>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>>> .callInternal(HeapSnapshotStrategy.java:158)
>>>>>>     at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(
>>>>>> AsyncSnapshotCallable.java:75)
>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>> .runIfNotDoneAndGet(FutureUtils.java:447)
>>>>>>     ... 5 more
>>>>>>
>>>>>> --
>>>>>> Best regards
>>>>>>
>>>>>> Sili Liu
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best regards
>>>>
>>>> Sili Liu
>>>>
>>>
>>
>> --
>> Best regards
>>
>> Sili Liu
>>
>

-- 
Best regards

Sili Liu

Reply via email to