It is great to see that, moving forward, we will have a way to tell if 
finalization is required.

>In my opinion, for such crucial behavior i would expect the pipeline to fail 
>with 
>a clear message stating the reason, like in the same way when you implement
>a new Codec and forget to override the verifyDeterministic method (don't 
>recall >the right name of it).

Many unbounded sources, e.g. Kafka, run fine without checkpointing because 
CheckpointMarks do not have to be finalized. However, seeing this come up 
multiple times now, I do think that we want to make this a better user 
experience by either throwing an error or having a thread periodically 
acknowledge CheckpointMarks.

-Max

On 14.06.19 18:52, Lukasz Cwik wrote:
> In the future, you will be able to check and give a hard error if
> checkpointing is disabled yet finalization is requested for portable
> pipelines:
> https://github.com/apache/beam/blob/2be7457a4c0b311c3bd784b3f00b425596adeb06/model/pipeline/src/main/proto/beam_runner_api.proto#L382
>
> On Fri, Jun 14, 2019 at 8:17 AM Juan Carlos Garcia <jcgarc...@gmail.com
> <mailto:jcgarc...@gmail.com>> wrote:
>
>     In my opinion, for such crucial behavior i would expect the pipeline
>     to fail with a clear message stating the reason, like in the same
>     way when you implement a new Codec and forget to override the
>     verifyDeterministic method (don't recall the right name of it).
>
>     Just my 2 cents.
>
>     Maximilian Michels <m...@apache.org <mailto:m...@apache.org>> schrieb
>     am Fr., 14. Juni 2019, 16:48:
>
>         This has come up before:
>         https://issues.apache.org/jira/browse/BEAM-4520
>
>         The issue is that checkpoints won't be acknowledged if
>         checkpointing is disabled in Flink. We throw a WARN when
>         unbounded sources are used without checkpointing. Not all
>         unbounded sources actually need to finalize checkpoint marks.
>
>         Seeing that this is still an issue, we might want to at least
>         periodically acknowledge checkpoint marks when checkpointing is
>         disabled. The alternative would be to throw an exception,
>         perhaps with the option to override this in case the user knows
>         what he/she does.
>
>         Thanks,
>         Max
>
>         On 14.06.19 10:52, Ismaël Mejía wrote:
>         > Is there a JIRA for this ? if this solves an issue to multiple
>         users
>         > maybe is worth of integrating the patch.
>         > Would you be up to do this Augustin?
>         >
>         > On Fri, Jun 14, 2019 at 10:35 AM Augustin Lafanechere
>         > <augustin.lafanech...@kapten.com
>         <mailto:augustin.lafanech...@kapten.com>> wrote:
>         > >
>         > > Hello Nicolas,
>         > > I also encountered the same problem.
>         > > RabbitMQIo indeed acknowledges messages on
>         finalizeCheckpoint calls but this was not clear to me on when
>         this method is called because no message were ack on pipeline
>         runtime.
>         > > I finally decided to implement a patch of the RabbitMqIO to
>         set auto ack of received messages, this is fine for my current
>         use case but is not the safest way of consuming messages.
>         > >
>         > > If someone has a cleaner solution I’ll be happy to hear it.
>         > >
>         > > Augustin
>         > >
>         > >
>         > >
>         > >
>         > >> Le 13 juin 2019 à 15:47, Nicolas Delsaux
>         <nicolas.dels...@gmx.fr <mailto:nicolas.dels...@gmx.fr>> a écrit :
>         > >>
>         > >> I'm having big troubles reading data from RabbitMQ.
>         > >>
>         > >> To understand my troubles, i've simplified my previous code
>         to the extreme :
>         > >>
>         > >>
>         > >>         Pipeline pipeline = Pipeline.create(options);
>         > >>
>         > >>         PCollection<Object> wat = (PCollection<Object>)
>         pipeline.apply("read_from_rabbit",
>         > >>                 RabbitMqIO.read()
>         > >>                     .withUri(options.getRabbitMQUri())
>         > >>                     .withQueue(options.getRabbitMQQueue())
>         > >>                     )
>         > >>                 .apply("why not",
>         RabbitMqIO.<RabbitMqMessage>write()
>         > >>                         .withQueue("written_in_rabbit")
>         > >>                         .withQueueDeclare(true)
>         > >>                         .withUri(options.getRabbitMQUri())
>         > >>                         )
>         > >>
>         > >>
>         > >> So if I put a simple message in my input queue, it should
>         be "moved" (quotes are here since new message is not the
>         original one, but has same content) into my "written_in_rabbit"
>         message.
>         > >>
>         > >> Unfortunatly, for reasons I don't understand, the original
>         message stays in input queue.
>         > >>
>         > >> It seems to be due to the fact that
>         RabbitMQCheckpointMark#finalizeCheckpoint() method is never
>         called. So where is the finalizeCheckpoint method called ?
>         > >>
>         > >> And how can I understand why this method is never called in
>         my case ?
>         > >>
>         > >> Thanks
>         > >>
>         > >>
>         > >
>

Reply via email to