Re: Question on late data handling in Beam streaming mode

2021-04-23 Thread Kenneth Knowles
Reuven's answer will result in a group by key (but not window) where no
data is dropped and you get deltas for each key. Downstream consumers can
recombine the deltas to get per-key aggregation. So instead of putting the
time interval into the window, you put it into the key, and then you get
the same grouped aggregation.

There are (at least) two other ways to do this:

1. You can set allowed lateness to a high value.
2. You can use a ParDo and outputWithTimestamp [1] to set the timestamps to
arrival time. I illustrated this in some older talks [2].

Kenn

[1]
https://github.com/apache/beam/blob/dc636be57900c8ad9b6b9e50b08dad64be8aee40/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L184
[2]
https://docs.google.com/presentation/d/1smGXb-0GGX_Fid1z3WWzZJWtyBjBA3Mo3t4oeRjJoZI/present?slide=id.g142c2fd96f_0_134

On Fri, Apr 23, 2021 at 8:32 AM Reuven Lax  wrote:

> You can definitely group by processing time. The way to do this in Beam is
> as follows
>
> Window.into(new GlobalWindows())
> .triggering(AfterWatermark.pastEndOfWindow()
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))
> .discardingFiredPanes());
>
> The syntax is a bit unfortunately wordy, but the idea is that you are
> creating a single event-time window that encompasses all time, and
> "triggering" an aggregation every 30 seconds based on processing time.
>
> On Fri, Apr 23, 2021 at 8:14 AM Tao Li  wrote:
>
>> Thanks @Kenneth Knowles . I understand we need to
>> specify a window for groupby so that the app knowns when processing is
>> “done” to output result.
>>
>>
>>
>> Is it possible to specify a event arrival/processing time based window
>> for groupby? The purpose is to avoid dropping of late events. With a event
>> processing time based window, the app will periodically output the result
>> based on all events that arrived in that window, and a late arriving event
>> will fall into whatever window covers its arrival time and thus that late
>> data will not get lost.
>>
>>
>>
>> Does Beam support this kind of mechanism? Thanks.
>>
>>
>>
>> *From: *Kenneth Knowles 
>> *Reply-To: *"user@beam.apache.org" 
>> *Date: *Thursday, April 22, 2021 at 1:49 PM
>> *To: *user 
>> *Cc: *Kelly Smith , Lian Jiang <
>> li...@zillowgroup.com>
>> *Subject: *Re: Question on late data handling in Beam streaming mode
>>
>>
>>
>> Hello!
>>
>>
>>
>> In a streaming app, you have two choices: wait forever and never have any
>> output OR use some method to decide that aggregation is "done".
>>
>>
>>
>> In Beam, the way you decide that aggregation is "done" is the watermark.
>> When the watermark predicts no more data for an aggregation, then the
>> aggregation is done. For example GROUP BY  is "done" when no more
>> data will arrive for that minute. At this point, your result is produced.
>> More data may arrive, and it is ignored. The watermark is determined by the
>> IO connector to be the best heuristic available. You can configure "allowed
>> lateness" for an aggregation to allow out of order data.
>>
>>
>>
>> Kenn
>>
>>
>>
>> On Thu, Apr 22, 2021 at 1:26 PM Tao Li  wrote:
>>
>> Hi Beam community,
>>
>>
>>
>> I am wondering if there is a risk of losing late data from a Beam stream
>> app due to watermarking?
>>
>>
>>
>> I just went through this design doc and noticed the “droppable”
>> definition there:
>> https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit#
>> 
>>
>>
>>
>> Can you please confirm if it’s possible for us to lose some data in a
>> stream app in practice? If that’s possible, what would be the best practice
>> to avoid data loss? Thanks!
>>
>>
>>
>>


Re: Question on late data handling in Beam streaming mode

2021-04-23 Thread Reuven Lax
You can definitely group by processing time. The way to do this in Beam is
as follows

Window.into(new GlobalWindows())
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))
.discardingFiredPanes());

The syntax is a bit unfortunately wordy, but the idea is that you are
creating a single event-time window that encompasses all time, and
"triggering" an aggregation every 30 seconds based on processing time.

On Fri, Apr 23, 2021 at 8:14 AM Tao Li  wrote:

