Re: Buffering when connecting streams

2022-01-18 Thread Piotr Nowojski
Hi Alexis,

I believe you should be able to use the `ConnectedStreams#transform()`
method.

Best, Piotrek

wt., 18 sty 2022 o 14:20 Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> napisał(a):

> Hi again everyone,
>
>
>
> It’s been a while, so first of all happy new year :)
>
>
>
> I was revisiting this discussion and started looking at the code. However,
> it seems that all of the overloads of ConnectedStreams#process expect a
> CoProcessFunction or the Keyed counterpart, so I don’t think I can inject a
> custom TwoInputStreamOperator.
>
>
>
> After a quick glance at the joining documentation, I wonder if I could
> accomplish what I want with a window/interval join of streams. If so, I
> might be able to avoid using state in the join function, but if I can’t
> avoid it, is it possible to use managed state in a (Process)JoinFunction?
> The join needs keys, but I don’t know if the resulting stream counts as
> keyed from the state’s point of view.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Piotr Nowojski 
> *Sent:* Montag, 6. Dezember 2021 08:43
> *To:* David Morávek 
> *Cc:* Alexis Sarda-Espinosa ;
> user@flink.apache.org
> *Subject:* Re: Buffering when connecting streams
>
>
>
> Hi Alexis and David,
>
>
>
> This actually can not happen. There are mechanisms in the code to make
> sure none of the input is starved IF there is some data to be read.
>
>
>
> The only time when input can be blocked is during the alignment phase of
> aligned checkpoints under back pressure. If there was a back pressure in
> your job it could have easily happened that checkpoint barriers would flow
> through the job graph to the CoProcessKeyedCoProcessFunction on one of the
> paths much quicker then the other, causing this faster path to be blocked
> until the other side catched up. But that would happen only during the
> alignment phase of the checkpoint, so without a backpressure for a very
> short period of time.
>
>
>
> Piotrek
>
>
>
> czw., 2 gru 2021 o 18:23 David Morávek  napisał(a):
>
> I think this could happen, but I have a very limited knowledge about how
> the input gates work internally. @Piotr could definitely provide some more
> insight here.
>
>
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 5:54 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
> I do have some logic with timers today, but it’s indeed not ideal. I guess
> I’ll have a look at TwoInputStreamOperator, but I do have related
> questions. You mentioned a sample scenario of "processing backlog" where
> windows fire very quickly; could it happen that, in such a situation, the
> framework calls the operator’s processElement1 continuously (even for
> several minutes) before calling processElement2 a single time? How does the
> framework decide when to switch the stream processing when the streams are
> connected?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* David Morávek 
> *Sent:* Donnerstag, 2. Dezember 2021 17:18
> *To:* Alexis Sarda-Espinosa 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Buffering when connecting streams
>
>
>
> Even with the TwoInputStreamOperator you can not "halt" the processing.
> You need to buffer these elements for example in the ListState for later
> processing. At the time the watermark of the second stream arrives, you can
> process all buffered elements that satisfy the condition.
>
>
>
> You could probably also implement a similar (less optimized) solution with
> KeyedCoProcessFunction using event time timers.
>
>
>
> Best,
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 5:12 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
> Yes, that sounds right, but with my current KeyedCoProcessFunction I can’t
> tell Flink to "halt" processElement1 and switch to the other stream
> depending on watermarks. I could look into TwoInputStreamOperator if you
> think that’s the best approach.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* David Morávek 
> *Sent:* Donnerstag, 2. Dezember 2021 16:59
> *To:* Alexis Sarda-Espinosa 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Buffering when connecting streams
>
>
>
> I think this would require using lower level API and implementing a custom
> `TwoInputStreamOperator`. Then you can hook to `processWatemark{1,2}`
> methods.
>
>
>
> Let's also make sure we're on the same page on what the watermark is. You
> can think of the watermark as event time clock. It basically gives you an
> information, that *no more events with timestamp lower

RE: Buffering when connecting streams

2022-01-18 Thread Alexis Sarda-Espinosa
Hi again everyone,

It’s been a while, so first of all happy new year :)

I was revisiting this discussion and started looking at the code. However, it 
seems that all of the overloads of ConnectedStreams#process expect a 
CoProcessFunction or the Keyed counterpart, so I don’t think I can inject a 
custom TwoInputStreamOperator.

After a quick glance at the joining documentation, I wonder if I could 
accomplish what I want with a window/interval join of streams. If so, I might 
be able to avoid using state in the join function, but if I can’t avoid it, is 
it possible to use managed state in a (Process)JoinFunction? The join needs 
keys, but I don’t know if the resulting stream counts as keyed from the state’s 
point of view.

Regards,
Alexis.

From: Piotr Nowojski 
Sent: Montag, 6. Dezember 2021 08:43
To: David Morávek 
Cc: Alexis Sarda-Espinosa ; 
user@flink.apache.org
Subject: Re: Buffering when connecting streams

Hi Alexis and David,

This actually can not happen. There are mechanisms in the code to make sure 
none of the input is starved IF there is some data to be read.

The only time when input can be blocked is during the alignment phase of 
aligned checkpoints under back pressure. If there was a back pressure in your 
job it could have easily happened that checkpoint barriers would flow through 
the job graph to the CoProcessKeyedCoProcessFunction on one of the paths much 
quicker then the other, causing this faster path to be blocked until the other 
side catched up. But that would happen only during the alignment phase of the 
checkpoint, so without a backpressure for a very short period of time.

