setting TCP no-delay makes a massive difference for txns, try with that
set on the broker and on the client connection.


On 10/06/2010 02:49 AM, [email protected] wrote:
Andrew,

Thank you very much for your response.
I have performed several tests with java cliens and c++ broker and I'm
shoked.
I use qpid version 0.6, 100 Mbit ethernet, 100byte message size, 1
persistance queue, 1 subscriber
In all tests I use 1 persistance queue, and the result is the folowwing:
1. Transactions in client are not used, messages non persistance - I got
up to 60000 msg/sec
2. Transactions in client are not used, messages IS persistance - I got up
to 20000 msg/sec
3. Transactions in client ARE used, messages IS persistance - I got up to
90 msg/sec

I use the following code:
1. Transactions in client are not used
public class JmsThreadReceiver {

     private static final int INTERVAL = 5;
     private static final int BIG_INTERVAL = 15;
     private static final int MILLS_IN_SECOND = 1000;
     private static final String JNDI_QPID_CONNECTION_FACTORY =
"QpidConnectionFactory";
     private Logger log = Logger.getLogger(this.getClass().getName());
     private static final String QPID_JNDI_PROPERTIES =
"qpid.jndi.properties";
     private Listener listener = new Listener();
     private long msgCount = 0;
     private long bigMsgCount = 0;

