Andrew,

Thank you very much for your response.
I have performed several tests with java cliens and c++ broker and I'm 
shoked. 
I use qpid version 0.6, 100 Mbit ethernet, 100byte message size, 1 
persistance queue, 1 subscriber
In all tests I use 1 persistance queue, and the result is the folowwing:
1. Transactions in client are not used, messages non persistance - I got 
up to 60000 msg/sec 
2. Transactions in client are not used, messages IS persistance - I got up 
to 20000 msg/sec 
3. Transactions in client ARE used, messages IS persistance - I got up to 
90 msg/sec 

I use the following code:
1. Transactions in client are not used
public class JmsThreadReceiver {

    private static final int INTERVAL = 5;
    private static final int BIG_INTERVAL = 15;
    private static final int MILLS_IN_SECOND = 1000;
    private static final String JNDI_QPID_CONNECTION_FACTORY = 
"QpidConnectionFactory";
    private Logger log = Logger.getLogger(this.getClass().getName());
    private static final String QPID_JNDI_PROPERTIES = 
"qpid.jndi.properties";
    private Listener listener = new Listener();
    private long msgCount = 0;
    private long bigMsgCount = 0;
 
    public class Listener implements MessageListener {
        @Override
        public void onMessage(Message msg) {
            msgCount++;
            bigMsgCount++;
        }
    }
    public void startListener(final String queueJndiName) 
        throws IOException, NamingException, JMSException, 
InterruptedException {

        long startTime = System.currentTimeMillis();
        long startBigTime = System.currentTimeMillis();
        // Load JNDI properties
        Properties properties = new Properties();
        properties.load(this
.getClass().getClassLoader().getResourceAsStream(QPID_JNDI_PROPERTIES));
        // Create the initial context
        Context ctx = new InitialContext(properties);
        // Lookup the connection factory
        ConnectionFactory conFac = (ConnectionFactory) ctx.lookup(
JNDI_QPID_CONNECTION_FACTORY);
        // look up destination
        Destination destination = (Destination) ctx.lookup(queueJndiName);
        // create the connection
        Connection connection = null;
        try {
            connection = conFac.createConnection();
            // As this application is using a MessageConsumer we need to 
set an ExceptionListener on the connection
            // so that errors raised within the JMS client library can be 
reported to the application
            log.info(": Setting an ExceptionListener on the connection as 
sample uses a MessageConsumer");
            connection.setExceptionListener(new ExceptionListener() {
                public void onException(JMSException e) {
                    // The connection may have broken invoke reconnect 
code if available.
                    log.log(Level.SEVERE, "The sample received for [" 
                            + queueJndiName + "] an exception through the 
ExceptionListener", e);
                    System.exit(0);
                }
            });
            // Create a session on the connection
            // This session is a default choice of non-transacted and uses
            // the auto acknowledge feature of a session.
            log.info(": Creating a non-transacted, auto-acknowledged 
session");
            Session session = connection.createSession(false, Session.
AUTO_ACKNOWLEDGE);
            // Create a MessageConsumer
            log.info(": Creating a MessageConsumer");
            MessageConsumer messageConsumer = 
session.createConsumer(destination);
            // Set a message listener on the messageConsumer
            messageConsumer.setMessageListener(listener);
            // Now the messageConsumer is set up we can start the 
connection
            log.info("Starting connection so MessageConsumer can receive 
messages");
            connection.start();
            while (true) {
                Thread.sleep(INTERVAL * MILLS_IN_SECOND);
                long currTime = System.currentTimeMillis();
                double throughput = (double) msgCount / ((currTime - 
startTime) / MILLS_IN_SECOND);
                log.info("JMS [" + queueJndiName + "] msgCount=[" + 
msgCount + "], throughput=[" + throughput + "]");
                msgCount = 0;
                startTime = currTime;
 
                currTime = System.currentTimeMillis();
                long bigInterval = currTime - startBigTime;
                if (bigInterval >= BIG_INTERVAL * MILLS_IN_SECOND) {
                    throughput = (double) bigMsgCount / ((bigInterval) / 
MILLS_IN_SECOND);
                    log.info("BIG### [" + queueJndiName + "] 
bigMsgCount=[" + bigMsgCount + "], throughput=[" + throughput + "]");
                    bigMsgCount = 0;
                    startBigTime = currTime;
                }
            }
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    log.log(Level.SEVERE, e.getMessage(), e);
                }
            }
        }
    }
}

2. Transactions in client ARE used
public class JmsTxThreadReceiver {
    private static final int INTERVAL = 5;
    private static final int BIG_INTERVAL = 15;
    private static final int MILLS_IN_SECOND = 1000;
    private static final String JNDI_QPID_CONNECTION_FACTORY = 
"QpidConnectionFactory";
    private Logger log = Logger.getLogger(this.getClass().getName());
    private static final String QPID_JNDI_PROPERTIES = 
"qpid.jndi.properties";
    private Listener listener = new Listener();
    private long msgCount = 0;
    private long bigMsgCount = 0;
    long startTime = System.currentTimeMillis();
    long startBigTime = System.currentTimeMillis();
    private final class Reader implements Runnable {
        private final String queueJndiName;
        private boolean isStop = false;
        private Reader(String queueJndiName) {
            this.queueJndiName = queueJndiName;
        }
        public void setStop(boolean isStop) {
            this.isStop = isStop;
        }
        @Override
        public void run() {
            while (!isStop) {
                try {
                    Thread.sleep(INTERVAL * MILLS_IN_SECOND);
                } catch (InterruptedException e) {
                    log.log(Level.SEVERE, e.getMessage(), e);
                }
                long currTime = System.currentTimeMillis();
                double throughput = (double) msgCount / ((currTime - 
startTime) / MILLS_IN_SECOND);
                log.info("JMS [" + queueJndiName + "] msgCount=[" + 
msgCount + "], throughput=[" + throughput + "]");
                msgCount = 0;
                startTime = currTime;

                currTime = System.currentTimeMillis();
                long bigInterval = currTime - startBigTime;
                if (bigInterval >= BIG_INTERVAL * MILLS_IN_SECOND) {
                    throughput = (double) bigMsgCount / ((bigInterval) / 
MILLS_IN_SECOND);
                    log.info("BIG### [" + queueJndiName + "] 
bigMsgCount=[" + bigMsgCount + "], throughput=["
                            + throughput + "]");
                    bigMsgCount = 0;
                    startBigTime = currTime;
                }
            }
        }
    }
    public void startListener(final String queueJndiName) throws 
IOException, NamingException, JMSException,
            InterruptedException {
        // Load JNDI properties
        Properties properties = new Properties();
        properties.load(this
.getClass().getClassLoader().getResourceAsStream(QPID_JNDI_PROPERTIES));
        // Create the initial context
        Context ctx = new InitialContext(properties);
        // Lookup the connection factory
        ConnectionFactory conFac = (ConnectionFactory) ctx.lookup(
JNDI_QPID_CONNECTION_FACTORY);
        // look up destination
        Destination destination = (Destination) ctx.lookup(queueJndiName);
        // create the connection
        Connection connection = null;
        try {
            connection = conFac.createConnection();
            // As this application is using a MessageConsumer we need to 
set an ExceptionListener on the connection
            // so that errors raised within the JMS client library can be 
reported to the application
            log.info(": Setting an ExceptionListener on the connection as 
sample uses a MessageConsumer");
            connection.setExceptionListener(new ExceptionListener() {
                public void onException(JMSException e) {
                    // The connection may have broken invoke reconnect 
code if available.
                    log.log(Level.SEVERE, "The sample received for [" + 
queueJndiName
                            + "] an exception through the 
ExceptionListener", e);
                    System.exit(0);
                }
            });
            // Create a session on the connection
            // This session is a default choice of non-transacted and uses
            // the auto acknowledge feature of a session.
            log.info(": Creating a non-transacted, auto-acknowledged 
session");
            Session session = connection.createSession(true, Session
.SESSION_TRANSACTED);
            // Create a MessageConsumer
            log.info(": Creating a MessageConsumer");
            MessageConsumer messageConsumer = 
session.createConsumer(destination);
            // Set a message listener on the messageConsumer
            // messageConsumer.setMessageListener(listener);
            // Now the messageConsumer is set up we can start the 
connection
            log.info("Starting connection so MessageConsumer can receive 
messages");
            connection.start();
            Reader reader = new Reader(queueJndiName);
            Thread t = new Thread(reader);
            t.start();
            // todo:
            Message msg = null;
            while ((msg = messageConsumer.receive()) != null) {
                msgCount++;
                bigMsgCount++;
                session.commit();
            }
            reader.setStop(true);
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    log.log(Level.SEVERE, e.getMessage(), e);
                }
            }
        }
    }
}

3. jndi props

java.naming.factory.initial = 
org.apache.qpid.jndi.PropertiesFileInitialContextFactory
connectionfactory.QpidConnectionFactory=amqp://guest:guest@/test?brokerlist='tcp://qpidserver1:5672?retries='1000'&connectdelay='5000';'
queue.boris.queue1=boris.queue1
queue.boris.queue2=boris.queue2


Regards,
Boris



Andrew Kennedy <[email protected]> 
05/10/2010 23:44
Please respond to
[email protected]


To
[email protected]
cc

Subject
Re: Transaction java client performance issue






On 4 Oct 2010, at 14:39, [email protected] wrote:
> Folks, greetings.

Hi, Boris.

> Could you please help me to clarify the following performance issue 
> with
> transaction java qpid client (jms)
> I use org.apache.qpid.jndi.PropertiesFileInitialContextFactory to get
> messages from broker.
> I read messages in one thread in java main application
> If I use
>                         Session session = connection.createSession 
> (false, Session.AUTO_ACKNOWLEDGE);
> I got 10000 messages per seconds
>
> But if I use
>                         Session session = connection.createSession 
> (true, Session.AUTO_ACKNOWLEDGE);

Note that the second parameter is ignored if the first is set to 
"true" - it is preferable to set it to 'Session.SESSION_TRANSACTED' 
in this case, to make things clearer.

> and commit transaction after each message I got only 60 messages per
> seconds
>
> Could you please advice why is the difference is so significant?
> How can I improve performance for my installation?

Firstly, you are using persistent messages (the default) so they are 
being written to disk, and also transactions, which is the slowest 
throughput usage pattern. Secondly, you will be using the default 
prefetch setting, which is either 500 or 1000 (depending on broker 
version) which causes this amount of messages to be buffered by the 
client.

In order to increase performance, I would suggest two things - one, 
batch up your commits if possible, and two, set "maxprefetch" to this 
batch size. This can be done by adding "&maxprefetch='10'" to the end 
of the connection factory URL in your client properties file.

If you can do without persistence and transactions, then transient, 
non-acknowledged messages are fastest of all, this depends on your 
application and use cases.

> I use default journal settings on the broker. I use c++ broker, I 
> built it
> on RedHat 5.5.

I didn't notice this, sorry. The preceding is most relevant for the 
Java broker, unfortunately, and I don't know how it translates to the 
C++ version?

Andrew.
-- 
-- andrew d kennedy ? do not fold, bend, spindle, or mutilate ;
-- http://grkvlt.blogspot.com/ ? edinburgh : +44 7941 197 134 ;

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]



_______________________________________________________

The information contained in this message may be privileged and conf idential 
and protected from disclosure. If you are not the original intended recipient, 
you are hereby notified that any review, retransmission, dissemination, or 
other use of, or taking of any action in reliance upon, this information is 
prohibited. If you have received this communication in error, please notify the 
sender immediately by replying to this message and delete it from your 
computer. Thank you for your cooperation. Troika Dialog, Russia. 
If you need assistance please contact our Contact Center  (+7495) 258 0500 or 
go to www.troika.ru/eng/Contacts/system.wbp  

Reply via email to