I removed the thread sleep from the producer and it now seems to be working
as expected as in messages are load balanced across the consumers.
Why would that have a impact on the way the router dispatches messages to
the consumer ?




On Mon, Jun 27, 2016 at 8:37 PM, Robbie Gemmell <[email protected]>
wrote:

> They should be getting sent asynchronously, since the client actually
> sends non-persistent messages async by default, plus the force option
> is there.
>
> There is a Thread.sleep in there for the producer, tried it without
> the sleep? How long is it sleeping?
>
> The 0.9.0 version of the client was mentioned, but which version of
> Dispatch is being used? Are you starting both the consumers first,
> then the producers? I believe the latest version of Dispatch only
> grants credit to senders once a receiver is available for an address,
> so the order of startup could also have some bearing on behaviour.
>
> Robbie
>
> On 26 June 2016 at 14:24, Ganesh Murthy <[email protected]> wrote:
> > Hi Noel,
> >     Looking at your java producer code, I suspect that the messages are
> sent synchronously (even though jms.forceAsyncSend=true). Every message
> sent is synchronous meaning that the sender expects an acknowledgment
> before sending the next message. This means that only one message is in
> flight and so the router is seeing only one message and directing it to the
> same consumer.  If you send out a burst of asynchronous messages, you will
> see them balanced across consumers.
> >
> > To check if your messages are sent synchronously to the router, start
> the router using PN_TRACE_FRM=1
> >
> > (like this - PN_TRACE_FRM=1 qdrouterd -c path/to/your/conf_file.conf)
> and start 1 consumer and 1 sender and send say 10 messages.
> >
> > 1. If the sender is sending messages asynchronously, you must see bunch
> of inbound transfers and then *finally* a disposition from the router to
> the sender like below
> >
> > [0x7f87440046b0]:0 -> @begin(17) [remote-channel=0, next-outgoing-id=0,
> incoming-window=61, outgoing-window=2147483647]
> > [0x7f87440046b0]:0 -> @attach(18)
> [name="9df61d77-7e7c-4036-a501-bf2db01e143f-examples", handle=0, role=true,
> snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [durable=0,
> timeout=0, dynamic=false], target=@target(41) [address="examples",
> durable=0, timeout=0, dynamic=false], initial-delivery-count=0]
> > [0x7f87440046b0]:0 -> @flow(19) [next-incoming-id=0, incoming-window=61,
> next-outgoing-id=0, outgoing-window=2147483647, handle=0, delivery-count=0,
> link-credit=250, drain=false]
> > [0x7f87440046b0]:0 <- @transfer(20) [handle=0, delivery-id=0,
> delivery-tag=b"1", message-format=0, settled=false, more=false] (86)
> "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR
> \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x01@
> @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R
> \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x01"
> > [0x7f87440046b0]:0 <- @transfer(20) [handle=0, delivery-id=1,
> delivery-tag=b"2", message-format=0, settled=false, more=false] (86)
> "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR
> \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x02@
> @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R
> \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x02"
> > [0x7f87440046b0]:0 <- @transfer(20) [handle=0, delivery-id=2,
> delivery-tag=b"3", message-format=0, settled=false, more=false] (86)
> "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR
> \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x03@
> @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R
> \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x03"
> > [0x7f87440046b0]:0 <- @transfer(20) [handle=0, delivery-id=3,
> delivery-tag=b"4", message-format=0, settled=false, more=false] (86)
> "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR
> \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x04@
> @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R
> \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x04"
> > [0x7f8748003b20]:0 -> @transfer(20) [handle=0, delivery-id=0,
> delivery-tag=b"\x00\x00\x00\x00\x00\x00\x00\x00", message-format=0,
> settled=false, more=false] (86)
> "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR
> \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x01@
> @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R
> \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x01"
> > [0x7f8748003b20]:0 -> @transfer(20) [handle=0, delivery-id=1,
> delivery-tag=b"\x01\x00\x00\x00\x00\x00\x00\x00", message-format=0,
> settled=false, more=false] (86)
> "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR
> \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x02@
> @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R
> \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x02"
> > [0x7f8748003b20]:0 -> @transfer(20) [handle=0, delivery-id=2,
> delivery-tag=b"\x02\x00\x00\x00\x00\x00\x00\x00", message-format=0,
> settled=false, more=false] (86)
> "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR
> \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x03@
> @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R
> \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x03"
> > [0x7f8748003b20]:0 -> @transfer(20) [handle=0, delivery-id=3,
> delivery-tag=b"\x03\x00\x00\x00\x00\x00\x00\x00", message-format=0,
> settled=false, more=false] (86)
> "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR
> \x00\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x0dS\x04@
> @@@@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R
> \x00@\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x04"
> > [0x7f8748003b20]:0 <- @disposition(21) [role=true, first=0, last=3,
> settled=true, state=@accepted(36) []]
> > [0x7f8748003b20]:0 <- @detach(22) [handle=0, closed=true]
> >
> > 2. If your sender is sending synchronously, you will see one transfer
> followed by one disposition (unlike what you see above which is a burst of
> transfers followed by a disposition)
> >
> >
> > There is a JIRA - https://issues.apache.org/jira/browse/DISPATCH-367
> which will be available in 0.6.1
> >
> > This JIRA will also balance messages sent synchronously across producers
> which will help your case.
> >
> > Thanks.
> >
> > ----- Original Message -----
> >> From: "Noel OConnor" <[email protected]>
> >> To: "users" <[email protected]>
> >> Sent: Sunday, June 26, 2016 3:35:57 AM
> >> Subject: Dispatch router load balancing with java clients
> >>
> >> Hi,
> >> I have four java clients, 2 producers and 2 consumers, connecting to a
> >> single instance dispatch router and all accessing a single destination
> >> (targetqueue). Producers are sending non-persistent messages.
> >> I've configured the clients using the following options -
> >> connectionfactory.myJmsFactory =
> >> amqp://localhost:5672?jms.forceAsyncSend=true&jms.forceAsyncAcks=true
> >>
> >> I'm sending 100 messages from each producer and from reading other
> mailing
> >> list threads I would have expected them to be load balanced across the
> >> consumers given non-persistence and forceasyncsend options but they're
> not
> >> and all 200 messages are consumer by the first consumer that is started.
> >>
> >> Is there any obvious that I'm missing or are my expectations incorrect ?
> >>
> >> thanks
> >> Noel
> >>
> >>
> >>
> >> [root@776339e34cd3 build]# qdstat -la
> >> Router Addresses
> >>   class   addr                   phs  distrib   in-proc  local  remote
> >>  cntnr  in   out  thru  to-proc  from-proc
> >>
> >>
> =================================================================================================================
> >>   local   $_management_internal       closest   1        0      0
>  0
> >>    0    0    0     6        6
> >>   local   $displayname                closest   1        0      0
>  0
> >>    0    0    0     0        0
> >>   mobile  $management            0    closest   1        0      0
>  0
> >>    424  0    0     424      0
> >>   local   $management                 closest   1        0      0
>  0
> >>    0    0    0     0        0
> >> *  mobile  targetqueue            0    balanced  0        2      0
>  0
> >>      200  200  0     0        0*
> >>   local   temp.YjGynchnxvRQ49q        closest   0        1      0
>  0
> >>    0    0    0     0        0
> >>
> >> <groupId>org.apache.qpid</groupId>
> >> <artifactId>qpid-jms-client</artifactId>
> >> <version>0.9.0</version>
> >>
> >> producer snippit
> >> ----------------------
> >>             connection = factory.createConnection();
> >>             connection.start();
> >>             Session session = connection.createSession(NON_TRANSACTED,
> >> Session.AUTO_ACKNOWLEDGE);
> >>             MessageProducer producer =
> session.createProducer(destination);
> >>             for (int i = 1; i <= NUM_MESSAGES_TO_BE_SENT; i++) {
> >>                 TextMessage message = session.createTextMessage(i + ".
> >> message sent");
> >>                 LOG.info("Sending to destination: " +
> >> destination.toString() + " this text: '" + message.getText());
> >>
>  producer.send(message,DeliveryMode.NON_PERSISTENT,4,2000);
> >>                 Thread.sleep(MESSAGE_DELAY_MILLISECONDS);
> >>             }
> >>
> >> consumer snippit
> >> -----------------------
> >> connection.start();
> >>             Session session = connection.createSession(NON_TRANSACTED,
> >> Session.AUTO_ACKNOWLEDGE);
> >>             MessageConsumer consumer =
> session.createConsumer(destination);
> >>             LOG.info("Start consuming messages from " +
> >> destination.toString() + " with " + MESSAGE_TIMEOUT_MILLISECONDS + "ms
> >> timeout");
> >>
> >>             // Synchronous message consumer
> >>             int i = 1;
> >>             while (true) {
> >>                 Message message =
> >> consumer.receive(MESSAGE_TIMEOUT_MILLISECONDS);
> >>                 if (message != null) {
> >>                     if (message instanceof TextMessage) {
> >>                         String text = ((TextMessage) message).getText();
> >>                         LOG.info("Got " + (i++) + ". message: " + text);
> >>                     }
> >>                 } else {
> >>
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: [email protected]
> > For additional commands, e-mail: [email protected]
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>

Reply via email to