Super! Thanks for all this info. Testing to do windowing on a per-key
basis, to have a consumer per topic/partition/schema. This way, the bundle
of data, from a specific window time, seems to wait until the previous one
has been processed, independently of the number of records.


*Juan*


On Mon, 13 Dec 2021 at 20:33, Vincent Marquez <[email protected]>
wrote:

> What I mean is, if you want to only commit offsets *after* a
> KafkaRecord<K,V> is processed, then you need to keep parallelism to the
> number of partitions, as offsets are monotonically increasing *per
> partition*.  So if you only have one partition and then split into two
> 'threads', if T1 handling offsets A-C fails while T2 handling D-G succeed,
> it will commit back offsets indicating everything processed on T1 also
> succeeded.
>
>
> *~Vincent*
>
>
> On Mon, Dec 13, 2021 at 11:12 AM Luke Cwik <[email protected]> wrote:
>
>> I believe you would be able to have parallelism greater than the number
>> of partitions for most of the pipeline. The checkpoint advancement code is
>> likely limited to the number of partitions but can be a very small portion
>> of the pipeline.
>>
>> On Fri, Dec 10, 2021 at 10:20 AM Vincent Marquez <
>> [email protected]> wrote:
>>
>>> If you want to ensure you have at least once processing I think the
>>> *maximum* amount of parallelization you can have would be the number of
>>> partitions you have, so you'd want to group by partition, process a bundle
>>> of that partition, then commit the last offset for a given partition.
>>>
>>> *~Vincent*
>>>
>>>
>>> On Fri, Dec 10, 2021 at 9:28 AM Luke Cwik <[email protected]> wrote:
>>>
>>>> Yes, you will need to deal with records being out of order because the
>>>> system will process many things in parallel.
>>>>
>>>> You can read the last committed offset from Kafka and compare it
>>>> against the offset you have right now. If the offset you have right is not
>>>> the next offset you store it in state and if it is then you find the
>>>> contiguous range of offsets that you have stored in state starting from
>>>> this offset and remove them from state and commit the last one in that
>>>> contiguous range.
>>>>
>>>> On Fri, Dec 10, 2021 at 8:18 AM Juan Calvo Ferrándiz <
>>>> [email protected]> wrote:
>>>>
>>>>>
>>>>>
>>>>> Thanks Alexey! I understand. Continue thinking in possible solutions
>>>>> of committing records, I was thinking about what happens in this scenario:
>>>>>
>>>>> When processing windows of data, do they get processed in sequential
>>>>> order or is it possible for them to be processed out of order? For example
>>>>> Window 1 contains 10000 elements of data whereas window 2 contains 10
>>>>> elements. Assuming Window 1 takes a while to process all of that data, is
>>>>> it possible window 2 will finish before window 1?
>>>>>
>>>>> Thanks again!
>>>>>
>>>>> On Fri, 10 Dec 2021 at 14:39, Alexey Romanenko <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> I answered the similar questions on SO a while ago [1], and I hope it
>>>>>> will help.
>>>>>>
>>>>>> “By default, pipeline.apply(KafkaIO.read()...) will return
>>>>>> a PCollection<KafkaRecord<K, V>>. So, downstream in your pipeline you can
>>>>>> get an offset from KafkaRecord metadata and commit it manually in a way
>>>>>> that you need (just don't forget to disable AUTO_COMMIT in 
>>>>>> KafkaIO.read()).
>>>>>>
>>>>>> By manual way, I mean that you should instantiate your own Kafka
>>>>>> client in your DoFn, process input element (as KafkaRecord<K, V>), that 
>>>>>> was
>>>>>> read before, fetch an offset from KafkaRecord and commit it with your own
>>>>>> client.
>>>>>>
>>>>>> Though, you need to make sure that a call to external API and offset
>>>>>> commit will be atomic to prevent potential data loss (if it's critical)."
>>>>>>
>>>>>> [1]
>>>>>> https://stackoverflow.com/questions/69272461/how-to-manually-commit-kafka-offset-in-apache-beam-at-the-end-of-specific-dofun/69272880#69272880
>>>>>>
>>>>>> —
>>>>>> Alexey
>>>>>>
>>>>>> On 10 Dec 2021, at 10:40, Juan Calvo Ferrándiz <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>> Thanks Luke for your quick response. I see, that makes sense. Now I
>>>>>> have two new questions if I may:
>>>>>> a) How I can get the offsets I want to commit. My investigation now
>>>>>> is going throw getCheckpointMark(), is this correct?
>>>>>> https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/UnboundedSource.UnboundedReader.html#:~:text=has%20been%20called.-,getCheckpointMark,-public%20abstract%C2%A0UnboundedSource
>>>>>>
>>>>>> b) With these offsets, I will create a client at the of the pipeline,
>>>>>> with Kafka library, and methods such as commitSync() and commitAsync(). 
>>>>>> Is
>>>>>> this correct?
>>>>>> https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html#:~:text=log%20an%20error.-,Asynchronous%20Commit,-One%20drawback%20of
>>>>>>
>>>>>> Thanks!!!
>>>>>>
>>>>>> *Juan *
>>>>>>
>>>>>>
>>>>>> On Fri, 10 Dec 2021 at 01:07, Luke Cwik <[email protected]> wrote:
>>>>>>
>>>>>>> commitOffsetsInFinalize is about committing the offset after the
>>>>>>> output has been durably persisted for the bundle containing the Kafka 
>>>>>>> Read.
>>>>>>> The bundle represents a unit of work over a subgraph of the pipeline. 
>>>>>>> You
>>>>>>> will want to ensure the commitOffsetsInFinalize is disabled and that the
>>>>>>> Kafka consumer config doesn't auto commit automatically. This will 
>>>>>>> ensure
>>>>>>> that KafkaIO.Read doesn't commit the offsets. Then it is upto your
>>>>>>> PTransform to perform the committing.
>>>>>>>
>>>>>>> On Thu, Dec 9, 2021 at 3:36 PM Juan Calvo Ferrándiz <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Morning!
>>>>>>>>
>>>>>>>> First of all, thanks for all the incredible work you do, is
>>>>>>>> amazing. Then, secondly, I reach you for some help or guidance to 
>>>>>>>> manually
>>>>>>>> commit records. I want to do this so I can commit the record and the 
>>>>>>>> end of
>>>>>>>> the pipeline, and not in the read() of the KafkaIO.
>>>>>>>>
>>>>>>>> Bearing in mind what I have read in this post:
>>>>>>>> https://lists.apache.org/[email protected]:2021-9:[email protected]%20kafka%20commit
>>>>>>>> , and thinking of a pipeline similar to the one described, I 
>>>>>>>> understand we
>>>>>>>> can use commitOffsetsInFinalize() to commit offsets in the read().
>>>>>>>> What I don't understand is how this helps to commit the offset if we 
>>>>>>>> want
>>>>>>>> to do this at the end, not in the reading.    Thanks. All comments and
>>>>>>>> suggestions are more than welcome. :)
>>>>>>>>
>>>>>>>>
>>>>>>>> *Juan *
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>

Reply via email to