Raghu, yes, I was thinking that advance would continuously return false. This would force a runner to checkpoint and then resume at which point we would have a stable reading point.
On Thu, Aug 16, 2018 at 3:17 PM Raghu Angadi <[email protected]> wrote: > The only way reshuffle could help is if you are able to separate > processing that is prone to errors from processing that produces side > effects i.e. > IO --> DoFn_B_prone_to_exceptions --> Reshuffle --> (B) > DoFn_B_producing_side_effects > > This way, it might look like (B) processes each record exactly once, but > it is not guaranteed (e.g. a worker could crash suddenly). > > Raghu. > > On Thu, Aug 16, 2018 at 11:12 AM Raghu Angadi <[email protected]> wrote: > >> On Thu, Aug 16, 2018 at 10:39 AM André Missaglia < >> [email protected]> wrote: >> >>> Hello everyone. >>> >>> Thanks for your answers. We've managed to solve our problem using the >>> solutions proposed here. >>> >>> First, there was no need for using autocommit. But switching to >>> "commitOffsetsInFinalize()" also didn't help. In our example, if a failure >>> occured when processing message #25, the runner would retry the whole batch >>> again, moving from "at-most-once processing" to "at-least-once processing". >>> >> >> It is always been at-least-once for DoFn side effects and external >> interactions. That is not specific to KafkaIO. Using reshuffle does not >> change that. >> >> Raghu. >> >> >>> >>> Of course, if the transformation is idempotent, it doesn't really >>> matter. But it is not always the case for us. >>> >>> Then, we tried using Reshuffle.viaRandomKey(). While the order of the >>> messages was lost, all messages were successfully processed exactly once. >>> >>> But one thing that still bugs me is that Reshuffle is deprecated[1]. >>> I've found some discussion[2] on the internet about this, but i'm still not >>> sure if it is safe to use that transformation in this use case. Any >>> thoughts on this? >>> >>> [1] >>> https://beam.apache.org/documentation/sdks/javadoc/2.6.0/org/apache/beam/sdk/transforms/Reshuffle.html#viaRandomKey-- >>> [2] https://www.mail-archive.com/[email protected]/msg07247.html >>> >>> Em qui, 16 de ago de 2018 às 13:36, Raghu Angadi <[email protected]> >>> escreveu: >>> >>>> I am not sure if it helps much. External checkpoints like Kafka >>>> 'autocommit' outside Beam's own checkpoint domain will always have quirks >>>> like this. I wanted to ask what their use case for using autocommit was >>>> (over commitOffsetsInFinalize()) >>>> >>>> If we wanted to, it is not clear to how an unbounded source can do >>>> that. A reader (by design I think) does not have visibility into when a >>>> checkpoint happens, bundle boundaries etc. E.g. it can try returning >>>> 'false' for start() and advance() initially, but does not know when to >>>> actually return records (in Dataflow, it could wait a few seconds). >>>> >>>> On Thu, Aug 16, 2018 at 8:21 AM Lukasz Cwik <[email protected]> wrote: >>>> >>>>> Raghu, based upon your description, do you think it would be a good >>>>> for KafkaIO to checkpoint on the first read without producing any actual >>>>> records? >>>>> >>>>> On Wed, Aug 15, 2018 at 11:49 AM Raghu Angadi <[email protected]> >>>>> wrote: >>>>> >>>>>> >>>>>> It is due to "enable.autocommit=true". Auto commit is an option to >>>>>> Kafka client and how and when it commits is totally out of control of >>>>>> Beam >>>>>> & KafkaIO. >>>>>> Could you try setting commitOffsetsInFinalize()[1] in KafkaIO rather >>>>>> than 'enable.autocommit'? That would ensure exactly once processing. >>>>>> >>>>>> That said, you might be interested in understanding why your example >>>>>> failed: >>>>>> enable.autocommit is not such a bad option by itself, but there is a >>>>>> corner case where it can cause issues like this. >>>>>> When a reader is initialized, it's start offset is determined in this >>>>>> (specifically in Dataflow, but roughly accurate on other runners too): >>>>>> >>>>>> - (a) If there is a checkpoint for the reader split (true for >>>>>> all reads except for very first read bundle read by the split from >>>>>> Kafka), >>>>>> the offset comes from checkpoint. This is how exactly once is >>>>>> ensures. Here >>>>>> the offset commit by Kafka client with 'autocommit' does not matter. >>>>>> - (b) If there is no checkpoint, (i.e. for the first bundle of >>>>>> records) KafkaIO does not set any offset explicitly and lets Kafka >>>>>> client >>>>>> decide. That implies it depend on your ConsumrConfig. So >>>>>> ConsumerConfig >>>>>> decides the offset when a pipeline first starts. >>>>>> >>>>>> In your example, when there was an exception for Message 25, it was >>>>>> still processing the first bundle of records and there was no Dataflow >>>>>> checkpoint. It kept hitting (b). Kafka's 'autocommit' is out of bounds, >>>>>> and >>>>>> it might have committed offset 60 in one of the reties. The next retry >>>>>> incorrectly reads from 60. >>>>>> >>>>>> I hope this helps. Enabling autocommit in only useful when you want >>>>>> to restart your pipeline from scratch (rather than 'updating' you >>>>>> Dataflow >>>>>> pipeline) and still want to *roughly* resume from where the previous >>>>>> pipeline left off. Even there, commitOffsetsInFinalize() is better. In >>>>>> either case, exactly once processing is not guaranteed when a pipeline >>>>>> restart, only way currently to achieve that is to 'update' the pipeline. >>>>>> >>>>>> [1]: >>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L623 >>>>>> >>>>>> Raghu. >>>>>> >>>>>> On Wed, Aug 15, 2018 at 10:14 AM Leonardo Miguel < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> I'm using the KafkaIO source in my Beam pipeline, testing the >>>>>>> scenario where intermitent errors may happen (e.g. DB connection >>>>>>> failure) >>>>>>> with the Dataflow runner. >>>>>>> So I produced a sequence of 200 messages containing sequential >>>>>>> numbers (1-200) to a topic, and then executed the following pipeline: >>>>>>> >>>>>>> p.apply("Read from kafka", KafkaIO.<String, String>read() >>>>>>> .withBootstrapServers(server) >>>>>>> .withTopic(topic) >>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>> .withValueDeserializer(StringDeserializer.class) >>>>>>> .updateConsumerProperties(properties) >>>>>>> .withoutMetadata()) >>>>>>> .apply(Values.create()) >>>>>>> .apply(ParDo.of(new StepTest())); >>>>>>> >>>>>>> Where StepTest is defined as follows: >>>>>>> >>>>>>> public class StepTest extends DoFn<String, String> { >>>>>>> @ProcessElement >>>>>>> public void processElement(ProcessContext pc) { >>>>>>> String element = pc.element(); >>>>>>> >>>>>>> if (randomErrorOccurs()) { >>>>>>> throw new RuntimeException("Failed ... " + element); >>>>>>> } else { >>>>>>> LOG.info(element); >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> The consumer configuration has "enable.auto.commit=true". >>>>>>> I would expect that all the numbers get printed, and if an exception >>>>>>> is thrown, Dataflow's runner would retry processing that failed message >>>>>>> until it eventually works. >>>>>>> However, what happened in my pipeline was different: when errors >>>>>>> start happening due to my code, it caused some messages to be never >>>>>>> processed, and some were actually lost forever. >>>>>>> >>>>>>> I would expect something like: >>>>>>> >>>>>>> {...} >>>>>>> 22 >>>>>>> 23 >>>>>>> 24 >>>>>>> Failed ... 25 >>>>>>> {A new reader starts} >>>>>>> Reader-0: first record offset 60 >>>>>>> 61 >>>>>>> 62 >>>>>>> {...} >>>>>>> {Dataflow retries 25} >>>>>>> Failed ... 25 >>>>>>> {...} >>>>>>> and so on... (exception would never cease to happen in this case and >>>>>>> Dataflow would retry forever) >>>>>>> >>>>>>> My output was something like: >>>>>>> >>>>>>> {...} >>>>>>> 22 >>>>>>> 23 >>>>>>> 24 >>>>>>> Failed ... 25 >>>>>>> {A new reader starts} >>>>>>> Reader-0: first record offset 60 >>>>>>> 61 >>>>>>> 62 >>>>>>> {...} >>>>>>> >>>>>>> Message #25 never gets reprocessed, and all the messages up to 60 >>>>>>> are lost, probably the ones in the same processing bundle as 25. Even >>>>>>> more >>>>>>> curious is that this behaviour doesn't happen when using the PubSubIO >>>>>>> source, which produces the first mentioned output. >>>>>>> >>>>>>> My questions are: >>>>>>> What is a good way of handling errors with Kafka source if I want >>>>>>> all messages to be processed exactly once? >>>>>>> Is there any Kafka or Dataflow configuration that I may be missing? >>>>>>> Please let me know of your thoughts. >>>>>>> >>>>>>> Andre (cc) is part of our team and will be together in this >>>>>>> discussion. >>>>>>> >>>>>>> -- >>>>>>> []s >>>>>>> >>>>>>> Leonardo Alves Miguel >>>>>>> Data Engineer >>>>>>> (16) 3509-5555 | www.arquivei.com.br >>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >>>>>>> Silício] >>>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>>>>> <https://www.facebook.com/arquivei> >>>>>>> <https://www.linkedin.com/company/arquivei> >>>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>>>>>> >>>>>> >>> >>> -- >>> André Badawi Missaglia >>> Data Engineer >>> (16) 3509-5555 | www.arquivei.com.br >>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >>> Silício] >>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>> <https://www.facebook.com/arquivei> >>> <https://www.linkedin.com/company/arquivei> >>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>> >>
