I can’t say exactly how it will work with Dataflow but for Spark Runner I 
answered it here [1]:

“Since KinesisIO is based on UnboundedSource.CheckpointMark, it uses the 
standard checkpoint mechanism, provided by Beam UnboundedSource.UnboundedReader.

Once a KinesisRecord has been read (actually, pulled from a records queue that 
is feed separately by actually fetching the records from Kinesis shard), then 
the shard checkpoint will be updated [2] by using the record SequenceNumber and 
then, depending on runner implementation of UnboundedSource and checkpoints 
processing, will be saved.

Afaik, Beam Spark Runner uses Spark States mechanism for this purposes. [3]”

Alexey


[1] 
https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838
[2] 
https://github.com/apache/beam/blob/01ba86777ab4ff6c4432af0896d239ada9d5d2f1/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java#L115
[3] 
https://github.com/apache/beam/blob/01ba86777ab4ff6c4432af0896d239ada9d5d2f1/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java#L200

> On 7 Apr 2021, at 00:12, Kenneth Knowles <[email protected]> wrote:
> 
> This sounds similar to the "Kafka Commit" in 
> https://github.com/apache/beam/pull/12572 
> <https://github.com/apache/beam/pull/12572> by +Boyuan Zhang 
> <mailto:[email protected]> and also to how PubsubIO ACKs messages in the 
> finalizer. I don't know much about KinesisIO or how Kinesis works. I was just 
> asking to clarify, in case other folks know more, like +Alexey Romanenko 
> <mailto:[email protected]> and +Ismaël Mejía 
> <mailto:[email protected]> have modified KinesisIO. If the feature does not 
> exist today, perhaps we can identify the best practices around this pattern.
> 
> Kenn
> 
> On Tue, Apr 6, 2021 at 1:59 PM Michael Luckey <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi Kenn,
> 
> yes, resuming reading at the proper timestamp is exactly the issue we are 
> currently struggling with. E.g. with Kinesis Client Lib we could store the 
> last read within some dynamo table. This mechanism is not used with beam, as 
> we understand, the runner is responsible to track that checkpoint mark.
> 
> Now, obviously on restarting the pipeline, e.g. on non compatible upgrade, 
> that is, an pipeline update is just not feasible, there must be some 
> mechanism in place on how Dataflow will know where to continue. Is that 
> simply the pipeline name? Or is there more involved? So how does 
> checkpointing actually work here?
> 
> Based on 'name', wouldn't that imply that something like (example taken from 
> https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates 
> <https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates>)
> 
>   export REGION="us-central1"
> 
>   gcloud dataflow flex-template run "streaming-beam-sql-`date 
> +%Y%m%d-%H%M%S`" \
>     --template-file-gcs-location "$TEMPLATE_PATH" \
>     --parameters inputSubscription="$SUBSCRIPTION" \
>     --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
>     --region "$REGION"
> will not resume on last read on rerun, because the name obviously changes 
> here?
> 
> best,
> 
> michel
> 
> 
> 
> On Tue, Apr 6, 2021 at 10:38 PM Kenneth Knowles <[email protected] 
> <mailto:[email protected]>> wrote:
> I would assume the main issue is resuming reading from the Kinesis stream 
> from the last read? In the case for Pubsub (just as another example of the 
> idea) this is part of the internal state of a pre-created subscription.
> 
> Kenn
> 
> On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi list,
> 
> with our current project we are implementing our streaming pipeline based on 
> Google Dataflow.
> 
> Essentially we receive input via Kinesis, doing some filtering, enrichment 
> and sessionizing and output to PubSub and/or google storage.
> 
> After short investigations it is not clear to us, how checkpointing will work 
> running on Dataflow in connection with KinesisIO. Is there any 
> documentation/discussions to get a better understanding on how that will be 
> working? Especially if we are forced to restart our pipelines, how could we 
> ensure not to loose any events?
> 
> As far as I understand currently, it should work 'auto-magically' but it is 
> not yet clear to us, how it will actually behave. Before we try to start 
> testing our expectations or even try to implement some watermark-tracking by 
> ourself we hoped to get some insights from other users here.
> 
> Any help appreciated.
> 
> Best,
> 
> michel

Reply via email to