Hi,

   I've been trying to benchmark the BerkeleyDb against DerbyDb with the
java broker to find which DB is more perform-ant against the java broker.

I have heard from earlier discussing that berkeleydb runs faster in the
scalability tests of Qpid. However, some of my tests showed the contrary.

I had setup BDB using the "ant build release-bin -Dmodules.opt=bdbstore
-Ddownload-bdb=true" as directed in Robbie's earlier email in a similar
topic thread.

I tried running two tests in particular which are of interest to me

Test 1)
Produce 1000 messages to the broker in transacted mode such that after
every enqueue you commit the transaction.

The time taken to enqueue a message in transacted mode from the above test
is approx 5-8 ms for derbyDb and about 18-25 ms in the case of BerkeleyDb.


Test 2)
Produce 1000 messages with auto-ack mode, with a consumer already setup for
the queue.
When the 1000th message is processed, calculate it's latency by doing
Latency =  (System.currentTimeInMillis() - message.getJMSTimeStamp()).

Try to compute an *approximate* dequeue rate by doing
numberOfMessageProcessed/Latency.

In the above test, the results I got were such that,

DerbyDb - 300 - 350 messages/second
BDB - 40 - 50 messages/second


I ran the tests against trunk(12/1)

My Connection to Qpid has a max prefetch of 1 (as my use case requires
this) and has tcp_nodelay set to true.

I have attached the tests that I used for reference.

Can someone please tell me if I'm doing something wrong in the above tests
or if there is an additional configuration that I'm missing?

Or are these results valid..? If valid, it will be great if the difference
could be explained.

Hoping to hear soon.

Thank you,
-- 
-Praveen
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;
import java.lang.System.*;
import java.io.PrintStream.*;
import javax.jms.*;
import javax.jms.Queue;
import javax.naming.*;

class QpidDequeueRateTest {

    private transient Connection connection;
    transient Session session;
    private transient MessageProducer emptyProducer;
    // The URL Used to connect tot he broker.
    private static String connUrl = "amqp://guest:guest@test/?brokerlist='tcp://localhost:5672?tcp_nodelay='true''&max_prefetch='1'";

    final String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";

    final String CONNECTION_JNDI_NAME = "local";

    private InitialContext _ctx;
    Map<String, Destination> queueNameToDestination = new HashMap<String, Destination>();
    Map<String, Destination> topicNameToDestination = new HashMap<String, Destination>();

    // the options used when creating a new queue
    private static String options = ";{create: always , node : {type : queue, durable : true}}";


