The only way reshuffle could help is if you are able to separate processing that is prone to errors from processing that produces side effects i.e. IO --> DoFn_B_prone_to_exceptions --> Reshuffle --> (B) DoFn_B_producing_side_effects
This way, it might look like (B) processes each record exactly once, but it is not guaranteed (e.g. a worker could crash suddenly). Raghu. On Thu, Aug 16, 2018 at 11:12 AM Raghu Angadi <[email protected]> wrote: > On Thu, Aug 16, 2018 at 10:39 AM André Missaglia < > [email protected]> wrote: > >> Hello everyone. >> >> Thanks for your answers. We've managed to solve our problem using the >> solutions proposed here. >> >> First, there was no need for using autocommit. But switching to >> "commitOffsetsInFinalize()" also didn't help. In our example, if a failure >> occured when processing message #25, the runner would retry the whole batch >> again, moving from "at-most-once processing" to "at-least-once processing". >> > > It is always been at-least-once for DoFn side effects and external > interactions. That is not specific to KafkaIO. Using reshuffle does not > change that. > > Raghu. > > >> >> Of course, if the transformation is idempotent, it doesn't really matter. >> But it is not always the case for us. >> >> Then, we tried using Reshuffle.viaRandomKey(). While the order of the >> messages was lost, all messages were successfully processed exactly once. >> >> But one thing that still bugs me is that Reshuffle is deprecated[1]. I've >> found some discussion[2] on the internet about this, but i'm still not sure >> if it is safe to use that transformation in this use case. Any thoughts on >> this? >> >> [1] >> https://beam.apache.org/documentation/sdks/javadoc/2.6.0/org/apache/beam/sdk/transforms/Reshuffle.html#viaRandomKey-- >> [2] https://www.mail-archive.com/[email protected]/msg07247.html >> >> Em qui, 16 de ago de 2018 às 13:36, Raghu Angadi <[email protected]> >> escreveu: >> >>> I am not sure if it helps much. External checkpoints like Kafka >>> 'autocommit' outside Beam's own checkpoint domain will always have quirks >>> like this. I wanted to ask what their use case for using autocommit was >>> (over commitOffsetsInFinalize()) >>> >>> If we wanted to, it is not clear to how an unbounded source can do that. >>> A reader (by design I think) does not have visibility into when a >>> checkpoint happens, bundle boundaries etc. E.g. it can try returning >>> 'false' for start() and advance() initially, but does not know when to >>> actually return records (in Dataflow, it could wait a few seconds). >>> >>> On Thu, Aug 16, 2018 at 8:21 AM Lukasz Cwik <[email protected]> wrote: >>> >>>> Raghu, based upon your description, do you think it would be a good for >>>> KafkaIO to checkpoint on the first read without producing any actual >>>> records? >>>> >>>> On Wed, Aug 15, 2018 at 11:49 AM Raghu Angadi <[email protected]> >>>> wrote: >>>> >>>>> >>>>> It is due to "enable.autocommit=true". Auto commit is an option to >>>>> Kafka client and how and when it commits is totally out of control of Beam >>>>> & KafkaIO. >>>>> Could you try setting commitOffsetsInFinalize()[1] in KafkaIO rather >>>>> than 'enable.autocommit'? That would ensure exactly once processing. >>>>> >>>>> That said, you might be interested in understanding why your example >>>>> failed: >>>>> enable.autocommit is not such a bad option by itself, but there is a >>>>> corner case where it can cause issues like this. >>>>> When a reader is initialized, it's start offset is determined in this >>>>> (specifically in Dataflow, but roughly accurate on other runners too): >>>>> >>>>> - (a) If there is a checkpoint for the reader split (true for >>>>> all reads except for very first read bundle read by the split from >>>>> Kafka), >>>>> the offset comes from checkpoint. This is how exactly once is ensures. >>>>> Here >>>>> the offset commit by Kafka client with 'autocommit' does not matter. >>>>> - (b) If there is no checkpoint, (i.e. for the first bundle of >>>>> records) KafkaIO does not set any offset explicitly and lets Kafka >>>>> client >>>>> decide. That implies it depend on your ConsumrConfig. So ConsumerConfig >>>>> decides the offset when a pipeline first starts. >>>>> >>>>> In your example, when there was an exception for Message 25, it was >>>>> still processing the first bundle of records and there was no Dataflow >>>>> checkpoint. It kept hitting (b). Kafka's 'autocommit' is out of bounds, >>>>> and >>>>> it might have committed offset 60 in one of the reties. The next retry >>>>> incorrectly reads from 60. >>>>> >>>>> I hope this helps. Enabling autocommit in only useful when you want to >>>>> restart your pipeline from scratch (rather than 'updating' you Dataflow >>>>> pipeline) and still want to *roughly* resume from where the previous >>>>> pipeline left off. Even there, commitOffsetsInFinalize() is better. In >>>>> either case, exactly once processing is not guaranteed when a pipeline >>>>> restart, only way currently to achieve that is to 'update' the pipeline. >>>>> >>>>> [1]: >>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L623 >>>>> >>>>> Raghu. >>>>> >>>>> On Wed, Aug 15, 2018 at 10:14 AM Leonardo Miguel < >>>>> [email protected]> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> I'm using the KafkaIO source in my Beam pipeline, testing the >>>>>> scenario where intermitent errors may happen (e.g. DB connection failure) >>>>>> with the Dataflow runner. >>>>>> So I produced a sequence of 200 messages containing sequential >>>>>> numbers (1-200) to a topic, and then executed the following pipeline: >>>>>> >>>>>> p.apply("Read from kafka", KafkaIO.<String, String>read() >>>>>> .withBootstrapServers(server) >>>>>> .withTopic(topic) >>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>> .withValueDeserializer(StringDeserializer.class) >>>>>> .updateConsumerProperties(properties) >>>>>> .withoutMetadata()) >>>>>> .apply(Values.create()) >>>>>> .apply(ParDo.of(new StepTest())); >>>>>> >>>>>> Where StepTest is defined as follows: >>>>>> >>>>>> public class StepTest extends DoFn<String, String> { >>>>>> @ProcessElement >>>>>> public void processElement(ProcessContext pc) { >>>>>> String element = pc.element(); >>>>>> >>>>>> if (randomErrorOccurs()) { >>>>>> throw new RuntimeException("Failed ... " + element); >>>>>> } else { >>>>>> LOG.info(element); >>>>>> } >>>>>> } >>>>>> } >>>>>> >>>>>> The consumer configuration has "enable.auto.commit=true". >>>>>> I would expect that all the numbers get printed, and if an exception >>>>>> is thrown, Dataflow's runner would retry processing that failed message >>>>>> until it eventually works. >>>>>> However, what happened in my pipeline was different: when errors >>>>>> start happening due to my code, it caused some messages to be never >>>>>> processed, and some were actually lost forever. >>>>>> >>>>>> I would expect something like: >>>>>> >>>>>> {...} >>>>>> 22 >>>>>> 23 >>>>>> 24 >>>>>> Failed ... 25 >>>>>> {A new reader starts} >>>>>> Reader-0: first record offset 60 >>>>>> 61 >>>>>> 62 >>>>>> {...} >>>>>> {Dataflow retries 25} >>>>>> Failed ... 25 >>>>>> {...} >>>>>> and so on... (exception would never cease to happen in this case and >>>>>> Dataflow would retry forever) >>>>>> >>>>>> My output was something like: >>>>>> >>>>>> {...} >>>>>> 22 >>>>>> 23 >>>>>> 24 >>>>>> Failed ... 25 >>>>>> {A new reader starts} >>>>>> Reader-0: first record offset 60 >>>>>> 61 >>>>>> 62 >>>>>> {...} >>>>>> >>>>>> Message #25 never gets reprocessed, and all the messages up to 60 are >>>>>> lost, probably the ones in the same processing bundle as 25. Even more >>>>>> curious is that this behaviour doesn't happen when using the PubSubIO >>>>>> source, which produces the first mentioned output. >>>>>> >>>>>> My questions are: >>>>>> What is a good way of handling errors with Kafka source if I want all >>>>>> messages to be processed exactly once? >>>>>> Is there any Kafka or Dataflow configuration that I may be missing? >>>>>> Please let me know of your thoughts. >>>>>> >>>>>> Andre (cc) is part of our team and will be together in this >>>>>> discussion. >>>>>> >>>>>> -- >>>>>> []s >>>>>> >>>>>> Leonardo Alves Miguel >>>>>> Data Engineer >>>>>> (16) 3509-5555 | www.arquivei.com.br >>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >>>>>> Silício] >>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>>>> <https://www.facebook.com/arquivei> >>>>>> <https://www.linkedin.com/company/arquivei> >>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>>>>> >>>>> >> >> -- >> André Badawi Missaglia >> Data Engineer >> (16) 3509-5555 | www.arquivei.com.br >> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >> Silício] >> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >> <https://www.facebook.com/arquivei> >> <https://www.linkedin.com/company/arquivei> >> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >> >
