Thanks Alexey! I understand. Continue thinking in possible solutions of
committing records, I was thinking about what happens in this scenario:

When processing windows of data, do they get processed in sequential order
or is it possible for them to be processed out of order? For example Window
1 contains 10000 elements of data whereas window 2 contains 10 elements.
Assuming Window 1 takes a while to process all of that data, is it possible
window 2 will finish before window 1?

Thanks again!

On Fri, 10 Dec 2021 at 14:39, Alexey Romanenko <[email protected]>
wrote:

> I answered the similar questions on SO a while ago [1], and I hope it will
> help.
>
> “By default, pipeline.apply(KafkaIO.read()...) will return
> a PCollection<KafkaRecord<K, V>>. So, downstream in your pipeline you can
> get an offset from KafkaRecord metadata and commit it manually in a way
> that you need (just don't forget to disable AUTO_COMMIT in KafkaIO.read()).
>
> By manual way, I mean that you should instantiate your own Kafka client in
> your DoFn, process input element (as KafkaRecord<K, V>), that was read
> before, fetch an offset from KafkaRecord and commit it with your own
> client.
>
> Though, you need to make sure that a call to external API and offset
> commit will be atomic to prevent potential data loss (if it's critical)."
>
> [1]
> https://stackoverflow.com/questions/69272461/how-to-manually-commit-kafka-offset-in-apache-beam-at-the-end-of-specific-dofun/69272880#69272880
>
> —
> Alexey
>
> On 10 Dec 2021, at 10:40, Juan Calvo Ferrándiz <
> [email protected]> wrote:
>
> Thanks Luke for your quick response. I see, that makes sense. Now I have
> two new questions if I may:
> a) How I can get the offsets I want to commit. My investigation now is
> going throw getCheckpointMark(), is this correct?
> https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/UnboundedSource.UnboundedReader.html#:~:text=has%20been%20called.-,getCheckpointMark,-public%20abstract%C2%A0UnboundedSource
>
> b) With these offsets, I will create a client at the of the pipeline, with
> Kafka library, and methods such as commitSync() and commitAsync(). Is this
> correct?
> https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html#:~:text=log%20an%20error.-,Asynchronous%20Commit,-One%20drawback%20of
>
> Thanks!!!
>
> *Juan *
>
>
> On Fri, 10 Dec 2021 at 01:07, Luke Cwik <[email protected]> wrote:
>
>> commitOffsetsInFinalize is about committing the offset after the output
>> has been durably persisted for the bundle containing the Kafka Read. The
>> bundle represents a unit of work over a subgraph of the pipeline. You will
>> want to ensure the commitOffsetsInFinalize is disabled and that the Kafka
>> consumer config doesn't auto commit automatically. This will ensure that
>> KafkaIO.Read doesn't commit the offsets. Then it is upto your PTransform to
>> perform the committing.
>>
>> On Thu, Dec 9, 2021 at 3:36 PM Juan Calvo Ferrándiz <
>> [email protected]> wrote:
>>
>>> Morning!
>>>
>>> First of all, thanks for all the incredible work you do, is amazing.
>>> Then, secondly, I reach you for some help or guidance to manually commit
>>> records. I want to do this so I can commit the record and the end of the
>>> pipeline, and not in the read() of the KafkaIO.
>>>
>>> Bearing in mind what I have read in this post:
>>> https://lists.apache.org/[email protected]:2021-9:[email protected]%20kafka%20commit
>>> , and thinking of a pipeline similar to the one described, I understand we
>>> can use commitOffsetsInFinalize() to commit offsets in the read(). What
>>> I don't understand is how this helps to commit the offset if we want to do
>>> this at the end, not in the reading.    Thanks. All comments and
>>> suggestions are more than welcome. :)
>>>
>>>
>>> *Juan *
>>>
>>>
>>>
>

Reply via email to