This has come up before: https://issues.apache.org/jira/browse/BEAM-4520

The issue is that checkpoints won't be acknowledged if checkpointing is 
disabled in Flink. We throw a WARN when unbounded sources are used without 
checkpointing. Not all unbounded sources actually need to finalize checkpoint 
marks.

Seeing that this is still an issue, we might want to at least periodically 
acknowledge checkpoint marks when checkpointing is disabled. The alternative 
would be to throw an exception, perhaps with the option to override this in 
case the user knows what he/she does.

Thanks,
Max

On 14.06.19 10:52, Ismaël Mejía wrote:
> Is there a JIRA for this ? if this solves an issue to multiple users
> maybe is worth of integrating the patch.
> Would you be up to do this Augustin?
>
> On Fri, Jun 14, 2019 at 10:35 AM Augustin Lafanechere
> <augustin.lafanech...@kapten.com> wrote:
> >
> > Hello Nicolas,
> > I also encountered the same problem.
> > RabbitMQIo indeed acknowledges messages on finalizeCheckpoint calls but 
> > this was not clear to me on when this method is called because no message 
> > were ack on pipeline runtime.
> > I finally decided to implement a patch of the RabbitMqIO to set auto ack of 
> > received messages, this is fine for my current use case but is not the 
> > safest way of consuming messages.
> >
> > If someone has a cleaner solution I’ll be happy to hear it.
> >
> > Augustin
> >
> >
> >
> >
> >> Le 13 juin 2019 à 15:47, Nicolas Delsaux <nicolas.dels...@gmx.fr> a écrit :
> >>
> >> I'm having big troubles reading data from RabbitMQ.
> >>
> >> To understand my troubles, i've simplified my previous code to the extreme 
> >> :
> >>
> >>
> >>         Pipeline pipeline = Pipeline.create(options);
> >>
> >>         PCollection<Object> wat = (PCollection<Object>) 
> >>pipeline.apply("read_from_rabbit",
> >>                 RabbitMqIO.read()
> >>                     .withUri(options.getRabbitMQUri())
> >>                     .withQueue(options.getRabbitMQQueue())
> >>                     )
> >>                 .apply("why not", RabbitMqIO.<RabbitMqMessage>write()
> >>                         .withQueue("written_in_rabbit")
> >>                         .withQueueDeclare(true)
> >>                         .withUri(options.getRabbitMQUri())
> >>                         )
> >>
> >>
> >> So if I put a simple message in my input queue, it should be "moved" 
> >> (quotes are here since new message is not the original one, but has same 
> >> content) into my "written_in_rabbit" message.
> >>
> >> Unfortunatly, for reasons I don't understand, the original message stays 
> >> in input queue.
> >>
> >> It seems to be due to the fact that 
> >> RabbitMQCheckpointMark#finalizeCheckpoint() method is never called. So 
> >> where is the finalizeCheckpoint method called ?
> >>
> >> And how can I understand why this method is never called in my case ?
> >>
> >> Thanks
> >>
> >>
> >

Reply via email to