Raghu, based upon your description, do you think it would be a good for KafkaIO to checkpoint on the first read without producing any actual records?
On Wed, Aug 15, 2018 at 11:49 AM Raghu Angadi <[email protected]> wrote: > > It is due to "enable.autocommit=true". Auto commit is an option to Kafka > client and how and when it commits is totally out of control of Beam & > KafkaIO. > Could you try setting commitOffsetsInFinalize()[1] in KafkaIO rather than > 'enable.autocommit'? That would ensure exactly once processing. > > That said, you might be interested in understanding why your example > failed: > enable.autocommit is not such a bad option by itself, but there is a > corner case where it can cause issues like this. > When a reader is initialized, it's start offset is determined in this > (specifically in Dataflow, but roughly accurate on other runners too): > > - (a) If there is a checkpoint for the reader split (true for all > reads except for very first read bundle read by the split from Kafka), the > offset comes from checkpoint. This is how exactly once is ensures. Here the > offset commit by Kafka client with 'autocommit' does not matter. > - (b) If there is no checkpoint, (i.e. for the first bundle of > records) KafkaIO does not set any offset explicitly and lets Kafka client > decide. That implies it depend on your ConsumrConfig. So ConsumerConfig > decides the offset when a pipeline first starts. > > In your example, when there was an exception for Message 25, it was still > processing the first bundle of records and there was no Dataflow > checkpoint. It kept hitting (b). Kafka's 'autocommit' is out of bounds, and > it might have committed offset 60 in one of the reties. The next retry > incorrectly reads from 60. > > I hope this helps. Enabling autocommit in only useful when you want to > restart your pipeline from scratch (rather than 'updating' you Dataflow > pipeline) and still want to *roughly* resume from where the previous > pipeline left off. Even there, commitOffsetsInFinalize() is better. In > either case, exactly once processing is not guaranteed when a pipeline > restart, only way currently to achieve that is to 'update' the pipeline. > > [1]: > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L623 > > Raghu. > > On Wed, Aug 15, 2018 at 10:14 AM Leonardo Miguel < > [email protected]> wrote: > >> Hello, >> >> I'm using the KafkaIO source in my Beam pipeline, testing the scenario >> where intermitent errors may happen (e.g. DB connection failure) with the >> Dataflow runner. >> So I produced a sequence of 200 messages containing sequential numbers >> (1-200) to a topic, and then executed the following pipeline: >> >> p.apply("Read from kafka", KafkaIO.<String, String>read() >> .withBootstrapServers(server) >> .withTopic(topic) >> .withKeyDeserializer(StringDeserializer.class) >> .withValueDeserializer(StringDeserializer.class) >> .updateConsumerProperties(properties) >> .withoutMetadata()) >> .apply(Values.create()) >> .apply(ParDo.of(new StepTest())); >> >> Where StepTest is defined as follows: >> >> public class StepTest extends DoFn<String, String> { >> @ProcessElement >> public void processElement(ProcessContext pc) { >> String element = pc.element(); >> >> if (randomErrorOccurs()) { >> throw new RuntimeException("Failed ... " + element); >> } else { >> LOG.info(element); >> } >> } >> } >> >> The consumer configuration has "enable.auto.commit=true". >> I would expect that all the numbers get printed, and if an exception is >> thrown, Dataflow's runner would retry processing that failed message until >> it eventually works. >> However, what happened in my pipeline was different: when errors start >> happening due to my code, it caused some messages to be never processed, >> and some were actually lost forever. >> >> I would expect something like: >> >> {...} >> 22 >> 23 >> 24 >> Failed ... 25 >> {A new reader starts} >> Reader-0: first record offset 60 >> 61 >> 62 >> {...} >> {Dataflow retries 25} >> Failed ... 25 >> {...} >> and so on... (exception would never cease to happen in this case and >> Dataflow would retry forever) >> >> My output was something like: >> >> {...} >> 22 >> 23 >> 24 >> Failed ... 25 >> {A new reader starts} >> Reader-0: first record offset 60 >> 61 >> 62 >> {...} >> >> Message #25 never gets reprocessed, and all the messages up to 60 are >> lost, probably the ones in the same processing bundle as 25. Even more >> curious is that this behaviour doesn't happen when using the PubSubIO >> source, which produces the first mentioned output. >> >> My questions are: >> What is a good way of handling errors with Kafka source if I want all >> messages to be processed exactly once? >> Is there any Kafka or Dataflow configuration that I may be missing? >> Please let me know of your thoughts. >> >> Andre (cc) is part of our team and will be together in this discussion. >> >> -- >> []s >> >> Leonardo Alves Miguel >> Data Engineer >> (16) 3509-5555 | www.arquivei.com.br >> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >> Silício] >> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >> <https://www.facebook.com/arquivei> >> <https://www.linkedin.com/company/arquivei> >> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >> >
