It is great to see that, moving forward, we will have a way to tell if finalization is required.
>In my opinion, for such crucial behavior i would expect the pipeline to fail >with >a clear message stating the reason, like in the same way when you implement >a new Codec and forget to override the verifyDeterministic method (don't >recall >the right name of it). Many unbounded sources, e.g. Kafka, run fine without checkpointing because CheckpointMarks do not have to be finalized. However, seeing this come up multiple times now, I do think that we want to make this a better user experience by either throwing an error or having a thread periodically acknowledge CheckpointMarks. -Max On 14.06.19 18:52, Lukasz Cwik wrote: > In the future, you will be able to check and give a hard error if > checkpointing is disabled yet finalization is requested for portable > pipelines: > https://github.com/apache/beam/blob/2be7457a4c0b311c3bd784b3f00b425596adeb06/model/pipeline/src/main/proto/beam_runner_api.proto#L382 > > On Fri, Jun 14, 2019 at 8:17 AM Juan Carlos Garcia <jcgarc...@gmail.com > <mailto:jcgarc...@gmail.com>> wrote: > > In my opinion, for such crucial behavior i would expect the pipeline > to fail with a clear message stating the reason, like in the same > way when you implement a new Codec and forget to override the > verifyDeterministic method (don't recall the right name of it). > > Just my 2 cents. > > Maximilian Michels <m...@apache.org <mailto:m...@apache.org>> schrieb > am Fr., 14. Juni 2019, 16:48: > > This has come up before: > https://issues.apache.org/jira/browse/BEAM-4520 > > The issue is that checkpoints won't be acknowledged if > checkpointing is disabled in Flink. We throw a WARN when > unbounded sources are used without checkpointing. Not all > unbounded sources actually need to finalize checkpoint marks. > > Seeing that this is still an issue, we might want to at least > periodically acknowledge checkpoint marks when checkpointing is > disabled. The alternative would be to throw an exception, > perhaps with the option to override this in case the user knows > what he/she does. > > Thanks, > Max > > On 14.06.19 10:52, Ismaël Mejía wrote: > > Is there a JIRA for this ? if this solves an issue to multiple > users > > maybe is worth of integrating the patch. > > Would you be up to do this Augustin? > > > > On Fri, Jun 14, 2019 at 10:35 AM Augustin Lafanechere > > <augustin.lafanech...@kapten.com > <mailto:augustin.lafanech...@kapten.com>> wrote: > > > > > > Hello Nicolas, > > > I also encountered the same problem. > > > RabbitMQIo indeed acknowledges messages on > finalizeCheckpoint calls but this was not clear to me on when > this method is called because no message were ack on pipeline > runtime. > > > I finally decided to implement a patch of the RabbitMqIO to > set auto ack of received messages, this is fine for my current > use case but is not the safest way of consuming messages. > > > > > > If someone has a cleaner solution I’ll be happy to hear it. > > > > > > Augustin > > > > > > > > > > > > > > >> Le 13 juin 2019 à 15:47, Nicolas Delsaux > <nicolas.dels...@gmx.fr <mailto:nicolas.dels...@gmx.fr>> a écrit : > > >> > > >> I'm having big troubles reading data from RabbitMQ. > > >> > > >> To understand my troubles, i've simplified my previous code > to the extreme : > > >> > > >> > > >> Pipeline pipeline = Pipeline.create(options); > > >> > > >> PCollection<Object> wat = (PCollection<Object>) > pipeline.apply("read_from_rabbit", > > >> RabbitMqIO.read() > > >> .withUri(options.getRabbitMQUri()) > > >> .withQueue(options.getRabbitMQQueue()) > > >> ) > > >> .apply("why not", > RabbitMqIO.<RabbitMqMessage>write() > > >> .withQueue("written_in_rabbit") > > >> .withQueueDeclare(true) > > >> .withUri(options.getRabbitMQUri()) > > >> ) > > >> > > >> > > >> So if I put a simple message in my input queue, it should > be "moved" (quotes are here since new message is not the > original one, but has same content) into my "written_in_rabbit" > message. > > >> > > >> Unfortunatly, for reasons I don't understand, the original > message stays in input queue. > > >> > > >> It seems to be due to the fact that > RabbitMQCheckpointMark#finalizeCheckpoint() method is never > called. So where is the finalizeCheckpoint method called ? > > >> > > >> And how can I understand why this method is never called in > my case ? > > >> > > >> Thanks > > >> > > >> > > > >