Hi all, Apologies for not following up sooner. Thank you Austin for creating FLINK-22698. It seems that the issue is well understood and a fix is currently under development/review. Please let me know if there is anything additional that I can do. I look forward to testing out a new version of Flink that includes this fix.
Thanks again, Jose On Tue, May 18, 2021 at 4:38 PM Austin Cawley-Edwards < [email protected]> wrote: > Hey all, > > Thanks for the details, John! Hmm, that doesn't look too good either 😬 > but probably a different issue with the RMQ source/ sink. Hopefully, the > new FLIP-27 sources will help you guys out there! The upcoming HybridSource > in FLIP-150 [1] might also be interesting to you in finely controlling > sources. > > @Jose Vargas <[email protected]> I've created FLINK-22698 [2] to > track your issue. Do you have a small reproducible case/ GitHub repo? Also, > would you be able to provide a little bit more about the Flink job that you > see this issue in? i.e. overall parallelism, the parallelism of the > sources/ sinks, checkpointing mode. > > Best, > Austin > > [1]: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source > [2]: https://issues.apache.org/jira/browse/FLINK-22698 > > On Thu, May 13, 2021 at 9:25 PM John Morrow <[email protected]> > wrote: > >> Hi Jose, hey Austin!! >> >> I know we were just recently looking at trying to consume a fixed number >> of messages from an RMQ source, process them and output them to an RMQ >> sink. As a naive first attempt at stopping the job when the target number >> of messaged had been processed, we put a counter state in the process >> function and tried throwing an exception when the counter >= the target >> message count. >> >> The job had: >> >> - parallelism: 1 >> - checkpointing: 1000 (1 sec) >> - restartStrategy: noRestart >> - prefetchCount: 100 >> >> Running it with 150 messages in the input queue and 150 also as the >> target number, at the end the queues had: >> >> - output queue - 150 >> - input queue - 50 >> >> So it looks like it did transfer all the messages, but some unack'd ones >> also got requeued back at the source so end up as duplicates. I know >> throwing an exception in the Flink job is not the same as triggering a >> stateful shutdown, but it might be hitting similar unack issues. >> >> John >> >> ------------------------------ >> *From:* Austin Cawley-Edwards <[email protected]> >> *Sent:* Thursday 13 May 2021 16:49 >> *To:* Jose Vargas <[email protected]>; John Morrow < >> [email protected]> >> *Cc:* user <[email protected]> >> *Subject:* Re: RabbitMQ source does not stop unless message arrives in >> queue >> >> Hey Jose, >> >> Thanks for bringing this up – it indeed sounds like a bug. There is >> ongoing work to update the RMQ source to the new interface, which might >> address some of these issues (or should, if it is not already), tracked in >> FLINK-20628[1]. Would you be able to create a JIRA issue for this, or would >> you like me to? >> >> At my previous company, we only consumed one Rabbit queue per >> application, so we didn't run into this exactly but did see other weird >> behavior in the RMQ source that could be related. I'm going to cc @John >> Morrow <[email protected]> who might be able to contribute to >> what he's seen working with the source, if he's around. I remember some >> messages not properly being ack'ed during a stateful shutdown via the >> Ververica Platform's stop-with-savepoint functionality that you mention, >> though that might be more related to FLINK-20244[2], perhaps. >> >> >> Best, >> Austin >> >> [1]: https://issues.apache.org/jira/browse/FLINK-20628 >> [2]: https://issues.apache.org/jira/browse/FLINK-20244 >> >> On Thu, May 13, 2021 at 10:23 AM Jose Vargas <[email protected]> >> wrote: >> >> Hi, >> >> I am using Flink 1.12 to read from and write to a RabbitMQ cluster. >> Flink's RabbitMQ source has some surprising behavior when a >> stop-with-savepoint request is made. >> >> *Expected Behavior:* >> The stop-with-savepoint request stops the job with a FINISHED state. >> >> *Actual Behavior:* >> The stop-with-savepoint request either times out or hangs indefinitely >> unless a message arrives in all the queues that the job consumes from after >> the stop-with-savepoint request is made. >> >> >> I know that one possible workaround is to send a sentinel value to each >> of the queues consumed by the job that the deserialization schema checks in >> its isEndOfStream method. However, this is somewhat cumbersome and >> complicates the continuous delivery of a Flink job. For example, >> Ververica Platform will trigger a stop-with-savepoint for the user if one >> of many possible Flink configurations for a job are changed. The >> stop-with-savepoint can then hang indefinitely because only some of the >> RabbitMQ sources will have reached a FINISHED state. >> >> I have attached the TaskManager thread dump after the save-with-savepoint >> request was made. Most every thread is either sleeping or waiting around >> for locks to be released, and then there are a handful of threads trying to >> read data from a socket via the com.rabbitmq.client.impl.Frame.readFrom >> method. >> >> Ideally, once a stop-with-savepoint request is made, the threads trying >> to read data from RabbitMQ would be interrupted so that all RabbitMQ >> sources would reach a FINISHED state. >> >> Regular checkpoints and savepoints complete successfully, it is only the >> stop-with-savepoint request where I see this behavior. >> >> >> Respectfully, >> >> >> Jose Vargas >> >> Software Engineer, Data Engineering >> >> E: [email protected] >> >> fiscalnote.com <https://www.fiscalnote.com> | info.cq.com >> <http://www.info.cq.com> | rollcall.com <https://www.rollcall.com> >> >> -- Jose Vargas Software Engineer, Data Engineering E: [email protected] fiscalnote.com <https://www.fiscalnote.com> | info.cq.com <http://www.info.cq.com> | rollcall.com <https://www.rollcall.com>
