Also, I was thinking if we could end up with some kind of race conditioning:
bundle 1 contains: Messages [1,2,3,4,5] bundle 2 contains: Messages: [6,7]If batch 2 completes before bundle 1 then it will commit all messages up to commit 7. If bundle 1 fails for whatever reason we potentially lose that data, right? PS: I miss an "end" between: ...at the "end" of the pipeline... in previous question b) Thank you Luke! *Juan* On Fri, 10 Dec 2021 at 10:40, Juan Calvo Ferrándiz < [email protected]> wrote: > Thanks Luke for your quick response. I see, that makes sense. Now I have > two new questions if I may: > a) How I can get the offsets I want to commit. My investigation now is > going throw getCheckpointMark(), is this correct? > https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/UnboundedSource.UnboundedReader.html#:~:text=has%20been%20called.-,getCheckpointMark,-public%20abstract%C2%A0UnboundedSource > > b) With these offsets, I will create a client at the of the pipeline, with > Kafka library, and methods such as commitSync() and commitAsync(). Is this > correct? > https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html#:~:text=log%20an%20error.-,Asynchronous%20Commit,-One%20drawback%20of > > Thanks!!! > > *Juan * > > > On Fri, 10 Dec 2021 at 01:07, Luke Cwik <[email protected]> wrote: > >> commitOffsetsInFinalize is about committing the offset after the output >> has been durably persisted for the bundle containing the Kafka Read. The >> bundle represents a unit of work over a subgraph of the pipeline. You will >> want to ensure the commitOffsetsInFinalize is disabled and that the Kafka >> consumer config doesn't auto commit automatically. This will ensure that >> KafkaIO.Read doesn't commit the offsets. Then it is upto your PTransform to >> perform the committing. >> >> On Thu, Dec 9, 2021 at 3:36 PM Juan Calvo Ferrándiz < >> [email protected]> wrote: >> >>> Morning! >>> >>> First of all, thanks for all the incredible work you do, is amazing. >>> Then, secondly, I reach you for some help or guidance to manually commit >>> records. I want to do this so I can commit the record and the end of the >>> pipeline, and not in the read() of the KafkaIO. >>> >>> Bearing in mind what I have read in this post: >>> https://lists.apache.org/[email protected]:2021-9:[email protected]%20kafka%20commit >>> , and thinking of a pipeline similar to the one described, I understand we >>> can use commitOffsetsInFinalize() to commit offsets in the read(). What >>> I don't understand is how this helps to commit the offset if we want to do >>> this at the end, not in the reading. Thanks. All comments and >>> suggestions are more than welcome. :) >>> >>> >>> *Juan Calvo Ferrándiz* >>> Data Engineer >>> Go to LINKEDIN <https://www.linkedin.com/in/juan-calvo-ferrandiz/> >>> Go to GITHUB <https://github.com/juancalvof> >>> Go to MEDIUM <https://medium.com/@juancalvoferrandiz> >>> >>>
