So, just an update. When I used this code (My stateful watermark) on the original application it seems that I can recover the latest watermark and further process the join with stuck events on it. I don't even have to create MyCoProcessFunction to implement a low-level join. The available .coGroup(MyCoGroupFunction) works as a charm.
Thank you again for the clarifications! Felipe On Mon, Jun 21, 2021 at 5:18 PM Felipe Gutierrez < felipe.o.gutier...@gmail.com> wrote: > Hello Piotr, > > Could you please help me to ensure that I am implementing it in the > correct way? > > I created the WatermarkFunction [1] based on the FilterFunction from Flink > and the WatermarkStreamOperator [2] and I am doing unit test [3]. Then > there are things that I am not sure how to do. > > How to make the ListState singleton on all parallel operators? > > When my job restarts I don't even have to call "processWatermark(new > Watermark(maxWatermark));" on the end of the "initializeState()". I can see > that the job process the previous watermarks before it fails. Is it because > the source is one that I created at the end of the unit test "MySource"? Or > is it because I don't have a join on the stream pipeline? I have the output > of my unit test below at this message in case you are not able to runt the > test. > > [1] > https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/operator/watermark/WatermarkFunction.java > [2] > https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperator.java > [3] > https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperatorTest.java#L113 > > $ cd explore-flink/docker/ops-playground-image/java/explore-flink/ > $ mvn -Dtest=WatermarkStreamOperatorTest#testRestartWithLatestWatermark > test > > WatermarkStreamOperator.initializeState > WatermarkStreamOperator.initializeState > WatermarkStreamOperator.initializeState > WatermarkStreamOperator.initializeState > initializeState... 0 > initializeState... 0 > initializeState... 0 > initializeState... 0 > maxWatermark: 0 > maxWatermark: 0 > maxWatermark: 0 > maxWatermark: 0 > processing watermark: 0 > processing watermark: 0 > processing watermark: 0 > processing watermark: 0 > processing watermark: 0 > processing watermark: 0 > processing watermark: 0 > processing watermark: 0 > Attempts restart: 0 > processing watermark: 1 > processing watermark: 1 > processing watermark: 1 > processing watermark: 1 > Attempts restart: 0 > processing watermark: 2 > processing watermark: 2 > processing watermark: 2 > processing watermark: 2 > Attempts restart: 0 > processing watermark: 3 > processing watermark: 3 > processing watermark: 3 > processing watermark: 3 > Attempts restart: 0 > processing watermark: 9223372036854775807 > processing watermark: 9223372036854775807 > processing watermark: 9223372036854775807 > processing watermark: 9223372036854775807 > This exception will trigger until the reference time [2021-06-21 > 16:57:19.531] reaches the trigger time [2021-06-21 16:57:21.672] // HERE > THE JOB IS RESTARTING > initializeState... 1 > initializeState... 1 > initializeState... 1 > WatermarkStreamOperator.initializeState > WatermarkStreamOperator.initializeState > WatermarkStreamOperator.initializeState > WatermarkStreamOperator.initializeState > watermarkList recovered: 0 > watermarkList recovered: 0 > watermarkList recovered: 0 > watermarkList recovered: 0 > watermarkList recovered: 0 > watermarkList recovered: 1 > watermarkList recovered: 2 > initializeState... 1 > maxWatermark: 2 // HERE IS THE LATEST WATERMARK > processing watermark: 2 // I PROCESS IT HERE > watermarkList recovered: 0 > watermarkList recovered: 1 > watermarkList recovered: 0 > watermarkList recovered: 0 > watermarkList recovered: 1 > watermarkList recovered: 1 > watermarkList recovered: 2 > watermarkList recovered: 2 > watermarkList recovered: 2 > maxWatermark: 2 > maxWatermark: 2 > processing watermark: 2 > processing watermark: 2 > maxWatermark: 2 > processing watermark: 2 > processing watermark: 0 // IS IS ALSO PROCESSING THE OTHER WATERMARKS. WHY? > processing watermark: 0 > processing watermark: 0 > processing watermark: 0 > Attempts restart: 1 > processing watermark: 1 > processing watermark: 1 > processing watermark: 1 > processing watermark: 1 > Attempts restart: 1 > processing watermark: 2 > processing watermark: 2 > processing watermark: 2 > processing watermark: 2 > Attempts restart: 1 > processing watermark: 3 > processing watermark: 3 > processing watermark: 3 > processing watermark: 3 > Attempts restart: 1 > processing watermark: 9223372036854775807 > processing watermark: 9223372036854775807 > processing watermark: 9223372036854775807 > processing watermark: 9223372036854775807 > This is a poison but we do NOT throw an exception because the reference > time passed :) [2021-06-21 16:57:22.849] >= [2021-06-21 16:57:21.672] > Attempts restart: 1 > Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 6.836 sec > > > On Fri, Jun 18, 2021 at 2:46 PM Piotr Nowojski <pnowoj...@apache.org> > wrote: > >> I'm glad I could help, I hope it will solve your problem :) >> >> Best, >> Piotrek >> >> pt., 18 cze 2021 o 14:38 Felipe Gutierrez <felipe.o.gutier...@gmail.com> >> napisał(a): >> >>> On Fri, Jun 18, 2021 at 1:41 PM Piotr Nowojski <pnowoj...@apache.org> >>> wrote: >>> >>>> Hi, >>>> >>>> Keep in mind that this is a quite low level approach to this problem. >>>> It would be much better to make sure that after recovery watermarks are >>>> still being emitted. >>>> >>> >>> yes. Indeed it looks like a very low level. I did a small test to emit >>> one watermark for the stream that was recovered and then it can process >>> the join. It has the same behavior on using a CoGroupFunction nad a >>> CoProcessFunction. So in the end I don't need to implement >>> MyCoProcessFunction with checkpoint. I just need to emit a new watermark >>> after the job recovers. >>> >>> In my case, I am using Kafka source. so, if I make Kafka >>> keeping emitting watermarks I solve the problem. Otherwise, I have to >>> implement this custom operator. >>> >>> Thanks for your answer! >>> Felipe >>> >>> >>>> >>>> If you are using a built-in source, it's probably easier to do it in a >>>> custom operator. I would try to implement a custom one based on >>>> AbstractStreamOperator. Your class would also need to implement the >>>> OneInputStreamOperator interface. `processElement` you could implement as >>>> an identity function (just pass down the stream element unchanged). In >>>> `processWatermark` you would need to store the latest watermark on the >>>> `ListState<Long>` field (you can declare it inside >>>> `AbstractStreamOperator#initializeState` via `context.getListState(new >>>> ListStateDescriptor<>("your-field-name", Long.class));`). During normal >>>> processing (`processWatermark`) make sure it's a singleton list. During >>>> recovery (`AbstractStreamOpeartor#initializeState()`) without rescaling, >>>> you would just access this state field and re-emit the only element on that >>>> list. However during recovery, depending if you are scaling up (a) or down >>>> (b), you could have a case where you sometimes have either (a) empty list >>>> (in that case you can not emit anything), or (b) many elements on the list >>>> (in that case you would need to calculate a minimum of all elements). >>>> >>>> As operator API is not a very oficial one, it's not well documented. >>>> For an example you would need to take a look in the Flink code itself by >>>> finding existing implementations of the `AbstractStreamOperator` or >>>> `OneInputStreamOperator`. >>>> >>>> Best, >>>> Piotrek >>>> >>>> pt., 18 cze 2021 o 12:49 Felipe Gutierrez <felipe.o.gutier...@gmail.com> >>>> napisał(a): >>>> >>>>> Hello Piotrek, >>>>> >>>>> On Fri, Jun 18, 2021 at 11:48 AM Piotr Nowojski <pnowoj...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> As far as I can tell timers should be checkpointed and recovered. >>>>>> What may be happening is that the state of the last seen watermarks by >>>>>> operators on different inputs and different channels inside an input is >>>>>> not >>>>>> persisted. Flink is assuming that after the restart, watermark assigners >>>>>> will emit newer watermarks after the recovery. However if one of your >>>>>> inputs is dormant and it has already emitted some very high watermark >>>>>> long >>>>>> time before the failure, after recovery if no new watermark is emitted, >>>>>> this input/input channel might be preventing timers from firing. Can you >>>>>> check if that's what's happening in your case? >>>>>> >>>>> >>>>> I think you are correct. at least when I reproduce the bug it is like >>>>> you said. >>>>> >>>>> >>>>>> If so you would have to make sure one way or another that some >>>>>> watermarks will be emitted after recovery. As a last resort, you could >>>>>> manually store the watermarks in the operators/sources state and re-emit >>>>>> last seen watermark during recovery. >>>>>> >>>>> >>>>> Could you please point how I can checkpoint the watermarks on a source >>>>> operator? Is it done by this code below from here ( >>>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector >>>>> )? >>>>> >>>>> FlinkKafkaConsumer<MyType> kafkaSource = new >>>>> FlinkKafkaConsumer<>("myTopic", schema, props); >>>>> kafkaSource.assignTimestampsAndWatermarks( >>>>> WatermarkStrategy. >>>>> .forBoundedOutOfOrderness(Duration.ofSeconds(20))); >>>>> >>>>> Thanks, >>>>> Felipe >>>>> >>>>> >>>>>> >>>>>> Best, >>>>>> Piotrek >>>>>> >>>>>> czw., 17 cze 2021 o 13:46 Felipe Gutierrez < >>>>>> felipe.o.gutier...@gmail.com> napisał(a): >>>>>> >>>>>>> Hi community, >>>>>>> >>>>>>> I have implemented a join function using CoProcessFunction with >>>>>>> CheckpointedFunction to recover from failures. I added some debug lines >>>>>>> to >>>>>>> check if it is restoring and it does. Before the crash, I process events >>>>>>> that fall at processElement2. I create snapshots at snapshotState(), the >>>>>>> application comes back and restores the events. That is fine. >>>>>>> >>>>>>> After the restore, I process events that fall on processElement1. I >>>>>>> register event timers for them as I did on processElement2 before the >>>>>>> crash. But the onTimer() is never called. The point is that I don't have >>>>>>> any events to send to processElement2() to make the CoProcessFunction >>>>>>> register a time for them. They were sent before the crash. >>>>>>> >>>>>>> I suppose that the onTimer() is called only when there are >>>>>>> "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 >>>>>>> and >>>>>>> processElement2. Because when I test the same application without >>>>>>> crashing >>>>>>> and the CoProcessFunction triggers the onTimer() method. >>>>>>> >>>>>>> But if I have a crash in the middle the CoProcessFunction does not >>>>>>> call onTimer(). Why is that? Is that normal? What do I have to do to >>>>>>> make >>>>>>> the CoProcessFunction trigger the onTime() method even if only one >>>>>>> stream >>>>>>> is processed let's say at the processElement2() method and the other >>>>>>> stream >>>>>>> is restored from a snapshot? I imagine that I have to register a time >>>>>>> during the recovery (initializeState()). But how? >>>>>>> >>>>>>> thanks, >>>>>>> Felipe >>>>>>> >>>>>>