    public static void main(String[] args) {
        int expectedPerQueue = 1000;
        int queueCount = 1;
        String baseQueueName = "Testing-";
        Map<String, CountDownLatch> queuesToLatchMap = new HashMap<String, CountDownLatch>();
        List<QpidMqHandler> handlers = null;
        QpidDequeueRateTest test = null;
        try {
            test = new QpidDequeueRateTest();
            // open connection to the broker
            test.open();

            for (int i=0; i<queueCount; i++) {
                queuesToLatchMap.put(baseQueueName + i, new CountDownLatch(expectedPerQueue));
            }


            // create The queues in the broker
            for (String queueName : queuesToLatchMap.keySet()) {
                test.createQueue(queueName);
            }

            // register the handlers for the queue.
            handlers = new ArrayList<QpidMqHandler>();
            for (Map.Entry<String, CountDownLatch> entry : queuesToLatchMap.entrySet()) {
                QpidDequeueRateTest.QpidMqHandler handler = new QpidMqHandler(entry.getValue());
                handlers.add(handler);
                test.listen(entry.getKey(), handler);
            }
        } catch (Throwable e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }

        // produce messages
        for (int i=0; i<expectedPerQueue; i++) {
            for (String queueName : queuesToLatchMap.keySet()) {
                try {
                    // enqueue 1 message and commit that message
                    test.enqueue(queueName, "ABCDEF");
                } catch(Throwable e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
            }
        }
        
        try {
            test.waitOnHandlers(handlers);
            test.close();
        } catch (Exception e) {

        } 
        double lastMsgDelaySecs = (double)QpidMqHandler.lastMessageDelay/1000;
        double dequeueRate =(double)expectedPerQueue/lastMsgDelaySecs;
        System.out.println("The dequeue rate is: " + dequeueRate + " messages/second");

    }

    /**
     * Open a new connection to the broker and start it.
     */
    public void open() throws Exception {
        // Set the properties ...
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
        properties.put("connectionfactory." + CONNECTION_JNDI_NAME, connUrl);

        try
        {
            _ctx = new InitialContext(properties);
        }
        catch (NamingException e)
        {
            System.err.println("Error Setting up JNDI Context:" + e);
        }
        connection = ((ConnectionFactory) _ctx.lookup(CONNECTION_JNDI_NAME)).createConnection();

        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
        emptyProducer = session.createProducer(null);
        emptyProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
        connection.start();
    }

    /**
     * Close the connection to the broker
     */
    public void close() throws Exception {
        try {
            if (session != null) {
                session.close();
            }
        } finally {
            try {
                if (connection != null) {
                    connection.close();
                }
            } finally {
                if( _ctx != null) {
                    _ctx.close();      
                }
            }
        }

    }

    /**
     * Creates a new queue and adds it to the destination to queue map.
     * 
     */
    public String createQueue(String queueName) throws Exception {
        Destination destination = session.createQueue(queueName + options);
        if (destination != null) {
            queueNameToDestination.put(queueName, destination);
            return queueName;
        } else {
            System.out.println("Queue Created Null");
            return null;
        }
    }


    /**
     * Create a listener for the queue.
     */
    public void listen(String p2pConsumer, QpidMqHandler handler) throws Exception {
        Destination destination = queueNameToDestination.get(p2pConsumer);
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(handler);
    }

    /**
     * Wait for consumers to complete.
     */
    private void waitOnHandlers(List<QpidMqHandler> handlers) throws Exception {
        for (QpidMqHandler handler : handlers) {
            boolean handlerCompleted = handler.latch.await(60, TimeUnit.SECONDS);
            if(!handlerCompleted) {
                System.out.println("The test failed to complete");
            }
        }
    }

    /**
     * Enqueue Messages
     */
    public String enqueue(String p2pConsumer, String payload) throws Exception {
        MapMessage message = session.createMapMessage();
        message.setString("Body", payload);
        Destination destination = queueNameToDestination.get(p2pConsumer);
        emptyProducer.send(destination, message); 
        return message.getJMSMessageID();
    }


    /**
     * The callback handler
     */
    static public class QpidMqHandler  implements MessageListener {

        final CountDownLatch latch;
        static long lastMessageDelay;
        public QpidMqHandler(CountDownLatch suppliedLatch) {
            this.latch = suppliedLatch;
        }

        @Override
        final public void onMessage(Message arg0) {
            MapMessage mapMessage = (MapMessage) arg0;
            try {
                if(latch.getCount() == 1) {
                    lastMessageDelay = System.currentTimeMillis() -  mapMessage.getJMSTimestamp();
                }
                latch.countDown();
            } catch (Exception x) {
                throw new RuntimeException(x);
            }
        }

    }



}
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;
import java.lang.System.*;
import java.io.PrintStream.*;
import javax.jms.*;
import javax.jms.Queue;
import javax.naming.*;

class QpidEnqueueRateTest {

    private transient Connection connection;
    transient Session session;
    private transient MessageProducer emptyProducer;
    // The URL Used to connect tot he broker.
    private static String connUrl = "amqp://guest:guest@test/?brokerlist='tcp://localhost:5672?tcp_nodelay='true''&max_prefetch='1'";

    final String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";

    final String CONNECTION_JNDI_NAME = "local";

    private InitialContext _ctx;
    Map<String, Destination> queueNameToDestination = new HashMap<String, Destination>();
    Map<String, Destination> topicNameToDestination = new HashMap<String, Destination>();

    // the options used when creating a new queue
    private static String options = ";{create: always , node : {type : queue, durable : true}}";


