Hello,
I'm using the KafkaIO source in my Beam pipeline, testing the scenario
where intermitent errors may happen (e.g. DB connection failure) with the
Dataflow runner.
So I produced a sequence of 200 messages containing sequential numbers
(1-200) to a topic, and then executed the following pipeline:
p.apply("Read from kafka", KafkaIO.<String, String>read()
.withBootstrapServers(server)
.withTopic(topic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(properties)
.withoutMetadata())
.apply(Values.create())
.apply(ParDo.of(new StepTest()));
Where StepTest is defined as follows:
public class StepTest extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext pc) {
String element = pc.element();
if (randomErrorOccurs()) {
throw new RuntimeException("Failed ... " + element);
} else {
LOG.info(element);
}
}
}
The consumer configuration has "enable.auto.commit=true".
I would expect that all the numbers get printed, and if an exception is
thrown, Dataflow's runner would retry processing that failed message until
it eventually works.
However, what happened in my pipeline was different: when errors start
happening due to my code, it caused some messages to be never processed,
and some were actually lost forever.
I would expect something like:
{...}
22
23
24
Failed ... 25
{A new reader starts}
Reader-0: first record offset 60
61
62
{...}
{Dataflow retries 25}
Failed ... 25
{...}
and so on... (exception would never cease to happen in this case and
Dataflow would retry forever)
My output was something like:
{...}
22
23
24
Failed ... 25
{A new reader starts}
Reader-0: first record offset 60
61
62
{...}
Message #25 never gets reprocessed, and all the messages up to 60 are lost,
probably the ones in the same processing bundle as 25. Even more curious is
that this behaviour doesn't happen when using the PubSubIO source, which
produces the first mentioned output.
My questions are:
What is a good way of handling errors with Kafka source if I want all
messages to be processed exactly once?
Is there any Kafka or Dataflow configuration that I may be missing?
Please let me know of your thoughts.
Andre (cc) is part of our team and will be together in this discussion.
--
[]s
Leonardo Alves Miguel
Data Engineer
(16) 3509-5555 | www.arquivei.com.br
<https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
[image: Arquivei.com.br – Inteligência em Notas Fiscais]
<https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
[image: Google seleciona Arquivei para imersão e mentoria no Vale do
Silício]
<https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
<https://www.facebook.com/arquivei>
<https://www.linkedin.com/company/arquivei>
<https://www.youtube.com/watch?v=sSUUKxbXnxk>