Sorry

I can't reproduce it with reduce/aggregate/fold/apply and due to some
limitations in my working environment, I can't use flink 1.10 or 1.11.

Congxian Qiu <qcx978132...@gmail.com> 于2020年7月5日周日 下午6:21写道:

> Hi
>
> First, Could you please try this problem still there if use flink 1.10 or
> 1.11?
>
> It seems strange, from the error message, here is an error when trying to
> convert a non-Window state(VoidNameSpace) to a Window State (serializer is
> the serializer of Window state, but the state is non-Window state).
> Could you please try to replace the MyFuction with a 
> reduce/aggregate/fold/apply()
> function to see what happens? -- this wants to narrow down the problem.
>
> Best,
> Congxian
>
>
> Si-li Liu <unix...@gmail.com> 于2020年7月3日周五 下午6:44写道:
>
>> Thanks for your help
>>
>> 1. I started the job from scratch, not a savepoint or externalized
>> checkpoint
>> 2. No job graph change
>> 3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> 4. My Flink version is 1.9.1
>>
>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 下午4:49写道:
>>
>>> I still wasn't able to reproduce the issue.
>>>
>>> Can you also clarify:
>>> - Are you starting the job from a savepoint or externalized checkpoint?
>>> - If yes, was the job graph changed?
>>> - What StreamTimeCharacteristic is set, if any?
>>> - What exact version of Flink do you use?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <unix...@gmail.com> wrote:
>>>
>>>> Hi, Thanks for your help.
>>>>
>>>> The checkpoint configuration is
>>>>
>>>> checkpoint.intervalMS=300000
>>>> checkpoint.timeoutMS=300000
>>>>
>>>> The error callstack is from JM's log, which happened in every cp.
>>>> Currently I don't have a success cp yet.
>>>>
>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 上午3:50写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> Thanks for the details.
>>>>> However, I was not able to reproduce the issue. I used parallelism
>>>>> levels 4, file system backend and tried different timings for
>>>>> checkpointing, windowing and source.
>>>>> Do you encounter this problem deterministically, is it always 1st
>>>>> checkpoint?
>>>>> What checkpointing interval do you use?
>>>>>
>>>>> Regards,
>>>>> Roman
>>>>>
>>>>>
>>>>> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <unix...@gmail.com> wrote:
>>>>>
>>>>>> Hi, this is our production code so I have to modify it a little bit,
>>>>>> such as variable name and function name. I think 3 classes I provide here
>>>>>> is enough.
>>>>>>
>>>>>> I try to join two streams, but I don't want to use the default join
>>>>>> function, because I want to send the joined log immediately and remove it
>>>>>> from window state immediately. And my window gap time is very long( 20
>>>>>> minutes), so it maybe evaluate it multiple times.
>>>>>>
>>>>>> class JoinFunction extends
>>>>>>   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{
>>>>>>
>>>>>>   var ueState: ValueState[RawLog] = _
>>>>>>   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
>>>>>>   val invalidCounter = new LongCounter()
>>>>>>   val processCounter = new LongCounter()
>>>>>>   val sendToKafkaCounter = new LongCounter()
>>>>>>
>>>>>>   override def open(parameters: Configuration): Unit = {
>>>>>>     ueState = getRuntimeContext.getState(
>>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>>     )
>>>>>>     gZipThriftSerializer = new GZipThriftSerializer[MyType]()
>>>>>>     getRuntimeContext.addAccumulator("processCounter", 
>>>>>> this.processCounter)
>>>>>>     getRuntimeContext.addAccumulator("invalidCounter", 
>>>>>> this.invalidCounter)
>>>>>>     getRuntimeContext.addAccumulator("sendToKafkaCounter", 
>>>>>> this.sendToKafkaCounter)
>>>>>>   }
>>>>>>
>>>>>>   override def process(key: String,
>>>>>>                        ctx: Context,
>>>>>>                        logs: Iterable[RawLog],
>>>>>>                        out: Collector[OutputLog]): Unit = {
>>>>>>     if (ueState.value() != null) {
>>>>>>       processCounter.add(1L)
>>>>>>       val bid = ueState.value()
>>>>>>       val bidLog = 
>>>>>> gZipThriftSerializer.decompressAndDeserialize(bid.payload, 
>>>>>> classOf[MyType])
>>>>>>       logs.foreach( log => {
>>>>>>         if (log.eventType == SHOW) {
>>>>>>           val showLog = 
>>>>>> gZipThriftSerializer.decompressAndDeserialize(log.payload, 
>>>>>> classOf[MyType])
>>>>>>           sendToKafkaCounter.add(1L)
>>>>>>           out.collect(new OutputLog(ThriftUtils.serialize(showLog), 
>>>>>> Utils.getOutputTopic(showLog)))
>>>>>>         }
>>>>>>       })
>>>>>>     } else {
>>>>>>       invalidCounter.add(1L)
>>>>>>     }
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> class JoinTrigger extends Trigger[RawLog, TimeWindow] {
>>>>>>
>>>>>>   override def onElement(log: RawLog,
>>>>>>                          timestamp: Long,
>>>>>>                          window: TimeWindow,
>>>>>>                          ctx: Trigger.TriggerContext): TriggerResult = {
>>>>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>>     )
>>>>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>>>>
>>>>>>     if (!firstSeen.value()) {
>>>>>>       ctx.registerEventTimeTimer(window.getEnd)
>>>>>>       firstSeen.update(true)
>>>>>>     }
>>>>>>     val eventType = log.eventType
>>>>>>     if (eventType == BID) {
>>>>>>       ueState.update(log)
>>>>>>       TriggerResult.CONTINUE
>>>>>>     } else {
>>>>>>       if (ueState.value() == null) {
>>>>>>         TriggerResult.CONTINUE
>>>>>>       } else {
>>>>>>         TriggerResult.FIRE
>>>>>>       }
>>>>>>     }
>>>>>>   }
>>>>>>
>>>>>>   override def onEventTime(timestamp: Long,
>>>>>>                            window: TimeWindow,
>>>>>>                            ctx: Trigger.TriggerContext): TriggerResult = 
>>>>>> {
>>>>>>     if (timestamp == window.getEnd) {
>>>>>>       TriggerResult.PURGE
>>>>>>     }
>>>>>>     TriggerResult.CONTINUE
>>>>>>   }
>>>>>>
>>>>>>   override def onProcessingTime(timestamp: Long,
>>>>>>                                 window: TimeWindow,
>>>>>>                                 ctx: Trigger.TriggerContext): 
>>>>>> TriggerResult = {
>>>>>>     TriggerResult.CONTINUE
>>>>>>   }
>>>>>>
>>>>>>   override def clear(window: TimeWindow,
>>>>>>                      ctx: Trigger.TriggerContext): Unit = {
>>>>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>>     )
>>>>>>     ueState.clear()
>>>>>>
>>>>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>>>>     firstSeen.clear()
>>>>>>
>>>>>>     ctx.deleteEventTimeTimer(window.getEnd)
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> class JoinEvictor extends Evictor[RawLog, TimeWindow] {
>>>>>>
>>>>>>   override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
>>>>>>                            size: Int,
>>>>>>                            window: TimeWindow,
>>>>>>                            evictorContext: Evictor.EvictorContext): Unit 
>>>>>> = {}
>>>>>>
>>>>>>   override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
>>>>>>                            size: Int,
>>>>>>                            window: TimeWindow,
>>>>>>                            evictorContext: Evictor.EvictorContext): Unit 
>>>>>> = {
>>>>>>     val iter = elements.iterator()
>>>>>>     while (iter.hasNext) {
>>>>>>       iter.next()
>>>>>>       iter.remove()
>>>>>>     }
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四
>>>>>> 下午7:18写道:
>>>>>>
>>>>>>> Thanks for the clarification.
>>>>>>>
>>>>>>> Can you also share the code of other parts, particularly MyFunction?
>>>>>>>
>>>>>>> Regards,
>>>>>>> Roman
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <unix...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Rocksdb backend has the same problem
>>>>>>>>
>>>>>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四
>>>>>>>> 下午6:11写道:
>>>>>>>>
>>>>>>>>> Thanks for reporting this.
>>>>>>>>>
>>>>>>>>> Looks like the window namespace was replaced by VoidNamespace in
>>>>>>>>> state entry.
>>>>>>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to
>>>>>>>>> further investigate it.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Roman
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <unix...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and
>>>>>>>>>> evictor. The state is stored to memory.
>>>>>>>>>>
>>>>>>>>>> input.setParallelism(processParallelism)
>>>>>>>>>>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>>>>>>>>>>         .keyBy(_.key)
>>>>>>>>>>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>>>>>>>>>>         .trigger(new MyTrigger)
>>>>>>>>>>         .evictor(new MyEvictor)
>>>>>>>>>>         .process(new
>>>>>>>>>> MyFunction).setParallelism(aggregateParallelism)
>>>>>>>>>>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>>>>>>>>>>         .name("kafka-record-sink")
>>>>>>>>>>
>>>>>>>>>> And the exception stack is here, could anyone help with this?
>>>>>>>>>> Thanks!
>>>>>>>>>>
>>>>>>>>>> java.lang.Exception: Could not materialize checkpoint 1 for
>>>>>>>>>> operator Window(TumblingEventTimeWindows(1200000), JoinTrigger,
>>>>>>>>>> JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink:
>>>>>>>>>> kafka-record-sink (2/5).
>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException(
>>>>>>>>>> StreamTask.java:1100)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>>>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>>>>>> ThreadPoolExecutor.java:1149)
>>>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>>>>>>> ThreadPoolExecutor.java:624)
>>>>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.
>>>>>>>>>> ClassCastException:
>>>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow
>>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122
>>>>>>>>>> )
>>>>>>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:450)
>>>>>>>>>>     at org.apache.flink.streaming.api.operators.
>>>>>>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:
>>>>>>>>>> 47)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>>>>>>>>>     ... 3 more
>>>>>>>>>> Caused by: java.lang.ClassCastException:
>>>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow
>>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>>>>     at org.apache.flink.runtime.state.VoidNamespaceSerializer
>>>>>>>>>> .serialize(VoidNamespaceSerializer.java:32)
>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>> CopyOnWriteStateMapSnapshot.writeState(
>>>>>>>>>> CopyOnWriteStateMapSnapshot.java:114)
>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>> AbstractStateTableSnapshot.writeStateInKeyGroup(
>>>>>>>>>> AbstractStateTableSnapshot.java:121)
>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(
>>>>>>>>>> CopyOnWriteStateTableSnapshot.java:37)
>>>>>>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>>>>>>> .callInternal(HeapSnapshotStrategy.java:191)
>>>>>>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>>>>>>> .callInternal(HeapSnapshotStrategy.java:158)
>>>>>>>>>>     at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(
>>>>>>>>>> AsyncSnapshotCallable.java:75)
>>>>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:447)
>>>>>>>>>>     ... 5 more
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best regards
>>>>>>>>>>
>>>>>>>>>> Sili Liu
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best regards
>>>>>>>>
>>>>>>>> Sili Liu
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards
>>>>>>
>>>>>> Sili Liu
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best regards
>>>>
>>>> Sili Liu
>>>>
>>>
>>
>> --
>> Best regards
>>
>> Sili Liu
>>
>

-- 
Best regards

Sili Liu

Reply via email to