I am not sure if it helps much. External checkpoints like Kafka
'autocommit' outside Beam's own checkpoint domain will always have quirks
like this. I wanted to ask what their use case for using autocommit was
(over commitOffsetsInFinalize())

If we wanted to, it is not clear to how an unbounded source can do that. A
reader (by design I think) does not have visibility into when a checkpoint
happens, bundle boundaries etc. E.g. it can try returning 'false' for
start() and advance() initially, but does not know when to actually return
records (in Dataflow, it could wait a few seconds).

On Thu, Aug 16, 2018 at 8:21 AM Lukasz Cwik <[email protected]> wrote:

> Raghu, based upon your description, do you think it would be a good for
> KafkaIO to checkpoint on the first read without producing any actual
> records?
>
> On Wed, Aug 15, 2018 at 11:49 AM Raghu Angadi <[email protected]> wrote:
>
>>
>> It is due to "enable.autocommit=true".  Auto commit is an option to Kafka
>> client and how and when it commits is totally out of control of Beam &
>> KafkaIO.
>> Could you try setting commitOffsetsInFinalize()[1] in KafkaIO rather than
>> 'enable.autocommit'? That would ensure exactly once processing.
>>
>> That said, you might be interested in understanding why your example
>> failed:
>> enable.autocommit is not such a bad option by itself, but there is a
>> corner case where it can cause issues like this.
>> When a reader is initialized, it's start offset is determined in this
>> (specifically in Dataflow, but roughly accurate on other runners too):
>>
>>    - (a) If there is a  checkpoint for the reader split  (true for all
>>    reads except for very first read bundle read by the split from Kafka), the
>>    offset comes from checkpoint. This is how exactly once is ensures. Here 
>> the
>>    offset commit by Kafka client with 'autocommit' does not matter.
>>    - (b) If there is no checkpoint, (i.e. for the first bundle of
>>    records) KafkaIO does not set any offset explicitly and lets Kafka client
>>    decide. That implies it depend on your ConsumrConfig. So ConsumerConfig
>>    decides the offset when a pipeline first starts.
>>
>> In your example, when there was an exception for Message 25, it was still
>> processing the first bundle of records and there was no Dataflow
>> checkpoint. It kept hitting (b). Kafka's 'autocommit' is out of bounds, and
>> it might have committed offset 60 in one of the reties. The next retry
>> incorrectly reads from 60.
>>
>> I hope this helps. Enabling autocommit in only useful when you want to
>> restart your pipeline from scratch (rather than 'updating' you Dataflow
>> pipeline) and still want to *roughly* resume from where the previous
>> pipeline left off. Even there, commitOffsetsInFinalize() is better. In
>> either case, exactly once processing is not guaranteed when a pipeline
>> restart, only way currently to achieve that is to 'update' the pipeline.
>>
>> [1]:
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L623
>>
>> Raghu.
>>
>> On Wed, Aug 15, 2018 at 10:14 AM Leonardo Miguel <
>> [email protected]> wrote:
>>
>>> Hello,
>>>
>>> I'm using the KafkaIO source in my Beam pipeline, testing the scenario
>>> where intermitent errors may happen (e.g. DB connection failure) with the
>>> Dataflow runner.
>>> So I produced a sequence of 200 messages containing sequential numbers
>>> (1-200) to a topic, and then executed the following pipeline:
>>>
>>> p.apply("Read from kafka", KafkaIO.<String, String>read()
>>>     .withBootstrapServers(server)
>>>     .withTopic(topic)
>>>     .withKeyDeserializer(StringDeserializer.class)
>>>     .withValueDeserializer(StringDeserializer.class)
>>>     .updateConsumerProperties(properties)
>>>     .withoutMetadata())
>>> .apply(Values.create())
>>> .apply(ParDo.of(new StepTest()));
>>>
>>> Where StepTest is defined as follows:
>>>
>>> public class StepTest extends DoFn<String, String> {
>>>     @ProcessElement
>>>     public void processElement(ProcessContext pc) {
>>>         String element = pc.element();
>>>
>>>         if (randomErrorOccurs()) {
>>>             throw new RuntimeException("Failed ... " + element);
>>>         } else {
>>>             LOG.info(element);
>>>         }
>>>     }
>>> }
>>>
>>> The consumer configuration has "enable.auto.commit=true".
>>> I would expect that all the numbers get printed, and if an exception is
>>> thrown, Dataflow's runner would retry processing that failed message until
>>> it eventually works.
>>> However, what happened in my pipeline was different: when errors start
>>> happening due to my code, it caused some messages to be never processed,
>>> and some were actually lost forever.
>>>
>>> I would expect something like:
>>>
>>> {...}
>>> 22
>>> 23
>>> 24
>>> Failed ... 25
>>> {A new reader starts}
>>> Reader-0: first record offset 60
>>> 61
>>> 62
>>> {...}
>>> {Dataflow retries 25}
>>> Failed ... 25
>>> {...}
>>> and so on... (exception would never cease to happen in this case and
>>> Dataflow would retry forever)
>>>
>>> My output was something like:
>>>
>>> {...}
>>> 22
>>> 23
>>> 24
>>> Failed ... 25
>>> {A new reader starts}
>>> Reader-0: first record offset 60
>>> 61
>>> 62
>>> {...}
>>>
>>> Message #25 never gets reprocessed, and all the messages up to 60 are
>>> lost, probably the ones in the same processing bundle as 25. Even more
>>> curious is that this behaviour doesn't happen when using the PubSubIO
>>> source, which produces the first mentioned output.
>>>
>>> My questions are:
>>> What is a good way of handling errors with Kafka source if I want all
>>> messages to be processed exactly once?
>>> Is there any Kafka or Dataflow configuration that I may be missing?
>>> Please let me know of your thoughts.
>>>
>>> Andre (cc) is part of our team and will be together in this discussion.
>>>
>>> --
>>> []s
>>>
>>> Leonardo Alves Miguel
>>> Data Engineer
>>> (16) 3509-5555 | www.arquivei.com.br
>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>>> Silício]
>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>> <https://www.facebook.com/arquivei>
>>> <https://www.linkedin.com/company/arquivei>
>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>>>
>>

Reply via email to