Re: Kafka manually commit offsets

2021-12-10 Thread Vincent Marquez
If you want to ensure you have at least once processing I think the *maximum* amount of parallelization you can have would be the number of partitions you have, so you'd want to group by partition, process a bundle of that partition, then commit the last offset for a given partition. *~Vincent*

Re: Kafka manually commit offsets

2021-12-10 Thread Juan Calvo Ferrándiz
Thanks Alexey! I understand. Continue thinking in possible solutions of committing records, I was thinking about what happens in this scenario: When processing windows of data, do they get processed in sequential order or is it possible for them to be processed out of order? For example Window 1

Re: Kafka manually commit offsets

2021-12-10 Thread Alexey Romanenko
I answered the similar questions on SO a while ago [1], and I hope it will help. “By default, pipeline.apply(KafkaIO.read()...) will return a PCollection>. So, downstream in your pipeline you can get an offset from KafkaRecord metadata and commit it manually in a way that you need (just don't

Re: Kafka manually commit offsets

2021-12-10 Thread Juan Calvo Ferrándiz
Also, I was thinking if we could end up with some kind of race conditioning: bundle 1 contains: Messages [1,2,3,4,5] bundle 2 contains: Messages: [6,7]If batch 2 completes before bundle 1 then it will commit all messages up to commit 7. If bundle 1 fails for whatever reason we potentially lose

Re: Kafka manually commit offsets

2021-12-10 Thread Juan Calvo Ferrándiz
Thanks Luke for your quick response. I see, that makes sense. Now I have two new questions if I may: a) How I can get the offsets I want to commit. My investigation now is going throw getCheckpointMark(), is this correct?