Re: Checkpointing Dataflow Pipeline

2021-04-08 Thread Michael Luckey
Hi,

thanks for your help here. Really appreciated!

@alexey We already found your answer (magically showed up in some search
engine results) and - iirc - while we were still using spark as underlying
runner on some other project I do think we relied on that mentioned
mechanism. But as with our current project we wanted to use Google
Dataflow, we got stuck trying to understand how checkpointing is going to
work there. As Vincent mentioned, documentation is sparse, confusing and
from google provided example it seems difficult to understand how
checkpointing is supposed to work, i.e. what requirements we (as users)
have to meet to get checkpointing on e.g. kinesis.

As far as I understand now, it does not seem to be supported, which
unfortunately is a show-stopper for us and might force us to another runner
implementation.

Regarding external checkpointing with Kinesis, iiuc there is some support
with Kinesis Client Library relaying to DynamoDB. Which works for us in a
different context, but this is not exposed by KinesisIO, which - of course
- led myself to the assumption there will be support by Beam itself - or -
as it might be by the underlying runner. Which seems to be provided by
spark (and possibly Flink), we might check that out. We would still prefer
to get something working on Dataflow. Without being required to roll our
own checkpointing implementation. Any Dataflow experts around who could
shed some light onto this? I d love to get that better documented for
others running into this issues.

Best,

michel

On Wed, Apr 7, 2021 at 10:46 PM Vincent Marquez 
wrote:

>
>
>
> On Wed, Apr 7, 2021 at 11:55 AM Kenneth Knowles  wrote:
>
>> [I think this has graduated to a +dev  thread]
>>
>> Yea, in Beam it is left up to the IOs primarily, hence the bundle
>> finalization step, or allowed runners to have their own features of course.
>> Dataflow also does have in-place pipeline update that restores the
>> persisted checkpoints from one pipeline to another - same basic
>> mechanism/idea as Spark Structured Streaming but different overall
>> workflow. +Reuven Lax  has put a lot of thought into
>> updating, checkpointing, resuming, etc. Runners differ a lot in these
>> areas. Is there something that should graduate from runner-specific to the
>> Beam model?
>>
>> Kenn
>>
>
> In some ways, having an external checkpoint mechanism makes programming
> IOs simpler.  Let's use Redis Streams as an example, as our company
> recently implemented a RedisStreamIO internally so the details are fresh.
>
> One requirement was the need to be able to shut down a streaming Beam
> Pipeline, and then restart it from a later point in time without lost data
> and without starting from the beginning of time.
>
> This meant that I need to ensure only elements that are finished
> processing in a bundle are committed as 'processed' back to the redis
> server, which I accomplished by keeping track of all the elements that are
> outputted, then on finalizeCheckpoint, which I *assume* happens at the end
> of a bundle but I'm fuzzy on details, send those element IDs back to the
> server as consumed (with Redis XACK).
>
> If instead Beam would let you persist checkpoints externally and allow a
> pipeline to bootstrap off of the already existing checkpoint, I simply have
> to keep track *in the checkpoint* of the last element ID read, and can use
> that as the starting offset.  I would then be able to 'eager ack' read
> messages and not worry about delaying commits until elements are outputted
> further down the pipeline etc, since if an element is read into a
> checkpoint, we know it is recoverable.
>
> This also makes life a lot easier for anything regarding Kinesis since the
> Kinesis servers don't have a way of managing offsets/last element read
> (from when I used it?, maybe changed), unlike Kafka, Pubsub, Redis Streams,
> etc.
>
> Hopefully this makes sense, and if I have some misunderstandings I'd love
> to learn more.  This general subject has come up a few times in the beam
> slack so I think at the very least some extra documentation on these types
> of use cases might be welcome.
>
>
>
>
>>
>> On Wed, Apr 7, 2021 at 11:28 AM Vincent Marquez <
>> vincent.marq...@gmail.com> wrote:
>>
>>> Looks like this is a common source of confusion, I had similar questions
>>> about checkpointing in the beam slack.
>>>
>>> In Spark Structured Streaming, checkpoints are saved to an *external*
>>> HDFS location and persist *beyond* each run, so in the event of a stream
>>> crashing, you can just point your next execution of the stream to the
>>> checkpoint location.  Kafka  (or Kinesis/Redis Stream etc) offsets are
>>> persisted in the checkpoint, so the stream would resume off of the last
>>> committed checkpoint location.
>>>
>>> It doesn't seem Beam has an external checkpoint that persists beyond a
>>> single stream execution, so in Beam with Kinesis I believe you'll have to
>>> manage your own offsets deliberately with an external source if 

