Re: Question on late data handling in Beam streaming mode

2021-04-26 Thread Tao Li
Thanks folks. This is really informative!

From: Kenneth Knowles 
Reply-To: "user@beam.apache.org" 
Date: Friday, April 23, 2021 at 9:34 AM
To: Reuven Lax 
Cc: user , Kenneth Knowles , Kelly Smith 
, Lian Jiang 
Subject: Re: Question on late data handling in Beam streaming mode

Reuven's answer will result in a group by key (but not window) where no data is 
dropped and you get deltas for each key. Downstream consumers can recombine the 
deltas to get per-key aggregation. So instead of putting the time interval into 
the window, you put it into the key, and then you get the same grouped 
aggregation.

There are (at least) two other ways to do this:

1. You can set allowed lateness to a high value.
2. You can use a ParDo and outputWithTimestamp [1] to set the timestamps to 
arrival time. I illustrated this in some older talks [2].

Kenn

[1] 
https://github.com/apache/beam/blob/dc636be57900c8ad9b6b9e50b08dad64be8aee40/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L184<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fdc636be57900c8ad9b6b9e50b08dad64be8aee40%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Ftransforms%2FDoFn.java%23L184=04%7C01%7Ctaol%40zillow.com%7C7c11d6f8809f4f46887108d90675a90a%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637547924683482682%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=vs9%2FRa%2B8ya5%2FIWxtMUa8KUuRvEH4vUbfyDAr%2BbJN3IM%3D=0>
[2] 
https://docs.google.com/presentation/d/1smGXb-0GGX_Fid1z3WWzZJWtyBjBA3Mo3t4oeRjJoZI/present?slide=id.g142c2fd96f_0_134<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.google.com%2Fpresentation%2Fd%2F1smGXb-0GGX_Fid1z3WWzZJWtyBjBA3Mo3t4oeRjJoZI%2Fpresent%3Fslide%3Did.g142c2fd96f_0_134=04%7C01%7Ctaol%40zillow.com%7C7c11d6f8809f4f46887108d90675a90a%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637547924683492644%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=W6p9rfGk9vLqEW3p%2FlTN2c9Jbf%2B1qacEzu4wX36OVoE%3D=0>

On Fri, Apr 23, 2021 at 8:32 AM Reuven Lax 
mailto:re...@google.com>> wrote:
You can definitely group by processing time. The way to do this in Beam is as 
follows

Window.into(new GlobalWindows())
.triggering(AfterWatermark.pastEndOfWindow() 
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))
.discardingFiredPanes());

The syntax is a bit unfortunately wordy, but the idea is that you are creating 
a single event-time window that encompasses all time, and "triggering" an 
aggregation every 30 seconds based on processing time.

On Fri, Apr 23, 2021 at 8:14 AM Tao Li 
mailto:t...@zillow.com>> wrote:
Thanks @Kenneth Knowles<mailto:k...@apache.org>. I understand we need to 
specify a window for groupby so that the app knowns when processing is “done” 
to output result.

Is it possible to specify a event arrival/processing time based window for 
groupby? The purpose is to avoid dropping of late events. With a event 
processing time based window, the app will periodically output the result based 
on all events that arrived in that window, and a late arriving event will fall 
into whatever window covers its arrival time and thus that late data will not 
get lost.

Does Beam support this kind of mechanism? Thanks.

From: Kenneth Knowles mailto:k...@apache.org>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
mailto:user@beam.apache.org>>
Date: Thursday, April 22, 2021 at 1:49 PM
To: user mailto:user@beam.apache.org>>
Cc: Kelly Smith mailto:kell...@zillowgroup.com>>, Lian 
Jiang mailto:li...@zillowgroup.com>>
Subject: Re: Question on late data handling in Beam streaming mode

Hello!

In a streaming app, you have two choices: wait forever and never have any 
output OR use some method to decide that aggregation is "done".

In Beam, the way you decide that aggregation is "done" is the watermark. When 
the watermark predicts no more data for an aggregation, then the aggregation is 
done. For example GROUP BY  is "done" when no more data will arrive for 
that minute. At this point, your result is produced. More data may arrive, and 
it is ignored. The watermark is determined by the IO connector to be the best 
heuristic available. You can configure "allowed lateness" for an aggregation to 
allow out of order data.

Kenn

On Thu, Apr 22, 2021 at 1:26 PM Tao Li 
mailto:t...@zillow.com>> wrote:
Hi Beam community,

I am wondering if there is a risk of losing late data from a Beam stream app 
due to watermarking?

I just went through this design doc and noticed the “droppable” definition 
there: 
https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit#<https://nam11.safelinks.protection.o

Re: Question on late data handling in Beam streaming mode

2021-04-23 Thread Kenneth Knowles
Reuven's answer will result in a group by key (but not window) where no
data is dropped and you get deltas for each key. Downstream consumers can
recombine the deltas to get per-key aggregation. So instead of putting the
time interval into the window, you put it into the key, and then you get
the same grouped aggregation.

There are (at least) two other ways to do this:

1. You can set allowed lateness to a high value.
2. You can use a ParDo and outputWithTimestamp [1] to set the timestamps to
arrival time. I illustrated this in some older talks [2].

Kenn

[1]
https://github.com/apache/beam/blob/dc636be57900c8ad9b6b9e50b08dad64be8aee40/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L184
[2]
https://docs.google.com/presentation/d/1smGXb-0GGX_Fid1z3WWzZJWtyBjBA3Mo3t4oeRjJoZI/present?slide=id.g142c2fd96f_0_134

On Fri, Apr 23, 2021 at 8:32 AM Reuven Lax  wrote:

> You can definitely group by processing time. The way to do this in Beam is
> as follows
>
> Window.into(new GlobalWindows())
> .triggering(AfterWatermark.pastEndOfWindow()
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))
> .discardingFiredPanes());
>
> The syntax is a bit unfortunately wordy, but the idea is that you are
> creating a single event-time window that encompasses all time, and
> "triggering" an aggregation every 30 seconds based on processing time.
>
> On Fri, Apr 23, 2021 at 8:14 AM Tao Li  wrote:
>
>> Thanks @Kenneth Knowles . I understand we need to
>> specify a window for groupby so that the app knowns when processing is
>> “done” to output result.
>>
>>
>>
>> Is it possible to specify a event arrival/processing time based window
>> for groupby? The purpose is to avoid dropping of late events. With a event
>> processing time based window, the app will periodically output the result
>> based on all events that arrived in that window, and a late arriving event
>> will fall into whatever window covers its arrival time and thus that late
>> data will not get lost.
>>
>>
>>
>> Does Beam support this kind of mechanism? Thanks.
>>
>>
>>
>> *From: *Kenneth Knowles 
>> *Reply-To: *"user@beam.apache.org" 
>> *Date: *Thursday, April 22, 2021 at 1:49 PM
>> *To: *user 
>> *Cc: *Kelly Smith , Lian Jiang <
>> li...@zillowgroup.com>
>> *Subject: *Re: Question on late data handling in Beam streaming mode
>>
>>
>>
>> Hello!
>>
>>
>>
>> In a streaming app, you have two choices: wait forever and never have any
>> output OR use some method to decide that aggregation is "done".
>>
>>
>>
>> In Beam, the way you decide that aggregation is "done" is the watermark.
>> When the watermark predicts no more data for an aggregation, then the
>> aggregation is done. For example GROUP BY  is "done" when no more
>> data will arrive for that minute. At this point, your result is produced.
>> More data may arrive, and it is ignored. The watermark is determined by the
>> IO connector to be the best heuristic available. You can configure "allowed
>> lateness" for an aggregation to allow out of order data.
>>
>>
>>
>> Kenn
>>
>>
>>
>> On Thu, Apr 22, 2021 at 1:26 PM Tao Li  wrote:
>>
>> Hi Beam community,
>>
>>
>>
>> I am wondering if there is a risk of losing late data from a Beam stream
>> app due to watermarking?
>>
>>
>>
>> I just went through this design doc and noticed the “droppable”
>> definition there:
>> https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit#
>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.google.com%2Fdocument%2Fd%2F12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y%2Fedit%23=04%7C01%7Ctaol%40zillow.com%7C5f68c051a16843dc6e5f08d905d016dc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637547213557227210%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=2Gjz8DNW5JDbFUie010%2FhrEiKajPR7sMMb67lC8vHrU%3D=0>
>>
>>
>>
>> Can you please confirm if it’s possible for us to lose some data in a
>> stream app in practice? If that’s possible, what would be the best practice
>> to avoid data loss? Thanks!
>>
>>
>>
>>


Re: Question on late data handling in Beam streaming mode

2021-04-23 Thread Reuven Lax
You can definitely group by processing time. The way to do this in Beam is
as follows

Window.into(new GlobalWindows())
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))
.discardingFiredPanes());

The syntax is a bit unfortunately wordy, but the idea is that you are
creating a single event-time window that encompasses all time, and
"triggering" an aggregation every 30 seconds based on processing time.

On Fri, Apr 23, 2021 at 8:14 AM Tao Li  wrote:

> Thanks @Kenneth Knowles . I understand we need to
> specify a window for groupby so that the app knowns when processing is
> “done” to output result.
>
>
>
> Is it possible to specify a event arrival/processing time based window for
> groupby? The purpose is to avoid dropping of late events. With a event
> processing time based window, the app will periodically output the result
> based on all events that arrived in that window, and a late arriving event
> will fall into whatever window covers its arrival time and thus that late
> data will not get lost.
>
>
>
> Does Beam support this kind of mechanism? Thanks.
>
>
>
> *From: *Kenneth Knowles 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Thursday, April 22, 2021 at 1:49 PM
> *To: *user 
> *Cc: *Kelly Smith , Lian Jiang <
> li...@zillowgroup.com>
> *Subject: *Re: Question on late data handling in Beam streaming mode
>
>
>
> Hello!
>
>
>
> In a streaming app, you have two choices: wait forever and never have any
> output OR use some method to decide that aggregation is "done".
>
>
>
> In Beam, the way you decide that aggregation is "done" is the watermark.
> When the watermark predicts no more data for an aggregation, then the
> aggregation is done. For example GROUP BY  is "done" when no more
> data will arrive for that minute. At this point, your result is produced.
> More data may arrive, and it is ignored. The watermark is determined by the
> IO connector to be the best heuristic available. You can configure "allowed
> lateness" for an aggregation to allow out of order data.
>
>
>
> Kenn
>
>
>
> On Thu, Apr 22, 2021 at 1:26 PM Tao Li  wrote:
>
> Hi Beam community,
>
>
>
> I am wondering if there is a risk of losing late data from a Beam stream
> app due to watermarking?
>
>
>
> I just went through this design doc and noticed the “droppable” definition
> there:
> https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit#
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.google.com%2Fdocument%2Fd%2F12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y%2Fedit%23=04%7C01%7Ctaol%40zillow.com%7C5f68c051a16843dc6e5f08d905d016dc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637547213557227210%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=2Gjz8DNW5JDbFUie010%2FhrEiKajPR7sMMb67lC8vHrU%3D=0>
>
>
>
> Can you please confirm if it’s possible for us to lose some data in a
> stream app in practice? If that’s possible, what would be the best practice
> to avoid data loss? Thanks!
>
>
>
>


Re: Question on late data handling in Beam streaming mode

2021-04-23 Thread Tao Li
Thanks @Kenneth Knowles<mailto:k...@apache.org>. I understand we need to 
specify a window for groupby so that the app knowns when processing is “done” 
to output result.

Is it possible to specify a event arrival/processing time based window for 
groupby? The purpose is to avoid dropping of late events. With a event 
processing time based window, the app will periodically output the result based 
on all events that arrived in that window, and a late arriving event will fall 
into whatever window covers its arrival time and thus that late data will not 
get lost.

Does Beam support this kind of mechanism? Thanks.

From: Kenneth Knowles 
Reply-To: "user@beam.apache.org" 
Date: Thursday, April 22, 2021 at 1:49 PM
To: user 
Cc: Kelly Smith , Lian Jiang 
Subject: Re: Question on late data handling in Beam streaming mode

Hello!

In a streaming app, you have two choices: wait forever and never have any 
output OR use some method to decide that aggregation is "done".

In Beam, the way you decide that aggregation is "done" is the watermark. When 
the watermark predicts no more data for an aggregation, then the aggregation is 
done. For example GROUP BY  is "done" when no more data will arrive for 
that minute. At this point, your result is produced. More data may arrive, and 
it is ignored. The watermark is determined by the IO connector to be the best 
heuristic available. You can configure "allowed lateness" for an aggregation to 
allow out of order data.

Kenn

On Thu, Apr 22, 2021 at 1:26 PM Tao Li 
mailto:t...@zillow.com>> wrote:
Hi Beam community,

I am wondering if there is a risk of losing late data from a Beam stream app 
due to watermarking?

I just went through this design doc and noticed the “droppable” definition 
there: 
https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit#<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.google.com%2Fdocument%2Fd%2F12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y%2Fedit%23=04%7C01%7Ctaol%40zillow.com%7C5f68c051a16843dc6e5f08d905d016dc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637547213557227210%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=2Gjz8DNW5JDbFUie010%2FhrEiKajPR7sMMb67lC8vHrU%3D=0>

Can you please confirm if it’s possible for us to lose some data in a stream 
app in practice? If that’s possible, what would be the best practice to avoid 
data loss? Thanks!



Re: Question on late data handling in Beam streaming mode

2021-04-22 Thread Kenneth Knowles
Hello!

In a streaming app, you have two choices: wait forever and never have any
output OR use some method to decide that aggregation is "done".

In Beam, the way you decide that aggregation is "done" is the watermark.
When the watermark predicts no more data for an aggregation, then the
aggregation is done. For example GROUP BY  is "done" when no more
data will arrive for that minute. At this point, your result is produced.
More data may arrive, and it is ignored. The watermark is determined by the
IO connector to be the best heuristic available. You can configure "allowed
lateness" for an aggregation to allow out of order data.

Kenn

On Thu, Apr 22, 2021 at 1:26 PM Tao Li  wrote:

> Hi Beam community,
>
>
>
> I am wondering if there is a risk of losing late data from a Beam stream
> app due to watermarking?
>
>
>
> I just went through this design doc and noticed the “droppable” definition
> there:
> https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit#
>
>
>
> Can you please confirm if it’s possible for us to lose some data in a
> stream app in practice? If that’s possible, what would be the best practice
> to avoid data loss? Thanks!
>
>
>


Question on late data handling in Beam streaming mode

2021-04-22 Thread Tao Li
Hi Beam community,

I am wondering if there is a risk of losing late data from a Beam stream app 
due to watermarking?

I just went through this design doc and noticed the “droppable” definition 
there: 
https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit#

Can you please confirm if it’s possible for us to lose some data in a stream 
app in practice? If that’s possible, what would be the best practice to avoid 
data loss? Thanks!