    public static void main(String[] args) {
        int expectedPerQueue = 1000;
        int queueCount = 1;
        String baseQueueName = "Testing-";
        Map<String, CountDownLatch> queuesToLatchMap = new HashMap<String, CountDownLatch>();
        List<QpidMqHandler> handlers = null;
        QpidEnqueueRateTest test = null;
        try {
            test = new QpidEnqueueRateTest();
            // open connection to the broker
            test.open();

            for (int i=0; i<queueCount; i++) {
                queuesToLatchMap.put(baseQueueName + i, new CountDownLatch(expectedPerQueue));
            }


            // create The queues in the broker
            for (String queueName : queuesToLatchMap.keySet()) {
                test.createQueue(queueName);
            }

            // register the handlers for the queue.
            handlers = new ArrayList<QpidMqHandler>();
            for (Map.Entry<String, CountDownLatch> entry : queuesToLatchMap.entrySet()) {
                QpidEnqueueRateTest.QpidMqHandler handler = new QpidMqHandler(entry.getValue());
                handlers.add(handler);
                test.listen(entry.getKey(), handler);
            }
        } catch (Throwable e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }

        // produce messages
        long startTime = System.currentTimeMillis();
        for (int i=0; i<expectedPerQueue; i++) {
            for (String queueName : queuesToLatchMap.keySet()) {
                try {
                    // enqueue 1 message and commit that message
                    test.enqueue(queueName, "ABCDEF");
                    test.commitSession();
                } catch(Throwable e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
            }
        }
        long totalTime = (System.currentTimeMillis() - startTime);
        double avgTime = ((double)totalTime/expectedPerQueue);
        System.out.println("Time taken to enqueue a single message: "  + avgTime + " ms");
        
        try {
            test.waitOnHandlers(handlers);
            test.close();
        } catch (Exception e) {

        } 
    }

    /**
     * Open a new connection to the broker and start it.
     */
    public void open() throws Exception {
        // Set the properties ...
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
        properties.put("connectionfactory." + CONNECTION_JNDI_NAME, connUrl);

        try
        {
            _ctx = new InitialContext(properties);
        }
        catch (NamingException e)
        {
            System.err.println("Error Setting up JNDI Context:" + e);
        }
        connection = ((ConnectionFactory) _ctx.lookup(CONNECTION_JNDI_NAME)).createConnection();

        // create transacted session
        session = connection.createSession(true, Session.SESSION_TRANSACTED);
        emptyProducer = session.createProducer(null);
        emptyProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
        connection.start();
    }

    /**
     * Close the connection to the broker
     */
    public void close() throws Exception {
        try {
            if (session != null) {
                session.close();
            }
        } finally {
            try {
                if (connection != null) {
                    connection.close();
                }
            } finally {
                if( _ctx != null) {
                    _ctx.close();      
                }
            }
        }

    }

    /**
     * Creates a new queue and adds it to the destination to queue map.
     * 
     */
    public String createQueue(String queueName) throws Exception {
        Destination destination = session.createQueue(queueName + options);
        if (destination != null) {
            queueNameToDestination.put(queueName, destination);
            return queueName;
        } else {
            System.out.println("Queue Created Null");
            return null;
        }
    }


    /**
     * Create a listener for the queue.
     */
    public void listen(String p2pConsumer, QpidMqHandler handler) throws Exception {
        Destination destination = queueNameToDestination.get(p2pConsumer);
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(handler);
    }

    /**
     * Wait for consumers to complete.
     */
    private void waitOnHandlers(List<QpidMqHandler> handlers) throws Exception {
        for (QpidMqHandler handler : handlers) {
            boolean handlerCompleted = handler.latch.await(60, TimeUnit.SECONDS);
            if(!handlerCompleted) {
                System.out.println("The test failed to complete");
            }
        }
    }

    /**
     * Enqueue Messages
     */
    public String enqueue(String p2pConsumer, String payload) throws Exception {
        MapMessage message = session.createMapMessage();
        message.setString("Body", payload);
        Destination destination = queueNameToDestination.get(p2pConsumer);
        emptyProducer.send(destination, message); 
        return message.getJMSMessageID();
    }

    /**
     * Commit the session
     */
    public void commitSession() throws Exception {
        session.commit();
    }


    /**
     * The callback handler
     */
    static public class QpidMqHandler  implements MessageListener {

        final CountDownLatch latch;
        public QpidMqHandler(CountDownLatch suppliedLatch) {
            this.latch = suppliedLatch;
        }

        @Override
        final public void onMessage(Message arg0) {
            MapMessage mapMessage = (MapMessage) arg0;
            try {
                latch.countDown();
            } catch (Exception x) {
                throw new RuntimeException(x);
            }
        }

    }



}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscr...@qpid.apache.org

Reply via email to