The only way reshuffle could help is if you are able to separate processing
that is prone to errors from processing that produces side effects i.e.
IO --> DoFn_B_prone_to_exceptions --> Reshuffle --> (B)
DoFn_B_producing_side_effects

This way, it might look like (B) processes each record exactly once, but it
is not guaranteed (e.g. a worker could crash suddenly).

Raghu.

On Thu, Aug 16, 2018 at 11:12 AM Raghu Angadi <[email protected]> wrote:

> On Thu, Aug 16, 2018 at 10:39 AM André Missaglia <
> [email protected]> wrote:
>
>> Hello everyone.
>>
>> Thanks for your answers. We've managed to solve our problem using the
>> solutions proposed here.
>>
>> First, there was no need for using autocommit. But switching to
>> "commitOffsetsInFinalize()" also didn't help. In our example, if a failure
>> occured when processing message #25, the runner would retry the whole batch
>> again, moving from "at-most-once processing" to "at-least-once processing".
>>
>
> It is always been at-least-once for DoFn side effects and external
> interactions. That is not specific to KafkaIO. Using reshuffle does not
> change that.
>
> Raghu.
>
>
>>
>> Of course, if the transformation is idempotent, it doesn't really matter.
>> But it is not always the case for us.
>>
>> Then, we tried using Reshuffle.viaRandomKey(). While the order of the
>> messages was lost, all messages were successfully processed exactly once.
>>
>> But one thing that still bugs me is that Reshuffle is deprecated[1]. I've
>> found some discussion[2] on the internet about this, but i'm still not sure
>> if it is safe to use that transformation in this use case. Any thoughts on
>> this?
>>
>> [1]
>> https://beam.apache.org/documentation/sdks/javadoc/2.6.0/org/apache/beam/sdk/transforms/Reshuffle.html#viaRandomKey--
>> [2] https://www.mail-archive.com/[email protected]/msg07247.html
>>
>> Em qui, 16 de ago de 2018 às 13:36, Raghu Angadi <[email protected]>
>> escreveu:
>>
>>> I am not sure if it helps much. External checkpoints like Kafka
>>> 'autocommit' outside Beam's own checkpoint domain will always have quirks
>>> like this. I wanted to ask what their use case for using autocommit was
>>> (over commitOffsetsInFinalize())
>>>
>>> If we wanted to, it is not clear to how an unbounded source can do that.
>>> A reader (by design I think) does not have visibility into when a
>>> checkpoint happens, bundle boundaries etc. E.g. it can try returning
>>> 'false' for start() and advance() initially, but does not know when to
>>> actually return records (in Dataflow, it could wait a few seconds).
>>>
>>> On Thu, Aug 16, 2018 at 8:21 AM Lukasz Cwik <[email protected]> wrote:
>>>
>>>> Raghu, based upon your description, do you think it would be a good for
>>>> KafkaIO to checkpoint on the first read without producing any actual
>>>> records?
>>>>
>>>> On Wed, Aug 15, 2018 at 11:49 AM Raghu Angadi <[email protected]>
>>>> wrote:
>>>>
>>>>>
>>>>> It is due to "enable.autocommit=true".  Auto commit is an option to
>>>>> Kafka client and how and when it commits is totally out of control of Beam
>>>>> & KafkaIO.
>>>>> Could you try setting commitOffsetsInFinalize()[1] in KafkaIO rather
>>>>> than 'enable.autocommit'? That would ensure exactly once processing.
>>>>>
>>>>> That said, you might be interested in understanding why your example
>>>>> failed:
>>>>> enable.autocommit is not such a bad option by itself, but there is a
>>>>> corner case where it can cause issues like this.
>>>>> When a reader is initialized, it's start offset is determined in this
>>>>> (specifically in Dataflow, but roughly accurate on other runners too):
>>>>>
>>>>>    - (a) If there is a  checkpoint for the reader split  (true for
>>>>>    all reads except for very first read bundle read by the split from 
>>>>> Kafka),
>>>>>    the offset comes from checkpoint. This is how exactly once is ensures. 
>>>>> Here
>>>>>    the offset commit by Kafka client with 'autocommit' does not matter.
>>>>>    - (b) If there is no checkpoint, (i.e. for the first bundle of
>>>>>    records) KafkaIO does not set any offset explicitly and lets Kafka 
>>>>> client
>>>>>    decide. That implies it depend on your ConsumrConfig. So ConsumerConfig
>>>>>    decides the offset when a pipeline first starts.
>>>>>
>>>>> In your example, when there was an exception for Message 25, it was
>>>>> still processing the first bundle of records and there was no Dataflow
>>>>> checkpoint. It kept hitting (b). Kafka's 'autocommit' is out of bounds, 
>>>>> and
>>>>> it might have committed offset 60 in one of the reties. The next retry
>>>>> incorrectly reads from 60.
>>>>>
>>>>> I hope this helps. Enabling autocommit in only useful when you want to
>>>>> restart your pipeline from scratch (rather than 'updating' you Dataflow
>>>>> pipeline) and still want to *roughly* resume from where the previous
>>>>> pipeline left off. Even there, commitOffsetsInFinalize() is better. In
>>>>> either case, exactly once processing is not guaranteed when a pipeline
>>>>> restart, only way currently to achieve that is to 'update' the pipeline.
>>>>>
>>>>> [1]:
>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L623
>>>>>
>>>>> Raghu.
>>>>>
>>>>> On Wed, Aug 15, 2018 at 10:14 AM Leonardo Miguel <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I'm using the KafkaIO source in my Beam pipeline, testing the
>>>>>> scenario where intermitent errors may happen (e.g. DB connection failure)
>>>>>> with the Dataflow runner.
>>>>>> So I produced a sequence of 200 messages containing sequential
>>>>>> numbers (1-200) to a topic, and then executed the following pipeline:
>>>>>>
>>>>>> p.apply("Read from kafka", KafkaIO.<String, String>read()
>>>>>>     .withBootstrapServers(server)
>>>>>>     .withTopic(topic)
>>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>>     .withValueDeserializer(StringDeserializer.class)
>>>>>>     .updateConsumerProperties(properties)
>>>>>>     .withoutMetadata())
>>>>>> .apply(Values.create())
>>>>>> .apply(ParDo.of(new StepTest()));
>>>>>>
>>>>>> Where StepTest is defined as follows:
>>>>>>
>>>>>> public class StepTest extends DoFn<String, String> {
>>>>>>     @ProcessElement
>>>>>>     public void processElement(ProcessContext pc) {
>>>>>>         String element = pc.element();
>>>>>>
>>>>>>         if (randomErrorOccurs()) {
>>>>>>             throw new RuntimeException("Failed ... " + element);
>>>>>>         } else {
>>>>>>             LOG.info(element);
>>>>>>         }
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> The consumer configuration has "enable.auto.commit=true".
>>>>>> I would expect that all the numbers get printed, and if an exception
>>>>>> is thrown, Dataflow's runner would retry processing that failed message
>>>>>> until it eventually works.
>>>>>> However, what happened in my pipeline was different: when errors
>>>>>> start happening due to my code, it caused some messages to be never
>>>>>> processed, and some were actually lost forever.
>>>>>>
>>>>>> I would expect something like:
>>>>>>
>>>>>> {...}
>>>>>> 22
>>>>>> 23
>>>>>> 24
>>>>>> Failed ... 25
>>>>>> {A new reader starts}
>>>>>> Reader-0: first record offset 60
>>>>>> 61
>>>>>> 62
>>>>>> {...}
>>>>>> {Dataflow retries 25}
>>>>>> Failed ... 25
>>>>>> {...}
>>>>>> and so on... (exception would never cease to happen in this case and
>>>>>> Dataflow would retry forever)
>>>>>>
>>>>>> My output was something like:
>>>>>>
>>>>>> {...}
>>>>>> 22
>>>>>> 23
>>>>>> 24
>>>>>> Failed ... 25
>>>>>> {A new reader starts}
>>>>>> Reader-0: first record offset 60
>>>>>> 61
>>>>>> 62
>>>>>> {...}
>>>>>>
>>>>>> Message #25 never gets reprocessed, and all the messages up to 60 are
>>>>>> lost, probably the ones in the same processing bundle as 25. Even more
>>>>>> curious is that this behaviour doesn't happen when using the PubSubIO
>>>>>> source, which produces the first mentioned output.
>>>>>>
>>>>>> My questions are:
>>>>>> What is a good way of handling errors with Kafka source if I want all
>>>>>> messages to be processed exactly once?
>>>>>> Is there any Kafka or Dataflow configuration that I may be missing?
>>>>>> Please let me know of your thoughts.
>>>>>>
>>>>>> Andre (cc) is part of our team and will be together in this
>>>>>> discussion.
>>>>>>
>>>>>> --
>>>>>> []s
>>>>>>
>>>>>> Leonardo Alves Miguel
>>>>>> Data Engineer
>>>>>> (16) 3509-5555 | www.arquivei.com.br
>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>>>>>> Silício]
>>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>>>>> <https://www.facebook.com/arquivei>
>>>>>> <https://www.linkedin.com/company/arquivei>
>>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>>>>>>
>>>>>
>>
>> --
>> André Badawi Missaglia
>> Data Engineer
>> (16) 3509-5555 | www.arquivei.com.br
>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>> Silício]
>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>> <https://www.facebook.com/arquivei>
>> <https://www.linkedin.com/company/arquivei>
>> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>>
>

Reply via email to