I am not sure if it helps much. External checkpoints like Kafka 'autocommit' outside Beam's own checkpoint domain will always have quirks like this. I wanted to ask what their use case for using autocommit was (over commitOffsetsInFinalize())
If we wanted to, it is not clear to how an unbounded source can do that. A reader (by design I think) does not have visibility into when a checkpoint happens, bundle boundaries etc. E.g. it can try returning 'false' for start() and advance() initially, but does not know when to actually return records (in Dataflow, it could wait a few seconds). On Thu, Aug 16, 2018 at 8:21 AM Lukasz Cwik <[email protected]> wrote: > Raghu, based upon your description, do you think it would be a good for > KafkaIO to checkpoint on the first read without producing any actual > records? > > On Wed, Aug 15, 2018 at 11:49 AM Raghu Angadi <[email protected]> wrote: > >> >> It is due to "enable.autocommit=true". Auto commit is an option to Kafka >> client and how and when it commits is totally out of control of Beam & >> KafkaIO. >> Could you try setting commitOffsetsInFinalize()[1] in KafkaIO rather than >> 'enable.autocommit'? That would ensure exactly once processing. >> >> That said, you might be interested in understanding why your example >> failed: >> enable.autocommit is not such a bad option by itself, but there is a >> corner case where it can cause issues like this. >> When a reader is initialized, it's start offset is determined in this >> (specifically in Dataflow, but roughly accurate on other runners too): >> >> - (a) If there is a checkpoint for the reader split (true for all >> reads except for very first read bundle read by the split from Kafka), the >> offset comes from checkpoint. This is how exactly once is ensures. Here >> the >> offset commit by Kafka client with 'autocommit' does not matter. >> - (b) If there is no checkpoint, (i.e. for the first bundle of >> records) KafkaIO does not set any offset explicitly and lets Kafka client >> decide. That implies it depend on your ConsumrConfig. So ConsumerConfig >> decides the offset when a pipeline first starts. >> >> In your example, when there was an exception for Message 25, it was still >> processing the first bundle of records and there was no Dataflow >> checkpoint. It kept hitting (b). Kafka's 'autocommit' is out of bounds, and >> it might have committed offset 60 in one of the reties. The next retry >> incorrectly reads from 60. >> >> I hope this helps. Enabling autocommit in only useful when you want to >> restart your pipeline from scratch (rather than 'updating' you Dataflow >> pipeline) and still want to *roughly* resume from where the previous >> pipeline left off. Even there, commitOffsetsInFinalize() is better. In >> either case, exactly once processing is not guaranteed when a pipeline >> restart, only way currently to achieve that is to 'update' the pipeline. >> >> [1]: >> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L623 >> >> Raghu. >> >> On Wed, Aug 15, 2018 at 10:14 AM Leonardo Miguel < >> [email protected]> wrote: >> >>> Hello, >>> >>> I'm using the KafkaIO source in my Beam pipeline, testing the scenario >>> where intermitent errors may happen (e.g. DB connection failure) with the >>> Dataflow runner. >>> So I produced a sequence of 200 messages containing sequential numbers >>> (1-200) to a topic, and then executed the following pipeline: >>> >>> p.apply("Read from kafka", KafkaIO.<String, String>read() >>> .withBootstrapServers(server) >>> .withTopic(topic) >>> .withKeyDeserializer(StringDeserializer.class) >>> .withValueDeserializer(StringDeserializer.class) >>> .updateConsumerProperties(properties) >>> .withoutMetadata()) >>> .apply(Values.create()) >>> .apply(ParDo.of(new StepTest())); >>> >>> Where StepTest is defined as follows: >>> >>> public class StepTest extends DoFn<String, String> { >>> @ProcessElement >>> public void processElement(ProcessContext pc) { >>> String element = pc.element(); >>> >>> if (randomErrorOccurs()) { >>> throw new RuntimeException("Failed ... " + element); >>> } else { >>> LOG.info(element); >>> } >>> } >>> } >>> >>> The consumer configuration has "enable.auto.commit=true". >>> I would expect that all the numbers get printed, and if an exception is >>> thrown, Dataflow's runner would retry processing that failed message until >>> it eventually works. >>> However, what happened in my pipeline was different: when errors start >>> happening due to my code, it caused some messages to be never processed, >>> and some were actually lost forever. >>> >>> I would expect something like: >>> >>> {...} >>> 22 >>> 23 >>> 24 >>> Failed ... 25 >>> {A new reader starts} >>> Reader-0: first record offset 60 >>> 61 >>> 62 >>> {...} >>> {Dataflow retries 25} >>> Failed ... 25 >>> {...} >>> and so on... (exception would never cease to happen in this case and >>> Dataflow would retry forever) >>> >>> My output was something like: >>> >>> {...} >>> 22 >>> 23 >>> 24 >>> Failed ... 25 >>> {A new reader starts} >>> Reader-0: first record offset 60 >>> 61 >>> 62 >>> {...} >>> >>> Message #25 never gets reprocessed, and all the messages up to 60 are >>> lost, probably the ones in the same processing bundle as 25. Even more >>> curious is that this behaviour doesn't happen when using the PubSubIO >>> source, which produces the first mentioned output. >>> >>> My questions are: >>> What is a good way of handling errors with Kafka source if I want all >>> messages to be processed exactly once? >>> Is there any Kafka or Dataflow configuration that I may be missing? >>> Please let me know of your thoughts. >>> >>> Andre (cc) is part of our team and will be together in this discussion. >>> >>> -- >>> []s >>> >>> Leonardo Alves Miguel >>> Data Engineer >>> (16) 3509-5555 | www.arquivei.com.br >>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >>> Silício] >>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>> <https://www.facebook.com/arquivei> >>> <https://www.linkedin.com/company/arquivei> >>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>> >>