Piotrek

czw., 2 gru 2021 o 18:23 David Morávek 
mailto:d...@apache.org>> napisał(a):
I think this could happen, but I have a very limited knowledge about how the 
input gates work internally. @Piotr could definitely provide some more insight 
here.

D.

On Thu, Dec 2, 2021 at 5:54 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
I do have some logic with timers today, but it’s indeed not ideal. I guess I’ll 
have a look at TwoInputStreamOperator, but I do have related questions. You 
mentioned a sample scenario of "processing backlog" where windows fire very 
quickly; could it happen that, in such a situation, the framework calls the 
operator’s processElement1 continuously (even for several minutes) before 
calling processElement2 a single time? How does the framework decide when to 
switch the stream processing when the streams are connected?

Regards,
Alexis.

From: David Morávek mailto:d...@apache.org>>
Sent: Donnerstag, 2. Dezember 2021 17:18
To: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Buffering when connecting streams

Even with the TwoInputStreamOperator you can not "halt" the processing. You 
need to buffer these elements for example in the ListState for later 
processing. At the time the watermark of the second stream arrives, you can 
process all buffered elements that satisfy the condition.

You could probably also implement a similar (less optimized) solution with 
KeyedCoProcessFunction using event time timers.

Best,
D.

On Thu, Dec 2, 2021 at 5:12 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Yes, that sounds right, but with my current KeyedCoProcessFunction I can’t tell 
Flink to "halt" processElement1 and switch to the other stream depending on 
watermarks. I could look into TwoInputStreamOperator if you think that’s the 
best approach.

Regards,
Alexis.

From: David Morávek mailto:d...@apache.org>>
Sent: Donnerstag, 2. Dezember 2021 16:59
To: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Buffering when connecting streams

I think this would require using lower level API and implementing a custom 
`TwoInputStreamOperator`. Then you can hook to `processWatemark{1,2}` methods.

Let's also make sure we're on the same page on what the watermark is. You can 
think of the watermark as event time clock. It basically gives you an 
information, that no more events with timestamp lower than the watermark should 
appear in your stream.

You simply delay emitting of the window result from your "connect" operator, 
until watermark from the second (side output) stream passes the window's max 
timestamp (maximum timestamp that is included in the window).

Does that make sense?

Best,
D.

On Thu, Dec 2, 2021 at 4:25 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Could you elaborate on what you mean with synchronize? Buffering in the state 
would be fine, but I haven’t been able to come up with a good way of ensuring 
that all data from the side stream for a given minute is processed by 
processElement2 before all data

Re: Buffering when connecting streams

2021-12-05 Thread Piotr Nowojski
Hi Alexis and David,

This actually can not happen. There are mechanisms in the code to make sure
none of the input is starved IF there is some data to be read.

The only time when input can be blocked is during the alignment phase of
aligned checkpoints under back pressure. If there was a back pressure in
your job it could have easily happened that checkpoint barriers would flow
through the job graph to the CoProcessKeyedCoProcessFunction on one of the
paths much quicker then the other, causing this faster path to be blocked
until the other side catched up. But that would happen only during the
alignment phase of the checkpoint, so without a backpressure for a very
short period of time.

Piotrek

czw., 2 gru 2021 o 18:23 David Morávek  napisał(a):

> I think this could happen, but I have a very limited knowledge about how
> the input gates work internally. @Piotr could definitely provide some more
> insight here.
>
> D.
>
> On Thu, Dec 2, 2021 at 5:54 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
>> I do have some logic with timers today, but it’s indeed not ideal. I
>> guess I’ll have a look at TwoInputStreamOperator, but I do have related
>> questions. You mentioned a sample scenario of "processing backlog" where
>> windows fire very quickly; could it happen that, in such a situation, the
>> framework calls the operator’s processElement1 continuously (even for
>> several minutes) before calling processElement2 a single time? How does the
>> framework decide when to switch the stream processing when the streams are
>> connected?
>>
>>
>>
>> Regards,
>>
>> Alexis.
>>
>>
>>
>> *From:* David Morávek 
>> *Sent:* Donnerstag, 2. Dezember 2021 17:18
>> *To:* Alexis Sarda-Espinosa 
>> *Cc:* user@flink.apache.org
>> *Subject:* Re: Buffering when connecting streams
>>
>>
>>
>> Even with the TwoInputStreamOperator you can not "halt" the processing.
>> You need to buffer these elements for example in the ListState for later
>> processing. At the time the watermark of the second stream arrives, you can
>> process all buffered elements that satisfy the condition.
>>
>>
>>
>> You could probably also implement a similar (less optimized) solution
>> with KeyedCoProcessFunction using event time timers.
>>
>>
>>
>> Best,
>>
>> D.
>>
>>
>>
>> On Thu, Dec 2, 2021 at 5:12 PM Alexis Sarda-Espinosa <
>> alexis.sarda-espin...@microfocus.com> wrote:
>>
>> Yes, that sounds right, but with my current KeyedCoProcessFunction I
>> can’t tell Flink to "halt" processElement1 and switch to the other stream
>> depending on watermarks. I could look into TwoInputStreamOperator if you
>> think that’s the best approach.
>>
>>
>>
>> Regards,
>>
>> Alexis.
>>
>>
>>
>> *From:* David Morávek 
>> *Sent:* Donnerstag, 2. Dezember 2021 16:59
>> *To:* Alexis Sarda-Espinosa 
>> *Cc:* user@flink.apache.org
>> *Subject:* Re: Buffering when connecting streams
>>
>>
>>
>> I think this would require using lower level API and implementing a
>> custom `TwoInputStreamOperator`. Then you can hook to
>> `processWatemark{1,2}` methods.
>>
>>
>>
>> Let's also make sure we're on the same page on what the watermark is. You
>> can think of the watermark as event time clock. It basically gives you an
>> information, that *no more events with timestamp lower than the
>> watermark should appear in your stream*.
>>
>>
>>
>> You simply delay emitting of the window result from your "connect"
>> operator, until watermark from the second (side output) stream passes the
>> window's max timestamp (maximum timestamp that is included in the window).
>>
>>
>>
>> Does that make sense?
>>
>>
>>
>> Best,
>>
>> D.
>>
>>
>>
>> On Thu, Dec 2, 2021 at 4:25 PM Alexis Sarda-Espinosa <
>> alexis.sarda-espin...@microfocus.com> wrote:
>>
>> Could you elaborate on what you mean with synchronize? Buffering in the
>> state would be fine, but I haven’t been able to come up with a good way of
>> ensuring that all data from the side stream for a given minute is processed
>> by processElement2 before all data for the same (windowed) minute reaches
>> processElement1, even when considering watermarks.
>>
>>
>>
>> Regards,
>>
>> Alexis.
>>
>>
>>
>> *From:* David Morávek 
>> *Sent:* Donnerstag, 2. Dezembe

