I´ve created a dummy unit test that checks that with fixed shared reply
queues and CachingConnectionFactory, the parameter replyToCacheLevelName
doesn´t solve the stuck messages in the reply queue problem.
In this test, I send 3 messages (at almost the same time) to one queue that
uses the replyTo parameter using inOut. After receiving the reply, the
message is sent to one mock (mock:result). In the end, this mock only
receives 1 message, not 3. And in the log i can read:
16:20:13,169 INFO org.apache.camel.impl.DefaultShutdownStrategy|
Waiting as there are still 2 inflight and pending exchanges to complete,
timeout in 10 seconds.
16:20:14,170 INFO org.apache.camel.impl.DefaultShutdownStrategy|
Waiting as there are still 2 inflight and pending exchanges to complete,
timeout in 9 seconds.
...
This is the unit test:
public class FixedReplyQueueWithCachingConnectiongTest extends
CamelTestSupport {
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
ConnectionFactory connectionFactory = createConnectionFactory();
ActiveMQComponent activemqComponent = new ActiveMQComponent();
activemqComponent.setReplyToCacheLevelName("CACHE_NONE");
activemqComponent.setConnectionFactory(connectionFactory);
camelContext.addComponent("activemq", activemqComponent);
return camelContext;
}
private ConnectionFactory createConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61616");
// optimize AMQ to be as fast as possible so unit testing is quicker
connectionFactory.setCopyMessageOnSend(false);
connectionFactory.setOptimizeAcknowledge(true);
connectionFactory.setOptimizedMessageDispatch(true);
// When using asyncSend, producers will not be guaranteed to send in
the order we
// have in the tests (which may be confusing for queues) so we need
this set to false.
// Another way of guaranteeing order is to use persistent messages
or transactions.
connectionFactory.setUseAsyncSend(false);
connectionFactory.setAlwaysSessionAsync(false);
CachingConnectionFactory cached = new
CachingConnectionFactory(connectionFactory);
cached.setSessionCacheSize(100);
return cached;
}
@Test
public void testStuckMessagesWithFixedReplyQueue() throws Exception {
getMockEndpoint("mock:result").expectedMessageCount(1);
new MyThread().start();
new MyThread().start();
new MyThread().start();
assertMockEndpointsSatisfied();
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
from("direct:start")
.inOut("activemq:queue:foo?replyTo=queue:bar")
.to("mock:result");
from("activemq:queue:foo").transform(body().prepend("Bye
"));
}
};
}
class MyThread extends Thread {
public void run() {
template.requestBody("direct:start", "Camel"+getName(),
String.class);
}
}
}
If I change cacheConsumers to false in the CachingConnectionFactory, the
problem is solved:
cached.setCacheConsumers(false)
But i would like to deactivate the cache only for the reply queues, not in
general, using the replyToCacheLevelName.
Any idea why replyToCacheLevelName is not working?
--
View this message in context:
http://camel.465427.n5.nabble.com/JMS-Shared-fixed-reply-queue-doesn-t-consume-all-the-messages-tp5721700p5722304.html
Sent from the Camel - Users mailing list archive at Nabble.com.