It is due to "enable.autocommit=true". Auto commit is an option to Kafka client and how and when it commits is totally out of control of Beam & KafkaIO. Could you try setting commitOffsetsInFinalize()[1] in KafkaIO rather than 'enable.autocommit'? That would ensure exactly once processing.
That said, you might be interested in understanding why your example failed: enable.autocommit is not such a bad option by itself, but there is a corner case where it can cause issues like this. When a reader is initialized, it's start offset is determined in this (specifically in Dataflow, but roughly accurate on other runners too): - (a) If there is a checkpoint for the reader split (true for all reads except for very first read bundle read by the split from Kafka), the offset comes from checkpoint. This is how exactly once is ensures. Here the offset commit by Kafka client with 'autocommit' does not matter. - (b) If there is no checkpoint, (i.e. for the first bundle of records) KafkaIO does not set any offset explicitly and lets Kafka client decide. That implies it depend on your ConsumrConfig. So ConsumerConfig decides the offset when a pipeline first starts. In your example, when there was an exception for Message 25, it was still processing the first bundle of records and there was no Dataflow checkpoint. It kept hitting (b). Kafka's 'autocommit' is out of bounds, and it might have committed offset 60 in one of the reties. The next retry incorrectly reads from 60. I hope this helps. Enabling autocommit in only useful when you want to restart your pipeline from scratch (rather than 'updating' you Dataflow pipeline) and still want to *roughly* resume from where the previous pipeline left off. Even there, commitOffsetsInFinalize() is better. In either case, exactly once processing is not guaranteed when a pipeline restart, only way currently to achieve that is to 'update' the pipeline. [1]: https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L623 Raghu. On Wed, Aug 15, 2018 at 10:14 AM Leonardo Miguel < [email protected]> wrote: > Hello, > > I'm using the KafkaIO source in my Beam pipeline, testing the scenario > where intermitent errors may happen (e.g. DB connection failure) with the > Dataflow runner. > So I produced a sequence of 200 messages containing sequential numbers > (1-200) to a topic, and then executed the following pipeline: > > p.apply("Read from kafka", KafkaIO.<String, String>read() > .withBootstrapServers(server) > .withTopic(topic) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(StringDeserializer.class) > .updateConsumerProperties(properties) > .withoutMetadata()) > .apply(Values.create()) > .apply(ParDo.of(new StepTest())); > > Where StepTest is defined as follows: > > public class StepTest extends DoFn<String, String> { > @ProcessElement > public void processElement(ProcessContext pc) { > String element = pc.element(); > > if (randomErrorOccurs()) { > throw new RuntimeException("Failed ... " + element); > } else { > LOG.info(element); > } > } > } > > The consumer configuration has "enable.auto.commit=true". > I would expect that all the numbers get printed, and if an exception is > thrown, Dataflow's runner would retry processing that failed message until > it eventually works. > However, what happened in my pipeline was different: when errors start > happening due to my code, it caused some messages to be never processed, > and some were actually lost forever. > > I would expect something like: > > {...} > 22 > 23 > 24 > Failed ... 25 > {A new reader starts} > Reader-0: first record offset 60 > 61 > 62 > {...} > {Dataflow retries 25} > Failed ... 25 > {...} > and so on... (exception would never cease to happen in this case and > Dataflow would retry forever) > > My output was something like: > > {...} > 22 > 23 > 24 > Failed ... 25 > {A new reader starts} > Reader-0: first record offset 60 > 61 > 62 > {...} > > Message #25 never gets reprocessed, and all the messages up to 60 are > lost, probably the ones in the same processing bundle as 25. Even more > curious is that this behaviour doesn't happen when using the PubSubIO > source, which produces the first mentioned output. > > My questions are: > What is a good way of handling errors with Kafka source if I want all > messages to be processed exactly once? > Is there any Kafka or Dataflow configuration that I may be missing? > Please let me know of your thoughts. > > Andre (cc) is part of our team and will be together in this discussion. > > -- > []s > > Leonardo Alves Miguel > Data Engineer > (16) 3509-5555 | www.arquivei.com.br > <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> > [image: Arquivei.com.br – Inteligência em Notas Fiscais] > <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> > [image: Google seleciona Arquivei para imersão e mentoria no Vale do > Silício] > <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> > <https://www.facebook.com/arquivei> > <https://www.linkedin.com/company/arquivei> > <https://www.youtube.com/watch?v=sSUUKxbXnxk> >