Re: Buffering when connecting streams

2021-12-02 Thread David Morávek
I think this could happen, but I have a very limited knowledge about how
the input gates work internally. @Piotr could definitely provide some more
insight here.

D.

On Thu, Dec 2, 2021 at 5:54 PM Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> wrote:

> I do have some logic with timers today, but it’s indeed not ideal. I guess
> I’ll have a look at TwoInputStreamOperator, but I do have related
> questions. You mentioned a sample scenario of "processing backlog" where
> windows fire very quickly; could it happen that, in such a situation, the
> framework calls the operator’s processElement1 continuously (even for
> several minutes) before calling processElement2 a single time? How does the
> framework decide when to switch the stream processing when the streams are
> connected?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* David Morávek 
> *Sent:* Donnerstag, 2. Dezember 2021 17:18
> *To:* Alexis Sarda-Espinosa 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Buffering when connecting streams
>
>
>
> Even with the TwoInputStreamOperator you can not "halt" the processing.
> You need to buffer these elements for example in the ListState for later
> processing. At the time the watermark of the second stream arrives, you can
> process all buffered elements that satisfy the condition.
>
>
>
> You could probably also implement a similar (less optimized) solution with
> KeyedCoProcessFunction using event time timers.
>
>
>
> Best,
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 5:12 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
> Yes, that sounds right, but with my current KeyedCoProcessFunction I can’t
> tell Flink to "halt" processElement1 and switch to the other stream
> depending on watermarks. I could look into TwoInputStreamOperator if you
> think that’s the best approach.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* David Morávek 
> *Sent:* Donnerstag, 2. Dezember 2021 16:59
> *To:* Alexis Sarda-Espinosa 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Buffering when connecting streams
>
>
>
> I think this would require using lower level API and implementing a custom
> `TwoInputStreamOperator`. Then you can hook to `processWatemark{1,2}`
> methods.
>
>
>
> Let's also make sure we're on the same page on what the watermark is. You
> can think of the watermark as event time clock. It basically gives you an
> information, that *no more events with timestamp lower than the watermark
> should appear in your stream*.
>
>
>
> You simply delay emitting of the window result from your "connect"
> operator, until watermark from the second (side output) stream passes the
> window's max timestamp (maximum timestamp that is included in the window).
>
>
>
> Does that make sense?
>
>
>
> Best,
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 4:25 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
> Could you elaborate on what you mean with synchronize? Buffering in the
> state would be fine, but I haven’t been able to come up with a good way of
> ensuring that all data from the side stream for a given minute is processed
> by processElement2 before all data for the same (windowed) minute reaches
> processElement1, even when considering watermarks.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* David Morávek 
> *Sent:* Donnerstag, 2. Dezember 2021 15:45
> *To:* Alexis Sarda-Espinosa 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Buffering when connecting streams
>
>
>
> You can not rely on order of the two streams that easily. In case you are
> for example processing backlog and the windows fire quickly, it can happen
> that it's actually faster than the second branch which has less work to do.
> This will make the pipeline non-deterministic.
>
>
>
> What you can do is to "synchronize" watermarks of both streams in your
> "connect" operator, but that of course involves buffering events in the
> state.
>
>
>
> Best,
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 3:02 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
> Hi David,
>
>
>
> A watermark step simply refers to assigning timestamps and watermarks, my
> source doesn’t do that.
>
>
>
> I have a test system with only a couple of data points per day, so there’s
> definitely no back pressure. I basically have a browser where I can see the
> results from the sink, and I found one result that should have been
> discarded but wasn’t, which makes me think that the ope

RE: Buffering when connecting streams

2021-12-02 Thread Alexis Sarda-Espinosa
I do have some logic with timers today, but it’s indeed not ideal. I guess I’ll 
have a look at TwoInputStreamOperator, but I do have related questions. You 
mentioned a sample scenario of "processing backlog" where windows fire very 
quickly; could it happen that, in such a situation, the framework calls the 
operator’s processElement1 continuously (even for several minutes) before 
calling processElement2 a single time? How does the framework decide when to 
switch the stream processing when the streams are connected?

