Hey all,

Thanks for the details, John! Hmm, that doesn't look too good either 😬 but
probably a different issue with the RMQ source/ sink. Hopefully, the new
FLIP-27 sources will help you guys out there! The upcoming HybridSource in
FLIP-150 [1] might also be interesting to you in finely controlling sources.

@Jose Vargas <jose.var...@fiscalnote.com> I've created FLINK-22698 [2] to
track your issue. Do you have a small reproducible case/ GitHub repo? Also,
would you be able to provide a little bit more about the Flink job that you
see this issue in? i.e. overall parallelism, the parallelism of the
sources/ sinks, checkpointing mode.

Best,
Austin

[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
[2]: https://issues.apache.org/jira/browse/FLINK-22698

On Thu, May 13, 2021 at 9:25 PM John Morrow <johnniemor...@hotmail.com>
wrote:

> Hi Jose, hey Austin!!
>
> I know we were just recently looking at trying to consume a fixed number
> of messages from an RMQ source, process them and output them to an RMQ
> sink. As a naive first attempt at stopping the job when the target number
> of messaged had been processed, we put a counter state in the process
> function and tried throwing an exception when the counter >= the target
> message count.
>
> The job had:
>
>    - parallelism: 1
>    - checkpointing: 1000 (1 sec)
>    - restartStrategy: noRestart
>    - prefetchCount: 100
>
> Running it with 150 messages in the input queue and 150 also as the target
> number, at the end the queues had:
>
>    - output queue - 150
>    - input queue - 50
>
> So it looks like it did transfer all the messages, but some unack'd ones
> also got requeued back at the source so end up as duplicates. I know
> throwing an exception in the Flink job is not the same as triggering a
> stateful shutdown, but it might be hitting similar unack issues.
>
> John
>
> ------------------------------
> *From:* Austin Cawley-Edwards <austin.caw...@gmail.com>
> *Sent:* Thursday 13 May 2021 16:49
> *To:* Jose Vargas <jose.var...@fiscalnote.com>; John Morrow <
> johnniemor...@hotmail.com>
> *Cc:* user <user@flink.apache.org>
> *Subject:* Re: RabbitMQ source does not stop unless message arrives in
> queue
>
> Hey Jose,
>
> Thanks for bringing this up – it indeed sounds like a bug. There is
> ongoing work to update the RMQ source to the new interface, which might
> address some of these issues (or should, if it is not already), tracked in
> FLINK-20628[1]. Would you be able to create a JIRA issue for this, or would
> you like me to?
>
> At my previous company, we only consumed one Rabbit queue per application,
> so we didn't run into this exactly but did see other weird behavior in the
> RMQ source that could be related. I'm going to cc @John Morrow
> <johnniemor...@hotmail.com> who might be able to contribute to what he's
> seen working with the source, if he's around. I remember some messages not
> properly being ack'ed during a stateful shutdown via the Ververica
> Platform's stop-with-savepoint functionality that you mention, though that
> might be more related to FLINK-20244[2], perhaps.
>
>
> Best,
> Austin
>
> [1]: https://issues.apache.org/jira/browse/FLINK-20628
> [2]: https://issues.apache.org/jira/browse/FLINK-20244
>
> On Thu, May 13, 2021 at 10:23 AM Jose Vargas <jose.var...@fiscalnote.com>
> wrote:
>
> Hi,
>
> I am using Flink 1.12 to read from and write to a RabbitMQ cluster.
> Flink's RabbitMQ source has some surprising behavior when a
> stop-with-savepoint request is made.
>
> *Expected Behavior:*
> The stop-with-savepoint request stops the job with a FINISHED state.
>
> *Actual Behavior:*
> The stop-with-savepoint request either times out or hangs indefinitely
> unless a message arrives in all the queues that the job consumes from after
> the stop-with-savepoint request is made.
>
>
> I know that one possible workaround is to send a sentinel value to each of
> the queues consumed by the job that the deserialization schema checks in
> its isEndOfStream method. However, this is somewhat cumbersome and
> complicates the continuous delivery of a Flink job. For example,
> Ververica Platform will trigger a stop-with-savepoint for the user if one
> of many possible Flink configurations for a job are changed. The
> stop-with-savepoint can then hang indefinitely because only some of the
> RabbitMQ sources will have reached a FINISHED state.
>
> I have attached the TaskManager thread dump after the save-with-savepoint
> request was made. Most every thread is either sleeping or waiting around
> for locks to be released, and then there are a handful of threads trying to
> read data from a socket via the com.rabbitmq.client.impl.Frame.readFrom
> method.
>
> Ideally, once a stop-with-savepoint request is made, the threads trying to
> read data from RabbitMQ would be interrupted so that all RabbitMQ sources
> would reach a FINISHED state.
>
> Regular checkpoints and savepoints complete successfully, it is only the
> stop-with-savepoint request where I see this behavior.
>
>
> Respectfully,
>
>
> Jose Vargas
>
> Software Engineer, Data Engineering
>
> E: jose.var...@fiscalnote.com
>
> fiscalnote.com <https://www.fiscalnote.com>  |  info.cq.com
> <http://www.info.cq.com>  | rollcall.com <https://www.rollcall.com>
>
>

Reply via email to