Hi Si-li

Thanks for the notice.
I just want to double-check is the original problem has been solved?  As I
found that the created issue FLINK-18464 has been closed with reason "can
not reproduce". Am I missing something here?

Best,
Congxian


Si-li Liu <unix...@gmail.com> 于2020年7月10日周五 下午6:06写道:

> Sorry
>
> I can't reproduce it with reduce/aggregate/fold/apply and due to some
> limitations in my working environment, I can't use flink 1.10 or 1.11.
>
> Congxian Qiu <qcx978132...@gmail.com> 于2020年7月5日周日 下午6:21写道:
>
>> Hi
>>
>> First, Could you please try this problem still there if use flink 1.10 or
>> 1.11?
>>
>> It seems strange, from the error message, here is an error when trying to
>> convert a non-Window state(VoidNameSpace) to a Window State (serializer is
>> the serializer of Window state, but the state is non-Window state).
>> Could you please try to replace the MyFuction with a 
>> reduce/aggregate/fold/apply()
>> function to see what happens? -- this wants to narrow down the problem.
>>
>> Best,
>> Congxian
>>
>>
>> Si-li Liu <unix...@gmail.com> 于2020年7月3日周五 下午6:44写道:
>>
>>> Thanks for your help
>>>
>>> 1. I started the job from scratch, not a savepoint or externalized
>>> checkpoint
>>> 2. No job graph change
>>> 3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>> 4. My Flink version is 1.9.1
>>>
>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 下午4:49写道:
>>>
>>>> I still wasn't able to reproduce the issue.
>>>>
>>>> Can you also clarify:
>>>> - Are you starting the job from a savepoint or externalized checkpoint?
>>>> - If yes, was the job graph changed?
>>>> - What StreamTimeCharacteristic is set, if any?
>>>> - What exact version of Flink do you use?
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>>
>>>> On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <unix...@gmail.com> wrote:
>>>>
>>>>> Hi, Thanks for your help.
>>>>>
>>>>> The checkpoint configuration is
>>>>>
>>>>> checkpoint.intervalMS=300000
>>>>> checkpoint.timeoutMS=300000
>>>>>
>>>>> The error callstack is from JM's log, which happened in every cp.
>>>>> Currently I don't have a success cp yet.
>>>>>
>>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月3日周五 上午3:50写道:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Thanks for the details.
>>>>>> However, I was not able to reproduce the issue. I used parallelism
>>>>>> levels 4, file system backend and tried different timings for
>>>>>> checkpointing, windowing and source.
>>>>>> Do you encounter this problem deterministically, is it always 1st
>>>>>> checkpoint?
>>>>>> What checkpointing interval do you use?
>>>>>>
>>>>>> Regards,
>>>>>> Roman
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <unix...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi, this is our production code so I have to modify it a little bit,
>>>>>>> such as variable name and function name. I think 3 classes I provide 
>>>>>>> here
>>>>>>> is enough.
>>>>>>>
>>>>>>> I try to join two streams, but I don't want to use the default join
>>>>>>> function, because I want to send the joined log immediately and remove 
>>>>>>> it
>>>>>>> from window state immediately. And my window gap time is very long( 20
>>>>>>> minutes), so it maybe evaluate it multiple times.
>>>>>>>
>>>>>>> class JoinFunction extends
>>>>>>>   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{
>>>>>>>
>>>>>>>   var ueState: ValueState[RawLog] = _
>>>>>>>   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
>>>>>>>   val invalidCounter = new LongCounter()
>>>>>>>   val processCounter = new LongCounter()
>>>>>>>   val sendToKafkaCounter = new LongCounter()
>>>>>>>
>>>>>>>   override def open(parameters: Configuration): Unit = {
>>>>>>>     ueState = getRuntimeContext.getState(
>>>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>>>     )
>>>>>>>     gZipThriftSerializer = new GZipThriftSerializer[MyType]()
>>>>>>>     getRuntimeContext.addAccumulator("processCounter", 
>>>>>>> this.processCounter)
>>>>>>>     getRuntimeContext.addAccumulator("invalidCounter", 
>>>>>>> this.invalidCounter)
>>>>>>>     getRuntimeContext.addAccumulator("sendToKafkaCounter", 
>>>>>>> this.sendToKafkaCounter)
>>>>>>>   }
>>>>>>>
>>>>>>>   override def process(key: String,
>>>>>>>                        ctx: Context,
>>>>>>>                        logs: Iterable[RawLog],
>>>>>>>                        out: Collector[OutputLog]): Unit = {
>>>>>>>     if (ueState.value() != null) {
>>>>>>>       processCounter.add(1L)
>>>>>>>       val bid = ueState.value()
>>>>>>>       val bidLog = 
>>>>>>> gZipThriftSerializer.decompressAndDeserialize(bid.payload, 
>>>>>>> classOf[MyType])
>>>>>>>       logs.foreach( log => {
>>>>>>>         if (log.eventType == SHOW) {
>>>>>>>           val showLog = 
>>>>>>> gZipThriftSerializer.decompressAndDeserialize(log.payload, 
>>>>>>> classOf[MyType])
>>>>>>>           sendToKafkaCounter.add(1L)
>>>>>>>           out.collect(new OutputLog(ThriftUtils.serialize(showLog), 
>>>>>>> Utils.getOutputTopic(showLog)))
>>>>>>>         }
>>>>>>>       })
>>>>>>>     } else {
>>>>>>>       invalidCounter.add(1L)
>>>>>>>     }
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>> class JoinTrigger extends Trigger[RawLog, TimeWindow] {
>>>>>>>
>>>>>>>   override def onElement(log: RawLog,
>>>>>>>                          timestamp: Long,
>>>>>>>                          window: TimeWindow,
>>>>>>>                          ctx: Trigger.TriggerContext): TriggerResult = {
>>>>>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>>>     )
>>>>>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>>>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>>>>>
>>>>>>>     if (!firstSeen.value()) {
>>>>>>>       ctx.registerEventTimeTimer(window.getEnd)
>>>>>>>       firstSeen.update(true)
>>>>>>>     }
>>>>>>>     val eventType = log.eventType
>>>>>>>     if (eventType == BID) {
>>>>>>>       ueState.update(log)
>>>>>>>       TriggerResult.CONTINUE
>>>>>>>     } else {
>>>>>>>       if (ueState.value() == null) {
>>>>>>>         TriggerResult.CONTINUE
>>>>>>>       } else {
>>>>>>>         TriggerResult.FIRE
>>>>>>>       }
>>>>>>>     }
>>>>>>>   }
>>>>>>>
>>>>>>>   override def onEventTime(timestamp: Long,
>>>>>>>                            window: TimeWindow,
>>>>>>>                            ctx: Trigger.TriggerContext): TriggerResult 
>>>>>>> = {
>>>>>>>     if (timestamp == window.getEnd) {
>>>>>>>       TriggerResult.PURGE
>>>>>>>     }
>>>>>>>     TriggerResult.CONTINUE
>>>>>>>   }
>>>>>>>
>>>>>>>   override def onProcessingTime(timestamp: Long,
>>>>>>>                                 window: TimeWindow,
>>>>>>>                                 ctx: Trigger.TriggerContext): 
>>>>>>> TriggerResult = {
>>>>>>>     TriggerResult.CONTINUE
>>>>>>>   }
>>>>>>>
>>>>>>>   override def clear(window: TimeWindow,
>>>>>>>                      ctx: Trigger.TriggerContext): Unit = {
>>>>>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>>>     )
>>>>>>>     ueState.clear()
>>>>>>>
>>>>>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>>>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>>>>>     firstSeen.clear()
>>>>>>>
>>>>>>>     ctx.deleteEventTimeTimer(window.getEnd)
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>> class JoinEvictor extends Evictor[RawLog, TimeWindow] {
>>>>>>>
>>>>>>>   override def evictBefore(elements: 
>>>>>>> JIterable[TimestampedValue[RawLog]],
>>>>>>>                            size: Int,
>>>>>>>                            window: TimeWindow,
>>>>>>>                            evictorContext: Evictor.EvictorContext): 
>>>>>>> Unit = {}
>>>>>>>
>>>>>>>   override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
>>>>>>>                            size: Int,
>>>>>>>                            window: TimeWindow,
>>>>>>>                            evictorContext: Evictor.EvictorContext): 
>>>>>>> Unit = {
>>>>>>>     val iter = elements.iterator()
>>>>>>>     while (iter.hasNext) {
>>>>>>>       iter.next()
>>>>>>>       iter.remove()
>>>>>>>     }
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四
>>>>>>> 下午7:18写道:
>>>>>>>
>>>>>>>> Thanks for the clarification.
>>>>>>>>
>>>>>>>> Can you also share the code of other parts, particularly MyFunction?
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Roman
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <unix...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Rocksdb backend has the same problem
>>>>>>>>>
>>>>>>>>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月2日周四
>>>>>>>>> 下午6:11写道:
>>>>>>>>>
>>>>>>>>>> Thanks for reporting this.
>>>>>>>>>>
>>>>>>>>>> Looks like the window namespace was replaced by VoidNamespace in
>>>>>>>>>> state entry.
>>>>>>>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to
>>>>>>>>>> further investigate it.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Roman
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <unix...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and
>>>>>>>>>>> evictor. The state is stored to memory.
>>>>>>>>>>>
>>>>>>>>>>> input.setParallelism(processParallelism)
>>>>>>>>>>>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>>>>>>>>>>>         .keyBy(_.key)
>>>>>>>>>>>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>>>>>>>>>>>         .trigger(new MyTrigger)
>>>>>>>>>>>         .evictor(new MyEvictor)
>>>>>>>>>>>         .process(new
>>>>>>>>>>> MyFunction).setParallelism(aggregateParallelism)
>>>>>>>>>>>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>>>>>>>>>>>         .name("kafka-record-sink")
>>>>>>>>>>>
>>>>>>>>>>> And the exception stack is here, could anyone help with this?
>>>>>>>>>>> Thanks!
>>>>>>>>>>>
>>>>>>>>>>> java.lang.Exception: Could not materialize checkpoint 1 for
>>>>>>>>>>> operator Window(TumblingEventTimeWindows(1200000), JoinTrigger,
>>>>>>>>>>> JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink:
>>>>>>>>>>> kafka-record-sink (2/5).
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException(
>>>>>>>>>>> StreamTask.java:1100)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>>>>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>>>>>>> ThreadPoolExecutor.java:1149)
>>>>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>>>>>>>> ThreadPoolExecutor.java:624)
>>>>>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.
>>>>>>>>>>> ClassCastException:
>>>>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow
>>>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:
>>>>>>>>>>> 122)
>>>>>>>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:450)
>>>>>>>>>>>     at org.apache.flink.streaming.api.operators.
>>>>>>>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:
>>>>>>>>>>> 47)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>>>>>>>>>>     ... 3 more
>>>>>>>>>>> Caused by: java.lang.ClassCastException:
>>>>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow
>>>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>>>>>     at org.apache.flink.runtime.state.VoidNamespaceSerializer
>>>>>>>>>>> .serialize(VoidNamespaceSerializer.java:32)
>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>> CopyOnWriteStateMapSnapshot.writeState(
>>>>>>>>>>> CopyOnWriteStateMapSnapshot.java:114)
>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>> AbstractStateTableSnapshot.writeStateInKeyGroup(
>>>>>>>>>>> AbstractStateTableSnapshot.java:121)
>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(
>>>>>>>>>>> CopyOnWriteStateTableSnapshot.java:37)
>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>> HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:
>>>>>>>>>>> 191)
>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>> HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:
>>>>>>>>>>> 158)
>>>>>>>>>>>     at org.apache.flink.runtime.state.AsyncSnapshotCallable
>>>>>>>>>>> .call(AsyncSnapshotCallable.java:75)
>>>>>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:447)
>>>>>>>>>>>     ... 5 more
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best regards
>>>>>>>>>>>
>>>>>>>>>>> Sili Liu
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best regards
>>>>>>>>>
>>>>>>>>> Sili Liu
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best regards
>>>>>>>
>>>>>>> Sili Liu
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best regards
>>>>>
>>>>> Sili Liu
>>>>>
>>>>
>>>
>>> --
>>> Best regards
>>>
>>> Sili Liu
>>>
>>
>
> --
> Best regards
>
> Sili Liu
>

Reply via email to