Regards,
Alexis.

From: David Morávek 
Sent: Donnerstag, 2. Dezember 2021 17:18
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: Buffering when connecting streams

Even with the TwoInputStreamOperator you can not "halt" the processing. You 
need to buffer these elements for example in the ListState for later 
processing. At the time the watermark of the second stream arrives, you can 
process all buffered elements that satisfy the condition.

You could probably also implement a similar (less optimized) solution with 
KeyedCoProcessFunction using event time timers.

Best,
D.

On Thu, Dec 2, 2021 at 5:12 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Yes, that sounds right, but with my current KeyedCoProcessFunction I can’t tell 
Flink to "halt" processElement1 and switch to the other stream depending on 
watermarks. I could look into TwoInputStreamOperator if you think that’s the 
best approach.

Regards,
Alexis.

From: David Morávek mailto:d...@apache.org>>
Sent: Donnerstag, 2. Dezember 2021 16:59
To: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Buffering when connecting streams

I think this would require using lower level API and implementing a custom 
`TwoInputStreamOperator`. Then you can hook to `processWatemark{1,2}` methods.

Let's also make sure we're on the same page on what the watermark is. You can 
think of the watermark as event time clock. It basically gives you an 
information, that no more events with timestamp lower than the watermark should 
appear in your stream.

You simply delay emitting of the window result from your "connect" operator, 
until watermark from the second (side output) stream passes the window's max 
timestamp (maximum timestamp that is included in the window).

Does that make sense?

Best,
D.

On Thu, Dec 2, 2021 at 4:25 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Could you elaborate on what you mean with synchronize? Buffering in the state 
would be fine, but I haven’t been able to come up with a good way of ensuring 
that all data from the side stream for a given minute is processed by 
processElement2 before all data for the same (windowed) minute reaches 
processElement1, even when considering watermarks.

Regards,
Alexis.

From: David Morávek mailto:d...@apache.org>>
Sent: Donnerstag, 2. Dezember 2021 15:45
To: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Buffering when connecting streams

You can not rely on order of the two streams that easily. In case you are for 
example processing backlog and the windows fire quickly, it can happen that 
it's actually faster than the second branch which has less work to do. This 
will make the pipeline non-deterministic.

What you can do is to "synchronize" watermarks of both streams in your 
"connect" operator, but that of course involves buffering events in the state.

Best,
D.

On Thu, Dec 2, 2021 at 3:02 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Hi David,

A watermark step simply refers to assigning timestamps and watermarks, my 
source doesn’t do that.

I have a test system with only a couple of data points per day, so there’s 
definitely no back pressure. I basically have a browser where I can see the 
results from the sink, and I found one result that should have been discarded 
but wasn’t, which makes me think that the operator processed the "open" state 
but waited too long and didn’t process the "close" state before the window 
fired. I can also see that the closure (going from open to close) triggered on 
second 17, and my windows are evaluated every minute, so it wasn’t a race 
condition.

Regards,
Alexis.

From: David Morávek mailto:d...@apache.org>>
Sent: Donnerstag, 2. Dezember 2021 14:52
To: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Buffering when connecting streams

Hi Alexis,

I'm not sure what "watermark" step refers to in you graph, but in general I'd 
say your intuition is correct.

For the "buffering" part, each sub-

Re: Buffering when connecting streams

2021-12-02 Thread David Morávek
Even with the TwoInputStreamOperator you can not "halt" the processing. You
need to buffer these elements for example in the ListState for later
processing. At the time the watermark of the second stream arrives, you can
process all buffered elements that satisfy the condition.

You could probably also implement a similar (less optimized) solution
with KeyedCoProcessFunction
using event time timers.

Best,
D.

On Thu, Dec 2, 2021 at 5:12 PM Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> wrote:

