Thanks for your help

1. I started the job from scratch, not a savepoint or externalized
checkpoint
2. No job graph change
3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
4. My Flink version is 1.9.1

Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 下午4:49写道:

> I still wasn't able to reproduce the issue.
>
> Can you also clarify:
> - Are you starting the job from a savepoint or externalized checkpoint?
> - If yes, was the job graph changed?
> - What StreamTimeCharacteristic is set, if any?
> - What exact version of Flink do you use?
>
> Regards,
> Roman
>
>
> On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <unix...@gmail.com> wrote:
>
>> Hi, Thanks for your help.
>>
>> The checkpoint configuration is
>>
>> checkpoint.intervalMS=300000
>> checkpoint.timeoutMS=300000
>>
>> The error callstack is from JM's log, which happened in every cp.
>> Currently I don't have a success cp yet.
>>
>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 上午3:50写道:
>>
>>> Hi,
>>>
>>> Thanks for the details.
>>> However, I was not able to reproduce the issue. I used parallelism
>>> levels 4, file system backend and tried different timings for
>>> checkpointing, windowing and source.
>>> Do you encounter this problem deterministically, is it always 1st
>>> checkpoint?
>>> What checkpointing interval do you use?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <unix...@gmail.com> wrote:
>>>
>>>> Hi, this is our production code so I have to modify it a little bit,
>>>> such as variable name and function name. I think 3 classes I provide here
>>>> is enough.
>>>>
>>>> I try to join two streams, but I don't want to use the default join
>>>> function, because I want to send the joined log immediately and remove it
>>>> from window state immediately. And my window gap time is very long( 20
>>>> minutes), so it maybe evaluate it multiple times.
>>>>
>>>> class JoinFunction extends
>>>>   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{
>>>>
>>>>   var ueState: ValueState[RawLog] = _
>>>>   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
>>>>   val invalidCounter = new LongCounter()
>>>>   val processCounter = new LongCounter()
>>>>   val sendToKafkaCounter = new LongCounter()
>>>>
>>>>   override def open(parameters: Configuration): Unit = {
>>>>     ueState = getRuntimeContext.getState(
>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>     )
>>>>     gZipThriftSerializer = new GZipThriftSerializer[MyType]()
>>>>     getRuntimeContext.addAccumulator("processCounter", this.processCounter)
>>>>     getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
>>>>     getRuntimeContext.addAccumulator("sendToKafkaCounter", 
>>>> this.sendToKafkaCounter)
>>>>   }
>>>>
>>>>   override def process(key: String,
>>>>                        ctx: Context,
>>>>                        logs: Iterable[RawLog],
>>>>                        out: Collector[OutputLog]): Unit = {
>>>>     if (ueState.value() != null) {
>>>>       processCounter.add(1L)
>>>>       val bid = ueState.value()
>>>>       val bidLog = 
>>>> gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType])
>>>>       logs.foreach( log => {
>>>>         if (log.eventType == SHOW) {
>>>>           val showLog = 
>>>> gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType])
>>>>           sendToKafkaCounter.add(1L)
>>>>           out.collect(new OutputLog(ThriftUtils.serialize(showLog), 
>>>> Utils.getOutputTopic(showLog)))
>>>>         }
>>>>       })
>>>>     } else {
>>>>       invalidCounter.add(1L)
>>>>     }
>>>>   }
>>>> }
>>>>
>>>> class JoinTrigger extends Trigger[RawLog, TimeWindow] {
>>>>
>>>>   override def onElement(log: RawLog,
>>>>                          timestamp: Long,
>>>>                          window: TimeWindow,
>>>>                          ctx: Trigger.TriggerContext): TriggerResult = {
>>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>     )
>>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>>
>>>>     if (!firstSeen.value()) {
>>>>       ctx.registerEventTimeTimer(window.getEnd)
>>>>       firstSeen.update(true)
>>>>     }
>>>>     val eventType = log.eventType
>>>>     if (eventType == BID) {
>>>>       ueState.update(log)
>>>>       TriggerResult.CONTINUE
>>>>     } else {
>>>>       if (ueState.value() == null) {
>>>>         TriggerResult.CONTINUE
>>>>       } else {
>>>>         TriggerResult.FIRE
>>>>       }
>>>>     }
>>>>   }
>>>>
>>>>   override def onEventTime(timestamp: Long,
>>>>                            window: TimeWindow,
>>>>                            ctx: Trigger.TriggerContext): TriggerResult = {
>>>>     if (timestamp == window.getEnd) {
>>>>       TriggerResult.PURGE
>>>>     }
>>>>     TriggerResult.CONTINUE
>>>>   }
>>>>
>>>>   override def onProcessingTime(timestamp: Long,
>>>>                                 window: TimeWindow,
>>>>                                 ctx: Trigger.TriggerContext): 
>>>> TriggerResult = {
>>>>     TriggerResult.CONTINUE
>>>>   }
>>>>
>>>>   override def clear(window: TimeWindow,
>>>>                      ctx: Trigger.TriggerContext): Unit = {
>>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>     )
>>>>     ueState.clear()
>>>>
>>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>>     firstSeen.clear()
>>>>
>>>>     ctx.deleteEventTimeTimer(window.getEnd)
>>>>   }
>>>> }
>>>>
>>>> class JoinEvictor extends Evictor[RawLog, TimeWindow] {
>>>>
>>>>   override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
>>>>                            size: Int,
>>>>                            window: TimeWindow,
>>>>                            evictorContext: Evictor.EvictorContext): Unit = 
>>>> {}
>>>>
>>>>   override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
>>>>                            size: Int,
>>>>                            window: TimeWindow,
>>>>                            evictorContext: Evictor.EvictorContext): Unit = 
>>>> {
>>>>     val iter = elements.iterator()
>>>>     while (iter.hasNext) {
>>>>       iter.next()
>>>>       iter.remove()
>>>>     }
>>>>   }
>>>> }
>>>>
>>>>
>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四 下午7:18写道:
>>>>
>>>>> Thanks for the clarification.
>>>>>
>>>>> Can you also share the code of other parts, particularly MyFunction?
>>>>>
>>>>> Regards,
>>>>> Roman
>>>>>
>>>>>
>>>>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <unix...@gmail.com> wrote:
>>>>>
>>>>>> Rocksdb backend has the same problem
>>>>>>
>>>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四
>>>>>> 下午6:11写道:
>>>>>>
>>>>>>> Thanks for reporting this.
>>>>>>>
>>>>>>> Looks like the window namespace was replaced by VoidNamespace in
>>>>>>> state entry.
>>>>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to
>>>>>>> further investigate it.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Roman
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <unix...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and
>>>>>>>> evictor. The state is stored to memory.
>>>>>>>>
>>>>>>>> input.setParallelism(processParallelism)
>>>>>>>>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>>>>>>>>         .keyBy(_.key)
>>>>>>>>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>>>>>>>>         .trigger(new MyTrigger)
>>>>>>>>         .evictor(new MyEvictor)
>>>>>>>>         .process(new
>>>>>>>> MyFunction).setParallelism(aggregateParallelism)
>>>>>>>>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>>>>>>>>         .name("kafka-record-sink")
>>>>>>>>
>>>>>>>> And the exception stack is here, could anyone help with this?
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>> java.lang.Exception: Could not materialize checkpoint 1 for
>>>>>>>> operator Window(TumblingEventTimeWindows(1200000), JoinTrigger,
>>>>>>>> JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink:
>>>>>>>> kafka-record-sink (2/5).
>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException(
>>>>>>>> StreamTask.java:1100)
>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>>>> ThreadPoolExecutor.java:1149)
>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>>>>> ThreadPoolExecutor.java:624)
>>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.
>>>>>>>> ClassCastException:
>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot
>>>>>>>> be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:450)
>>>>>>>>     at org.apache.flink.streaming.api.operators.
>>>>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>>>>>>>     ... 3 more
>>>>>>>> Caused by: java.lang.ClassCastException:
>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot
>>>>>>>> be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>>     at org.apache.flink.runtime.state.VoidNamespaceSerializer
>>>>>>>> .serialize(VoidNamespaceSerializer.java:32)
>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>> CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot
>>>>>>>> .java:114)
>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>> AbstractStateTableSnapshot.writeStateInKeyGroup(
>>>>>>>> AbstractStateTableSnapshot.java:121)
>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(
>>>>>>>> CopyOnWriteStateTableSnapshot.java:37)
>>>>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>>>>> .callInternal(HeapSnapshotStrategy.java:191)
>>>>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>>>>> .callInternal(HeapSnapshotStrategy.java:158)
>>>>>>>>     at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(
>>>>>>>> AsyncSnapshotCallable.java:75)
>>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:447)
>>>>>>>>     ... 5 more
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best regards
>>>>>>>>
>>>>>>>> Sili Liu
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards
>>>>>>
>>>>>> Sili Liu
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best regards
>>>>
>>>> Sili Liu
>>>>
>>>
>>
>> --
>> Best regards
>>
>> Sili Liu
>>
>

-- 
Best regards

Sili Liu

Reply via email to