On Thu, Aug 16, 2018 at 10:39 AM André Missaglia <
[email protected]> wrote:

> Hello everyone.
>
> Thanks for your answers. We've managed to solve our problem using the
> solutions proposed here.
>
> First, there was no need for using autocommit. But switching to
> "commitOffsetsInFinalize()" also didn't help. In our example, if a failure
> occured when processing message #25, the runner would retry the whole batch
> again, moving from "at-most-once processing" to "at-least-once processing".
>

It is always been at-least-once for DoFn side effects and external
interactions. That is not specific to KafkaIO. Using reshuffle does not
change that.

Raghu.


>
> Of course, if the transformation is idempotent, it doesn't really matter.
> But it is not always the case for us.
>
> Then, we tried using Reshuffle.viaRandomKey(). While the order of the
> messages was lost, all messages were successfully processed exactly once.
>
> But one thing that still bugs me is that Reshuffle is deprecated[1]. I've
> found some discussion[2] on the internet about this, but i'm still not sure
> if it is safe to use that transformation in this use case. Any thoughts on
> this?
>
> [1]
> https://beam.apache.org/documentation/sdks/javadoc/2.6.0/org/apache/beam/sdk/transforms/Reshuffle.html#viaRandomKey--
> [2] https://www.mail-archive.com/[email protected]/msg07247.html
>
> Em qui, 16 de ago de 2018 às 13:36, Raghu Angadi <[email protected]>
> escreveu:
>
>> I am not sure if it helps much. External checkpoints like Kafka
>> 'autocommit' outside Beam's own checkpoint domain will always have quirks
>> like this. I wanted to ask what their use case for using autocommit was
>> (over commitOffsetsInFinalize())
>>
>> If we wanted to, it is not clear to how an unbounded source can do that.
>> A reader (by design I think) does not have visibility into when a
>> checkpoint happens, bundle boundaries etc. E.g. it can try returning
>> 'false' for start() and advance() initially, but does not know when to
>> actually return records (in Dataflow, it could wait a few seconds).
>>
>> On Thu, Aug 16, 2018 at 8:21 AM Lukasz Cwik <[email protected]> wrote:
>>
>>> Raghu, based upon your description, do you think it would be a good for
>>> KafkaIO to checkpoint on the first read without producing any actual
>>> records?
>>>
>>> On Wed, Aug 15, 2018 at 11:49 AM Raghu Angadi <[email protected]>
>>> wrote:
>>>
>>>>
>>>> It is due to "enable.autocommit=true".  Auto commit is an option to
>>>> Kafka client and how and when it commits is totally out of control of Beam
>>>> & KafkaIO.
>>>> Could you try setting commitOffsetsInFinalize()[1] in KafkaIO rather
>>>> than 'enable.autocommit'? That would ensure exactly once processing.
>>>>
>>>> That said, you might be interested in understanding why your example
>>>> failed:
>>>> enable.autocommit is not such a bad option by itself, but there is a
>>>> corner case where it can cause issues like this.
>>>> When a reader is initialized, it's start offset is determined in this
>>>> (specifically in Dataflow, but roughly accurate on other runners too):
>>>>
>>>>    - (a) If there is a  checkpoint for the reader split  (true for all
>>>>    reads except for very first read bundle read by the split from Kafka), 
>>>> the
>>>>    offset comes from checkpoint. This is how exactly once is ensures. Here 
>>>> the
>>>>    offset commit by Kafka client with 'autocommit' does not matter.
>>>>    - (b) If there is no checkpoint, (i.e. for the first bundle of
>>>>    records) KafkaIO does not set any offset explicitly and lets Kafka 
>>>> client
>>>>    decide. That implies it depend on your ConsumrConfig. So ConsumerConfig
>>>>    decides the offset when a pipeline first starts.
>>>>
>>>> In your example, when there was an exception for Message 25, it was
>>>> still processing the first bundle of records and there was no Dataflow
>>>> checkpoint. It kept hitting (b). Kafka's 'autocommit' is out of bounds, and
>>>> it might have committed offset 60 in one of the reties. The next retry
>>>> incorrectly reads from 60.
>>>>
>>>> I hope this helps. Enabling autocommit in only useful when you want to
>>>> restart your pipeline from scratch (rather than 'updating' you Dataflow
>>>> pipeline) and still want to *roughly* resume from where the previous
>>>> pipeline left off. Even there, commitOffsetsInFinalize() is better. In
>>>> either case, exactly once processing is not guaranteed when a pipeline
>>>> restart, only way currently to achieve that is to 'update' the pipeline.
>>>>
>>>> [1]:
>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L623
>>>>
>>>> Raghu.
>>>>
>>>> On Wed, Aug 15, 2018 at 10:14 AM Leonardo Miguel <
>>>> [email protected]> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I'm using the KafkaIO source in my Beam pipeline, testing the scenario
>>>>> where intermitent errors may happen (e.g. DB connection failure) with the
>>>>> Dataflow runner.
>>>>> So I produced a sequence of 200 messages containing sequential numbers
>>>>> (1-200) to a topic, and then executed the following pipeline:
>>>>>
>>>>> p.apply("Read from kafka", KafkaIO.<String, String>read()
>>>>>     .withBootstrapServers(server)
>>>>>     .withTopic(topic)
>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>     .withValueDeserializer(StringDeserializer.class)
>>>>>     .updateConsumerProperties(properties)
>>>>>     .withoutMetadata())
>>>>> .apply(Values.create())
>>>>> .apply(ParDo.of(new StepTest()));
>>>>>
>>>>> Where StepTest is defined as follows:
>>>>>
>>>>> public class StepTest extends DoFn<String, String> {
>>>>>     @ProcessElement
>>>>>     public void processElement(ProcessContext pc) {
>>>>>         String element = pc.element();
>>>>>
>>>>>         if (randomErrorOccurs()) {
>>>>>             throw new RuntimeException("Failed ... " + element);
>>>>>         } else {
>>>>>             LOG.info(element);
>>>>>         }
>>>>>     }
>>>>> }
>>>>>
>>>>> The consumer configuration has "enable.auto.commit=true".
>>>>> I would expect that all the numbers get printed, and if an exception
>>>>> is thrown, Dataflow's runner would retry processing that failed message
>>>>> until it eventually works.
>>>>> However, what happened in my pipeline was different: when errors start
>>>>> happening due to my code, it caused some messages to be never processed,
>>>>> and some were actually lost forever.
>>>>>
>>>>> I would expect something like:
>>>>>
>>>>> {...}
>>>>> 22
>>>>> 23
>>>>> 24
>>>>> Failed ... 25
>>>>> {A new reader starts}
>>>>> Reader-0: first record offset 60
>>>>> 61
>>>>> 62
>>>>> {...}
>>>>> {Dataflow retries 25}
>>>>> Failed ... 25
>>>>> {...}
>>>>> and so on... (exception would never cease to happen in this case and
>>>>> Dataflow would retry forever)
>>>>>
>>>>> My output was something like:
>>>>>
>>>>> {...}
>>>>> 22
>>>>> 23
>>>>> 24
>>>>> Failed ... 25
>>>>> {A new reader starts}
>>>>> Reader-0: first record offset 60
>>>>> 61
>>>>> 62
>>>>> {...}
>>>>>
>>>>> Message #25 never gets reprocessed, and all the messages up to 60 are
>>>>> lost, probably the ones in the same processing bundle as 25. Even more
>>>>> curious is that this behaviour doesn't happen when using the PubSubIO
>>>>> source, which produces the first mentioned output.
>>>>>
>>>>> My questions are:
>>>>> What is a good way of handling errors with Kafka source if I want all
>>>>> messages to be processed exactly once?
>>>>> Is there any Kafka or Dataflow configuration that I may be missing?
>>>>> Please let me know of your thoughts.
>>>>>
>>>>> Andre (cc) is part of our team and will be together in this discussion.
>>>>>
>>>>> --
>>>>> []s
>>>>>
>>>>> Leonardo Alves Miguel
>>>>> Data Engineer
>>>>> (16) 3509-5555 | www.arquivei.com.br
>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>>>>> Silício]
>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>>>> <https://www.facebook.com/arquivei>
>>>>> <https://www.linkedin.com/company/arquivei>
>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>>>>>
>>>>
>
> --
> André Badawi Missaglia
> Data Engineer
> (16) 3509-5555 | www.arquivei.com.br
> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
> Silício]
> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
> <https://www.facebook.com/arquivei>
> <https://www.linkedin.com/company/arquivei>
> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>

Reply via email to