> Yes, that sounds right, but with my current KeyedCoProcessFunction I
> can’t tell Flink to "halt" processElement1 and switch to the other stream
> depending on watermarks. I could look into TwoInputStreamOperator if you
> think that’s the best approach.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* David Morávek 
> *Sent:* Donnerstag, 2. Dezember 2021 16:59
> *To:* Alexis Sarda-Espinosa 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Buffering when connecting streams
>
>
>
> I think this would require using lower level API and implementing a custom
> `TwoInputStreamOperator`. Then you can hook to `processWatemark{1,2}`
> methods.
>
>
>
> Let's also make sure we're on the same page on what the watermark is. You
> can think of the watermark as event time clock. It basically gives you an
> information, that *no more events with timestamp lower than the watermark
> should appear in your stream*.
>
>
>
> You simply delay emitting of the window result from your "connect"
> operator, until watermark from the second (side output) stream passes the
> window's max timestamp (maximum timestamp that is included in the window).
>
>
>
> Does that make sense?
>
>
>
> Best,
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 4:25 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
> Could you elaborate on what you mean with synchronize? Buffering in the
> state would be fine, but I haven’t been able to come up with a good way of
> ensuring that all data from the side stream for a given minute is processed
> by processElement2 before all data for the same (windowed) minute reaches
> processElement1, even when considering watermarks.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* David Morávek 
> *Sent:* Donnerstag, 2. Dezember 2021 15:45
> *To:* Alexis Sarda-Espinosa 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Buffering when connecting streams
>
>
>
> You can not rely on order of the two streams that easily. In case you are
> for example processing backlog and the windows fire quickly, it can happen
> that it's actually faster than the second branch which has less work to do.
> This will make the pipeline non-deterministic.
>
>
>
> What you can do is to "synchronize" watermarks of both streams in your
> "connect" operator, but that of course involves buffering events in the
> state.
>
>
>
> Best,
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 3:02 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
> Hi David,
>
>
>
> A watermark step simply refers to assigning timestamps and watermarks, my
> source doesn’t do that.
>
>
>
> I have a test system with only a couple of data points per day, so there’s
> definitely no back pressure. I basically have a browser where I can see the
> results from the sink, and I found one result that should have been
> discarded but wasn’t, which makes me think that the operator processed the
> "open" state but waited too long and didn’t process the "close" state
> before the window fired. I can also see that the closure (going from open
> to close) triggered on second 17, and my windows are evaluated every
> minute, so it wasn’t a race condition.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* David Morávek 
> *Sent:* Donnerstag, 2. Dezember 2021 14:52
> *To:* Alexis Sarda-Espinosa 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Buffering when connecting streams
>
>
>
> Hi Alexis,
>
>
>
> I'm not sure what "watermark" step refers to in you graph, but in general
> I'd say your intuition is correct.
>
>
>
> For the "buffering" part, each sub-task needs to send data via data
> exchange (last operator in chain) has an output buffer and the operator
> that consumes this data (maybe on different machine) has an input buffer
> (buffer de-bloating feature can help to mitigate excessive buffering in
> case of back-pressure).
>
>
>
> but I’m not sure if this actually happens
>
>
>
> How are you tryin

RE: Buffering when connecting streams

2021-12-02 Thread Alexis Sarda-Espinosa
Yes, that sounds right, but with my current KeyedCoProcessFunction I can’t tell 
Flink to "halt" processElement1 and switch to the other stream depending on 
watermarks. I could look into TwoInputStreamOperator if you think that’s the 
best approach.

Regards,
Alexis.

From: David Morávek 
Sent: Donnerstag, 2. Dezember 2021 16:59
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: Buffering when connecting streams

I think this would require using lower level API and implementing a custom 
`TwoInputStreamOperator`. Then you can hook to `processWatemark{1,2}` methods.

Let's also make sure we're on the same page on what the watermark is. You can 
think of the watermark as event time clock. It basically gives you an 
information, that no more events with timestamp lower than the watermark should 
appear in your stream.

You simply delay emitting of the window result from your "connect" operator, 
until watermark from the second (side output) stream passes the window's max 
timestamp (maximum timestamp that is included in the window).

Does that make sense?

Best,
D.

On Thu, Dec 2, 2021 at 4:25 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Could you elaborate on what you mean with synchronize? Buffering in the state 
would be fine, but I haven’t been able to come up with a good way of ensuring 
that all data from the side stream for a given minute is processed by 
processElement2 before all data for the same (windowed) minute reaches 
processElement1, even when considering watermarks.

Regards,
Alexis.

From: David Morávek mailto:d...@apache.org>>
Sent: Donnerstag, 2. Dezember 2021 15:45
To: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Buffering when connecting streams

You can not rely on order of the two streams that easily. In case you are for 
example processing backlog and the windows fire quickly, it can happen that 
it's actually faster than the second branch which has less work to do. This 
will make the pipeline non-deterministic.

What you can do is to "synchronize" watermarks of both streams in your 
"connect" operator, but that of course involves buffering events in the state.

Best,
D.

On Thu, Dec 2, 2021 at 3:02 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Hi David,

A watermark step simply refers to assigning timestamps and watermarks, my 
source doesn’t do that.

I have a test system with only a couple of data points per day, so there’s 
definitely no back pressure. I basically have a browser where I can see the 
results from the sink, and I found one result that should have been discarded 
but wasn’t, which makes me think that the operator processed the "open" state 
but waited too long and didn’t process the "close" state before the window 
fired. I can also see that the closure (going from open to close) triggered on 
second 17, and my windows are evaluated every minute, so it wasn’t a race 
condition.

Regards,
Alexis.

From: David Morávek mailto:d...@apache.org>>
Sent: Donnerstag, 2. Dezember 2021 14:52
To: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Buffering when connecting streams

Hi Alexis,

I'm not sure what "watermark" step refers to in you graph, but in general I'd 
say your intuition is correct.

For the "buffering" part, each sub-task needs to send data via data exchange 
(last operator in chain) has an output buffer and the operator that consumes 
this data (maybe on different machine) has an input buffer (buffer de-bloating 
feature can help to mitigate excessive buffering in case of back-pressure).

but I’m not sure if this actually happens

How are you trying to verify this? Also can you check whether the operators are 
not back-pressured?

Best,
D.

On Thu, Dec 2, 2021 at 12:27 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Hello,

I have a use case with event-time processing that ends up with a DAG roughly 
like this:

source -> filter -> keyBy -> watermark -> window -> process -> keyBy_2 -> 
connect (KeyedCoProcessFunction) -> sink
   |   /
  (side output) -> keyBy -> watermark /


(In case the text gets mangled in the e-mail, the side output comes from the 
filter and joins back with the connect operation)

The filter takes all data and its main output is all _valid_ data with state 
"open"; the side output is all _valid_ data regardless of state.

