What I mean is, if you want to only commit offsets *after* a KafkaRecord<K,V> is processed, then you need to keep parallelism to the number of partitions, as offsets are monotonically increasing *per partition*. So if you only have one partition and then split into two 'threads', if T1 handling offsets A-C fails while T2 handling D-G succeed, it will commit back offsets indicating everything processed on T1 also succeeded.
*~Vincent* On Mon, Dec 13, 2021 at 11:12 AM Luke Cwik <[email protected]> wrote: > I believe you would be able to have parallelism greater than the number of > partitions for most of the pipeline. The checkpoint advancement code is > likely limited to the number of partitions but can be a very small portion > of the pipeline. > > On Fri, Dec 10, 2021 at 10:20 AM Vincent Marquez < > [email protected]> wrote: > >> If you want to ensure you have at least once processing I think the >> *maximum* amount of parallelization you can have would be the number of >> partitions you have, so you'd want to group by partition, process a bundle >> of that partition, then commit the last offset for a given partition. >> >> *~Vincent* >> >> >> On Fri, Dec 10, 2021 at 9:28 AM Luke Cwik <[email protected]> wrote: >> >>> Yes, you will need to deal with records being out of order because the >>> system will process many things in parallel. >>> >>> You can read the last committed offset from Kafka and compare it against >>> the offset you have right now. If the offset you have right is not the next >>> offset you store it in state and if it is then you find the contiguous >>> range of offsets that you have stored in state starting from this offset >>> and remove them from state and commit the last one in that contiguous range. >>> >>> On Fri, Dec 10, 2021 at 8:18 AM Juan Calvo Ferrándiz < >>> [email protected]> wrote: >>> >>>> >>>> >>>> Thanks Alexey! I understand. Continue thinking in possible solutions >>>> of committing records, I was thinking about what happens in this scenario: >>>> >>>> When processing windows of data, do they get processed in sequential >>>> order or is it possible for them to be processed out of order? For example >>>> Window 1 contains 10000 elements of data whereas window 2 contains 10 >>>> elements. Assuming Window 1 takes a while to process all of that data, is >>>> it possible window 2 will finish before window 1? >>>> >>>> Thanks again! >>>> >>>> On Fri, 10 Dec 2021 at 14:39, Alexey Romanenko < >>>> [email protected]> wrote: >>>> >>>>> I answered the similar questions on SO a while ago [1], and I hope it >>>>> will help. >>>>> >>>>> “By default, pipeline.apply(KafkaIO.read()...) will return >>>>> a PCollection<KafkaRecord<K, V>>. So, downstream in your pipeline you can >>>>> get an offset from KafkaRecord metadata and commit it manually in a way >>>>> that you need (just don't forget to disable AUTO_COMMIT in >>>>> KafkaIO.read()). >>>>> >>>>> By manual way, I mean that you should instantiate your own Kafka >>>>> client in your DoFn, process input element (as KafkaRecord<K, V>), that >>>>> was >>>>> read before, fetch an offset from KafkaRecord and commit it with your own >>>>> client. >>>>> >>>>> Though, you need to make sure that a call to external API and offset >>>>> commit will be atomic to prevent potential data loss (if it's critical)." >>>>> >>>>> [1] >>>>> https://stackoverflow.com/questions/69272461/how-to-manually-commit-kafka-offset-in-apache-beam-at-the-end-of-specific-dofun/69272880#69272880 >>>>> >>>>> — >>>>> Alexey >>>>> >>>>> On 10 Dec 2021, at 10:40, Juan Calvo Ferrándiz < >>>>> [email protected]> wrote: >>>>> >>>>> Thanks Luke for your quick response. I see, that makes sense. Now I >>>>> have two new questions if I may: >>>>> a) How I can get the offsets I want to commit. My investigation now is >>>>> going throw getCheckpointMark(), is this correct? >>>>> https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/UnboundedSource.UnboundedReader.html#:~:text=has%20been%20called.-,getCheckpointMark,-public%20abstract%C2%A0UnboundedSource >>>>> >>>>> b) With these offsets, I will create a client at the of the pipeline, >>>>> with Kafka library, and methods such as commitSync() and commitAsync(). Is >>>>> this correct? >>>>> https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html#:~:text=log%20an%20error.-,Asynchronous%20Commit,-One%20drawback%20of >>>>> >>>>> Thanks!!! >>>>> >>>>> *Juan * >>>>> >>>>> >>>>> On Fri, 10 Dec 2021 at 01:07, Luke Cwik <[email protected]> wrote: >>>>> >>>>>> commitOffsetsInFinalize is about committing the offset after the >>>>>> output has been durably persisted for the bundle containing the Kafka >>>>>> Read. >>>>>> The bundle represents a unit of work over a subgraph of the pipeline. You >>>>>> will want to ensure the commitOffsetsInFinalize is disabled and that the >>>>>> Kafka consumer config doesn't auto commit automatically. This will ensure >>>>>> that KafkaIO.Read doesn't commit the offsets. Then it is upto your >>>>>> PTransform to perform the committing. >>>>>> >>>>>> On Thu, Dec 9, 2021 at 3:36 PM Juan Calvo Ferrándiz < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Morning! >>>>>>> >>>>>>> First of all, thanks for all the incredible work you do, is amazing. >>>>>>> Then, secondly, I reach you for some help or guidance to manually commit >>>>>>> records. I want to do this so I can commit the record and the end of the >>>>>>> pipeline, and not in the read() of the KafkaIO. >>>>>>> >>>>>>> Bearing in mind what I have read in this post: >>>>>>> https://lists.apache.org/[email protected]:2021-9:[email protected]%20kafka%20commit >>>>>>> , and thinking of a pipeline similar to the one described, I understand >>>>>>> we >>>>>>> can use commitOffsetsInFinalize() to commit offsets in the read(). >>>>>>> What I don't understand is how this helps to commit the offset if we >>>>>>> want >>>>>>> to do this at the end, not in the reading. Thanks. All comments and >>>>>>> suggestions are more than welcome. :) >>>>>>> >>>>>>> >>>>>>> *Juan * >>>>>>> >>>>>>> >>>>>>> >>>>>
