Hello, TLDR: When we introduce some latency on the consumers, the maximum consume rate drops drastically, even though we have enough consumer threads to handle the rate.
We are currently evaluating ActiveMQ, focusing on produce/consume rate: The setup is the following: - Virtual Topics: 100 - Consumer Queues: 3 per virtual topic amounting to 300 queues. - Produce rate: 10 messages/second on each Virtual topic, amounting to 1000messages/second -- PooledConnectionFactory with 16 connections - Consumers: 4 different consumers on each queue, amounting to 1200 consumers (i.e. sessions + consumer) -- another PooledConnectionFactory with 16 connections - Broker configuration: default settings, producerFlowControl is enabled - Everything runs on a single machine, i7 2.5Ghz, 16GB memory When the consumers process messages very fast we can achieve a consume rate of 3000 messages/sec which is the expected one for the 1000messages/sec that we produce (3 queues per topic). When we introduce some latency on the consumers, the maximum consume rate drops drastically, even though we have enough consumers (== threads) to handle the rate: - Latency 10 msec, Consume Rate drops to 1300 messages/sec - Latency 20 msec, Consume Rate drops to 650 messages/sec - Latency 100 msec, Consume Rate drops to 130 messages/sec In all cases produce rate also drops to approximately 1/3 of consume rate, due to flow control. Any advice on why the consume rate drops? Below are code samples, with the connections and producers/consumers configurations: // ------------------------------------ // Producer Connection Settings ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUrl(amqConfiguration)); connectionFactory.setUserName(amqConfiguration.getPublisherUsername()); connectionFactory.setPassword(amqConfiguration.getPublisherPassword()); connectionFactory.setClientIDPrefix("HermesMonitorAMQ-Producer"); connectionFactory.setAlwaysSyncSend(true); connectionFactory.setDispatchAsync(false); connectionFactory.setSendTimeout(5000); publisherConnectionFactory = new PooledConnectionFactory(connectionFactory); publisherConnectionFactory.setMaxConnections(16); publisherConnectionFactory.start(); // ------------------------------------ // Producer: this is called by different threads to produce 10 messages/sec on each topic private synchronized void publish(String topic) throws JMSException, TException { // 1. Create connection Connection producerConnection = publisherConnectionFactory.createConnection(); // 2. Create Session Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 3. Create Destination Destination destination = producerSession.createTopic(topic); // ^^ e.g. "VirtualTopic.ProducerYYY" // 4. Create Producer MessageProducer producer = producerSession.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 5. Create message MapMessage producerMessage = producerSession.createMapMessage(); { // Add some payload byte[] payload = new byte[monitorParameters.getProducerMessageSize()]; new Random().nextBytes(payload); producerMessage.setObject("payload", payload); } // 6. Send Message producer.send(producerMessage); producer.close(); producerSession.close(); producerConnection.close(); } // ------------------------------------ // Consumer Connection Settings ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUrl(amqConfiguration)); connectionFactory.setClientIDPrefix("HermesMonitorAMQ-Consumer"); connectionFactory.setUserName(amqConfiguration.getConsumerUsername()); connectionFactory.setPassword(amqConfiguration.getConsumerPassword()); connectionFactory.setAlwaysSessionAsync(true); connectionFactory.setOptimizeAcknowledge(false); connectionFactory.setDispatchAsync(false); connectionFactory.setSendTimeout(5000); connectionFactory.setConnectResponseTimeout(5000); connectionFactory.setMessagePrioritySupported(true); ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); prefetchPolicy.setAll(1); connectionFactory.setPrefetchPolicy(prefetchPolicy); RedeliveryPolicy redeliveryPolicy = connectionFactory.getRedeliveryPolicy(); redeliveryPolicy.setMaximumRedeliveries(0); connectionFactory.setRedeliveryPolicy(redeliveryPolicy); consumerConnectionFactory = new PooledConnectionFactory(connectionFactory); consumerConnectionFactory.setMaxConnections(16); consumerConnectionFactory.start(); //--------------------------------------------------- // Consumer: this is called 4 times per queue => 4 * 300 = 1200 sessions + consumers void startConsumer(String queue) { // 1. Establish a connection for the consumer. consumerConnection = consumerConnectionFactory.createConnection(); // 2. Create a session. consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // 3. Create a message consumer from the session to the queue. Destination destination = consumerSession.createQueue(queue)); // ^^ e.g. "Consumer.ConsumerXXX.VirtualTopic.ProducerYYY" // 4. Create consumer consumer = consumerSession.createConsumer(destination); // 5. Add a listener for handling received messages consumer.setMessageListener((consumerMessage) -> { try { MapMessage consumerByteMessage = (MapMessage) consumerMessage; try { Thread.sleep(N); // Introduce some artificial delay } catch (InterruptedException e) { logger.error("", e); } consumerMessage.acknowledge(); } catch (JMSException e) { try { consumerSession.recover(); } catch (JMSException e1) { throw new RuntimeException(e1); } } }); consumerConnection.start(); } -- Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html