> Thanks @Kenneth Knowles . I understand we need to
> specify a window for groupby so that the app knowns when processing is
> “done” to output result.
>
>
>
> Is it possible to specify a event arrival/processing time based window for
> groupby? The purpose is to avoid dropping of late events. With a event
> processing time based window, the app will periodically output the result
> based on all events that arrived in that window, and a late arriving event
> will fall into whatever window covers its arrival time and thus that late
> data will not get lost.
>
>
>
> Does Beam support this kind of mechanism? Thanks.
>
>
>
> *From: *Kenneth Knowles 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Thursday, April 22, 2021 at 1:49 PM
> *To: *user 
> *Cc: *Kelly Smith , Lian Jiang <
> li...@zillowgroup.com>
> *Subject: *Re: Question on late data handling in Beam streaming mode
>
>
>
> Hello!
>
>
>
> In a streaming app, you have two choices: wait forever and never have any
> output OR use some method to decide that aggregation is "done".
>
>
>
> In Beam, the way you decide that aggregation is "done" is the watermark.
> When the watermark predicts no more data for an aggregation, then the
> aggregation is done. For example GROUP BY  is "done" when no more
> data will arrive for that minute. At this point, your result is produced.
> More data may arrive, and it is ignored. The watermark is determined by the
> IO connector to be the best heuristic available. You can configure "allowed
> lateness" for an aggregation to allow out of order data.
>
>
>
> Kenn
>
>
>
> On Thu, Apr 22, 2021 at 1:26 PM Tao Li  wrote:
>
> Hi Beam community,
>
>
>
> I am wondering if there is a risk of losing late data from a Beam stream
> app due to watermarking?
>
>
>
> I just went through this design doc and noticed the “droppable” definition
> there:
> https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit#
> 
>
>
>
> Can you please confirm if it’s possible for us to lose some data in a
> stream app in practice? If that’s possible, what would be the best practice
> to avoid data loss? Thanks!
>
>
>
>


Re: Question on late data handling in Beam streaming mode

2021-04-23 Thread Tao Li
Thanks @Kenneth Knowles. I understand we need to 
specify a window for groupby so that the app knowns when processing is “done” 
to output result.

Is it possible to specify a event arrival/processing time based window for 
groupby? The purpose is to avoid dropping of late events. With a event 
processing time based window, the app will periodically output the result based 
on all events that arrived in that window, and a late arriving event will fall 
into whatever window covers its arrival time and thus that late data will not 
get lost.

Does Beam support this kind of mechanism? Thanks.

From: Kenneth Knowles 
Reply-To: "user@beam.apache.org" 
Date: Thursday, April 22, 2021 at 1:49 PM
To: user 
Cc: Kelly Smith , Lian Jiang 
Subject: Re: Question on late data handling in Beam streaming mode

Hello!

In a streaming app, you have two choices: wait forever and never have any 
output OR use some method to decide that aggregation is "done".

In Beam, the way you decide that aggregation is "done" is the watermark. When 
the watermark predicts no more data for an aggregation, then the aggregation is 
done. For example GROUP BY  is "done" when no more data will arrive for 
that minute. At this point, your result is produced. More data may arrive, and 
it is ignored. The watermark is determined by the IO connector to be the best 
heuristic available. You can configure "allowed lateness" for an aggregation to 
allow out of order data.

Kenn

On Thu, Apr 22, 2021 at 1:26 PM Tao Li 
mailto:t...@zillow.com>> wrote:
Hi Beam community,

I am wondering if there is a risk of losing late data from a Beam stream app 
due to watermarking?

I just went through this design doc and noticed the “droppable” definition 
there: 
https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit#

Can you please confirm if it’s possible for us to lose some data in a stream 
app in practice? If that’s possible, what would be the best practice to avoid 
data loss? Thanks!



Re: How avoid blocking when decompressing large GZIP files.

2021-04-23 Thread Evan Galpin
Hmm in my somewhat limited experience, I was not able to combine state and
Splittable DoFn. Definitely could be user error on my part though.

RE sequence numbers, could it work to embed those numbers in the CSV itself?

Thanks,
Evan

On Fri, Apr 23, 2021 at 07:55 Simon Gauld  wrote:

> Thank you and I will have a look however some concerns I have
>
> - the gzip itself is not splittable as such
> - I need to apply a sequence number 1..n so I believe the read *must* be
> sequential
>
> However what I am looking to achieve is handing off the newly decorated
> row as soon as the sequence is applied to it.   The issue is that the
> entire step of applying the sequence number appear to be blocking. Also of
> note, I am using a @DoFn.StateId.
>
> I'll look at SplittableDoFns, thanks.
>
>
> On Fri, Apr 23, 2021 at 12:50 PM Evan Galpin 
> wrote:
>
>> I could be wrong but I believe that if your large file is being read by a
>> DoFn, it’s likely that the file is being processed atomically inside that
>> DoFn, which cannot be parallelized further by the runner.
>>
>> One purpose-built way around that constraint is by using Splittable
>> DoFn[1][2] which could be used to allow each split to read a portion of the
>> file. I don’t know, however, how this might (or might not) work with
>> compression.
>>
>> [1]
>> https://beam.apache.org/blog/splittable-do-fn-is-available/
>> [2]
>> https://beam.apache.org/documentation/programming-guide/#splittable-dofns
>>
>> Thanks,
>> Evan
>>
>> On Fri, Apr 23, 2021 at 07:34 Simon Gauld  wrote:
>>
>>> Hello,
>>>
>>> I am trying to apply a transformation to each row in a reasonably large
>>> (1b row) gzip compressed CSV.
>>>
>>> The first operation is to assign a sequence number, in this case 1,2,3..
>>>
>>> The second operation is the actual transformation.
>>>
>>> I would like to apply the sequence number *as* each row is read from the
>>> compressed source and then hand off the 'real' transformation work in
>>> parallel, using DataFlow to autoscale the workers for the transformation.
>>>
>>> I don't seem to be able to scale *until* all rows have been read; this
>>> appears to be blocking the pipeline until decompression of the entire file
>>> is completed.   At this point DataFlow autoscaling works as expected, it
>>> scales upwards and throughput is then high. The issue is the decompression
>>> appears to block.
>>>
>>> My question: in beam, is it possible to stream records from a compressed
>>> source? without blocking the pipeline?
>>>
>>> thank you
>>>
>>> .s
>>>
>>>