The goal of the KeyedCoProcessFunction is to check the results of the (sliding) 
window. The window only receives open states, but KeyedCoProcessFun

Re: Buffering when connecting streams

2021-12-02 Thread David Morávek
I think this would require using lower level API and implementing a custom
`TwoInputStreamOperator`. Then you can hook to `processWatemark{1,2}`
methods.

Let's also make sure we're on the same page on what the watermark is. You
can think of the watermark as event time clock. It basically gives you an
information, that *no more events with timestamp lower than the watermark
should appear in your stream*.

You simply delay emitting of the window result from your "connect"
operator, until watermark from the second (side output) stream passes the
window's max timestamp (maximum timestamp that is included in the window).

Does that make sense?

Best,
D.

On Thu, Dec 2, 2021 at 4:25 PM Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> wrote:

> Could you elaborate on what you mean with synchronize? Buffering in the
> state would be fine, but I haven’t been able to come up with a good way of
> ensuring that all data from the side stream for a given minute is processed
> by processElement2 before all data for the same (windowed) minute reaches
> processElement1, even when considering watermarks.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* David Morávek 
> *Sent:* Donnerstag, 2. Dezember 2021 15:45
> *To:* Alexis Sarda-Espinosa 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Buffering when connecting streams
>
>
>
> You can not rely on order of the two streams that easily. In case you are
> for example processing backlog and the windows fire quickly, it can happen
> that it's actually faster than the second branch which has less work to do.
> This will make the pipeline non-deterministic.
>
>
>
> What you can do is to "synchronize" watermarks of both streams in your
> "connect" operator, but that of course involves buffering events in the
> state.
>
>
>
> Best,
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 3:02 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
> Hi David,
>
>
>
> A watermark step simply refers to assigning timestamps and watermarks, my
> source doesn’t do that.
>
>
>
> I have a test system with only a couple of data points per day, so there’s
> definitely no back pressure. I basically have a browser where I can see the
> results from the sink, and I found one result that should have been
> discarded but wasn’t, which makes me think that the operator processed the
> "open" state but waited too long and didn’t process the "close" state
> before the window fired. I can also see that the closure (going from open
> to close) triggered on second 17, and my windows are evaluated every
> minute, so it wasn’t a race condition.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* David Morávek 
> *Sent:* Donnerstag, 2. Dezember 2021 14:52
> *To:* Alexis Sarda-Espinosa 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Buffering when connecting streams
>
>
>
> Hi Alexis,
>
>
>
> I'm not sure what "watermark" step refers to in you graph, but in general
> I'd say your intuition is correct.
>
>
>
> For the "buffering" part, each sub-task needs to send data via data
> exchange (last operator in chain) has an output buffer and the operator
> that consumes this data (maybe on different machine) has an input buffer
> (buffer de-bloating feature can help to mitigate excessive buffering in
> case of back-pressure).
>
>
>
> but I’m not sure if this actually happens
>
>
>
> How are you trying to verify this? Also can you check whether the
> operators are not back-pressured?
>
>
>
> Best,
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 12:27 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
> Hello,
>
>
>
> I have a use case with event-time processing that ends up with a DAG
> roughly like this:
>
>
>
> source -> filter -> keyBy -> watermark -> window -> process -> keyBy_2 ->
> connect (KeyedCoProcessFunction) -> sink
>
>|
>   /
>
>   (side output) -> keyBy -> watermark
> /
>
>
>
>
>
> (In case the text gets mangled in the e-mail, the side output comes from
> the filter and joins back with the connect operation)
>
>
>
> The filter takes all data and its main output is all _*valid*_ data with
> state "open"; the side output is all _*valid*_ data regardless of state.
>
>
>
> The goal of the KeyedCoProcessFunction is to check the results of the
> (sliding) window. The window only receives open states, but
> KeyedCoProcessFunction receives all valid data and should discard results
> from the main stream if states changed from "open" to something else before
> the window was evaluated.
>
>
>
> I would have expected all data from the side output to be processed
> roughly immediately in KeyedCoProcessFunction’s processElement2 because
> there’s no windowing in the side stream, but I’m not sure if this actually
> happens, maybe the side stream (or both streams) buffers some data before
> passing it to the connected stream? If yes, is there any way I could tune
> this?
>
>
>
> Regards,
>
> Alexis.
>
>
>
>


RE: Buffering when connecting streams

2021-12-02 Thread Alexis Sarda-Espinosa
Could you elaborate on what you mean with synchronize? Buffering in the state 
would be fine, but I haven’t been able to come up with a good way of ensuring 
that all data from the side stream for a given minute is processed by 
processElement2 before all data for the same (windowed) minute reaches 
processElement1, even when considering watermarks.

Regards,
Alexis.

From: David Morávek 
Sent: Donnerstag, 2. Dezember 2021 15:45
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: Buffering when connecting streams

You can not rely on order of the two streams that easily. In case you are for 
example processing backlog and the windows fire quickly, it can happen that 
it's actually faster than the second branch which has less work to do. This 
will make the pipeline non-deterministic.

What you can do is to "synchronize" watermarks of both streams in your 
"connect" operator, but that of course involves buffering events in the state.

Best,
D.

On Thu, Dec 2, 2021 at 3:02 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Hi David,

A watermark step simply refers to assigning timestamps and watermarks, my 
source doesn’t do that.