Re: Checkpointing Dataflow Pipeline

2021-04-07 Thread Vincent Marquez
On Wed, Apr 7, 2021 at 11:55 AM Kenneth Knowles  wrote:

> [I think this has graduated to a +dev  thread]
>
> Yea, in Beam it is left up to the IOs primarily, hence the bundle
> finalization step, or allowed runners to have their own features of course.
> Dataflow also does have in-place pipeline update that restores the
> persisted checkpoints from one pipeline to another - same basic
> mechanism/idea as Spark Structured Streaming but different overall
> workflow. +Reuven Lax  has put a lot of thought into
> updating, checkpointing, resuming, etc. Runners differ a lot in these
> areas. Is there something that should graduate from runner-specific to the
> Beam model?
>
> Kenn
>

In some ways, having an external checkpoint mechanism makes programming IOs
simpler.  Let's use Redis Streams as an example, as our company recently
implemented a RedisStreamIO internally so the details are fresh.

One requirement was the need to be able to shut down a streaming Beam
Pipeline, and then restart it from a later point in time without lost data
and without starting from the beginning of time.

This meant that I need to ensure only elements that are finished processing
in a bundle are committed as 'processed' back to the redis server, which I
accomplished by keeping track of all the elements that are outputted, then
on finalizeCheckpoint, which I *assume* happens at the end of a bundle but
I'm fuzzy on details, send those element IDs back to the server as consumed
(with Redis XACK).

If instead Beam would let you persist checkpoints externally and allow a
pipeline to bootstrap off of the already existing checkpoint, I simply have
to keep track *in the checkpoint* of the last element ID read, and can use
that as the starting offset.  I would then be able to 'eager ack' read
messages and not worry about delaying commits until elements are outputted
further down the pipeline etc, since if an element is read into a
checkpoint, we know it is recoverable.

This also makes life a lot easier for anything regarding Kinesis since the
Kinesis servers don't have a way of managing offsets/last element read
(from when I used it?, maybe changed), unlike Kafka, Pubsub, Redis Streams,
etc.

Hopefully this makes sense, and if I have some misunderstandings I'd love
to learn more.  This general subject has come up a few times in the beam
slack so I think at the very least some extra documentation on these types
of use cases might be welcome.




>
> On Wed, Apr 7, 2021 at 11:28 AM Vincent Marquez 
> wrote:
>
>> Looks like this is a common source of confusion, I had similar questions
>> about checkpointing in the beam slack.
>>
>> In Spark Structured Streaming, checkpoints are saved to an *external*
>> HDFS location and persist *beyond* each run, so in the event of a stream
>> crashing, you can just point your next execution of the stream to the
>> checkpoint location.  Kafka  (or Kinesis/Redis Stream etc) offsets are
>> persisted in the checkpoint, so the stream would resume off of the last
>> committed checkpoint location.
>>
>> It doesn't seem Beam has an external checkpoint that persists beyond a
>> single stream execution, so in Beam with Kinesis I believe you'll have to
>> manage your own offsets deliberately with an external source if you want to
>> achieve 'exactly once' semantics in the event of shutting down a stream and
>>  resuming it at a later point.
>>
>> In Kafka you don't need this since as long as we ensure our offsets are
>> committed in finalization of a bundle, the offsets for a particular group
>> id are stored on the server.
>>
>>
>> On Tue, Apr 6, 2021 at 3:13 PM Kenneth Knowles  wrote:
>>
>>> This sounds similar to the "Kafka Commit" in
>>> https://github.com/apache/beam/pull/12572 by +Boyuan Zhang
>>>  and also to how PubsubIO ACKs messages in the
>>> finalizer. I don't know much about KinesisIO or how Kinesis works. I was
>>> just asking to clarify, in case other folks know more, like +Alexey
>>> Romanenko  and +Ismaël Mejía
>>>  have modified KinesisIO. If the feature does not
>>> exist today, perhaps we can identify the best practices around this pattern.
>>>
>>> Kenn
>>>
>>> On Tue, Apr 6, 2021 at 1:59 PM Michael Luckey 
>>> wrote:
>>>
 Hi Kenn,

 yes, resuming reading at the proper timestamp is exactly the issue we
 are currently struggling with. E.g. with Kinesis Client Lib we could store
 the last read within some dynamo table. This mechanism is not used with
 beam, as we understand, the runner is responsible to track that checkpoint
 mark.

 Now, obviously on restarting the pipeline, e.g. on non compatible
 upgrade, that is, an pipeline update is just not feasible, there must be
 some mechanism in place on how Dataflow will know where to continue. Is
 that simply the pipeline name? Or is there more involved? So how does
 checkpointing actually work here?

 Based on 'name', wouldn't that imply that something like (example taken
 from
 