Re: How avoid blocking when decompressing large GZIP files.

2021-04-23 Thread Simon Gauld
Thank you and I will have a look however some concerns I have

- the gzip itself is not splittable as such
- I need to apply a sequence number 1..n so I believe the read *must* be
sequential

However what I am looking to achieve is handing off the newly decorated row
as soon as the sequence is applied to it.   The issue is that the entire
step of applying the sequence number appear to be blocking. Also of note, I
am using a @DoFn.StateId.

I'll look at SplittableDoFns, thanks.


On Fri, Apr 23, 2021 at 12:50 PM Evan Galpin  wrote:

> I could be wrong but I believe that if your large file is being read by a
> DoFn, it’s likely that the file is being processed atomically inside that
> DoFn, which cannot be parallelized further by the runner.
>
> One purpose-built way around that constraint is by using Splittable
> DoFn[1][2] which could be used to allow each split to read a portion of the
> file. I don’t know, however, how this might (or might not) work with
> compression.
>
> [1]
> https://beam.apache.org/blog/splittable-do-fn-is-available/
> [2]
> https://beam.apache.org/documentation/programming-guide/#splittable-dofns
>
> Thanks,
> Evan
>
> On Fri, Apr 23, 2021 at 07:34 Simon Gauld  wrote:
>
>> Hello,
>>
>> I am trying to apply a transformation to each row in a reasonably large
>> (1b row) gzip compressed CSV.
>>
>> The first operation is to assign a sequence number, in this case 1,2,3..
>>
>> The second operation is the actual transformation.
>>
>> I would like to apply the sequence number *as* each row is read from the
>> compressed source and then hand off the 'real' transformation work in
>> parallel, using DataFlow to autoscale the workers for the transformation.
>>
>> I don't seem to be able to scale *until* all rows have been read; this
>> appears to be blocking the pipeline until decompression of the entire file
>> is completed.   At this point DataFlow autoscaling works as expected, it
>> scales upwards and throughput is then high. The issue is the decompression
>> appears to block.
>>
>> My question: in beam, is it possible to stream records from a compressed
>> source? without blocking the pipeline?
>>
>> thank you
>>
>> .s
>>
>>


Re: How avoid blocking when decompressing large GZIP files.

2021-04-23 Thread Evan Galpin
I could be wrong but I believe that if your large file is being read by a
DoFn, it’s likely that the file is being processed atomically inside that
DoFn, which cannot be parallelized further by the runner.

One purpose-built way around that constraint is by using Splittable
DoFn[1][2] which could be used to allow each split to read a portion of the
file. I don’t know, however, how this might (or might not) work with
compression.

[1]
https://beam.apache.org/blog/splittable-do-fn-is-available/
[2]
https://beam.apache.org/documentation/programming-guide/#splittable-dofns

Thanks,
Evan

On Fri, Apr 23, 2021 at 07:34 Simon Gauld  wrote:

> Hello,
>
> I am trying to apply a transformation to each row in a reasonably large
> (1b row) gzip compressed CSV.
>
> The first operation is to assign a sequence number, in this case 1,2,3..
>
> The second operation is the actual transformation.
>
> I would like to apply the sequence number *as* each row is read from the
> compressed source and then hand off the 'real' transformation work in
> parallel, using DataFlow to autoscale the workers for the transformation.
>
> I don't seem to be able to scale *until* all rows have been read; this
> appears to be blocking the pipeline until decompression of the entire file
> is completed.   At this point DataFlow autoscaling works as expected, it
> scales upwards and throughput is then high. The issue is the decompression
> appears to block.
>
> My question: in beam, is it possible to stream records from a compressed
> source? without blocking the pipeline?
>
> thank you
>
> .s
>
>


How avoid blocking when decompressing large GZIP files.

2021-04-23 Thread Simon Gauld
Hello,

I am trying to apply a transformation to each row in a reasonably large (1b
row) gzip compressed CSV.

The first operation is to assign a sequence number, in this case 1,2,3..

The second operation is the actual transformation.

I would like to apply the sequence number *as* each row is read from the
compressed source and then hand off the 'real' transformation work in
parallel, using DataFlow to autoscale the workers for the transformation.

I don't seem to be able to scale *until* all rows have been read; this
appears to be blocking the pipeline until decompression of the entire file
is completed.   At this point DataFlow autoscaling works as expected, it
scales upwards and throughput is then high. The issue is the decompression
appears to block.

My question: in beam, is it possible to stream records from a compressed
source? without blocking the pipeline?

thank you

.s