On Thu, Aug 16, 2018 at 4:54 PM Lukasz Cwik <[email protected]> wrote:

> Raghu, yes, I was thinking that advance would continuously return false.
> This would force a runner to checkpoint and then resume at which point we
> would have a stable reading point.
>

When the actual checkpoint occurs is still transparent to the reader. Only
way it knows if there was one for sure is when a new reader is created with
non-null CheckpointMark. In the case of Dataflow, readers are cached and
might be created only once during the life of a job.


> On Thu, Aug 16, 2018 at 3:17 PM Raghu Angadi <[email protected]> wrote:
>
>> The only way reshuffle could help is if you are able to separate
>> processing that is prone to errors from processing that produces side
>> effects i.e.
>> IO --> DoFn_B_prone_to_exceptions --> Reshuffle --> (B)
>> DoFn_B_producing_side_effects
>>
>> This way, it might look like (B) processes each record exactly once, but
>> it is not guaranteed (e.g. a worker could crash suddenly).
>>
>> Raghu.
>>
>> On Thu, Aug 16, 2018 at 11:12 AM Raghu Angadi <[email protected]> wrote:
>>
>>> On Thu, Aug 16, 2018 at 10:39 AM André Missaglia <
>>> [email protected]> wrote:
>>>
>>>> Hello everyone.
>>>>
>>>> Thanks for your answers. We've managed to solve our problem using the
>>>> solutions proposed here.
>>>>
>>>> First, there was no need for using autocommit. But switching to
>>>> "commitOffsetsInFinalize()" also didn't help. In our example, if a failure
>>>> occured when processing message #25, the runner would retry the whole batch
>>>> again, moving from "at-most-once processing" to "at-least-once processing".
>>>>
>>>
>>> It is always been at-least-once for DoFn side effects and external
>>> interactions. That is not specific to KafkaIO. Using reshuffle does not
>>> change that.
>>>
>>> Raghu.
>>>
>>>
>>>>
>>>> Of course, if the transformation is idempotent, it doesn't really
>>>> matter. But it is not always the case for us.
>>>>
>>>> Then, we tried using Reshuffle.viaRandomKey(). While the order of the
>>>> messages was lost, all messages were successfully processed exactly once.
>>>>
>>>> But one thing that still bugs me is that Reshuffle is deprecated[1].
>>>> I've found some discussion[2] on the internet about this, but i'm still not
>>>> sure if it is safe to use that transformation in this use case. Any
>>>> thoughts on this?
>>>>
>>>> [1]
>>>> https://beam.apache.org/documentation/sdks/javadoc/2.6.0/org/apache/beam/sdk/transforms/Reshuffle.html#viaRandomKey--
>>>> [2] https://www.mail-archive.com/[email protected]/msg07247.html
>>>>
>>>> Em qui, 16 de ago de 2018 às 13:36, Raghu Angadi <[email protected]>
>>>> escreveu:
>>>>
>>>>> I am not sure if it helps much. External checkpoints like Kafka
>>>>> 'autocommit' outside Beam's own checkpoint domain will always have quirks
>>>>> like this. I wanted to ask what their use case for using autocommit was
>>>>> (over commitOffsetsInFinalize())
>>>>>
>>>>> If we wanted to, it is not clear to how an unbounded source can do
>>>>> that. A reader (by design I think) does not have visibility into when a
>>>>> checkpoint happens, bundle boundaries etc. E.g. it can try returning
>>>>> 'false' for start() and advance() initially, but does not know when to
>>>>> actually return records (in Dataflow, it could wait a few seconds).
>>>>>
>>>>> On Thu, Aug 16, 2018 at 8:21 AM Lukasz Cwik <[email protected]> wrote:
>>>>>
>>>>>> Raghu, based upon your description, do you think it would be a good
>>>>>> for KafkaIO to checkpoint on the first read without producing any actual
>>>>>> records?
>>>>>>
>>>>>> On Wed, Aug 15, 2018 at 11:49 AM Raghu Angadi <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>> It is due to "enable.autocommit=true".  Auto commit is an option to
>>>>>>> Kafka client and how and when it commits is totally out of control of 
>>>>>>> Beam
>>>>>>> & KafkaIO.
>>>>>>> Could you try setting commitOffsetsInFinalize()[1] in KafkaIO rather
>>>>>>> than 'enable.autocommit'? That would ensure exactly once processing.
>>>>>>>
>>>>>>> That said, you might be interested in understanding why your example
>>>>>>> failed:
>>>>>>> enable.autocommit is not such a bad option by itself, but there is a
>>>>>>> corner case where it can cause issues like this.
>>>>>>> When a reader is initialized, it's start offset is determined in
>>>>>>> this (specifically in Dataflow, but roughly accurate on other runners 
>>>>>>> too):
>>>>>>>
>>>>>>>    - (a) If there is a  checkpoint for the reader split  (true for
>>>>>>>    all reads except for very first read bundle read by the split from 
>>>>>>> Kafka),
>>>>>>>    the offset comes from checkpoint. This is how exactly once is 
>>>>>>> ensures. Here
>>>>>>>    the offset commit by Kafka client with 'autocommit' does not matter.
>>>>>>>    - (b) If there is no checkpoint, (i.e. for the first bundle of
>>>>>>>    records) KafkaIO does not set any offset explicitly and lets Kafka 
>>>>>>> client
>>>>>>>    decide. That implies it depend on your ConsumrConfig. So 
>>>>>>> ConsumerConfig
>>>>>>>    decides the offset when a pipeline first starts.
>>>>>>>
>>>>>>> In your example, when there was an exception for Message 25, it was
>>>>>>> still processing the first bundle of records and there was no Dataflow
>>>>>>> checkpoint. It kept hitting (b). Kafka's 'autocommit' is out of bounds, 
>>>>>>> and
>>>>>>> it might have committed offset 60 in one of the reties. The next retry
>>>>>>> incorrectly reads from 60.
>>>>>>>
>>>>>>> I hope this helps. Enabling autocommit in only useful when you want
>>>>>>> to restart your pipeline from scratch (rather than 'updating' you 
>>>>>>> Dataflow
>>>>>>> pipeline) and still want to *roughly* resume from where the
>>>>>>> previous pipeline left off. Even there, commitOffsetsInFinalize() is
>>>>>>> better. In either case, exactly once processing is not guaranteed when a
>>>>>>> pipeline restart, only way currently to achieve that is to 'update' the
>>>>>>> pipeline.
>>>>>>>
>>>>>>> [1]:
>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L623
>>>>>>>
>>>>>>> Raghu.
>>>>>>>
>>>>>>> On Wed, Aug 15, 2018 at 10:14 AM Leonardo Miguel <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I'm using the KafkaIO source in my Beam pipeline, testing the
>>>>>>>> scenario where intermitent errors may happen (e.g. DB connection 
>>>>>>>> failure)
>>>>>>>> with the Dataflow runner.
>>>>>>>> So I produced a sequence of 200 messages containing sequential
>>>>>>>> numbers (1-200) to a topic, and then executed the following pipeline:
>>>>>>>>
>>>>>>>> p.apply("Read from kafka", KafkaIO.<String, String>read()
>>>>>>>>     .withBootstrapServers(server)
>>>>>>>>     .withTopic(topic)
>>>>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>     .withValueDeserializer(StringDeserializer.class)
>>>>>>>>     .updateConsumerProperties(properties)
>>>>>>>>     .withoutMetadata())
>>>>>>>> .apply(Values.create())
>>>>>>>> .apply(ParDo.of(new StepTest()));
>>>>>>>>
>>>>>>>> Where StepTest is defined as follows:
>>>>>>>>
>>>>>>>> public class StepTest extends DoFn<String, String> {
>>>>>>>>     @ProcessElement
>>>>>>>>     public void processElement(ProcessContext pc) {
>>>>>>>>         String element = pc.element();
>>>>>>>>
>>>>>>>>         if (randomErrorOccurs()) {
>>>>>>>>             throw new RuntimeException("Failed ... " + element);
>>>>>>>>         } else {
>>>>>>>>             LOG.info(element);
>>>>>>>>         }
>>>>>>>>     }
>>>>>>>> }
>>>>>>>>
>>>>>>>> The consumer configuration has "enable.auto.commit=true".
>>>>>>>> I would expect that all the numbers get printed, and if an
>>>>>>>> exception is thrown, Dataflow's runner would retry processing that 
>>>>>>>> failed
>>>>>>>> message until it eventually works.
>>>>>>>> However, what happened in my pipeline was different: when errors
>>>>>>>> start happening due to my code, it caused some messages to be never
>>>>>>>> processed, and some were actually lost forever.
>>>>>>>>
>>>>>>>> I would expect something like:
>>>>>>>>
>>>>>>>> {...}
>>>>>>>> 22
>>>>>>>> 23
>>>>>>>> 24
>>>>>>>> Failed ... 25
>>>>>>>> {A new reader starts}
>>>>>>>> Reader-0: first record offset 60
>>>>>>>> 61
>>>>>>>> 62
>>>>>>>> {...}
>>>>>>>> {Dataflow retries 25}
>>>>>>>> Failed ... 25
>>>>>>>> {...}
>>>>>>>> and so on... (exception would never cease to happen in this case
>>>>>>>> and Dataflow would retry forever)
>>>>>>>>
>>>>>>>> My output was something like:
>>>>>>>>
>>>>>>>> {...}
>>>>>>>> 22
>>>>>>>> 23
>>>>>>>> 24
>>>>>>>> Failed ... 25
>>>>>>>> {A new reader starts}
>>>>>>>> Reader-0: first record offset 60
>>>>>>>> 61
>>>>>>>> 62
>>>>>>>> {...}
>>>>>>>>
>>>>>>>> Message #25 never gets reprocessed, and all the messages up to 60
>>>>>>>> are lost, probably the ones in the same processing bundle as 25. Even 
>>>>>>>> more
>>>>>>>> curious is that this behaviour doesn't happen when using the PubSubIO
>>>>>>>> source, which produces the first mentioned output.
>>>>>>>>
>>>>>>>> My questions are:
>>>>>>>> What is a good way of handling errors with Kafka source if I want
>>>>>>>> all messages to be processed exactly once?
>>>>>>>> Is there any Kafka or Dataflow configuration that I may be missing?
>>>>>>>> Please let me know of your thoughts.
>>>>>>>>
>>>>>>>> Andre (cc) is part of our team and will be together in this
>>>>>>>> discussion.
>>>>>>>>
>>>>>>>> --
>>>>>>>> []s
>>>>>>>>
>>>>>>>> Leonardo Alves Miguel
>>>>>>>> Data Engineer
>>>>>>>> (16) 3509-5555 | www.arquivei.com.br
>>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale
>>>>>>>> do Silício]
>>>>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>>>>>>> <https://www.facebook.com/arquivei>
>>>>>>>> <https://www.linkedin.com/company/arquivei>
>>>>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>>>>>>>>
>>>>>>>
>>>>
>>>> --
>>>> André Badawi Missaglia
>>>> Data Engineer
>>>> (16) 3509-5555 | www.arquivei.com.br
>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>>>> Silício]
>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>>> <https://www.facebook.com/arquivei>
>>>> <https://www.linkedin.com/company/arquivei>
>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>>>>
>>>

Reply via email to