     public class Listener implements MessageListener {
         @Override
         public void onMessage(Message msg) {
             msgCount++;
             bigMsgCount++;
         }
     }
     public void startListener(final String queueJndiName)
         throws IOException, NamingException, JMSException,
InterruptedException {

         long startTime = System.currentTimeMillis();
         long startBigTime = System.currentTimeMillis();
         // Load JNDI properties
         Properties properties = new Properties();
         properties.load(this
.getClass().getClassLoader().getResourceAsStream(QPID_JNDI_PROPERTIES));
         // Create the initial context
         Context ctx = new InitialContext(properties);
         // Lookup the connection factory
         ConnectionFactory conFac = (ConnectionFactory) ctx.lookup(
JNDI_QPID_CONNECTION_FACTORY);
         // look up destination
         Destination destination = (Destination) ctx.lookup(queueJndiName);
         // create the connection
         Connection connection = null;
         try {
             connection = conFac.createConnection();
             // As this application is using a MessageConsumer we need to
set an ExceptionListener on the connection
             // so that errors raised within the JMS client library can be
reported to the application
             log.info(": Setting an ExceptionListener on the connection as
sample uses a MessageConsumer");
             connection.setExceptionListener(new ExceptionListener() {
                 public void onException(JMSException e) {
                     // The connection may have broken invoke reconnect
code if available.
                     log.log(Level.SEVERE, "The sample received for ["
                             + queueJndiName + "] an exception through the
ExceptionListener", e);
                     System.exit(0);
                 }
             });
             // Create a session on the connection
             // This session is a default choice of non-transacted and uses
             // the auto acknowledge feature of a session.
             log.info(": Creating a non-transacted, auto-acknowledged
session");
             Session session = connection.createSession(false, Session.
AUTO_ACKNOWLEDGE);
             // Create a MessageConsumer
             log.info(": Creating a MessageConsumer");
             MessageConsumer messageConsumer =
session.createConsumer(destination);
             // Set a message listener on the messageConsumer
             messageConsumer.setMessageListener(listener);
             // Now the messageConsumer is set up we can start the
connection
             log.info("Starting connection so MessageConsumer can receive
messages");
             connection.start();
             while (true) {
                 Thread.sleep(INTERVAL * MILLS_IN_SECOND);
                 long currTime = System.currentTimeMillis();
                 double throughput = (double) msgCount / ((currTime -
startTime) / MILLS_IN_SECOND);
                 log.info("JMS [" + queueJndiName + "] msgCount=[" +
msgCount + "], throughput=[" + throughput + "]");
                 msgCount = 0;
                 startTime = currTime;

                 currTime = System.currentTimeMillis();
                 long bigInterval = currTime - startBigTime;
                 if (bigInterval>= BIG_INTERVAL * MILLS_IN_SECOND) {
                     throughput = (double) bigMsgCount / ((bigInterval) /
MILLS_IN_SECOND);
                     log.info("BIG### [" + queueJndiName + "]
bigMsgCount=[" + bigMsgCount + "], throughput=[" + throughput + "]");
                     bigMsgCount = 0;
                     startBigTime = currTime;
                 }
             }
         } finally {
             if (connection != null) {
                 try {
                     connection.close();
                 } catch (JMSException e) {
                     log.log(Level.SEVERE, e.getMessage(), e);
                 }
             }
         }
     }
}

2. Transactions in client ARE used
public class JmsTxThreadReceiver {
     private static final int INTERVAL = 5;
     private static final int BIG_INTERVAL = 15;
     private static final int MILLS_IN_SECOND = 1000;
     private static final String JNDI_QPID_CONNECTION_FACTORY =
"QpidConnectionFactory";
     private Logger log = Logger.getLogger(this.getClass().getName());
     private static final String QPID_JNDI_PROPERTIES =
"qpid.jndi.properties";
     private Listener listener = new Listener();
     private long msgCount = 0;
     private long bigMsgCount = 0;
     long startTime = System.currentTimeMillis();
     long startBigTime = System.currentTimeMillis();
     private final class Reader implements Runnable {
         private final String queueJndiName;
         private boolean isStop = false;
         private Reader(String queueJndiName) {
             this.queueJndiName = queueJndiName;
         }
         public void setStop(boolean isStop) {
             this.isStop = isStop;
         }
         @Override
         public void run() {
             while (!isStop) {
                 try {
                     Thread.sleep(INTERVAL * MILLS_IN_SECOND);
                 } catch (InterruptedException e) {
                     log.log(Level.SEVERE, e.getMessage(), e);
                 }
                 long currTime = System.currentTimeMillis();
                 double throughput = (double) msgCount / ((currTime -
startTime) / MILLS_IN_SECOND);
                 log.info("JMS [" + queueJndiName + "] msgCount=[" +
msgCount + "], throughput=[" + throughput + "]");
                 msgCount = 0;
                 startTime = currTime;

                 currTime = System.currentTimeMillis();
                 long bigInterval = currTime - startBigTime;
                 if (bigInterval>= BIG_INTERVAL * MILLS_IN_SECOND) {
                     throughput = (double) bigMsgCount / ((bigInterval) /
MILLS_IN_SECOND);
                     log.info("BIG### [" + queueJndiName + "]
bigMsgCount=[" + bigMsgCount + "], throughput=["
                             + throughput + "]");
                     bigMsgCount = 0;
                     startBigTime = currTime;
                 }
             }
         }
     }
     public void startListener(final String queueJndiName) throws
IOException, NamingException, JMSException,
             InterruptedException {
         // Load JNDI properties
         Properties properties = new Properties();
         properties.load(this
.getClass().getClassLoader().getResourceAsStream(QPID_JNDI_PROPERTIES));
         // Create the initial context
         Context ctx = new InitialContext(properties);
         // Lookup the connection factory
         ConnectionFactory conFac = (ConnectionFactory) ctx.lookup(
JNDI_QPID_CONNECTION_FACTORY);
         // look up destination
         Destination destination = (Destination) ctx.lookup(queueJndiName);
         // create the connection
         Connection connection = null;
         try {
             connection = conFac.createConnection();
             // As this application is using a MessageConsumer we need to
set an ExceptionListener on the connection
             // so that errors raised within the JMS client library can be
reported to the application
             log.info(": Setting an ExceptionListener on the connection as
sample uses a MessageConsumer");
             connection.setExceptionListener(new ExceptionListener() {
                 public void onException(JMSException e) {
                     // The connection may have broken invoke reconnect
code if available.
                     log.log(Level.SEVERE, "The sample received for [" +
queueJndiName
                             + "] an exception through the
ExceptionListener", e);
                     System.exit(0);
                 }
             });
             // Create a session on the connection
             // This session is a default choice of non-transacted and uses
             // the auto acknowledge feature of a session.
             log.info(": Creating a non-transacted, auto-acknowledged
session");
             Session session = connection.createSession(true, Session
.SESSION_TRANSACTED);
             // Create a MessageConsumer
             log.info(": Creating a MessageConsumer");
             MessageConsumer messageConsumer =
session.createConsumer(destination);
             // Set a message listener on the messageConsumer
             // messageConsumer.setMessageListener(listener);
             // Now the messageConsumer is set up we can start the
connection
             log.info("Starting connection so MessageConsumer can receive
messages");
             connection.start();
             Reader reader = new Reader(queueJndiName);
             Thread t = new Thread(reader);
             t.start();
             // todo:
             Message msg = null;
             while ((msg = messageConsumer.receive()) != null) {
                 msgCount++;
                 bigMsgCount++;
                 session.commit();
             }
             reader.setStop(true);
         } finally {
             if (connection != null) {
                 try {
                     connection.close();
                 } catch (JMSException e) {
                     log.log(Level.SEVERE, e.getMessage(), e);
                 }
             }
         }
     }
}

3. jndi props

java.naming.factory.initial =
org.apache.qpid.jndi.PropertiesFileInitialContextFactory
connectionfactory.QpidConnectionFactory=amqp://guest:guest@/test?brokerlist='tcp://qpidserver1:5672?retries='1000'&connectdelay='5000';'
queue.boris.queue1=boris.queue1
queue.boris.queue2=boris.queue2


Regards,
Boris



Andrew Kennedy<[email protected]>
05/10/2010 23:44
Please respond to
[email protected]


To
[email protected]
cc

Subject
Re: Transaction java client performance issue






On 4 Oct 2010, at 14:39, [email protected] wrote:
Folks, greetings.
Hi, Boris.

Could you please help me to clarify the following performance issue
with
transaction java qpid client (jms)
I use org.apache.qpid.jndi.PropertiesFileInitialContextFactory to get
messages from broker.
I read messages in one thread in java main application
If I use
                         Session session = connection.createSession
(false, Session.AUTO_ACKNOWLEDGE);
I got 10000 messages per seconds

But if I use
                         Session session = connection.createSession
(true, Session.AUTO_ACKNOWLEDGE);
Note that the second parameter is ignored if the first is set to
"true" - it is preferable to set it to 'Session.SESSION_TRANSACTED'
in this case, to make things clearer.

and commit transaction after each message I got only 60 messages per
seconds

Could you please advice why is the difference is so significant?
How can I improve performance for my installation?
Firstly, you are using persistent messages (the default) so they are
being written to disk, and also transactions, which is the slowest
throughput usage pattern. Secondly, you will be using the default
prefetch setting, which is either 500 or 1000 (depending on broker
version) which causes this amount of messages to be buffered by the
client.

In order to increase performance, I would suggest two things - one,
batch up your commits if possible, and two, set "maxprefetch" to this
batch size. This can be done by adding "&maxprefetch='10'" to the end
of the connection factory URL in your client properties file.

If you can do without persistence and transactions, then transient,
non-acknowledged messages are fastest of all, this depends on your
application and use cases.

I use default journal settings on the broker. I use c++ broker, I
built it
on RedHat 5.5.
I didn't notice this, sorry. The preceding is most relevant for the
Java broker, unfortunately, and I don't know how it translates to the
C++ version?

Andrew.


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to