On 11/12/2013 04:56 PM, Flavio Baronti wrote:
I'm using the C++ broker on Linux, and the
Java client on windows.
The sender is sending through producers obtained with
     MessageProducer topicProducer =
session.createProducer(session.createTopic(topicName + "; {node: { type:
topic } }"));
while the receiver is receiving with a listener set on consumers
obtained with
     MessageConsumer consumer =
session.createConsumer(session.createTopic(topicName));
I don't know what to gather from CPU usage. With few sessions it's
higher on the broker, and lower on the client.
Increasing the number of session it's the other way round, higher on
client and lower on broker.
In both cases we're very far from 100%.

I created a bug report, I'm not too familiar with JIRA so I hope I did
it ok.
I added the two test programs I'm using. If there is anything else I can
help with, please let me know.

Sorry for the delayed response.

The way the c++ broker does delivery is by setting interest in the writability of the connection if there is a message available. A worker thread will then process that connection which involves attempting to pull the message off the queue. This gets increasingly expensive as the number of queues from which you are consuming on that connection goes up.

A suggested change to avoid the problem would be to have all those distinct subscriptions on the session use the same underlying subscription queue, and just add the bindings for the different keys.

I've attached a patch to your JMSReceiver example that does this. (Unfortunately due to QPID-5345, the address is somewhat less intuitive than ideal).

diff -up ./JMSReceiver.java.orig ./JMSReceiver.java
--- ./JMSReceiver.java.orig	2013-11-15 13:54:10.872030983 +0000
+++ ./JMSReceiver.java	2013-11-15 14:37:51.740173048 +0000
@@ -84,7 +84,9 @@ public class JMSReceiver {
             Session session = sessions[topicIdx % sessions.length];
             String topicName = (topicIdx / TOPIC_LEVEL1_SIZE) + "." + (topicIdx % TOPIC_LEVEL1_SIZE);
             //consumers[topicIdx] = session.createConsumer(session.createTopic("JMSRecv#" + UUID.randomUUID() + "; {create: always, node: {type: queue, durable:false, x-bindings:[{exchange: '" + EXCHANGE + "', key:'" + topicName + "'}], x-declare:{ arguments: {'qpid.max_count': 8192, 'qpid.max_size': 8388608, 'qpid.policy_type': reject }}}}"));
-            consumers[topicIdx] = sessions[topicIdx % sessions.length].createConsumer(session.createTopic(EXCHANGE + "/" + topicName));
+            //String address = EXCHANGE + "/" + topicName + ";{link:{name:session-" + (topicIdx % sessions.length) + ",x-subscribe:{exclusive:False}}}";
+            String address = "session-" + (topicIdx % sessions.length) + "; {create:always, node:{auto-delete:True,exclusive:True}, link:{x-bindings:[{exchange:'" + EXCHANGE + "',key:'" + topicName + "'}]}}";
+            consumers[topicIdx] = sessions[topicIdx % sessions.length].createConsumer(session.createQueue(address));
             consumers[topicIdx].setMessageListener(receiverListener);
             if (++totCreated % Math.max(NUM_TOPICS / 50, 1) == 0) {
                 System.out.printf("%s startReceive created %d (tot %d)%n", new DateTime(), topicIdx, totCreated);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to