Re: Checkpointing Dataflow Pipeline

2021-04-06 Thread Kenneth Knowles
This sounds similar to the "Kafka Commit" in
https://github.com/apache/beam/pull/12572 by +Boyuan Zhang
 and also to how PubsubIO ACKs messages in the
finalizer. I don't know much about KinesisIO or how Kinesis works. I was
just asking to clarify, in case other folks know more, like +Alexey
Romanenko  and +Ismaël Mejía  have
modified KinesisIO. If the feature does not exist today, perhaps we can
identify the best practices around this pattern.

Kenn

On Tue, Apr 6, 2021 at 1:59 PM Michael Luckey  wrote:

> Hi Kenn,
>
> yes, resuming reading at the proper timestamp is exactly the issue we are
> currently struggling with. E.g. with Kinesis Client Lib we could store the
> last read within some dynamo table. This mechanism is not used with beam,
> as we understand, the runner is responsible to track that checkpoint mark.
>
> Now, obviously on restarting the pipeline, e.g. on non compatible upgrade,
> that is, an pipeline update is just not feasible, there must be some
> mechanism in place on how Dataflow will know where to continue. Is that
> simply the pipeline name? Or is there more involved? So how does
> checkpointing actually work here?
>
> Based on 'name', wouldn't that imply that something like (example taken
> from
> https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates
> )
>
>   export REGION="us-central1"
>
>   gcloud dataflow flex-template run "streaming-beam-sql-`date 
> +%Y%m%d-%H%M%S`" \
> --template-file-gcs-location "$TEMPLATE_PATH" \
> --parameters inputSubscription="$SUBSCRIPTION" \
> --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
> --region "$REGION"
>
> will not resume on last read on rerun, because the name obviously changes
> here?
>
> best,
>
> michel
>
>
>
> On Tue, Apr 6, 2021 at 10:38 PM Kenneth Knowles  wrote:
>
>> I would assume the main issue is resuming reading from the Kinesis stream
>> from the last read? In the case for Pubsub (just as another example of the
>> idea) this is part of the internal state of a pre-created subscription.
>>
>> Kenn
>>
>> On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey 
>> wrote:
>>
>>> Hi list,
>>>
>>> with our current project we are implementing our streaming pipeline
>>> based on Google Dataflow.
>>>
>>> Essentially we receive input via Kinesis, doing some filtering,
>>> enrichment and sessionizing and output to PubSub and/or google storage.
>>>
>>> After short investigations it is not clear to us, how checkpointing will
>>> work running on Dataflow in connection with KinesisIO. Is there any
>>> documentation/discussions to get a better understanding on how that will be
>>> working? Especially if we are forced to restart our pipelines, how could we
>>> ensure not to loose any events?
>>>
>>> As far as I understand currently, it should work 'auto-magically' but it
>>> is not yet clear to us, how it will actually behave. Before we try to start
>>> testing our expectations or even try to implement some watermark-tracking
>>> by ourself we hoped to get some insights from other users here.
>>>
>>> Any help appreciated.
>>>
>>> Best,
>>>
>>> michel
>>>
>>


Re: Checkpointing Dataflow Pipeline

2021-04-06 Thread Michael Luckey
Hi Kenn,

yes, resuming reading at the proper timestamp is exactly the issue we are
currently struggling with. E.g. with Kinesis Client Lib we could store the
last read within some dynamo table. This mechanism is not used with beam,
as we understand, the runner is responsible to track that checkpoint mark.

Now, obviously on restarting the pipeline, e.g. on non compatible upgrade,
that is, an pipeline update is just not feasible, there must be some
mechanism in place on how Dataflow will know where to continue. Is that
simply the pipeline name? Or is there more involved? So how does
checkpointing actually work here?

Based on 'name', wouldn't that imply that something like (example taken
from
https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates
)

  export REGION="us-central1"

  gcloud dataflow flex-template run "streaming-beam-sql-`date +%Y%m%d-%H%M%S`" \
--template-file-gcs-location "$TEMPLATE_PATH" \
--parameters inputSubscription="$SUBSCRIPTION" \
--parameters outputTable="$PROJECT:$DATASET.$TABLE" \
--region "$REGION"

will not resume on last read on rerun, because the name obviously changes
here?

best,

michel



On Tue, Apr 6, 2021 at 10:38 PM Kenneth Knowles  wrote:

> I would assume the main issue is resuming reading from the Kinesis stream
> from the last read? In the case for Pubsub (just as another example of the
> idea) this is part of the internal state of a pre-created subscription.
>
> Kenn
>
> On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey  wrote:
>
>> Hi list,
>>
>> with our current project we are implementing our streaming pipeline based
>> on Google Dataflow.
>>
>> Essentially we receive input via Kinesis, doing some filtering,
>> enrichment and sessionizing and output to PubSub and/or google storage.
>>
>> After short investigations it is not clear to us, how checkpointing will
>> work running on Dataflow in connection with KinesisIO. Is there any
>> documentation/discussions to get a better understanding on how that will be
>> working? Especially if we are forced to restart our pipelines, how could we
>> ensure not to loose any events?
>>
>> As far as I understand currently, it should work 'auto-magically' but it
>> is not yet clear to us, how it will actually behave. Before we try to start
>> testing our expectations or even try to implement some watermark-tracking
>> by ourself we hoped to get some insights from other users here.
>>
>> Any help appreciated.
>>
>> Best,
>>
>> michel
>>
>


Re: Checkpointing Dataflow Pipeline

2021-04-06 Thread Kenneth Knowles
I would assume the main issue is resuming reading from the Kinesis stream
from the last read? In the case for Pubsub (just as another example of the
idea) this is part of the internal state of a pre-created subscription.

Kenn

On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey  wrote:

> Hi list,
>
> with our current project we are implementing our streaming pipeline based
> on Google Dataflow.
>
> Essentially we receive input via Kinesis, doing some filtering, enrichment
> and sessionizing and output to PubSub and/or google storage.
>
> After short investigations it is not clear to us, how checkpointing will
> work running on Dataflow in connection with KinesisIO. Is there any
> documentation/discussions to get a better understanding on how that will be
> working? Especially if we are forced to restart our pipelines, how could we
> ensure not to loose any events?
>
> As far as I understand currently, it should work 'auto-magically' but it
> is not yet clear to us, how it will actually behave. Before we try to start
> testing our expectations or even try to implement some watermark-tracking
> by ourself we hoped to get some insights from other users here.
>
> Any help appreciated.
>
> Best,
>
> michel
>


Checkpointing Dataflow Pipeline

2021-04-06 Thread Michael Luckey
Hi list,

with our current project we are implementing our streaming pipeline based
on Google Dataflow.

Essentially we receive input via Kinesis, doing some filtering, enrichment
and sessionizing and output to PubSub and/or google storage.

After short investigations it is not clear to us, how checkpointing will
work running on Dataflow in connection with KinesisIO. Is there any
documentation/discussions to get a better understanding on how that will be
working? Especially if we are forced to restart our pipelines, how could we
ensure not to loose any events?

As far as I understand currently, it should work 'auto-magically' but it is
not yet clear to us, how it will actually behave. Before we try to start
testing our expectations or even try to implement some watermark-tracking
by ourself we hoped to get some insights from other users here.

Any help appreciated.

Best,

michel


Re: Triggering partway through a window

2021-04-06 Thread Raman Gupta
On Mon, Mar 29, 2021 at 1:17 PM Kenneth Knowles  wrote:

> That's a neat example!
>
> The trigger you have there will emit a ton of output. What is your
> accumulation mode? I assume it must be accumulatingFiredPanes() otherwise
> you would not actually have access to the prior 6 days of input.
>

Yup, accumulating.


>
> The only trigger that is based on completeness of data is the
> AfterWatermark.pastEndOfWindow() trigger, so you have to use that to
> capture the 6 days of data:
>
> prior6days = input.apply(Window.into(<6 day windows sliding one
> day>).triggering(AfterWatermark.pastEndOfWindow())
>
> Now if you GBK this collection, each group will have a timestamp that is
> the end of the 6 day period. You can use ParDo with outputWithTimestamp to
> move the timestamp up to any timestamp in the following day, yielding a
> PCollection of 6 day grouping of data with a timestamp in the last day of
> the 7. If the 6 days of data is large you may hit size limits (either hard
> limits or perf problems) and have to do something fancier.
>
> Flatten this with the input PCollection and window into FixedWindows( day>) and trigger however you like, again with accumulatingFiredPanes().
> There is no guarantee that the 6 days of past data arrives prior to
> elements in the last day. In fact since it will be delayed by an extra
> shuffle you would expect it to often show up later. So this is a heuristic
> approach equivalent to what it sounds like you are already doing that
> should lower the cost.
>

Ah interesting. Yes, this would likely have worked for me.


>
> If you want a guarantee that the 6 day buffer arrives prior to the other
> elements you will need to do something else. You could write a WindowFn
> that assigned all 7 days of data to a window that only spanned the first 6
> days, then trigger at end of window plus allowing late data (no early
> firings). Then every firing would be guaranteed by the watermark to have
> the first 6 days of data plus whatever else has shown up. (I assume part of
> your spec is that you do want data to be processed as it arrives, versus
> waiting until the end of the 7 day window).
>

I was curious about this option, and tried it. One issue I ran into was
that the downstream logic had some "odd" windows to deal with because the
window interval did not properly reflect its contents, which resulted in
some downstream logic that wasn't as encapsulated as it should be.

I therefore created a PTransform "DailyWindowsWithContext" that first does
the contextual windowing ("ContextualCalendarDayWindow") and GBK. It then
"re-windows" the elements + their context by filtering out the
"context-only" groups, sets the timestamps of the remaining groups based on
the max of the element timestamps, and then outputs them into a fixed
daily window, followed by another GBK and flatten.

This seems to work quite well with my set of unit tests, though I haven't
used it extensively yet. If anyone is curious about the code (written in
Kotlin), see here:

https://gist.github.com/rocketraman/543f066813fc89590f23ff5dacf43f01

Feedback on this code would be more than welcome.

Regards,
Raman



>
> I am just writing this without coding, so I could certainly have missed
> something or gotten it wrong.
>
> Kenn
>
> On Fri, Mar 26, 2021 at 1:47 PM Raman Gupta  wrote:
>
>> I have a 7-day sliding calendar window, sliding by 1 day. The intent is
>> to process only elements that fall into the last day of a window, but still
>> have access to the elements from the preceding six days.
>>
>> I created a sliding calendar window function, and trigger it like this:
>>
>> AfterWatermark.pastEndOfWindow()
>>   .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
>>   .withLateFirings(AfterPane.elementCountAtLeast(1))
>>
>> Downstream of this pipeline I have a GBK and a DoFn that basically
>> ignores elements until at least some of them are in the last day of
>> the window.
>>
>> The above trigger works and the pipeline produces the expected output,
>> but runs the GBK and downstream logic many more times than is necessary.
>>
>> Is there a way I can optimize the triggering here such that the early
>> firings begin only when the watermark moves into the last day of the 7-day
>> window?
>>
>> Thanks,
>> Raman
>>
>>


[Meetup] Wednesday 14th April 9am PDT / 6pm CEST - David Sabater Dinter on Multi-Language Pipelines

2021-04-06 Thread Max King

Hi Everyone,

Next Wednesday there will be an online Beam meetup with David Sabater 
Dinter.


Join David at 9am PDT / 6pm CEST where he will introduce you to a new 
feature in the Beam Portability Framework for making your data pipelines 
shareable with stakeholders in your organization, regardless of the 
programming language of their preference.


Register for this free online meetup here: 
https://pretix.eu/plainschwarz/meetup-Apr-14/ 



Thanks

Max King

--
Max King
Plain Schwarz
Schönhauser Allee 6-7
10119 Berlin

t: +49 30 92105-978
e: m...@plainschwarz.com
https://plainschwarz.com/

Sitz der Gesellschaft: Berlin
Amtsgericht Charlottenburg
HRB 102015 B
Geschäftsführung: Andreas Gebhard