Hey all, Thanks for the details, John! Hmm, that doesn't look too good either 😬 but probably a different issue with the RMQ source/ sink. Hopefully, the new FLIP-27 sources will help you guys out there! The upcoming HybridSource in FLIP-150 [1] might also be interesting to you in finely controlling sources.
@Jose Vargas <jose.var...@fiscalnote.com> I've created FLINK-22698 [2] to track your issue. Do you have a small reproducible case/ GitHub repo? Also, would you be able to provide a little bit more about the Flink job that you see this issue in? i.e. overall parallelism, the parallelism of the sources/ sinks, checkpointing mode. Best, Austin [1]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source [2]: https://issues.apache.org/jira/browse/FLINK-22698 On Thu, May 13, 2021 at 9:25 PM John Morrow <johnniemor...@hotmail.com> wrote: > Hi Jose, hey Austin!! > > I know we were just recently looking at trying to consume a fixed number > of messages from an RMQ source, process them and output them to an RMQ > sink. As a naive first attempt at stopping the job when the target number > of messaged had been processed, we put a counter state in the process > function and tried throwing an exception when the counter >= the target > message count. > > The job had: > > - parallelism: 1 > - checkpointing: 1000 (1 sec) > - restartStrategy: noRestart > - prefetchCount: 100 > > Running it with 150 messages in the input queue and 150 also as the target > number, at the end the queues had: > > - output queue - 150 > - input queue - 50 > > So it looks like it did transfer all the messages, but some unack'd ones > also got requeued back at the source so end up as duplicates. I know > throwing an exception in the Flink job is not the same as triggering a > stateful shutdown, but it might be hitting similar unack issues. > > John > > ------------------------------ > *From:* Austin Cawley-Edwards <austin.caw...@gmail.com> > *Sent:* Thursday 13 May 2021 16:49 > *To:* Jose Vargas <jose.var...@fiscalnote.com>; John Morrow < > johnniemor...@hotmail.com> > *Cc:* user <user@flink.apache.org> > *Subject:* Re: RabbitMQ source does not stop unless message arrives in > queue > > Hey Jose, > > Thanks for bringing this up – it indeed sounds like a bug. There is > ongoing work to update the RMQ source to the new interface, which might > address some of these issues (or should, if it is not already), tracked in > FLINK-20628[1]. Would you be able to create a JIRA issue for this, or would > you like me to? > > At my previous company, we only consumed one Rabbit queue per application, > so we didn't run into this exactly but did see other weird behavior in the > RMQ source that could be related. I'm going to cc @John Morrow > <johnniemor...@hotmail.com> who might be able to contribute to what he's > seen working with the source, if he's around. I remember some messages not > properly being ack'ed during a stateful shutdown via the Ververica > Platform's stop-with-savepoint functionality that you mention, though that > might be more related to FLINK-20244[2], perhaps. > > > Best, > Austin > > [1]: https://issues.apache.org/jira/browse/FLINK-20628 > [2]: https://issues.apache.org/jira/browse/FLINK-20244 > > On Thu, May 13, 2021 at 10:23 AM Jose Vargas <jose.var...@fiscalnote.com> > wrote: > > Hi, > > I am using Flink 1.12 to read from and write to a RabbitMQ cluster. > Flink's RabbitMQ source has some surprising behavior when a > stop-with-savepoint request is made. > > *Expected Behavior:* > The stop-with-savepoint request stops the job with a FINISHED state. > > *Actual Behavior:* > The stop-with-savepoint request either times out or hangs indefinitely > unless a message arrives in all the queues that the job consumes from after > the stop-with-savepoint request is made. > > > I know that one possible workaround is to send a sentinel value to each of > the queues consumed by the job that the deserialization schema checks in > its isEndOfStream method. However, this is somewhat cumbersome and > complicates the continuous delivery of a Flink job. For example, > Ververica Platform will trigger a stop-with-savepoint for the user if one > of many possible Flink configurations for a job are changed. The > stop-with-savepoint can then hang indefinitely because only some of the > RabbitMQ sources will have reached a FINISHED state. > > I have attached the TaskManager thread dump after the save-with-savepoint > request was made. Most every thread is either sleeping or waiting around > for locks to be released, and then there are a handful of threads trying to > read data from a socket via the com.rabbitmq.client.impl.Frame.readFrom > method. > > Ideally, once a stop-with-savepoint request is made, the threads trying to > read data from RabbitMQ would be interrupted so that all RabbitMQ sources > would reach a FINISHED state. > > Regular checkpoints and savepoints complete successfully, it is only the > stop-with-savepoint request where I see this behavior. > > > Respectfully, > > > Jose Vargas > > Software Engineer, Data Engineering > > E: jose.var...@fiscalnote.com > > fiscalnote.com <https://www.fiscalnote.com> | info.cq.com > <http://www.info.cq.com> | rollcall.com <https://www.rollcall.com> > >