Hello,

TLDR: When we introduce some latency on the consumers, the maximum consume
rate drops drastically, even though we have enough consumer threads to
handle the rate.


We are currently evaluating ActiveMQ, focusing on produce/consume rate:
The setup is the following:
- Virtual Topics: 100 
- Consumer Queues: 3 per virtual topic amounting to 300 queues.
- Produce rate: 10 messages/second on each Virtual topic, amounting to
1000messages/second
-- PooledConnectionFactory with 16 connections
- Consumers: 4 different consumers on each queue, amounting to 1200
consumers (i.e. sessions + consumer)
-- another PooledConnectionFactory with 16 connections
- Broker configuration: default settings, producerFlowControl is enabled
- Everything runs on a single machine, i7 2.5Ghz, 16GB memory

When the consumers process messages very fast we can achieve a consume rate
of 3000 messages/sec which is the expected one for the 1000messages/sec that
we produce (3 queues per topic).

When we introduce some latency on the consumers, the maximum consume rate
drops drastically, even though we have enough consumers (== threads) to
handle the rate:

- Latency 10 msec, Consume Rate drops to 1300 messages/sec
- Latency 20 msec, Consume Rate drops to 650 messages/sec
- Latency 100 msec, Consume Rate drops to 130 messages/sec

In all cases produce rate also drops to approximately 1/3 of consume rate,
due to flow control.

Any advice on why the consume rate drops?


Below are code samples, with the connections and producers/consumers
configurations:

// ------------------------------------
// Producer Connection Settings
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(connectionUrl(amqConfiguration));
connectionFactory.setUserName(amqConfiguration.getPublisherUsername());
connectionFactory.setPassword(amqConfiguration.getPublisherPassword());
connectionFactory.setClientIDPrefix("HermesMonitorAMQ-Producer");
connectionFactory.setAlwaysSyncSend(true);
connectionFactory.setDispatchAsync(false);
connectionFactory.setSendTimeout(5000);

publisherConnectionFactory = new PooledConnectionFactory(connectionFactory);
publisherConnectionFactory.setMaxConnections(16);
publisherConnectionFactory.start();

// ------------------------------------
// Producer: this is called by different threads  to produce 10 messages/sec
on each topic
private synchronized void publish(String topic) throws JMSException,
TException {
    // 1. Create connection
    Connection producerConnection =
publisherConnectionFactory.createConnection();
    // 2. Create Session
    Session producerSession = producerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
    // 3. Create Destination
    Destination destination = producerSession.createTopic(topic);
    // ^^ e.g. "VirtualTopic.ProducerYYY"
    // 4. Create Producer
    MessageProducer producer = producerSession.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);

    // 5. Create message
    MapMessage producerMessage = producerSession.createMapMessage();
    {
        // Add some payload
        byte[] payload = new
byte[monitorParameters.getProducerMessageSize()];
        new Random().nextBytes(payload);
        producerMessage.setObject("payload", payload);
    }

    // 6. Send Message
    producer.send(producerMessage);

    producer.close();
    producerSession.close();
    producerConnection.close();
}

// ------------------------------------
// Consumer Connection Settings
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(connectionUrl(amqConfiguration));
connectionFactory.setClientIDPrefix("HermesMonitorAMQ-Consumer");
connectionFactory.setUserName(amqConfiguration.getConsumerUsername());
connectionFactory.setPassword(amqConfiguration.getConsumerPassword());
connectionFactory.setAlwaysSessionAsync(true);
connectionFactory.setOptimizeAcknowledge(false);
connectionFactory.setDispatchAsync(false);
connectionFactory.setSendTimeout(5000);
connectionFactory.setConnectResponseTimeout(5000);
connectionFactory.setMessagePrioritySupported(true);
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setAll(1);
connectionFactory.setPrefetchPolicy(prefetchPolicy);
RedeliveryPolicy redeliveryPolicy = connectionFactory.getRedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(0);
connectionFactory.setRedeliveryPolicy(redeliveryPolicy);

consumerConnectionFactory = new PooledConnectionFactory(connectionFactory);
consumerConnectionFactory.setMaxConnections(16);

consumerConnectionFactory.start();

//---------------------------------------------------
// Consumer: this is called 4 times per queue => 4 * 300 = 1200 sessions +
consumers
void startConsumer(String queue) {
    // 1. Establish a connection for the consumer.
    consumerConnection = consumerConnectionFactory.createConnection();
    // 2. Create a session.
    consumerSession = consumerConnection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
    // 3. Create a message consumer from the session to the queue.
    Destination destination = consumerSession.createQueue(queue));
    // ^^ e.g. "Consumer.ConsumerXXX.VirtualTopic.ProducerYYY"
    // 4. Create consumer
    consumer = consumerSession.createConsumer(destination);
    // 5. Add a listener for handling received messages
    consumer.setMessageListener((consumerMessage) -> {
        try {
            MapMessage consumerByteMessage = (MapMessage) consumerMessage;
                try {
                    Thread.sleep(N); // Introduce some artificial delay
                } catch (InterruptedException e) {
                    logger.error("", e);
                }
                consumerMessage.acknowledge();
        } catch (JMSException e) {
            try {
                consumerSession.recover();
            } catch (JMSException e1) {
                throw new RuntimeException(e1);
            }
        }
    });
    consumerConnection.start();
}



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html

Reply via email to