I removed the thread sleep from the producer and it now seems to be working as expected as in messages are load balanced across the consumers. Why would that have a impact on the way the router dispatches messages to the consumer ?
On Mon, Jun 27, 2016 at 8:37 PM, Robbie Gemmell <[email protected]> wrote: > They should be getting sent asynchronously, since the client actually > sends non-persistent messages async by default, plus the force option > is there. > > There is a Thread.sleep in there for the producer, tried it without > the sleep? How long is it sleeping? > > The 0.9.0 version of the client was mentioned, but which version of > Dispatch is being used? Are you starting both the consumers first, > then the producers? I believe the latest version of Dispatch only > grants credit to senders once a receiver is available for an address, > so the order of startup could also have some bearing on behaviour. > > Robbie > > On 26 June 2016 at 14:24, Ganesh Murthy <[email protected]> wrote: > > Hi Noel, > > Looking at your java producer code, I suspect that the messages are > sent synchronously (even though jms.forceAsyncSend=true). Every message > sent is synchronous meaning that the sender expects an acknowledgment > before sending the next message. This means that only one message is in > flight and so the router is seeing only one message and directing it to the > same consumer. If you send out a burst of asynchronous messages, you will > see them balanced across consumers. > > > > To check if your messages are sent synchronously to the router, start > the router using PN_TRACE_FRM=1 > > > > (like this - PN_TRACE_FRM=1 qdrouterd -c path/to/your/conf_file.conf) > and start 1 consumer and 1 sender and send say 10 messages. > > > > 1. If the sender is sending messages asynchronously, you must see bunch > of inbound transfers and then *finally* a disposition from the router to > the sender like below > > > > [0x7f87440046b0]:0 -> @begin(17) [remote-channel=0, next-outgoing-id=0, > incoming-window=61, outgoing-window=2147483647] > > [0x7f87440046b0]:0 -> @attach(18) > [name="9df61d77-7e7c-4036-a501-bf2db01e143f-examples", handle=0, role=true, > snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [durable=0, > timeout=0, dynamic=false], target=@target(41) [address="examples", > durable=0, timeout=0, dynamic=false], initial-delivery-count=0] > > [0x7f87440046b0]:0 -> @flow(19) [next-incoming-id=0, incoming-window=61, > next-outgoing-id=0, outgoing-window=2147483647, handle=0, delivery-count=0, > link-credit=250, drain=false] > > [0x7f87440046b0]:0 <- @transfer(20) [handle=0, delivery-id=0, > delivery-tag=b"1", message-format=0, settled=false, more=false] (86) > "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR > \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x01@ > @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R > \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x01" > > [0x7f87440046b0]:0 <- @transfer(20) [handle=0, delivery-id=1, > delivery-tag=b"2", message-format=0, settled=false, more=false] (86) > "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR > \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x02@ > @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R > \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x02" > > [0x7f87440046b0]:0 <- @transfer(20) [handle=0, delivery-id=2, > delivery-tag=b"3", message-format=0, settled=false, more=false] (86) > "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR > \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x03@ > @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R > \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x03" > > [0x7f87440046b0]:0 <- @transfer(20) [handle=0, delivery-id=3, > delivery-tag=b"4", message-format=0, settled=false, more=false] (86) > "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR > \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x04@ > @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R > \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x04" > > [0x7f8748003b20]:0 -> @transfer(20) [handle=0, delivery-id=0, > delivery-tag=b"\x00\x00\x00\x00\x00\x00\x00\x00", message-format=0, > settled=false, more=false] (86) > "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR > \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x01@ > @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R > \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x01" > > [0x7f8748003b20]:0 -> @transfer(20) [handle=0, delivery-id=1, > delivery-tag=b"\x01\x00\x00\x00\x00\x00\x00\x00", message-format=0, > settled=false, more=false] (86) > "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR > \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x02@ > @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R > \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x02" > > [0x7f8748003b20]:0 -> @transfer(20) [handle=0, delivery-id=2, > delivery-tag=b"\x02\x00\x00\x00\x00\x00\x00\x00", message-format=0, > settled=false, more=false] (86) > "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR > \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x03@ > @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R > \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x03" > > [0x7f8748003b20]:0 -> @transfer(20) [handle=0, delivery-id=3, > delivery-tag=b"\x03\x00\x00\x00\x00\x00\x00\x00", message-format=0, > settled=false, more=false] (86) > "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR > \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x04@ > @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R > \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x04" > > [0x7f8748003b20]:0 <- @disposition(21) [role=true, first=0, last=3, > settled=true, state=@accepted(36) []] > > [0x7f8748003b20]:0 <- @detach(22) [handle=0, closed=true] > > > > 2. If your sender is sending synchronously, you will see one transfer > followed by one disposition (unlike what you see above which is a burst of > transfers followed by a disposition) > > > > > > There is a JIRA - https://issues.apache.org/jira/browse/DISPATCH-367 > which will be available in 0.6.1 > > > > This JIRA will also balance messages sent synchronously across producers > which will help your case. > > > > Thanks. > > > > ----- Original Message ----- > >> From: "Noel OConnor" <[email protected]> > >> To: "users" <[email protected]> > >> Sent: Sunday, June 26, 2016 3:35:57 AM > >> Subject: Dispatch router load balancing with java clients > >> > >> Hi, > >> I have four java clients, 2 producers and 2 consumers, connecting to a > >> single instance dispatch router and all accessing a single destination > >> (targetqueue). Producers are sending non-persistent messages. > >> I've configured the clients using the following options - > >> connectionfactory.myJmsFactory = > >> amqp://localhost:5672?jms.forceAsyncSend=true&jms.forceAsyncAcks=true > >> > >> I'm sending 100 messages from each producer and from reading other > mailing > >> list threads I would have expected them to be load balanced across the > >> consumers given non-persistence and forceasyncsend options but they're > not > >> and all 200 messages are consumer by the first consumer that is started. > >> > >> Is there any obvious that I'm missing or are my expectations incorrect ? > >> > >> thanks > >> Noel > >> > >> > >> > >> [root@776339e34cd3 build]# qdstat -la > >> Router Addresses > >> class addr phs distrib in-proc local remote > >> cntnr in out thru to-proc from-proc > >> > >> > ================================================================================================================= > >> local $_management_internal closest 1 0 0 > 0 > >> 0 0 0 6 6 > >> local $displayname closest 1 0 0 > 0 > >> 0 0 0 0 0 > >> mobile $management 0 closest 1 0 0 > 0 > >> 424 0 0 424 0 > >> local $management closest 1 0 0 > 0 > >> 0 0 0 0 0 > >> * mobile targetqueue 0 balanced 0 2 0 > 0 > >> 200 200 0 0 0* > >> local temp.YjGynchnxvRQ49q closest 0 1 0 > 0 > >> 0 0 0 0 0 > >> > >> <groupId>org.apache.qpid</groupId> > >> <artifactId>qpid-jms-client</artifactId> > >> <version>0.9.0</version> > >> > >> producer snippit > >> ---------------------- > >> connection = factory.createConnection(); > >> connection.start(); > >> Session session = connection.createSession(NON_TRANSACTED, > >> Session.AUTO_ACKNOWLEDGE); > >> MessageProducer producer = > session.createProducer(destination); > >> for (int i = 1; i <= NUM_MESSAGES_TO_BE_SENT; i++) { > >> TextMessage message = session.createTextMessage(i + ". > >> message sent"); > >> LOG.info("Sending to destination: " + > >> destination.toString() + " this text: '" + message.getText()); > >> > producer.send(message,DeliveryMode.NON_PERSISTENT,4,2000); > >> Thread.sleep(MESSAGE_DELAY_MILLISECONDS); > >> } > >> > >> consumer snippit > >> ----------------------- > >> connection.start(); > >> Session session = connection.createSession(NON_TRANSACTED, > >> Session.AUTO_ACKNOWLEDGE); > >> MessageConsumer consumer = > session.createConsumer(destination); > >> LOG.info("Start consuming messages from " + > >> destination.toString() + " with " + MESSAGE_TIMEOUT_MILLISECONDS + "ms > >> timeout"); > >> > >> // Synchronous message consumer > >> int i = 1; > >> while (true) { > >> Message message = > >> consumer.receive(MESSAGE_TIMEOUT_MILLISECONDS); > >> if (message != null) { > >> if (message instanceof TextMessage) { > >> String text = ((TextMessage) message).getText(); > >> LOG.info("Got " + (i++) + ". message: " + text); > >> } > >> } else { > >> > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: [email protected] > > For additional commands, e-mail: [email protected] > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [email protected] > For additional commands, e-mail: [email protected] > >
