Raghu, yes, I was thinking that advance would continuously return false.
This would force a runner to checkpoint and then resume at which point we
would have a stable reading point.

On Thu, Aug 16, 2018 at 3:17 PM Raghu Angadi <[email protected]> wrote:

> The only way reshuffle could help is if you are able to separate
> processing that is prone to errors from processing that produces side
> effects i.e.
> IO --> DoFn_B_prone_to_exceptions --> Reshuffle --> (B)
> DoFn_B_producing_side_effects
>
> This way, it might look like (B) processes each record exactly once, but
> it is not guaranteed (e.g. a worker could crash suddenly).
>
> Raghu.
>
> On Thu, Aug 16, 2018 at 11:12 AM Raghu Angadi <[email protected]> wrote:
>
>> On Thu, Aug 16, 2018 at 10:39 AM André Missaglia <
>> [email protected]> wrote:
>>
>>> Hello everyone.
>>>
>>> Thanks for your answers. We've managed to solve our problem using the
>>> solutions proposed here.
>>>
>>> First, there was no need for using autocommit. But switching to
>>> "commitOffsetsInFinalize()" also didn't help. In our example, if a failure
>>> occured when processing message #25, the runner would retry the whole batch
>>> again, moving from "at-most-once processing" to "at-least-once processing".
>>>
>>
>> It is always been at-least-once for DoFn side effects and external
>> interactions. That is not specific to KafkaIO. Using reshuffle does not
>> change that.
>>
>> Raghu.
>>
>>
>>>
>>> Of course, if the transformation is idempotent, it doesn't really
>>> matter. But it is not always the case for us.
>>>
>>> Then, we tried using Reshuffle.viaRandomKey(). While the order of the
>>> messages was lost, all messages were successfully processed exactly once.
>>>
>>> But one thing that still bugs me is that Reshuffle is deprecated[1].
>>> I've found some discussion[2] on the internet about this, but i'm still not
>>> sure if it is safe to use that transformation in this use case. Any
>>> thoughts on this?
>>>
>>> [1]
>>> https://beam.apache.org/documentation/sdks/javadoc/2.6.0/org/apache/beam/sdk/transforms/Reshuffle.html#viaRandomKey--
>>> [2] https://www.mail-archive.com/[email protected]/msg07247.html
>>>
>>> Em qui, 16 de ago de 2018 às 13:36, Raghu Angadi <[email protected]>
>>> escreveu:
>>>
>>>> I am not sure if it helps much. External checkpoints like Kafka
>>>> 'autocommit' outside Beam's own checkpoint domain will always have quirks
>>>> like this. I wanted to ask what their use case for using autocommit was
>>>> (over commitOffsetsInFinalize())
>>>>
>>>> If we wanted to, it is not clear to how an unbounded source can do
>>>> that. A reader (by design I think) does not have visibility into when a
>>>> checkpoint happens, bundle boundaries etc. E.g. it can try returning
>>>> 'false' for start() and advance() initially, but does not know when to
>>>> actually return records (in Dataflow, it could wait a few seconds).
>>>>
>>>> On Thu, Aug 16, 2018 at 8:21 AM Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> Raghu, based upon your description, do you think it would be a good
>>>>> for KafkaIO to checkpoint on the first read without producing any actual
>>>>> records?
>>>>>
>>>>> On Wed, Aug 15, 2018 at 11:49 AM Raghu Angadi <[email protected]>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> It is due to "enable.autocommit=true".  Auto commit is an option to
>>>>>> Kafka client and how and when it commits is totally out of control of 
>>>>>> Beam
>>>>>> & KafkaIO.
>>>>>> Could you try setting commitOffsetsInFinalize()[1] in KafkaIO rather
>>>>>> than 'enable.autocommit'? That would ensure exactly once processing.
>>>>>>
>>>>>> That said, you might be interested in understanding why your example
>>>>>> failed:
>>>>>> enable.autocommit is not such a bad option by itself, but there is a
>>>>>> corner case where it can cause issues like this.
>>>>>> When a reader is initialized, it's start offset is determined in this
>>>>>> (specifically in Dataflow, but roughly accurate on other runners too):
>>>>>>
>>>>>>    - (a) If there is a  checkpoint for the reader split  (true for
>>>>>>    all reads except for very first read bundle read by the split from 
>>>>>> Kafka),
>>>>>>    the offset comes from checkpoint. This is how exactly once is 
>>>>>> ensures. Here
>>>>>>    the offset commit by Kafka client with 'autocommit' does not matter.
>>>>>>    - (b) If there is no checkpoint, (i.e. for the first bundle of
>>>>>>    records) KafkaIO does not set any offset explicitly and lets Kafka 
>>>>>> client
>>>>>>    decide. That implies it depend on your ConsumrConfig. So 
>>>>>> ConsumerConfig
>>>>>>    decides the offset when a pipeline first starts.
>>>>>>
>>>>>> In your example, when there was an exception for Message 25, it was
>>>>>> still processing the first bundle of records and there was no Dataflow
>>>>>> checkpoint. It kept hitting (b). Kafka's 'autocommit' is out of bounds, 
>>>>>> and
>>>>>> it might have committed offset 60 in one of the reties. The next retry
>>>>>> incorrectly reads from 60.
>>>>>>
>>>>>> I hope this helps. Enabling autocommit in only useful when you want
>>>>>> to restart your pipeline from scratch (rather than 'updating' you 
>>>>>> Dataflow
>>>>>> pipeline) and still want to *roughly* resume from where the previous
>>>>>> pipeline left off. Even there, commitOffsetsInFinalize() is better. In
>>>>>> either case, exactly once processing is not guaranteed when a pipeline
>>>>>> restart, only way currently to achieve that is to 'update' the pipeline.
>>>>>>
>>>>>> [1]:
>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L623
>>>>>>
>>>>>> Raghu.
>>>>>>
>>>>>> On Wed, Aug 15, 2018 at 10:14 AM Leonardo Miguel <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I'm using the KafkaIO source in my Beam pipeline, testing the
>>>>>>> scenario where intermitent errors may happen (e.g. DB connection 
>>>>>>> failure)
>>>>>>> with the Dataflow runner.
>>>>>>> So I produced a sequence of 200 messages containing sequential
>>>>>>> numbers (1-200) to a topic, and then executed the following pipeline:
>>>>>>>
>>>>>>> p.apply("Read from kafka", KafkaIO.<String, String>read()
>>>>>>>     .withBootstrapServers(server)
>>>>>>>     .withTopic(topic)
>>>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>>>     .withValueDeserializer(StringDeserializer.class)
>>>>>>>     .updateConsumerProperties(properties)
>>>>>>>     .withoutMetadata())
>>>>>>> .apply(Values.create())
>>>>>>> .apply(ParDo.of(new StepTest()));
>>>>>>>
>>>>>>> Where StepTest is defined as follows:
>>>>>>>
>>>>>>> public class StepTest extends DoFn<String, String> {
>>>>>>>     @ProcessElement
>>>>>>>     public void processElement(ProcessContext pc) {
>>>>>>>         String element = pc.element();
>>>>>>>
>>>>>>>         if (randomErrorOccurs()) {
>>>>>>>             throw new RuntimeException("Failed ... " + element);
>>>>>>>         } else {
>>>>>>>             LOG.info(element);
>>>>>>>         }
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>> The consumer configuration has "enable.auto.commit=true".
>>>>>>> I would expect that all the numbers get printed, and if an exception
>>>>>>> is thrown, Dataflow's runner would retry processing that failed message
>>>>>>> until it eventually works.
>>>>>>> However, what happened in my pipeline was different: when errors
>>>>>>> start happening due to my code, it caused some messages to be never
>>>>>>> processed, and some were actually lost forever.
>>>>>>>
>>>>>>> I would expect something like:
>>>>>>>
>>>>>>> {...}
>>>>>>> 22
>>>>>>> 23
>>>>>>> 24
>>>>>>> Failed ... 25
>>>>>>> {A new reader starts}
>>>>>>> Reader-0: first record offset 60
>>>>>>> 61
>>>>>>> 62
>>>>>>> {...}
>>>>>>> {Dataflow retries 25}
>>>>>>> Failed ... 25
>>>>>>> {...}
>>>>>>> and so on... (exception would never cease to happen in this case and
>>>>>>> Dataflow would retry forever)
>>>>>>>
>>>>>>> My output was something like:
>>>>>>>
>>>>>>> {...}
>>>>>>> 22
>>>>>>> 23
>>>>>>> 24
>>>>>>> Failed ... 25
>>>>>>> {A new reader starts}
>>>>>>> Reader-0: first record offset 60
>>>>>>> 61
>>>>>>> 62
>>>>>>> {...}
>>>>>>>
>>>>>>> Message #25 never gets reprocessed, and all the messages up to 60
>>>>>>> are lost, probably the ones in the same processing bundle as 25. Even 
>>>>>>> more
>>>>>>> curious is that this behaviour doesn't happen when using the PubSubIO
>>>>>>> source, which produces the first mentioned output.
>>>>>>>
>>>>>>> My questions are:
>>>>>>> What is a good way of handling errors with Kafka source if I want
>>>>>>> all messages to be processed exactly once?
>>>>>>> Is there any Kafka or Dataflow configuration that I may be missing?
>>>>>>> Please let me know of your thoughts.
>>>>>>>
>>>>>>> Andre (cc) is part of our team and will be together in this
>>>>>>> discussion.
>>>>>>>
>>>>>>> --
>>>>>>> []s
>>>>>>>
>>>>>>> Leonardo Alves Miguel
>>>>>>> Data Engineer
>>>>>>> (16) 3509-5555 | www.arquivei.com.br
>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>>>>>>> Silício]
>>>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>>>>>> <https://www.facebook.com/arquivei>
>>>>>>> <https://www.linkedin.com/company/arquivei>
>>>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>>>>>>>
>>>>>>
>>>
>>> --
>>> André Badawi Missaglia
>>> Data Engineer
>>> (16) 3509-5555 | www.arquivei.com.br
>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>>> Silício]
>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>> <https://www.facebook.com/arquivei>
>>> <https://www.linkedin.com/company/arquivei>
>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>>>
>>

Reply via email to