I have a test system with only a couple of data points per day, so there’s 
definitely no back pressure. I basically have a browser where I can see the 
results from the sink, and I found one result that should have been discarded 
but wasn’t, which makes me think that the operator processed the "open" state 
but waited too long and didn’t process the "close" state before the window 
fired. I can also see that the closure (going from open to close) triggered on 
second 17, and my windows are evaluated every minute, so it wasn’t a race 
condition.

Regards,
Alexis.

From: David Morávek mailto:d...@apache.org>>
Sent: Donnerstag, 2. Dezember 2021 14:52
To: Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Buffering when connecting streams

Hi Alexis,

I'm not sure what "watermark" step refers to in you graph, but in general I'd 
say your intuition is correct.

For the "buffering" part, each sub-task needs to send data via data exchange 
(last operator in chain) has an output buffer and the operator that consumes 
this data (maybe on different machine) has an input buffer (buffer de-bloating 
feature can help to mitigate excessive buffering in case of back-pressure).

but I’m not sure if this actually happens

How are you trying to verify this? Also can you check whether the operators are 
not back-pressured?

Best,
D.

On Thu, Dec 2, 2021 at 12:27 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Hello,

I have a use case with event-time processing that ends up with a DAG roughly 
like this:

source -> filter -> keyBy -> watermark -> window -> process -> keyBy_2 -> 
connect (KeyedCoProcessFunction) -> sink
   |   /
  (side output) -> keyBy -> watermark /


(In case the text gets mangled in the e-mail, the side output comes from the 
filter and joins back with the connect operation)

The filter takes all data and its main output is all _valid_ data with state 
"open"; the side output is all _valid_ data regardless of state.

The goal of the KeyedCoProcessFunction is to check the results of the (sliding) 
window. The window only receives open states, but KeyedCoProcessFunction 
receives all valid data and should discard results from the main stream if 
states changed from "open" to something else before the window was evaluated.

I would have expected all data from the side output to be processed roughly 
immediately in KeyedCoProcessFunction’s processElement2 because there’s no 
windowing in the side stream, but I’m not sure if this actually happens, maybe 
the side stream (or both streams) buffers some data before passing it to the 
connected stream? If yes, is there any way I could tune this?

Regards,
Alexis.



Re: Buffering when connecting streams

2021-12-02 Thread David Morávek
You can not rely on order of the two streams that easily. In case you are
for example processing backlog and the windows fire quickly, it can happen
that it's actually faster than the second branch which has less work to do.
This will make the pipeline non-deterministic.

What you can do is to "synchronize" watermarks of both streams in your
"connect" operator, but that of course involves buffering events in the
state.

Best,
D.

On Thu, Dec 2, 2021 at 3:02 PM Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> wrote:

> Hi David,
>
>
>
> A watermark step simply refers to assigning timestamps and watermarks, my
> source doesn’t do that.
>
>
>
> I have a test system with only a couple of data points per day, so there’s
> definitely no back pressure. I basically have a browser where I can see the
> results from the sink, and I found one result that should have been
> discarded but wasn’t, which makes me think that the operator processed the
> "open" state but waited too long and didn’t process the "close" state
> before the window fired. I can also see that the closure (going from open
> to close) triggered on second 17, and my windows are evaluated every
> minute, so it wasn’t a race condition.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* David Morávek 
> *Sent:* Donnerstag, 2. Dezember 2021 14:52
> *To:* Alexis Sarda-Espinosa 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Buffering when connecting streams
>
>
>
> Hi Alexis,
>
>
>
> I'm not sure what "watermark" step refers to in you graph, but in general
> I'd say your intuition is correct.
>
>
>
> For the "buffering" part, each sub-task needs to send data via data
> exchange (last operator in chain) has an output buffer and the operator
> that consumes this data (maybe on different machine) has an input buffer
> (buffer de-bloating feature can help to mitigate excessive buffering in
> case of back-pressure).
>
>
>
> but I’m not sure if this actually happens
>
>
>
> How are you trying to verify this? Also can you check whether the
> operators are not back-pressured?
>
>
>
> Best,
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 12:27 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
> Hello,
>
>
>
> I have a use case with event-time processing that ends up with a DAG
> roughly like this:
>
>
>
> source -> filter -> keyBy -> watermark -> window -> process -> keyBy_2 ->
> connect (KeyedCoProcessFunction) -> sink
>
>|
>   /
>
>   (side output) -> keyBy -> watermark
> /
>
>
>
>
>
> (In case the text gets mangled in the e-mail, the side output comes from
> the filter and joins back with the connect operation)
>
>
>
> The filter takes all data and its main output is all _*valid*_ data with
> state "open"; the side output is all _*valid*_ data regardless of state.
>
>
>
> The goal of the KeyedCoProcessFunction is to check the results of the
> (sliding) window. The window only receives open states, but
> KeyedCoProcessFunction receives all valid data and should discard results
> from the main stream if states changed from "open" to something else before
> the window was evaluated.
>
>
>
> I would have expected all data from the side output to be processed
> roughly immediately in KeyedCoProcessFunction’s processElement2 because
> there’s no windowing in the side stream, but I’m not sure if this actually
> happens, maybe the side stream (or both streams) buffers some data before
> passing it to the connected stream? If yes, is there any way I could tune
> this?
>
>
>
> Regards,
>
> Alexis.
>
>
>
>


RE: Buffering when connecting streams

2021-12-02 Thread Alexis Sarda-Espinosa
Hi David,

