If you want to ensure you have at least once processing I think the
*maximum* amount of parallelization you can have would be the number of
partitions you have, so you'd want to group by partition, process a bundle
of that partition, then commit the last offset for a given partition.
*~Vincent*
Thanks Alexey! I understand. Continue thinking in possible solutions of
committing records, I was thinking about what happens in this scenario:
When processing windows of data, do they get processed in sequential order
or is it possible for them to be processed out of order? For example Window
1
I answered the similar questions on SO a while ago [1], and I hope it will help.
“By default, pipeline.apply(KafkaIO.read()...) will return a
PCollection>. So, downstream in your pipeline you can get an
offset from KafkaRecord metadata and commit it manually in a way that you need
(just don't
Also, I was thinking if we could end up with some kind of race
conditioning:
bundle 1 contains:
Messages [1,2,3,4,5]
bundle 2 contains:
Messages: [6,7]If batch 2 completes before bundle 1 then it will commit all
messages up to commit 7. If bundle 1 fails for whatever reason we
potentially lose
Thanks Luke for your quick response. I see, that makes sense. Now I have
two new questions if I may:
a) How I can get the offsets I want to commit. My investigation now is
going throw getCheckpointMark(), is this correct?