On Thu, Aug 16, 2018 at 4:54 PM Lukasz Cwik <[email protected]> wrote:
> Raghu, yes, I was thinking that advance would continuously return false. > This would force a runner to checkpoint and then resume at which point we > would have a stable reading point. > When the actual checkpoint occurs is still transparent to the reader. Only way it knows if there was one for sure is when a new reader is created with non-null CheckpointMark. In the case of Dataflow, readers are cached and might be created only once during the life of a job. > On Thu, Aug 16, 2018 at 3:17 PM Raghu Angadi <[email protected]> wrote: > >> The only way reshuffle could help is if you are able to separate >> processing that is prone to errors from processing that produces side >> effects i.e. >> IO --> DoFn_B_prone_to_exceptions --> Reshuffle --> (B) >> DoFn_B_producing_side_effects >> >> This way, it might look like (B) processes each record exactly once, but >> it is not guaranteed (e.g. a worker could crash suddenly). >> >> Raghu. >> >> On Thu, Aug 16, 2018 at 11:12 AM Raghu Angadi <[email protected]> wrote: >> >>> On Thu, Aug 16, 2018 at 10:39 AM André Missaglia < >>> [email protected]> wrote: >>> >>>> Hello everyone. >>>> >>>> Thanks for your answers. We've managed to solve our problem using the >>>> solutions proposed here. >>>> >>>> First, there was no need for using autocommit. But switching to >>>> "commitOffsetsInFinalize()" also didn't help. In our example, if a failure >>>> occured when processing message #25, the runner would retry the whole batch >>>> again, moving from "at-most-once processing" to "at-least-once processing". >>>> >>> >>> It is always been at-least-once for DoFn side effects and external >>> interactions. That is not specific to KafkaIO. Using reshuffle does not >>> change that. >>> >>> Raghu. >>> >>> >>>> >>>> Of course, if the transformation is idempotent, it doesn't really >>>> matter. But it is not always the case for us. >>>> >>>> Then, we tried using Reshuffle.viaRandomKey(). While the order of the >>>> messages was lost, all messages were successfully processed exactly once. >>>> >>>> But one thing that still bugs me is that Reshuffle is deprecated[1]. >>>> I've found some discussion[2] on the internet about this, but i'm still not >>>> sure if it is safe to use that transformation in this use case. Any >>>> thoughts on this? >>>> >>>> [1] >>>> https://beam.apache.org/documentation/sdks/javadoc/2.6.0/org/apache/beam/sdk/transforms/Reshuffle.html#viaRandomKey-- >>>> [2] https://www.mail-archive.com/[email protected]/msg07247.html >>>> >>>> Em qui, 16 de ago de 2018 às 13:36, Raghu Angadi <[email protected]> >>>> escreveu: >>>> >>>>> I am not sure if it helps much. External checkpoints like Kafka >>>>> 'autocommit' outside Beam's own checkpoint domain will always have quirks >>>>> like this. I wanted to ask what their use case for using autocommit was >>>>> (over commitOffsetsInFinalize()) >>>>> >>>>> If we wanted to, it is not clear to how an unbounded source can do >>>>> that. A reader (by design I think) does not have visibility into when a >>>>> checkpoint happens, bundle boundaries etc. E.g. it can try returning >>>>> 'false' for start() and advance() initially, but does not know when to >>>>> actually return records (in Dataflow, it could wait a few seconds). >>>>> >>>>> On Thu, Aug 16, 2018 at 8:21 AM Lukasz Cwik <[email protected]> wrote: >>>>> >>>>>> Raghu, based upon your description, do you think it would be a good >>>>>> for KafkaIO to checkpoint on the first read without producing any actual >>>>>> records? >>>>>> >>>>>> On Wed, Aug 15, 2018 at 11:49 AM Raghu Angadi <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> It is due to "enable.autocommit=true". Auto commit is an option to >>>>>>> Kafka client and how and when it commits is totally out of control of >>>>>>> Beam >>>>>>> & KafkaIO. >>>>>>> Could you try setting commitOffsetsInFinalize()[1] in KafkaIO rather >>>>>>> than 'enable.autocommit'? That would ensure exactly once processing. >>>>>>> >>>>>>> That said, you might be interested in understanding why your example >>>>>>> failed: >>>>>>> enable.autocommit is not such a bad option by itself, but there is a >>>>>>> corner case where it can cause issues like this. >>>>>>> When a reader is initialized, it's start offset is determined in >>>>>>> this (specifically in Dataflow, but roughly accurate on other runners >>>>>>> too): >>>>>>> >>>>>>> - (a) If there is a checkpoint for the reader split (true for >>>>>>> all reads except for very first read bundle read by the split from >>>>>>> Kafka), >>>>>>> the offset comes from checkpoint. This is how exactly once is >>>>>>> ensures. Here >>>>>>> the offset commit by Kafka client with 'autocommit' does not matter. >>>>>>> - (b) If there is no checkpoint, (i.e. for the first bundle of >>>>>>> records) KafkaIO does not set any offset explicitly and lets Kafka >>>>>>> client >>>>>>> decide. That implies it depend on your ConsumrConfig. So >>>>>>> ConsumerConfig >>>>>>> decides the offset when a pipeline first starts. >>>>>>> >>>>>>> In your example, when there was an exception for Message 25, it was >>>>>>> still processing the first bundle of records and there was no Dataflow >>>>>>> checkpoint. It kept hitting (b). Kafka's 'autocommit' is out of bounds, >>>>>>> and >>>>>>> it might have committed offset 60 in one of the reties. The next retry >>>>>>> incorrectly reads from 60. >>>>>>> >>>>>>> I hope this helps. Enabling autocommit in only useful when you want >>>>>>> to restart your pipeline from scratch (rather than 'updating' you >>>>>>> Dataflow >>>>>>> pipeline) and still want to *roughly* resume from where the >>>>>>> previous pipeline left off. Even there, commitOffsetsInFinalize() is >>>>>>> better. In either case, exactly once processing is not guaranteed when a >>>>>>> pipeline restart, only way currently to achieve that is to 'update' the >>>>>>> pipeline. >>>>>>> >>>>>>> [1]: >>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L623 >>>>>>> >>>>>>> Raghu. >>>>>>> >>>>>>> On Wed, Aug 15, 2018 at 10:14 AM Leonardo Miguel < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hello, >>>>>>>> >>>>>>>> I'm using the KafkaIO source in my Beam pipeline, testing the >>>>>>>> scenario where intermitent errors may happen (e.g. DB connection >>>>>>>> failure) >>>>>>>> with the Dataflow runner. >>>>>>>> So I produced a sequence of 200 messages containing sequential >>>>>>>> numbers (1-200) to a topic, and then executed the following pipeline: >>>>>>>> >>>>>>>> p.apply("Read from kafka", KafkaIO.<String, String>read() >>>>>>>> .withBootstrapServers(server) >>>>>>>> .withTopic(topic) >>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>> .withValueDeserializer(StringDeserializer.class) >>>>>>>> .updateConsumerProperties(properties) >>>>>>>> .withoutMetadata()) >>>>>>>> .apply(Values.create()) >>>>>>>> .apply(ParDo.of(new StepTest())); >>>>>>>> >>>>>>>> Where StepTest is defined as follows: >>>>>>>> >>>>>>>> public class StepTest extends DoFn<String, String> { >>>>>>>> @ProcessElement >>>>>>>> public void processElement(ProcessContext pc) { >>>>>>>> String element = pc.element(); >>>>>>>> >>>>>>>> if (randomErrorOccurs()) { >>>>>>>> throw new RuntimeException("Failed ... " + element); >>>>>>>> } else { >>>>>>>> LOG.info(element); >>>>>>>> } >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> The consumer configuration has "enable.auto.commit=true". >>>>>>>> I would expect that all the numbers get printed, and if an >>>>>>>> exception is thrown, Dataflow's runner would retry processing that >>>>>>>> failed >>>>>>>> message until it eventually works. >>>>>>>> However, what happened in my pipeline was different: when errors >>>>>>>> start happening due to my code, it caused some messages to be never >>>>>>>> processed, and some were actually lost forever. >>>>>>>> >>>>>>>> I would expect something like: >>>>>>>> >>>>>>>> {...} >>>>>>>> 22 >>>>>>>> 23 >>>>>>>> 24 >>>>>>>> Failed ... 25 >>>>>>>> {A new reader starts} >>>>>>>> Reader-0: first record offset 60 >>>>>>>> 61 >>>>>>>> 62 >>>>>>>> {...} >>>>>>>> {Dataflow retries 25} >>>>>>>> Failed ... 25 >>>>>>>> {...} >>>>>>>> and so on... (exception would never cease to happen in this case >>>>>>>> and Dataflow would retry forever) >>>>>>>> >>>>>>>> My output was something like: >>>>>>>> >>>>>>>> {...} >>>>>>>> 22 >>>>>>>> 23 >>>>>>>> 24 >>>>>>>> Failed ... 25 >>>>>>>> {A new reader starts} >>>>>>>> Reader-0: first record offset 60 >>>>>>>> 61 >>>>>>>> 62 >>>>>>>> {...} >>>>>>>> >>>>>>>> Message #25 never gets reprocessed, and all the messages up to 60 >>>>>>>> are lost, probably the ones in the same processing bundle as 25. Even >>>>>>>> more >>>>>>>> curious is that this behaviour doesn't happen when using the PubSubIO >>>>>>>> source, which produces the first mentioned output. >>>>>>>> >>>>>>>> My questions are: >>>>>>>> What is a good way of handling errors with Kafka source if I want >>>>>>>> all messages to be processed exactly once? >>>>>>>> Is there any Kafka or Dataflow configuration that I may be missing? >>>>>>>> Please let me know of your thoughts. >>>>>>>> >>>>>>>> Andre (cc) is part of our team and will be together in this >>>>>>>> discussion. >>>>>>>> >>>>>>>> -- >>>>>>>> []s >>>>>>>> >>>>>>>> Leonardo Alves Miguel >>>>>>>> Data Engineer >>>>>>>> (16) 3509-5555 | www.arquivei.com.br >>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale >>>>>>>> do Silício] >>>>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>>>>>> <https://www.facebook.com/arquivei> >>>>>>>> <https://www.linkedin.com/company/arquivei> >>>>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>>>>>>> >>>>>>> >>>> >>>> -- >>>> André Badawi Missaglia >>>> Data Engineer >>>> (16) 3509-5555 | www.arquivei.com.br >>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >>>> Silício] >>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>> <https://www.facebook.com/arquivei> >>>> <https://www.linkedin.com/company/arquivei> >>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>>> >>>
