Hi,
I have four java clients, 2 producers and 2 consumers, connecting to a
single instance dispatch router and all accessing a single destination
(targetqueue). Producers are sending non-persistent messages.
I've configured the clients using the following options -
connectionfactory.myJmsFactory =
amqp://localhost:5672?jms.forceAsyncSend=true&jms.forceAsyncAcks=true

I'm sending 100 messages from each producer and from reading other mailing
list threads I would have expected them to be load balanced across the
consumers given non-persistence and forceasyncsend options but they're not
and all 200 messages are consumer by the first consumer that is started.

Is there any obvious that I'm missing or are my expectations incorrect ?

thanks
Noel



[root@776339e34cd3 build]# qdstat -la
Router Addresses
  class   addr                   phs  distrib   in-proc  local  remote
 cntnr  in   out  thru  to-proc  from-proc

=================================================================================================================
  local   $_management_internal       closest   1        0      0       0
   0    0    0     6        6
  local   $displayname                closest   1        0      0       0
   0    0    0     0        0
  mobile  $management            0    closest   1        0      0       0
   424  0    0     424      0
  local   $management                 closest   1        0      0       0
   0    0    0     0        0
*  mobile  targetqueue            0    balanced  0        2      0       0
     200  200  0     0        0*
  local   temp.YjGynchnxvRQ49q        closest   0        1      0       0
   0    0    0     0        0

<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>0.9.0</version>

producer snippit
----------------------
            connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(NON_TRANSACTED,
Session.AUTO_ACKNOWLEDGE);
            MessageProducer producer = session.createProducer(destination);
            for (int i = 1; i <= NUM_MESSAGES_TO_BE_SENT; i++) {
                TextMessage message = session.createTextMessage(i + ".
message sent");
                LOG.info("Sending to destination: " +
destination.toString() + " this text: '" + message.getText());
                producer.send(message,DeliveryMode.NON_PERSISTENT,4,2000);
                Thread.sleep(MESSAGE_DELAY_MILLISECONDS);
            }

consumer snippit
-----------------------
connection.start();
            Session session = connection.createSession(NON_TRANSACTED,
Session.AUTO_ACKNOWLEDGE);
            MessageConsumer consumer = session.createConsumer(destination);
            LOG.info("Start consuming messages from " +
destination.toString() + " with " + MESSAGE_TIMEOUT_MILLISECONDS + "ms
timeout");

            // Synchronous message consumer
            int i = 1;
            while (true) {
                Message message =
consumer.receive(MESSAGE_TIMEOUT_MILLISECONDS);
                if (message != null) {
                    if (message instanceof TextMessage) {
                        String text = ((TextMessage) message).getText();
                        LOG.info("Got " + (i++) + ". message: " + text);
                    }
                } else {

Reply via email to