This isn't a long term solution but inserting a Reshuffle inbetween your IO
and your transforms will ensure that failures in your transforms will be
decoupled from the IO.

The IO should be responsible for correctly pushing back any messages that
would be considered uncommitted in case of a failure. I don't have enough
familiarity with the KafkaIO implementation to provide further guidance
though.

On Wed, Aug 15, 2018 at 10:14 AM Leonardo Miguel <
[email protected]> wrote:

> Hello,
>
> I'm using the KafkaIO source in my Beam pipeline, testing the scenario
> where intermitent errors may happen (e.g. DB connection failure) with the
> Dataflow runner.
> So I produced a sequence of 200 messages containing sequential numbers
> (1-200) to a topic, and then executed the following pipeline:
>
> p.apply("Read from kafka", KafkaIO.<String, String>read()
>     .withBootstrapServers(server)
>     .withTopic(topic)
>     .withKeyDeserializer(StringDeserializer.class)
>     .withValueDeserializer(StringDeserializer.class)
>     .updateConsumerProperties(properties)
>     .withoutMetadata())
> .apply(Values.create())
> .apply(ParDo.of(new StepTest()));
>
> Where StepTest is defined as follows:
>
> public class StepTest extends DoFn<String, String> {
>     @ProcessElement
>     public void processElement(ProcessContext pc) {
>         String element = pc.element();
>
>         if (randomErrorOccurs()) {
>             throw new RuntimeException("Failed ... " + element);
>         } else {
>             LOG.info(element);
>         }
>     }
> }
>
> The consumer configuration has "enable.auto.commit=true".
> I would expect that all the numbers get printed, and if an exception is
> thrown, Dataflow's runner would retry processing that failed message until
> it eventually works.
> However, what happened in my pipeline was different: when errors start
> happening due to my code, it caused some messages to be never processed,
> and some were actually lost forever.
>
> I would expect something like:
>
> {...}
> 22
> 23
> 24
> Failed ... 25
> {A new reader starts}
> Reader-0: first record offset 60
> 61
> 62
> {...}
> {Dataflow retries 25}
> Failed ... 25
> {...}
> and so on... (exception would never cease to happen in this case and
> Dataflow would retry forever)
>
> My output was something like:
>
> {...}
> 22
> 23
> 24
> Failed ... 25
> {A new reader starts}
> Reader-0: first record offset 60
> 61
> 62
> {...}
>
> Message #25 never gets reprocessed, and all the messages up to 60 are
> lost, probably the ones in the same processing bundle as 25. Even more
> curious is that this behaviour doesn't happen when using the PubSubIO
> source, which produces the first mentioned output.
>
> My questions are:
> What is a good way of handling errors with Kafka source if I want all
> messages to be processed exactly once?
> Is there any Kafka or Dataflow configuration that I may be missing?
> Please let me know of your thoughts.
>
> Andre (cc) is part of our team and will be together in this discussion.
>
> --
> []s
>
> Leonardo Alves Miguel
> Data Engineer
> (16) 3509-5555 | www.arquivei.com.br
> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
> Silício]
> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
> <https://www.facebook.com/arquivei>
> <https://www.linkedin.com/company/arquivei>
> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>

Reply via email to