A watermark step simply refers to assigning timestamps and watermarks, my 
source doesn’t do that.

I have a test system with only a couple of data points per day, so there’s 
definitely no back pressure. I basically have a browser where I can see the 
results from the sink, and I found one result that should have been discarded 
but wasn’t, which makes me think that the operator processed the "open" state 
but waited too long and didn’t process the "close" state before the window 
fired. I can also see that the closure (going from open to close) triggered on 
second 17, and my windows are evaluated every minute, so it wasn’t a race 
condition.

Regards,
Alexis.

From: David Morávek 
Sent: Donnerstag, 2. Dezember 2021 14:52
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: Buffering when connecting streams

Hi Alexis,

I'm not sure what "watermark" step refers to in you graph, but in general I'd 
say your intuition is correct.

For the "buffering" part, each sub-task needs to send data via data exchange 
(last operator in chain) has an output buffer and the operator that consumes 
this data (maybe on different machine) has an input buffer (buffer de-bloating 
feature can help to mitigate excessive buffering in case of back-pressure).

but I’m not sure if this actually happens

How are you trying to verify this? Also can you check whether the operators are 
not back-pressured?

Best,
D.

On Thu, Dec 2, 2021 at 12:27 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
Hello,

I have a use case with event-time processing that ends up with a DAG roughly 
like this:

source -> filter -> keyBy -> watermark -> window -> process -> keyBy_2 -> 
connect (KeyedCoProcessFunction) -> sink
   |   /
  (side output) -> keyBy -> watermark /


(In case the text gets mangled in the e-mail, the side output comes from the 
filter and joins back with the connect operation)

The filter takes all data and its main output is all _valid_ data with state 
"open"; the side output is all _valid_ data regardless of state.

The goal of the KeyedCoProcessFunction is to check the results of the (sliding) 
window. The window only receives open states, but KeyedCoProcessFunction 
receives all valid data and should discard results from the main stream if 
states changed from "open" to something else before the window was evaluated.

I would have expected all data from the side output to be processed roughly 
immediately in KeyedCoProcessFunction’s processElement2 because there’s no 
windowing in the side stream, but I’m not sure if this actually happens, maybe 
the side stream (or both streams) buffers some data before passing it to the 
connected stream? If yes, is there any way I could tune this?

Regards,
Alexis.



Re: Buffering when connecting streams

2021-12-02 Thread David Morávek
Hi Alexis,

I'm not sure what "watermark" step refers to in you graph, but in general
I'd say your intuition is correct.

For the "buffering" part, each sub-task needs to send data via data
exchange (last operator in chain) has an output buffer and the operator
that consumes this data (maybe on different machine) has an input buffer
(buffer de-bloating feature can help to mitigate excessive buffering in
case of back-pressure).

but I’m not sure if this actually happens
>

How are you trying to verify this? Also can you check whether the operators
are not back-pressured?

Best,
D.

On Thu, Dec 2, 2021 at 12:27 PM Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> wrote:

> Hello,
>
>
>
> I have a use case with event-time processing that ends up with a DAG
> roughly like this:
>
>
>
> source -> filter -> keyBy -> watermark -> window -> process -> keyBy_2 ->
> connect (KeyedCoProcessFunction) -> sink
>
>|
>   /
>
>   (side output) -> keyBy -> watermark
> /
>
>
>
>
>
> (In case the text gets mangled in the e-mail, the side output comes from
> the filter and joins back with the connect operation)
>
>
>
> The filter takes all data and its main output is all _*valid*_ data with
> state "open"; the side output is all _*valid*_ data regardless of state.
>
>
>
> The goal of the KeyedCoProcessFunction is to check the results of the
> (sliding) window. The window only receives open states, but
> KeyedCoProcessFunction receives all valid data and should discard results
> from the main stream if states changed from "open" to something else before
> the window was evaluated.
>
>
>
> I would have expected all data from the side output to be processed
> roughly immediately in KeyedCoProcessFunction’s processElement2 because
> there’s no windowing in the side stream, but I’m not sure if this actually
> happens, maybe the side stream (or both streams) buffers some data before
> passing it to the connected stream? If yes, is there any way I could tune
> this?
>
>
>
> Regards,
>
> Alexis.
>
>
>


Buffering when connecting streams

2021-12-02 Thread Alexis Sarda-Espinosa
Hello,

I have a use case with event-time processing that ends up with a DAG roughly 
like this:

source -> filter -> keyBy -> watermark -> window -> process -> keyBy_2 -> 
connect (KeyedCoProcessFunction) -> sink
   |   /
  (side output) -> keyBy -> watermark /


(In case the text gets mangled in the e-mail, the side output comes from the 
filter and joins back with the connect operation)

The filter takes all data and its main output is all _valid_ data with state 
"open"; the side output is all _valid_ data regardless of state.

The goal of the KeyedCoProcessFunction is to check the results of the (sliding) 
window. The window only receives open states, but KeyedCoProcessFunction 
receives all valid data and should discard results from the main stream if 
states changed from "open" to something else before the window was evaluated.

I would have expected all data from the side output to be processed roughly 
immediately in KeyedCoProcessFunction's processElement2 because there's no 
windowing in the side stream, but I'm not sure if this actually happens, maybe 
the side stream (or both streams) buffers some data before passing it to the 
connected stream? If yes, is there any way I could tune this?

Regards,
Alexis.