Hey Jose,

Thanks for bringing this up – it indeed sounds like a bug. There is ongoing
work to update the RMQ source to the new interface, which might address
some of these issues (or should, if it is not already), tracked in
FLINK-20628[1]. Would you be able to create a JIRA issue for this, or would
you like me to?

At my previous company, we only consumed one Rabbit queue per application,
so we didn't run into this exactly but did see other weird behavior in the
RMQ source that could be related. I'm going to cc @John Morrow
<johnniemor...@hotmail.com> who might be able to contribute to what he's
seen working with the source, if he's around. I remember some messages not
properly being ack'ed during a stateful shutdown via the Ververica
Platform's stop-with-savepoint functionality that you mention, though that
might be more related to FLINK-20244[2], perhaps.


Best,
Austin

[1]: https://issues.apache.org/jira/browse/FLINK-20628
[2]: https://issues.apache.org/jira/browse/FLINK-20244

On Thu, May 13, 2021 at 10:23 AM Jose Vargas <jose.var...@fiscalnote.com>
wrote:

> Hi,
>
> I am using Flink 1.12 to read from and write to a RabbitMQ cluster.
> Flink's RabbitMQ source has some surprising behavior when a
> stop-with-savepoint request is made.
>
> *Expected Behavior:*
> The stop-with-savepoint request stops the job with a FINISHED state.
>
> *Actual Behavior:*
> The stop-with-savepoint request either times out or hangs indefinitely
> unless a message arrives in all the queues that the job consumes from after
> the stop-with-savepoint request is made.
>
>
> I know that one possible workaround is to send a sentinel value to each of
> the queues consumed by the job that the deserialization schema checks in
> its isEndOfStream method. However, this is somewhat cumbersome and
> complicates the continuous delivery of a Flink job. For example,
> Ververica Platform will trigger a stop-with-savepoint for the user if one
> of many possible Flink configurations for a job are changed. The
> stop-with-savepoint can then hang indefinitely because only some of the
> RabbitMQ sources will have reached a FINISHED state.
>
> I have attached the TaskManager thread dump after the save-with-savepoint
> request was made. Most every thread is either sleeping or waiting around
> for locks to be released, and then there are a handful of threads trying to
> read data from a socket via the com.rabbitmq.client.impl.Frame.readFrom
> method.
>
> Ideally, once a stop-with-savepoint request is made, the threads trying to
> read data from RabbitMQ would be interrupted so that all RabbitMQ sources
> would reach a FINISHED state.
>
> Regular checkpoints and savepoints complete successfully, it is only the
> stop-with-savepoint request where I see this behavior.
>
>
> Respectfully,
>
>
> Jose Vargas
>
> Software Engineer, Data Engineering
>
> E: jose.var...@fiscalnote.com
>
> fiscalnote.com <https://www.fiscalnote.com>  |  info.cq.com
> <http://www.info.cq.com>  | rollcall.com <https://www.rollcall.com>
>
>

Reply via email to