Hi You may be on to something. It does smell like when using temporary queues for request/response then acknowledge should maybe be auto always, so the "other side" can see the message on the temporary queue, process it, and send back a reply message, for Camel to pickup.
I wonder if you could modify the source code and try to "fix this" yourself and give it some testing, and then if so you are welcome to log a JIRA and provide a patch / github PR with the fix. Anyone else have other thoughts or comments? On Fri, Aug 17, 2018 at 5:31 PM, Valdis Andersons <valdis.anders...@vhi.ie> wrote: > Hi All, > > Been digging today through some of the Camel-RabbitMQ code and can confirm > now that the temp queue is created with the main endpoint's relevant > properties (prefetch, autoAck and some others). The ack mode is set when the > consumer get bound to the channel - TemporaryQueueReplyManager line 139: > > /** > * Bind consumer to channel > */ > private void start() throws IOException { > tag = channel.basicConsume(getReplyTo(), endpoint.isAutoAck(), > this); > } > > What I haven't been able to dig out is where the TemporaryQueueReplyManager > or TemporaryQueueReplyHandler (handleReplyMessage, onReply) or any other temp > queue related logic would be sending the manual ack to RabbitMQ > (channel.basicAck) in case the original endpoint, that the settings are taken > from, had autoAck set to false. > > My suspicion is that either the handler class or the manager class should be > responsible for sending the manual ack, either TemporaryQueueReplyManager > line 66-67: > > if (handler != null) { > correlation.remove(correlationID); > handler.onReply(correlationID, properties, message); > } > > Or TemporaryQueueReplyHandler line 56 - 59: > > public void onReply(String correlationId, AMQP.BasicProperties properties, > byte[] reply) { > // create holder object with the the reply > log.info("in onReply with correlationId= {}", correlationId); > ReplyHolder holder = new ReplyHolder(exchange, callback, > originalCorrelationId, correlationId, properties, reply); > // process the reply > replyManager.processReply(holder); > } > > The processReply method in ReplyManagerSupport doesn't seem to do it either, > really lost on this one. > > Could someone please point me to the place in the code where it's done? It > might just maybe give me an idea or two of what I might be missing in my > routing setup and how to get this working or at least how to work around this > issue. > > > Thanks, > Valdis > > From: Valdis Andersons > Sent: 16 August 2018 16:10 > To: users@camel.apache.org > Subject: Camel with Rabbitmq: messages in temp reply queue not being acked > > Hi All, > > Hopefully someone here can educate me on the below matter. > > Here is a rather long-winded description of the issue: > https://stackoverflow.com/questions/51875646/apache-camel-with-rabbitmq-messages-in-temp-reply-queue-not-being-acked-when-au<https://scanmail.trustwave.com/?c=6600&d=4pP1262DeccXeHHSPqygoqcKZcNR_lN14CUo2YIHRg&s=33&u=https%3a%2f%2fstackoverflow%2ecom%2fquestions%2f51875646%2fapache-camel-with-rabbitmq-messages-in-temp-reply-queue-not-being-acked-when-au> > > The essence of it is that in an InOut route with autoAck=false on the main > queue the messages on the temporary reply queue are not being acked even > though they are being fetched as indicated by the prefetch setting and the > amount of messages in an un-acked state on the temp queues. > > If I set the autoAck=true then it works fine but in that case I'm risking > losing messages and that's not what I want (we switched from SEDA to RabbitMQ > for this reason among others). > If I set the exchange pattern to InOnly then it works fine as well as the > route then doesn't need the temp queue for the reply. That unfortunately > causes a race condition in this scenario (outputEmailEndpoint and > outputArchiveEndpoint get the message at the same time, but email route needs > to finish first before it can be archived): > > rest(restEndpoint).post(postEndpoint) > .type(MyRequest.class) > .route() > .startupOrder(Integer.MAX_VALUE - 2) > .process(this::process) > .choice() > > .when(header(DELIVERYSTATUS_HEADER).isEqualTo(Status.COMPLETED)).to(outputEmailEndpoint, > outputArchiveEndpoint) > > I'd also like to be sending the correct response status to the REST client > calling this service and at the moment that doesn't quite work too well as > the un-acked messages start blocking the route or the response will be sent > before email route has finished. > > I'm only new to Camel so I might be missing something very obvious here but > the last few days of searching around and trying various config options > haven't really got me anywhere further. > > > Thanks and Best Regards, > Valdis -- Claus Ibsen ----------------- http://davsclaus.com @davsclaus Camel in Action 2: https://www.manning.com/ibsen2