Great, thats what I was expecting to see happen. As described in https://issues.apache.org/jira/browse/DISPATCH-367, if you were sending synchronously, then by the time the subsequent messages reach the router all the consumers have the same ability to receieve it and the router just sent it to the first one again, this repaeating over and over. The effect of sleeping just after the send is that even if though it was asynchronous, if the prior delivery has time to be completely processed before the next sent message arrives at the router it gives exactly the same effect as the synchronous send did.
If you had lots of such producers running at the same time, then youd see some balancing under the existing behaviour, but with just 1 or 2 its far less likely to happen unless the consumers also take time consuming the messages. Robbie On 27 June 2016 at 13:59, Noel OConnor <[email protected]> wrote: > I removed the thread sleep from the producer and it now seems to be working > as expected as in messages are load balanced across the consumers. > Why would that have a impact on the way the router dispatches messages to > the consumer ? > > > > > On Mon, Jun 27, 2016 at 8:37 PM, Robbie Gemmell <[email protected]> > wrote: > >> They should be getting sent asynchronously, since the client actually >> sends non-persistent messages async by default, plus the force option >> is there. >> >> There is a Thread.sleep in there for the producer, tried it without >> the sleep? How long is it sleeping? >> >> The 0.9.0 version of the client was mentioned, but which version of >> Dispatch is being used? Are you starting both the consumers first, >> then the producers? I believe the latest version of Dispatch only >> grants credit to senders once a receiver is available for an address, >> so the order of startup could also have some bearing on behaviour. >> >> Robbie >> >> On 26 June 2016 at 14:24, Ganesh Murthy <[email protected]> wrote: >> > Hi Noel, >> > Looking at your java producer code, I suspect that the messages are >> sent synchronously (even though jms.forceAsyncSend=true). Every message >> sent is synchronous meaning that the sender expects an acknowledgment >> before sending the next message. This means that only one message is in >> flight and so the router is seeing only one message and directing it to the >> same consumer. If you send out a burst of asynchronous messages, you will >> see them balanced across consumers. >> > >> > To check if your messages are sent synchronously to the router, start >> the router using PN_TRACE_FRM=1 >> > >> > (like this - PN_TRACE_FRM=1 qdrouterd -c path/to/your/conf_file.conf) >> and start 1 consumer and 1 sender and send say 10 messages. >> > >> > 1. If the sender is sending messages asynchronously, you must see bunch >> of inbound transfers and then *finally* a disposition from the router to >> the sender like below >> > >> > [0x7f87440046b0]:0 -> @begin(17) [remote-channel=0, next-outgoing-id=0, >> incoming-window=61, outgoing-window=2147483647] >> > [0x7f87440046b0]:0 -> @attach(18) >> [name="9df61d77-7e7c-4036-a501-bf2db01e143f-examples", handle=0, role=true, >> snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [durable=0, >> timeout=0, dynamic=false], target=@target(41) [address="examples", >> durable=0, timeout=0, dynamic=false], initial-delivery-count=0] >> > [0x7f87440046b0]:0 -> @flow(19) [next-incoming-id=0, incoming-window=61, >> next-outgoing-id=0, outgoing-window=2147483647, handle=0, delivery-count=0, >> link-credit=250, drain=false] >> > [0x7f87440046b0]:0 <- @transfer(20) [handle=0, delivery-id=0, >> delivery-tag=b"1", message-format=0, settled=false, more=false] (86) >> "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR >> \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x01@ >> @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R >> \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x01" >> > [0x7f87440046b0]:0 <- @transfer(20) [handle=0, delivery-id=1, >> delivery-tag=b"2", message-format=0, settled=false, more=false] (86) >> "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR >> \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x02@ >> @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R >> \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x02" >> > [0x7f87440046b0]:0 <- @transfer(20) [handle=0, delivery-id=2, >> delivery-tag=b"3", message-format=0, settled=false, more=false] (86) >> "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR >> \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x03@ >> @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R >> \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x03" >> > [0x7f87440046b0]:0 <- @transfer(20) [handle=0, delivery-id=3, >> delivery-tag=b"4", message-format=0, settled=false, more=false] (86) >> "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR >> \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x04@ >> @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R >> \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x04" >> > [0x7f8748003b20]:0 -> @transfer(20) [handle=0, delivery-id=0, >> delivery-tag=b"\x00\x00\x00\x00\x00\x00\x00\x00", message-format=0, >> settled=false, more=false] (86) >> "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR >> \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x01@ >> @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R >> \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x01" >> > [0x7f8748003b20]:0 -> @transfer(20) [handle=0, delivery-id=1, >> delivery-tag=b"\x01\x00\x00\x00\x00\x00\x00\x00", message-format=0, >> settled=false, more=false] (86) >> "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR >> \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x02@ >> @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R >> \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x02" >> > [0x7f8748003b20]:0 -> @transfer(20) [handle=0, delivery-id=2, >> delivery-tag=b"\x02\x00\x00\x00\x00\x00\x00\x00", message-format=0, >> settled=false, more=false] (86) >> "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR >> \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x03@ >> @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R >> \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x03" >> > [0x7f8748003b20]:0 -> @transfer(20) [handle=0, delivery-id=3, >> delivery-tag=b"\x03\x00\x00\x00\x00\x00\x00\x00", message-format=0, >> settled=false, more=false] (86) >> "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR >> \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x04@ >> @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R >> \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x04" >> > [0x7f8748003b20]:0 <- @disposition(21) [role=true, first=0, last=3, >> settled=true, state=@accepted(36) []] >> > [0x7f8748003b20]:0 <- @detach(22) [handle=0, closed=true] >> > >> > 2. If your sender is sending synchronously, you will see one transfer >> followed by one disposition (unlike what you see above which is a burst of >> transfers followed by a disposition) >> > >> > >> > There is a JIRA - https://issues.apache.org/jira/browse/DISPATCH-367 >> which will be available in 0.6.1 >> > >> > This JIRA will also balance messages sent synchronously across producers >> which will help your case. >> > >> > Thanks. >> > >> > ----- Original Message ----- >> >> From: "Noel OConnor" <[email protected]> >> >> To: "users" <[email protected]> >> >> Sent: Sunday, June 26, 2016 3:35:57 AM >> >> Subject: Dispatch router load balancing with java clients >> >> >> >> Hi, >> >> I have four java clients, 2 producers and 2 consumers, connecting to a >> >> single instance dispatch router and all accessing a single destination >> >> (targetqueue). Producers are sending non-persistent messages. >> >> I've configured the clients using the following options - >> >> connectionfactory.myJmsFactory = >> >> amqp://localhost:5672?jms.forceAsyncSend=true&jms.forceAsyncAcks=true >> >> >> >> I'm sending 100 messages from each producer and from reading other >> mailing >> >> list threads I would have expected them to be load balanced across the >> >> consumers given non-persistence and forceasyncsend options but they're >> not >> >> and all 200 messages are consumer by the first consumer that is started. >> >> >> >> Is there any obvious that I'm missing or are my expectations incorrect ? >> >> >> >> thanks >> >> Noel >> >> >> >> >> >> >> >> [root@776339e34cd3 build]# qdstat -la >> >> Router Addresses >> >> class addr phs distrib in-proc local remote >> >> cntnr in out thru to-proc from-proc >> >> >> >> >> ================================================================================================================= >> >> local $_management_internal closest 1 0 0 >> 0 >> >> 0 0 0 6 6 >> >> local $displayname closest 1 0 0 >> 0 >> >> 0 0 0 0 0 >> >> mobile $management 0 closest 1 0 0 >> 0 >> >> 424 0 0 424 0 >> >> local $management closest 1 0 0 >> 0 >> >> 0 0 0 0 0 >> >> * mobile targetqueue 0 balanced 0 2 0 >> 0 >> >> 200 200 0 0 0* >> >> local temp.YjGynchnxvRQ49q closest 0 1 0 >> 0 >> >> 0 0 0 0 0 >> >> >> >> <groupId>org.apache.qpid</groupId> >> >> <artifactId>qpid-jms-client</artifactId> >> >> <version>0.9.0</version> >> >> >> >> producer snippit >> >> ---------------------- >> >> connection = factory.createConnection(); >> >> connection.start(); >> >> Session session = connection.createSession(NON_TRANSACTED, >> >> Session.AUTO_ACKNOWLEDGE); >> >> MessageProducer producer = >> session.createProducer(destination); >> >> for (int i = 1; i <= NUM_MESSAGES_TO_BE_SENT; i++) { >> >> TextMessage message = session.createTextMessage(i + ". >> >> message sent"); >> >> LOG.info("Sending to destination: " + >> >> destination.toString() + " this text: '" + message.getText()); >> >> >> producer.send(message,DeliveryMode.NON_PERSISTENT,4,2000); >> >> Thread.sleep(MESSAGE_DELAY_MILLISECONDS); >> >> } >> >> >> >> consumer snippit >> >> ----------------------- >> >> connection.start(); >> >> Session session = connection.createSession(NON_TRANSACTED, >> >> Session.AUTO_ACKNOWLEDGE); >> >> MessageConsumer consumer = >> session.createConsumer(destination); >> >> LOG.info("Start consuming messages from " + >> >> destination.toString() + " with " + MESSAGE_TIMEOUT_MILLISECONDS + "ms >> >> timeout"); >> >> >> >> // Synchronous message consumer >> >> int i = 1; >> >> while (true) { >> >> Message message = >> >> consumer.receive(MESSAGE_TIMEOUT_MILLISECONDS); >> >> if (message != null) { >> >> if (message instanceof TextMessage) { >> >> String text = ((TextMessage) message).getText(); >> >> LOG.info("Got " + (i++) + ". message: " + text); >> >> } >> >> } else { >> >> >> > >> > --------------------------------------------------------------------- >> > To unsubscribe, e-mail: [email protected] >> > For additional commands, e-mail: [email protected] >> > >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: [email protected] >> For additional commands, e-mail: [email protected] >> >> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
