I don't understand how I can save the state of a window on the
RichCoGroupFunction if the events arrive on the
RichCoGroupFunction.coCgroup only when the window closes. Then, upon a
failure I will not recover events that were on the window. This is why I
think the approach to this problem is to use a CoProcessFunction where I
can update the state of events arriving
at CoProcessFunction.processElement1 and CoProcessFunction.processElement2.


*--*
*-- Felipe Gutierrez*
*-- skype: felipe.o.gutierrez*


On Wed, Jun 16, 2021 at 4:28 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi Robert,
>
> 1 - I am using Kafka010 as data source.
> 2 - No, I am not using any kind of ListState. That I think it must be used
> 3 - Good. I am going to use CheckpointedFunction.
>
> Just a follow-up question. I was reimplementing it using CoProcessFunction
> to save the state and trigger the window. So, based on your answer I think
> I am overcomplicating it. If I just use RichCoGroupFunction, save the
> states on a ListState, and implement CheckpointedFunction, it will do
> everything that I need. Is that correct? Then I don't have to implement the
> event window trigger at onTimer(). I just use the regular window from
> Flink. is that correct?
>
> Thanks
>
> *--*
> *-- Felipe Gutierrez*
> *-- skype: felipe.o.gutierrez*
>
>
> On Wed, Jun 16, 2021 at 2:16 PM Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> Hi Felipe,
>>
>> Which data source are you using?
>>
>> > Then, in the MyCoGroupFunction there are only events of stream02
>>
>> Are you storing events in your state?
>>
>> > Is this the case where I have to use RichCoGroupFunction and save the
>> state by implementing the CheckpointedFunction?
>>
>> If you want your state to be persisted with each checkpoint, and
>> recovered after a failure, ye .
>>
>> On Tue, Jun 15, 2021 at 6:18 PM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a problem on my stream pipeline where the events on a
>>> CoGroupFunction are not restored after the application crashes. The
>>> application is like this:
>>>
>>> stream01.coGroup(stream02)
>>> .where(...).equalTo(...)
>>> .window(TumblingEventTimeWindows.of(1 minute))
>>> .apply(new MyCoGroupFunction())
>>> .process(new MyProcessFunction())
>>> .sink(new MySinkFunction)
>>>
>>> The checkpoint is configured to 20 seconds and the window is of 1
>>> minute. I follow this sequence to reproduce the error:
>>> 1 - send 6 events to stream01
>>> 2 - after 25 seconds I send an event to make the application crash
>>> 3 - at this meantime the application recovers
>>> 4 - after 25 seconds I send 6 events to stream02
>>>
>>> Then, in the MyCoGroupFunction there are only events of stream02. Is
>>> this the case where I have to use RichCoGroupFunction and save the state by
>>> implementing the CheckpointedFunction? I am confused because
>>> the CoGroupFunction.coGroup() method is called only when the Window closes
>>> and then I see the output stream events of this operator. That is when
>>> the Collector.collect() is called.
>>>
>>> What I think is that the events are held in memory and when the window
>>> closes the CoGroupFunction.coGroup() is called. So I have to snapshot the
>>> state in an operator before the CoGroupFunction. Is that correct? In case
>>> anyone have a toy example of it (CoGroupFunction with Checkpoint and
>>> testing it in a unit test) could you please send me the link?
>>>
>>> Thanks,
>>> Felipe
>>>
>>>

Reply via email to