So, just an update.

When I used this code (My stateful watermark) on the original application
it seems that I can recover the latest watermark and further process the
join with stuck events on it.
I don't even have to create MyCoProcessFunction to implement a low-level
join. The available .coGroup(MyCoGroupFunction) works as a charm.

Thank you again for the clarifications!
Felipe

On Mon, Jun 21, 2021 at 5:18 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hello Piotr,
>
> Could you please help me to ensure that I am implementing it in the
> correct way?
>
> I created the WatermarkFunction [1] based on the FilterFunction from Flink
> and the WatermarkStreamOperator [2] and I am doing unit test [3]. Then
> there are things that I am not sure how to do.
>
> How to make the ListState singleton on all parallel operators?
>
> When my job restarts I don't even have to call "processWatermark(new
> Watermark(maxWatermark));" on the end of the "initializeState()". I can see
> that the job process the previous watermarks before it fails. Is it because
> the source is one that I created at the end of the unit test "MySource"? Or
> is it because I don't have a join on the stream pipeline? I have the output
> of my unit test below at this message in case you are not able to runt the
> test.
>
> [1]
> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/operator/watermark/WatermarkFunction.java
> [2]
> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperator.java
> [3]
> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperatorTest.java#L113
>
> $ cd explore-flink/docker/ops-playground-image/java/explore-flink/
> $ mvn -Dtest=WatermarkStreamOperatorTest#testRestartWithLatestWatermark
> test
>
> WatermarkStreamOperator.initializeState
> WatermarkStreamOperator.initializeState
> WatermarkStreamOperator.initializeState
> WatermarkStreamOperator.initializeState
> initializeState... 0
> initializeState... 0
> initializeState... 0
> initializeState... 0
> maxWatermark: 0
> maxWatermark: 0
> maxWatermark: 0
> maxWatermark: 0
> processing watermark: 0
> processing watermark: 0
> processing watermark: 0
> processing watermark: 0
> processing watermark: 0
> processing watermark: 0
> processing watermark: 0
> processing watermark: 0
> Attempts restart: 0
> processing watermark: 1
> processing watermark: 1
> processing watermark: 1
> processing watermark: 1
> Attempts restart: 0
> processing watermark: 2
> processing watermark: 2
> processing watermark: 2
> processing watermark: 2
> Attempts restart: 0
> processing watermark: 3
> processing watermark: 3
> processing watermark: 3
> processing watermark: 3
> Attempts restart: 0
> processing watermark: 9223372036854775807
> processing watermark: 9223372036854775807
> processing watermark: 9223372036854775807
> processing watermark: 9223372036854775807
> This exception will trigger until the reference time [2021-06-21
> 16:57:19.531] reaches the trigger time [2021-06-21 16:57:21.672] // HERE
> THE JOB IS RESTARTING
> initializeState... 1
> initializeState... 1
> initializeState... 1
> WatermarkStreamOperator.initializeState
> WatermarkStreamOperator.initializeState
> WatermarkStreamOperator.initializeState
> WatermarkStreamOperator.initializeState
> watermarkList recovered: 0
> watermarkList recovered: 0
> watermarkList recovered: 0
> watermarkList recovered: 0
> watermarkList recovered: 0
> watermarkList recovered: 1
> watermarkList recovered: 2
> initializeState... 1
> maxWatermark: 2 // HERE IS THE LATEST WATERMARK
> processing watermark: 2 // I PROCESS IT HERE
> watermarkList recovered: 0
> watermarkList recovered: 1
> watermarkList recovered: 0
> watermarkList recovered: 0
> watermarkList recovered: 1
> watermarkList recovered: 1
> watermarkList recovered: 2
> watermarkList recovered: 2
> watermarkList recovered: 2
> maxWatermark: 2
> maxWatermark: 2
> processing watermark: 2
> processing watermark: 2
> maxWatermark: 2
> processing watermark: 2
> processing watermark: 0 // IS IS ALSO PROCESSING THE OTHER WATERMARKS. WHY?
> processing watermark: 0
> processing watermark: 0
> processing watermark: 0
> Attempts restart: 1
> processing watermark: 1
> processing watermark: 1
> processing watermark: 1
> processing watermark: 1
> Attempts restart: 1
> processing watermark: 2
> processing watermark: 2
> processing watermark: 2
> processing watermark: 2
> Attempts restart: 1
> processing watermark: 3
> processing watermark: 3
> processing watermark: 3
> processing watermark: 3
> Attempts restart: 1
> processing watermark: 9223372036854775807
> processing watermark: 9223372036854775807
> processing watermark: 9223372036854775807
> processing watermark: 9223372036854775807
> This is a poison but we do NOT throw an exception because the reference
> time passed :) [2021-06-21 16:57:22.849] >= [2021-06-21 16:57:21.672]
> Attempts restart: 1
> Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 6.836 sec
>
>
> On Fri, Jun 18, 2021 at 2:46 PM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
>> I'm glad I could help, I hope it will solve your problem :)
>>
>> Best,
>> Piotrek
>>
>> pt., 18 cze 2021 o 14:38 Felipe Gutierrez <felipe.o.gutier...@gmail.com>
>> napisał(a):
>>
>>> On Fri, Jun 18, 2021 at 1:41 PM Piotr Nowojski <pnowoj...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Keep in mind that this is a quite low level approach to this problem.
>>>> It would be much better to make sure that after recovery watermarks are
>>>> still being emitted.
>>>>
>>>
>>> yes. Indeed it looks like a very low level. I did a small test to emit
>>> one watermark for the stream that was recovered and then it can process
>>> the join. It has the same behavior on using a CoGroupFunction nad a
>>> CoProcessFunction. So in the end I don't need to implement
>>> MyCoProcessFunction with checkpoint. I just need to emit a new watermark
>>> after the job recovers.
>>>
>>> In my case, I am using Kafka source. so, if I make Kafka
>>> keeping emitting watermarks I solve the problem. Otherwise, I have to
>>> implement this custom operator.
>>>
>>> Thanks for your answer!
>>> Felipe
>>>
>>>
>>>>
>>>> If you are using a built-in source, it's probably easier to do it in a
>>>> custom operator. I would try to implement a custom one based on
>>>> AbstractStreamOperator. Your class would also need to implement the
>>>> OneInputStreamOperator interface. `processElement` you could implement as
>>>> an identity function (just pass down the stream element unchanged). In
>>>> `processWatermark` you would need to store the latest watermark on the
>>>> `ListState<Long>` field (you can declare it inside
>>>> `AbstractStreamOperator#initializeState` via `context.getListState(new
>>>> ListStateDescriptor<>("your-field-name", Long.class));`). During normal
>>>> processing (`processWatermark`) make sure it's a singleton list. During
>>>> recovery (`AbstractStreamOpeartor#initializeState()`) without rescaling,
>>>> you would just access this state field and re-emit the only element on that
>>>> list. However during recovery, depending if you are scaling up (a) or down
>>>> (b), you could have a case where you sometimes have either (a) empty list
>>>> (in that case you can not emit anything), or (b) many elements on the list
>>>> (in that case you would need to calculate a minimum of all elements).
>>>>
>>>> As operator API is not a very oficial one, it's not well documented.
>>>> For an example you would need to take a look in the Flink code itself by
>>>> finding existing implementations of the `AbstractStreamOperator` or
>>>> `OneInputStreamOperator`.
>>>>
>>>> Best,
>>>> Piotrek
>>>>
>>>> pt., 18 cze 2021 o 12:49 Felipe Gutierrez <felipe.o.gutier...@gmail.com>
>>>> napisał(a):
>>>>
>>>>> Hello Piotrek,
>>>>>
>>>>> On Fri, Jun 18, 2021 at 11:48 AM Piotr Nowojski <pnowoj...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> As far as I can tell timers should be checkpointed and recovered.
>>>>>> What may be happening is that the state of the last seen watermarks by
>>>>>> operators on different inputs and different channels inside an input is 
>>>>>> not
>>>>>> persisted. Flink is assuming that after the restart, watermark assigners
>>>>>> will emit newer watermarks after the recovery. However if one of your
>>>>>> inputs is dormant and it has already emitted some very high watermark 
>>>>>> long
>>>>>> time before the failure, after recovery if no new watermark is emitted,
>>>>>> this input/input channel might be preventing timers from firing. Can you
>>>>>> check if that's what's happening in your case?
>>>>>>
>>>>>
>>>>> I think you are correct. at least when I reproduce the bug it is like
>>>>> you said.
>>>>>
>>>>>
>>>>>> If so you would have to make sure one way or another that some
>>>>>> watermarks will be emitted after recovery. As a last resort, you could
>>>>>> manually store the watermarks in the operators/sources state and re-emit
>>>>>> last seen watermark during recovery.
>>>>>>
>>>>>
>>>>> Could you please point how I can checkpoint the watermarks on a source
>>>>> operator? Is it done by this code below from here (
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>>>>> )?
>>>>>
>>>>> FlinkKafkaConsumer<MyType> kafkaSource = new
>>>>> FlinkKafkaConsumer<>("myTopic", schema, props);
>>>>> kafkaSource.assignTimestampsAndWatermarks(
>>>>>         WatermarkStrategy.
>>>>>                 .forBoundedOutOfOrderness(Duration.ofSeconds(20)));
>>>>>
>>>>> Thanks,
>>>>> Felipe
>>>>>
>>>>>
>>>>>>
>>>>>> Best,
>>>>>> Piotrek
>>>>>>
>>>>>> czw., 17 cze 2021 o 13:46 Felipe Gutierrez <
>>>>>> felipe.o.gutier...@gmail.com> napisał(a):
>>>>>>
>>>>>>> Hi community,
>>>>>>>
>>>>>>> I have implemented a join function using CoProcessFunction with
>>>>>>> CheckpointedFunction to recover from failures. I added some debug lines 
>>>>>>> to
>>>>>>> check if it is restoring and it does. Before the crash, I process events
>>>>>>> that fall at processElement2. I create snapshots at snapshotState(), the
>>>>>>> application comes back and restores the events. That is fine.
>>>>>>>
>>>>>>> After the restore, I process events that fall on processElement1. I
>>>>>>> register event timers for them as I did on processElement2 before the
>>>>>>> crash. But the onTimer() is never called. The point is that I don't have
>>>>>>> any events to send to processElement2() to make the CoProcessFunction
>>>>>>> register a time for them. They were sent before the crash.
>>>>>>>
>>>>>>> I suppose that the onTimer() is called only when there are
>>>>>>> "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 
>>>>>>> and
>>>>>>> processElement2. Because when I test the same application without 
>>>>>>> crashing
>>>>>>> and the CoProcessFunction triggers the onTimer() method.
>>>>>>>
>>>>>>> But if I have a crash in the middle the CoProcessFunction does not
>>>>>>> call onTimer(). Why is that? Is that normal? What do I have to do to 
>>>>>>> make
>>>>>>> the CoProcessFunction trigger the onTime() method even if only one 
>>>>>>> stream
>>>>>>> is processed let's say at the processElement2() method and the other 
>>>>>>> stream
>>>>>>> is restored from a snapshot? I imagine that I have to register a time
>>>>>>> during the recovery (initializeState()). But how?
>>>>>>>
>>>>>>> thanks,
>>>>>>> Felipe
>>>>>>>
>>>>>>

Reply via email to