On Thu, Aug 16, 2018 at 10:39 AM André Missaglia < [email protected]> wrote:
> Hello everyone. > > Thanks for your answers. We've managed to solve our problem using the > solutions proposed here. > > First, there was no need for using autocommit. But switching to > "commitOffsetsInFinalize()" also didn't help. In our example, if a failure > occured when processing message #25, the runner would retry the whole batch > again, moving from "at-most-once processing" to "at-least-once processing". > It is always been at-least-once for DoFn side effects and external interactions. That is not specific to KafkaIO. Using reshuffle does not change that. Raghu. > > Of course, if the transformation is idempotent, it doesn't really matter. > But it is not always the case for us. > > Then, we tried using Reshuffle.viaRandomKey(). While the order of the > messages was lost, all messages were successfully processed exactly once. > > But one thing that still bugs me is that Reshuffle is deprecated[1]. I've > found some discussion[2] on the internet about this, but i'm still not sure > if it is safe to use that transformation in this use case. Any thoughts on > this? > > [1] > https://beam.apache.org/documentation/sdks/javadoc/2.6.0/org/apache/beam/sdk/transforms/Reshuffle.html#viaRandomKey-- > [2] https://www.mail-archive.com/[email protected]/msg07247.html > > Em qui, 16 de ago de 2018 às 13:36, Raghu Angadi <[email protected]> > escreveu: > >> I am not sure if it helps much. External checkpoints like Kafka >> 'autocommit' outside Beam's own checkpoint domain will always have quirks >> like this. I wanted to ask what their use case for using autocommit was >> (over commitOffsetsInFinalize()) >> >> If we wanted to, it is not clear to how an unbounded source can do that. >> A reader (by design I think) does not have visibility into when a >> checkpoint happens, bundle boundaries etc. E.g. it can try returning >> 'false' for start() and advance() initially, but does not know when to >> actually return records (in Dataflow, it could wait a few seconds). >> >> On Thu, Aug 16, 2018 at 8:21 AM Lukasz Cwik <[email protected]> wrote: >> >>> Raghu, based upon your description, do you think it would be a good for >>> KafkaIO to checkpoint on the first read without producing any actual >>> records? >>> >>> On Wed, Aug 15, 2018 at 11:49 AM Raghu Angadi <[email protected]> >>> wrote: >>> >>>> >>>> It is due to "enable.autocommit=true". Auto commit is an option to >>>> Kafka client and how and when it commits is totally out of control of Beam >>>> & KafkaIO. >>>> Could you try setting commitOffsetsInFinalize()[1] in KafkaIO rather >>>> than 'enable.autocommit'? That would ensure exactly once processing. >>>> >>>> That said, you might be interested in understanding why your example >>>> failed: >>>> enable.autocommit is not such a bad option by itself, but there is a >>>> corner case where it can cause issues like this. >>>> When a reader is initialized, it's start offset is determined in this >>>> (specifically in Dataflow, but roughly accurate on other runners too): >>>> >>>> - (a) If there is a checkpoint for the reader split (true for all >>>> reads except for very first read bundle read by the split from Kafka), >>>> the >>>> offset comes from checkpoint. This is how exactly once is ensures. Here >>>> the >>>> offset commit by Kafka client with 'autocommit' does not matter. >>>> - (b) If there is no checkpoint, (i.e. for the first bundle of >>>> records) KafkaIO does not set any offset explicitly and lets Kafka >>>> client >>>> decide. That implies it depend on your ConsumrConfig. So ConsumerConfig >>>> decides the offset when a pipeline first starts. >>>> >>>> In your example, when there was an exception for Message 25, it was >>>> still processing the first bundle of records and there was no Dataflow >>>> checkpoint. It kept hitting (b). Kafka's 'autocommit' is out of bounds, and >>>> it might have committed offset 60 in one of the reties. The next retry >>>> incorrectly reads from 60. >>>> >>>> I hope this helps. Enabling autocommit in only useful when you want to >>>> restart your pipeline from scratch (rather than 'updating' you Dataflow >>>> pipeline) and still want to *roughly* resume from where the previous >>>> pipeline left off. Even there, commitOffsetsInFinalize() is better. In >>>> either case, exactly once processing is not guaranteed when a pipeline >>>> restart, only way currently to achieve that is to 'update' the pipeline. >>>> >>>> [1]: >>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L623 >>>> >>>> Raghu. >>>> >>>> On Wed, Aug 15, 2018 at 10:14 AM Leonardo Miguel < >>>> [email protected]> wrote: >>>> >>>>> Hello, >>>>> >>>>> I'm using the KafkaIO source in my Beam pipeline, testing the scenario >>>>> where intermitent errors may happen (e.g. DB connection failure) with the >>>>> Dataflow runner. >>>>> So I produced a sequence of 200 messages containing sequential numbers >>>>> (1-200) to a topic, and then executed the following pipeline: >>>>> >>>>> p.apply("Read from kafka", KafkaIO.<String, String>read() >>>>> .withBootstrapServers(server) >>>>> .withTopic(topic) >>>>> .withKeyDeserializer(StringDeserializer.class) >>>>> .withValueDeserializer(StringDeserializer.class) >>>>> .updateConsumerProperties(properties) >>>>> .withoutMetadata()) >>>>> .apply(Values.create()) >>>>> .apply(ParDo.of(new StepTest())); >>>>> >>>>> Where StepTest is defined as follows: >>>>> >>>>> public class StepTest extends DoFn<String, String> { >>>>> @ProcessElement >>>>> public void processElement(ProcessContext pc) { >>>>> String element = pc.element(); >>>>> >>>>> if (randomErrorOccurs()) { >>>>> throw new RuntimeException("Failed ... " + element); >>>>> } else { >>>>> LOG.info(element); >>>>> } >>>>> } >>>>> } >>>>> >>>>> The consumer configuration has "enable.auto.commit=true". >>>>> I would expect that all the numbers get printed, and if an exception >>>>> is thrown, Dataflow's runner would retry processing that failed message >>>>> until it eventually works. >>>>> However, what happened in my pipeline was different: when errors start >>>>> happening due to my code, it caused some messages to be never processed, >>>>> and some were actually lost forever. >>>>> >>>>> I would expect something like: >>>>> >>>>> {...} >>>>> 22 >>>>> 23 >>>>> 24 >>>>> Failed ... 25 >>>>> {A new reader starts} >>>>> Reader-0: first record offset 60 >>>>> 61 >>>>> 62 >>>>> {...} >>>>> {Dataflow retries 25} >>>>> Failed ... 25 >>>>> {...} >>>>> and so on... (exception would never cease to happen in this case and >>>>> Dataflow would retry forever) >>>>> >>>>> My output was something like: >>>>> >>>>> {...} >>>>> 22 >>>>> 23 >>>>> 24 >>>>> Failed ... 25 >>>>> {A new reader starts} >>>>> Reader-0: first record offset 60 >>>>> 61 >>>>> 62 >>>>> {...} >>>>> >>>>> Message #25 never gets reprocessed, and all the messages up to 60 are >>>>> lost, probably the ones in the same processing bundle as 25. Even more >>>>> curious is that this behaviour doesn't happen when using the PubSubIO >>>>> source, which produces the first mentioned output. >>>>> >>>>> My questions are: >>>>> What is a good way of handling errors with Kafka source if I want all >>>>> messages to be processed exactly once? >>>>> Is there any Kafka or Dataflow configuration that I may be missing? >>>>> Please let me know of your thoughts. >>>>> >>>>> Andre (cc) is part of our team and will be together in this discussion. >>>>> >>>>> -- >>>>> []s >>>>> >>>>> Leonardo Alves Miguel >>>>> Data Engineer >>>>> (16) 3509-5555 | www.arquivei.com.br >>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >>>>> Silício] >>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>>> <https://www.facebook.com/arquivei> >>>>> <https://www.linkedin.com/company/arquivei> >>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>>>> >>>> > > -- > André Badawi Missaglia > Data Engineer > (16) 3509-5555 | www.arquivei.com.br > <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> > [image: Arquivei.com.br – Inteligência em Notas Fiscais] > <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> > [image: Google seleciona Arquivei para imersão e mentoria no Vale do > Silício] > <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> > <https://www.facebook.com/arquivei> > <https://www.linkedin.com/company/arquivei> > <https://www.youtube.com/watch?v=sSUUKxbXnxk> >