Re: Checkpointing Dataflow Pipeline

2021-04-07 Thread Kenneth Knowles
[I think this has graduated to a +dev  thread]

Yea, in Beam it is left up to the IOs primarily, hence the bundle
finalization step, or allowed runners to have their own features of course.
Dataflow also does have in-place pipeline update that restores the
persisted checkpoints from one pipeline to another - same basic
mechanism/idea as Spark Structured Streaming but different overall
workflow. +Reuven Lax  has put a lot of thought into
updating, checkpointing, resuming, etc. Runners differ a lot in these
areas. Is there something that should graduate from runner-specific to the
Beam model?

Kenn

On Wed, Apr 7, 2021 at 11:28 AM Vincent Marquez 
wrote:

> Looks like this is a common source of confusion, I had similar questions
> about checkpointing in the beam slack.
>
> In Spark Structured Streaming, checkpoints are saved to an *external* HDFS
> location and persist *beyond* each run, so in the event of a stream
> crashing, you can just point your next execution of the stream to the
> checkpoint location.  Kafka  (or Kinesis/Redis Stream etc) offsets are
> persisted in the checkpoint, so the stream would resume off of the last
> committed checkpoint location.
>
> It doesn't seem Beam has an external checkpoint that persists beyond a
> single stream execution, so in Beam with Kinesis I believe you'll have to
> manage your own offsets deliberately with an external source if you want to
> achieve 'exactly once' semantics in the event of shutting down a stream and
>  resuming it at a later point.
>
> In Kafka you don't need this since as long as we ensure our offsets are
> committed in finalization of a bundle, the offsets for a particular group
> id are stored on the server.
>
>
> On Tue, Apr 6, 2021 at 3:13 PM Kenneth Knowles  wrote:
>
>> This sounds similar to the "Kafka Commit" in
>> https://github.com/apache/beam/pull/12572 by +Boyuan Zhang
>>  and also to how PubsubIO ACKs messages in the
>> finalizer. I don't know much about KinesisIO or how Kinesis works. I was
>> just asking to clarify, in case other folks know more, like +Alexey
>> Romanenko  and +Ismaël Mejía
>>  have modified KinesisIO. If the feature does not
>> exist today, perhaps we can identify the best practices around this pattern.
>>
>> Kenn
>>
>> On Tue, Apr 6, 2021 at 1:59 PM Michael Luckey 
>> wrote:
>>
>>> Hi Kenn,
>>>
>>> yes, resuming reading at the proper timestamp is exactly the issue we
>>> are currently struggling with. E.g. with Kinesis Client Lib we could store
>>> the last read within some dynamo table. This mechanism is not used with
>>> beam, as we understand, the runner is responsible to track that checkpoint
>>> mark.
>>>
>>> Now, obviously on restarting the pipeline, e.g. on non compatible
>>> upgrade, that is, an pipeline update is just not feasible, there must be
>>> some mechanism in place on how Dataflow will know where to continue. Is
>>> that simply the pipeline name? Or is there more involved? So how does
>>> checkpointing actually work here?
>>>
>>> Based on 'name', wouldn't that imply that something like (example taken
>>> from
>>> https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates
>>> )
>>>
>>>   export REGION="us-central1"
>>>
>>>   gcloud dataflow flex-template run "streaming-beam-sql-`date 
>>> +%Y%m%d-%H%M%S`" \
>>> --template-file-gcs-location "$TEMPLATE_PATH" \
>>> --parameters inputSubscription="$SUBSCRIPTION" \
>>> --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
>>> --region "$REGION"
>>>
>>> will not resume on last read on rerun, because the name obviously
>>> changes here?
>>>
>>> best,
>>>
>>> michel
>>>
>>>
>>>
>>> On Tue, Apr 6, 2021 at 10:38 PM Kenneth Knowles  wrote:
>>>
 I would assume the main issue is resuming reading from the Kinesis
 stream from the last read? In the case for Pubsub (just as another example
 of the idea) this is part of the internal state of a pre-created
 subscription.

 Kenn

 On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey 
 wrote:

> Hi list,
>
> with our current project we are implementing our streaming pipeline
> based on Google Dataflow.
>
> Essentially we receive input via Kinesis, doing some filtering,
> enrichment and sessionizing and output to PubSub and/or google storage.
>
> After short investigations it is not clear to us, how checkpointing
> will work running on Dataflow in connection with KinesisIO. Is there any
> documentation/discussions to get a better understanding on how that will 
> be
> working? Especially if we are forced to restart our pipelines, how could 
> we
> ensure not to loose any events?
>
> As far as I understand currently, it should work 'auto-magically' but
> it is not yet clear to us, how it will actually behave. Before we try to
> start testing our expectations or even try to implement some
> watermark-tracking by ourself we hoped to get some insights from other
> users 

Re: Checkpointing Dataflow Pipeline

2021-04-07 Thread Vincent Marquez
Looks like this is a common source of confusion, I had similar questions
about checkpointing in the beam slack.

In Spark Structured Streaming, checkpoints are saved to an *external* HDFS
location and persist *beyond* each run, so in the event of a stream
crashing, you can just point your next execution of the stream to the
checkpoint location.  Kafka  (or Kinesis/Redis Stream etc) offsets are
persisted in the checkpoint, so the stream would resume off of the last
committed checkpoint location.

It doesn't seem Beam has an external checkpoint that persists beyond a
single stream execution, so in Beam with Kinesis I believe you'll have to
manage your own offsets deliberately with an external source if you want to
achieve 'exactly once' semantics in the event of shutting down a stream and
 resuming it at a later point.

In Kafka you don't need this since as long as we ensure our offsets are
committed in finalization of a bundle, the offsets for a particular group
id are stored on the server.


On Tue, Apr 6, 2021 at 3:13 PM Kenneth Knowles  wrote:

> This sounds similar to the "Kafka Commit" in
> https://github.com/apache/beam/pull/12572 by +Boyuan Zhang
>  and also to how PubsubIO ACKs messages in the
> finalizer. I don't know much about KinesisIO or how Kinesis works. I was
> just asking to clarify, in case other folks know more, like +Alexey
> Romanenko  and +Ismaël Mejía  
> have
> modified KinesisIO. If the feature does not exist today, perhaps we can
> identify the best practices around this pattern.
>
> Kenn
>
> On Tue, Apr 6, 2021 at 1:59 PM Michael Luckey  wrote:
>
>> Hi Kenn,
>>
>> yes, resuming reading at the proper timestamp is exactly the issue we are
>> currently struggling with. E.g. with Kinesis Client Lib we could store the
>> last read within some dynamo table. This mechanism is not used with beam,
>> as we understand, the runner is responsible to track that checkpoint mark.
>>
>> Now, obviously on restarting the pipeline, e.g. on non compatible
>> upgrade, that is, an pipeline update is just not feasible, there must be
>> some mechanism in place on how Dataflow will know where to continue. Is
>> that simply the pipeline name? Or is there more involved? So how does
>> checkpointing actually work here?
>>
>> Based on 'name', wouldn't that imply that something like (example taken
>> from
>> https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates
>> )
>>
>>   export REGION="us-central1"
>>
>>   gcloud dataflow flex-template run "streaming-beam-sql-`date 
>> +%Y%m%d-%H%M%S`" \
>> --template-file-gcs-location "$TEMPLATE_PATH" \
>> --parameters inputSubscription="$SUBSCRIPTION" \
>> --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
>> --region "$REGION"
>>
>> will not resume on last read on rerun, because the name obviously changes
>> here?
>>
>> best,
>>
>> michel
>>
>>
>>
>> On Tue, Apr 6, 2021 at 10:38 PM Kenneth Knowles  wrote:
>>
>>> I would assume the main issue is resuming reading from the Kinesis
>>> stream from the last read? In the case for Pubsub (just as another example
>>> of the idea) this is part of the internal state of a pre-created
>>> subscription.
>>>
>>> Kenn
>>>
>>> On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey 
>>> wrote:
>>>
 Hi list,

 with our current project we are implementing our streaming pipeline
 based on Google Dataflow.

 Essentially we receive input via Kinesis, doing some filtering,
 enrichment and sessionizing and output to PubSub and/or google storage.

 After short investigations it is not clear to us, how checkpointing
 will work running on Dataflow in connection with KinesisIO. Is there any
 documentation/discussions to get a better understanding on how that will be
 working? Especially if we are forced to restart our pipelines, how could we
 ensure not to loose any events?

 As far as I understand currently, it should work 'auto-magically' but
 it is not yet clear to us, how it will actually behave. Before we try to
 start testing our expectations or even try to implement some
 watermark-tracking by ourself we hoped to get some insights from other
 users here.

 Any help appreciated.

 Best,

 michel

>>>

~Vincent


Re: Checkpointing Dataflow Pipeline

2021-04-07 Thread Alexey Romanenko
I can’t say exactly how it will work with Dataflow but for Spark Runner I 
answered it here [1]:

“Since KinesisIO is based on UnboundedSource.CheckpointMark, it uses the 
standard checkpoint mechanism, provided by Beam UnboundedSource.UnboundedReader.

Once a KinesisRecord has been read (actually, pulled from a records queue that 
is feed separately by actually fetching the records from Kinesis shard), then 
the shard checkpoint will be updated [2] by using the record SequenceNumber and 
then, depending on runner implementation of UnboundedSource and checkpoints 
processing, will be saved.

Afaik, Beam Spark Runner uses Spark States mechanism for this purposes. [3]”

Alexey


[1] 
https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838
[2] 
https://github.com/apache/beam/blob/01ba86777ab4ff6c4432af0896d239ada9d5d2f1/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java#L115
[3] 
https://github.com/apache/beam/blob/01ba86777ab4ff6c4432af0896d239ada9d5d2f1/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java#L200

> On 7 Apr 2021, at 00:12, Kenneth Knowles  wrote:
> 
> This sounds similar to the "Kafka Commit" in 
> https://github.com/apache/beam/pull/12572 
>  by +Boyuan Zhang 
>  and also to how PubsubIO ACKs messages in the 
> finalizer. I don't know much about KinesisIO or how Kinesis works. I was just 
> asking to clarify, in case other folks know more, like +Alexey Romanenko 
>  and +Ismaël Mejía 
>  have modified KinesisIO. If the feature does not 
> exist today, perhaps we can identify the best practices around this pattern.
> 
> Kenn
> 
> On Tue, Apr 6, 2021 at 1:59 PM Michael Luckey  > wrote:
> Hi Kenn,
> 
> yes, resuming reading at the proper timestamp is exactly the issue we are 
> currently struggling with. E.g. with Kinesis Client Lib we could store the 
> last read within some dynamo table. This mechanism is not used with beam, as 
> we understand, the runner is responsible to track that checkpoint mark.
> 
> Now, obviously on restarting the pipeline, e.g. on non compatible upgrade, 
> that is, an pipeline update is just not feasible, there must be some 
> mechanism in place on how Dataflow will know where to continue. Is that 
> simply the pipeline name? Or is there more involved? So how does 
> checkpointing actually work here?
> 
> Based on 'name', wouldn't that imply that something like (example taken from 
> https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates 
> )
> 
>   export REGION="us-central1"
> 
>   gcloud dataflow flex-template run "streaming-beam-sql-`date 
> +%Y%m%d-%H%M%S`" \
> --template-file-gcs-location "$TEMPLATE_PATH" \
> --parameters inputSubscription="$SUBSCRIPTION" \
> --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
> --region "$REGION"
> will not resume on last read on rerun, because the name obviously changes 
> here?
> 
> best,
> 
> michel
> 
> 
> 
> On Tue, Apr 6, 2021 at 10:38 PM Kenneth Knowles  > wrote:
> I would assume the main issue is resuming reading from the Kinesis stream 
> from the last read? In the case for Pubsub (just as another example of the 
> idea) this is part of the internal state of a pre-created subscription.
> 
> Kenn
> 
> On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey  > wrote:
> Hi list,
> 
> with our current project we are implementing our streaming pipeline based on 
> Google Dataflow.
> 
> Essentially we receive input via Kinesis, doing some filtering, enrichment 
> and sessionizing and output to PubSub and/or google storage.
> 
> After short investigations it is not clear to us, how checkpointing will work 
> running on Dataflow in connection with KinesisIO. Is there any 
> documentation/discussions to get a better understanding on how that will be 
> working? Especially if we are forced to restart our pipelines, how could we 
> ensure not to loose any events?
> 
> As far as I understand currently, it should work 'auto-magically' but it is 
> not yet clear to us, how it will actually behave. Before we try to start 
> testing our expectations or even try to implement some watermark-tracking by 
> ourself we hoped to get some insights from other users here.
> 
> Any help appreciated.
> 
> Best,
> 
> michel



Re: Checkpointing Dataflow Pipeline

2021-04-06 Thread Kenneth Knowles
This sounds similar to the "Kafka Commit" in
https://github.com/apache/beam/pull/12572 by +Boyuan Zhang
 and also to how PubsubIO ACKs messages in the
finalizer. I don't know much about KinesisIO or how Kinesis works. I was
just asking to clarify, in case other folks know more, like +Alexey
Romanenko  and +Ismaël Mejía  have
modified KinesisIO. If the feature does not exist today, perhaps we can
identify the best practices around this pattern.

Kenn

On Tue, Apr 6, 2021 at 1:59 PM Michael Luckey  wrote:

> Hi Kenn,
>
> yes, resuming reading at the proper timestamp is exactly the issue we are
> currently struggling with. E.g. with Kinesis Client Lib we could store the
> last read within some dynamo table. This mechanism is not used with beam,
> as we understand, the runner is responsible to track that checkpoint mark.
>
> Now, obviously on restarting the pipeline, e.g. on non compatible upgrade,
> that is, an pipeline update is just not feasible, there must be some
> mechanism in place on how Dataflow will know where to continue. Is that
> simply the pipeline name? Or is there more involved? So how does
> checkpointing actually work here?
>
> Based on 'name', wouldn't that imply that something like (example taken
> from
> https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates
> )
>
>   export REGION="us-central1"
>
>   gcloud dataflow flex-template run "streaming-beam-sql-`date 
> +%Y%m%d-%H%M%S`" \
> --template-file-gcs-location "$TEMPLATE_PATH" \
> --parameters inputSubscription="$SUBSCRIPTION" \
> --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
> --region "$REGION"
>
> will not resume on last read on rerun, because the name obviously changes
> here?
>
> best,
>
> michel
>
>
>
> On Tue, Apr 6, 2021 at 10:38 PM Kenneth Knowles  wrote:
>
>> I would assume the main issue is resuming reading from the Kinesis stream
>> from the last read? In the case for Pubsub (just as another example of the
>> idea) this is part of the internal state of a pre-created subscription.
>>
>> Kenn
>>
>> On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey 
>> wrote:
>>
>>> Hi list,
>>>
>>> with our current project we are implementing our streaming pipeline
>>> based on Google Dataflow.
>>>
>>> Essentially we receive input via Kinesis, doing some filtering,
>>> enrichment and sessionizing and output to PubSub and/or google storage.
>>>
>>> After short investigations it is not clear to us, how checkpointing will
>>> work running on Dataflow in connection with KinesisIO. Is there any
>>> documentation/discussions to get a better understanding on how that will be
>>> working? Especially if we are forced to restart our pipelines, how could we
>>> ensure not to loose any events?
>>>
>>> As far as I understand currently, it should work 'auto-magically' but it
>>> is not yet clear to us, how it will actually behave. Before we try to start
>>> testing our expectations or even try to implement some watermark-tracking
>>> by ourself we hoped to get some insights from other users here.
>>>
>>> Any help appreciated.
>>>
>>> Best,
>>>
>>> michel
>>>
>>


Re: Checkpointing Dataflow Pipeline

2021-04-06 Thread Michael Luckey
Hi Kenn,

yes, resuming reading at the proper timestamp is exactly the issue we are
currently struggling with. E.g. with Kinesis Client Lib we could store the
last read within some dynamo table. This mechanism is not used with beam,
as we understand, the runner is responsible to track that checkpoint mark.

Now, obviously on restarting the pipeline, e.g. on non compatible upgrade,
that is, an pipeline update is just not feasible, there must be some
mechanism in place on how Dataflow will know where to continue. Is that
simply the pipeline name? Or is there more involved? So how does
checkpointing actually work here?

Based on 'name', wouldn't that imply that something like (example taken
from
https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates
)

  export REGION="us-central1"

  gcloud dataflow flex-template run "streaming-beam-sql-`date +%Y%m%d-%H%M%S`" \
--template-file-gcs-location "$TEMPLATE_PATH" \
--parameters inputSubscription="$SUBSCRIPTION" \
--parameters outputTable="$PROJECT:$DATASET.$TABLE" \
--region "$REGION"

will not resume on last read on rerun, because the name obviously changes
here?

best,

michel



On Tue, Apr 6, 2021 at 10:38 PM Kenneth Knowles  wrote:

> I would assume the main issue is resuming reading from the Kinesis stream
> from the last read? In the case for Pubsub (just as another example of the
> idea) this is part of the internal state of a pre-created subscription.
>
> Kenn
>
> On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey  wrote:
>
>> Hi list,
>>
>> with our current project we are implementing our streaming pipeline based
>> on Google Dataflow.
>>
>> Essentially we receive input via Kinesis, doing some filtering,
>> enrichment and sessionizing and output to PubSub and/or google storage.
>>
>> After short investigations it is not clear to us, how checkpointing will
>> work running on Dataflow in connection with KinesisIO. Is there any
>> documentation/discussions to get a better understanding on how that will be
>> working? Especially if we are forced to restart our pipelines, how could we
>> ensure not to loose any events?
>>
>> As far as I understand currently, it should work 'auto-magically' but it
>> is not yet clear to us, how it will actually behave. Before we try to start
>> testing our expectations or even try to implement some watermark-tracking
>> by ourself we hoped to get some insights from other users here.
>>
>> Any help appreciated.
>>
>> Best,
>>
>> michel
>>
>


Re: Checkpointing Dataflow Pipeline

2021-04-06 Thread Kenneth Knowles
I would assume the main issue is resuming reading from the Kinesis stream
from the last read? In the case for Pubsub (just as another example of the
idea) this is part of the internal state of a pre-created subscription.

Kenn

On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey  wrote:

> Hi list,
>
> with our current project we are implementing our streaming pipeline based
> on Google Dataflow.
>
> Essentially we receive input via Kinesis, doing some filtering, enrichment
> and sessionizing and output to PubSub and/or google storage.
>
> After short investigations it is not clear to us, how checkpointing will
> work running on Dataflow in connection with KinesisIO. Is there any
> documentation/discussions to get a better understanding on how that will be
> working? Especially if we are forced to restart our pipelines, how could we
> ensure not to loose any events?
>
> As far as I understand currently, it should work 'auto-magically' but it
> is not yet clear to us, how it will actually behave. Before we try to start
> testing our expectations or even try to implement some watermark-tracking
> by ourself we hoped to get some insights from other users here.
>
> Any help appreciated.
>
> Best,
>
> michel
>


Checkpointing Dataflow Pipeline

2021-04-06 Thread Michael Luckey
Hi list,

with our current project we are implementing our streaming pipeline based
on Google Dataflow.

Essentially we receive input via Kinesis, doing some filtering, enrichment
and sessionizing and output to PubSub and/or google storage.

After short investigations it is not clear to us, how checkpointing will
work running on Dataflow in connection with KinesisIO. Is there any
documentation/discussions to get a better understanding on how that will be
working? Especially if we are forced to restart our pipelines, how could we
ensure not to loose any events?

As far as I understand currently, it should work 'auto-magically' but it is
not yet clear to us, how it will actually behave. Before we try to start
testing our expectations or even try to implement some watermark-tracking
by ourself we hoped to get some insights from other users here.

Any help appreciated.

Best,

michel