Hi

You may be on to something. It does smell like when using temporary
queues for request/response then acknowledge should maybe be auto
always, so the "other side" can see
the message on the temporary queue, process it, and send back a reply
message, for Camel to pickup.

I wonder if you could modify the source code and try to "fix this"
yourself and give it some testing, and then if so you are welcome to
log a JIRA and provide a patch / github PR with the fix.

Anyone else have other thoughts or comments?

On Fri, Aug 17, 2018 at 5:31 PM, Valdis Andersons
<valdis.anders...@vhi.ie> wrote:
> Hi All,
>
> Been digging today through some of the Camel-RabbitMQ code and can confirm 
> now that the temp queue is created with the main endpoint's relevant 
> properties (prefetch, autoAck and some others). The ack mode is set when the 
> consumer get bound to the channel - TemporaryQueueReplyManager line 139:
>
> /**
>          * Bind consumer to channel
>          */
>         private void start() throws IOException {
>             tag = channel.basicConsume(getReplyTo(), endpoint.isAutoAck(), 
> this);
>         }
>
> What I haven't been able to dig out is where the TemporaryQueueReplyManager 
> or TemporaryQueueReplyHandler (handleReplyMessage, onReply) or any other temp 
> queue related logic would be sending the manual ack to RabbitMQ 
> (channel.basicAck) in case the original endpoint, that the settings are taken 
> from, had autoAck set to false.
>
> My suspicion is that either the handler class or the manager class should be 
> responsible for sending the manual ack, either TemporaryQueueReplyManager 
> line 66-67:
>
> if (handler != null) {
>             correlation.remove(correlationID);
>             handler.onReply(correlationID, properties, message);
>         }
>
> Or TemporaryQueueReplyHandler line 56 - 59:
>
> public void onReply(String correlationId, AMQP.BasicProperties properties, 
> byte[] reply) {
>         // create holder object with the the reply
>         log.info("in onReply with correlationId= {}", correlationId);
>         ReplyHolder holder = new ReplyHolder(exchange, callback, 
> originalCorrelationId, correlationId, properties, reply);
>         // process the reply
>         replyManager.processReply(holder);
>     }
>
> The processReply method in ReplyManagerSupport doesn't seem to do it either, 
> really lost on this one.
>
> Could someone please point me to the place in the code where it's done? It 
> might just maybe give me an idea or two of what I might be missing in my 
> routing setup and how to get this working or at least how to work around this 
> issue.
>
>
> Thanks,
> Valdis
>
> From: Valdis Andersons
> Sent: 16 August 2018 16:10
> To: users@camel.apache.org
> Subject: Camel with Rabbitmq: messages in temp reply queue not being acked
>
> Hi All,
>
> Hopefully someone here can educate me on the below matter.
>
> Here is a rather long-winded description of the issue: 
> https://stackoverflow.com/questions/51875646/apache-camel-with-rabbitmq-messages-in-temp-reply-queue-not-being-acked-when-au<https://scanmail.trustwave.com/?c=6600&d=4pP1262DeccXeHHSPqygoqcKZcNR_lN14CUo2YIHRg&s=33&u=https%3a%2f%2fstackoverflow%2ecom%2fquestions%2f51875646%2fapache-camel-with-rabbitmq-messages-in-temp-reply-queue-not-being-acked-when-au>
>
> The essence of it is that in an InOut route with autoAck=false on the main 
> queue the messages on the temporary reply queue are not being acked even 
> though they are being fetched as indicated by the prefetch setting and the 
> amount of messages in an un-acked state on the temp queues.
>
> If I set the autoAck=true then it works fine but in that case I'm risking 
> losing messages and that's not what I want (we switched from SEDA to RabbitMQ 
> for this reason among others).
> If I set the exchange pattern to InOnly then it works fine as well as the 
> route then doesn't need the temp queue for the reply. That unfortunately 
> causes a race condition in this scenario (outputEmailEndpoint and 
> outputArchiveEndpoint get the message at the same time, but email route needs 
> to finish first before it can be archived):
>
> rest(restEndpoint).post(postEndpoint)
>                   .type(MyRequest.class)
>                   .route()
>                   .startupOrder(Integer.MAX_VALUE - 2)
>                                   .process(this::process)
>                                   .choice()
>                                     
> .when(header(DELIVERYSTATUS_HEADER).isEqualTo(Status.COMPLETED)).to(outputEmailEndpoint,
>  outputArchiveEndpoint)
>
> I'd also like to be sending the correct response status to the REST client 
> calling this service and at the moment that doesn't quite work too well as 
> the un-acked messages start blocking the route or the response will be sent 
> before email route has finished.
>
> I'm only new to Camel so I might be missing something very obvious here but 
> the last few days of searching around and trying various config options 
> haven't really got me anywhere further.
>
>
> Thanks and Best Regards,
> Valdis



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

Reply via email to