I can’t say exactly how it will work with Dataflow but for Spark Runner I answered it here [1]:
“Since KinesisIO is based on UnboundedSource.CheckpointMark, it uses the standard checkpoint mechanism, provided by Beam UnboundedSource.UnboundedReader. Once a KinesisRecord has been read (actually, pulled from a records queue that is feed separately by actually fetching the records from Kinesis shard), then the shard checkpoint will be updated [2] by using the record SequenceNumber and then, depending on runner implementation of UnboundedSource and checkpoints processing, will be saved. Afaik, Beam Spark Runner uses Spark States mechanism for this purposes. [3]” Alexey [1] https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838 [2] https://github.com/apache/beam/blob/01ba86777ab4ff6c4432af0896d239ada9d5d2f1/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java#L115 [3] https://github.com/apache/beam/blob/01ba86777ab4ff6c4432af0896d239ada9d5d2f1/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java#L200 > On 7 Apr 2021, at 00:12, Kenneth Knowles <[email protected]> wrote: > > This sounds similar to the "Kafka Commit" in > https://github.com/apache/beam/pull/12572 > <https://github.com/apache/beam/pull/12572> by +Boyuan Zhang > <mailto:[email protected]> and also to how PubsubIO ACKs messages in the > finalizer. I don't know much about KinesisIO or how Kinesis works. I was just > asking to clarify, in case other folks know more, like +Alexey Romanenko > <mailto:[email protected]> and +Ismaël Mejía > <mailto:[email protected]> have modified KinesisIO. If the feature does not > exist today, perhaps we can identify the best practices around this pattern. > > Kenn > > On Tue, Apr 6, 2021 at 1:59 PM Michael Luckey <[email protected] > <mailto:[email protected]>> wrote: > Hi Kenn, > > yes, resuming reading at the proper timestamp is exactly the issue we are > currently struggling with. E.g. with Kinesis Client Lib we could store the > last read within some dynamo table. This mechanism is not used with beam, as > we understand, the runner is responsible to track that checkpoint mark. > > Now, obviously on restarting the pipeline, e.g. on non compatible upgrade, > that is, an pipeline update is just not feasible, there must be some > mechanism in place on how Dataflow will know where to continue. Is that > simply the pipeline name? Or is there more involved? So how does > checkpointing actually work here? > > Based on 'name', wouldn't that imply that something like (example taken from > https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates > <https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates>) > > export REGION="us-central1" > > gcloud dataflow flex-template run "streaming-beam-sql-`date > +%Y%m%d-%H%M%S`" \ > --template-file-gcs-location "$TEMPLATE_PATH" \ > --parameters inputSubscription="$SUBSCRIPTION" \ > --parameters outputTable="$PROJECT:$DATASET.$TABLE" \ > --region "$REGION" > will not resume on last read on rerun, because the name obviously changes > here? > > best, > > michel > > > > On Tue, Apr 6, 2021 at 10:38 PM Kenneth Knowles <[email protected] > <mailto:[email protected]>> wrote: > I would assume the main issue is resuming reading from the Kinesis stream > from the last read? In the case for Pubsub (just as another example of the > idea) this is part of the internal state of a pre-created subscription. > > Kenn > > On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey <[email protected] > <mailto:[email protected]>> wrote: > Hi list, > > with our current project we are implementing our streaming pipeline based on > Google Dataflow. > > Essentially we receive input via Kinesis, doing some filtering, enrichment > and sessionizing and output to PubSub and/or google storage. > > After short investigations it is not clear to us, how checkpointing will work > running on Dataflow in connection with KinesisIO. Is there any > documentation/discussions to get a better understanding on how that will be > working? Especially if we are forced to restart our pipelines, how could we > ensure not to loose any events? > > As far as I understand currently, it should work 'auto-magically' but it is > not yet clear to us, how it will actually behave. Before we try to start > testing our expectations or even try to implement some watermark-tracking by > ourself we hoped to get some insights from other users here. > > Any help appreciated. > > Best, > > michel
