Author: ritchiem
Date: Thu Jan 25 02:59:36 2007
New Revision: 499733

URL: http://svn.apache.org/viewvc?view=rev&rev=499733
Log:
Refactored to use CountDownLatch as using local count was wrong in multi 
threaded case.

Modified:
    
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java?view=diff&rev=499733&r1=499732&r2=499733
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
 Thu Jan 25 02:59:36 2007
@@ -20,8 +20,8 @@
  */
 package org.apache.qpid.ping;
 
-import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
-import uk.co.thebadgerset.junit.extensions.TimingController;
+//import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
+//import uk.co.thebadgerset.junit.extensions.TimingController;
 
 import javax.jms.MessageListener;
 import javax.jms.ObjectMessage;
@@ -35,297 +35,282 @@
 
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CountDownLatch;
 
 
-public class PingAsyncTestPerf extends PingTestPerf implements 
TimingControllerAware
+public class PingAsyncTestPerf extends PingTestPerf //implements 
TimingControllerAware
 {
-    private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class);
+//    private static Logger _logger = 
Logger.getLogger(PingAsyncTestPerf.class);
 
-    private TimingController _timingController;
+//    private TimingController _timingController;
 
-    private AsyncMessageListener _listener;
-
-    private volatile boolean _done = false;
+//    private AsyncMessageListener _listener;
 
     public PingAsyncTestPerf(String name)
     {
         super(name);
     }
 
-    /**
-     * Compile all the tests into a test suite.
-     */
-    public static Test suite()
-    {
-        // Build a new test suite
-        TestSuite suite = new TestSuite("Ping Performance Tests");
-
-        // Run performance tests in read committed mode.
-        suite.addTest(new PingAsyncTestPerf("testAsyncPingOk"));
-
-        return suite;
-    }
-
-    protected void setUp() throws Exception
-    {
-        // Create the test setups on a per thread basis, only if they have not 
already been created.
-
-        if (threadSetup.get() == null)
-        {
-            PerThreadSetup perThreadSetup = new PerThreadSetup();
-
-            // Extract the test set up paramaeters.
-            String brokerDetails = testParameters.getProperty(BROKER_PROPNAME);
-            String username = "guest";
-            String password = "guest";
-            String virtualpath = 
testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
-            int destinationscount = 
Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME));
-            String destinationname = 
testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME);
-            boolean persistent = 
Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
-            boolean transacted = 
Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
-            String selector = null;
-            boolean verbose = 
Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
-            int messageSize = 
Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
-            int rate = 
Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
-            boolean pubsub = 
Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME));
-
-
-            boolean afterCommit = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
-            boolean beforeCommit = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
-            boolean afterSend = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND));
-            boolean beforeSend = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND));
-            boolean failOnce = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE));
-
-            int batchSize = 
Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
-            int commitbatchSize = 
Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE));
-
-            // This is synchronized because there is a race condition, which 
causes one connection to sleep if
-            // all threads try to create connection concurrently
-            synchronized (this)
-            {
-                // Establish a client to ping a Queue and listen the reply 
back from same Queue
-                perThreadSetup._pingItselfClient = new 
TestPingItself(brokerDetails, username, password, virtualpath,
-                                                                      
destinationname, selector, transacted, persistent,
-                                                                      
messageSize, verbose,
-                                                                      
afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
-                                                                      
commitbatchSize, destinationscount, rate, pubsub);
-
-
-                _listener = new AsyncMessageListener(batchSize);
-
-                perThreadSetup._pingItselfClient.setMessageListener(_listener);
-                // Start the client connection
-                perThreadSetup._pingItselfClient.getConnection().start();
-
-                // Attach the per-thread set to the thread.
-                threadSetup.set(perThreadSetup);
-            }
-        }
-    }
-
-
-    public void testAsyncPingOk(int numPings)
-    {
-        _timingController = this.getTimingController();
-
-        _listener.setTotalMessages(numPings);
-
-        PerThreadSetup perThreadSetup = threadSetup.get();
-        if (numPings == 0)
-        {
-            _logger.error("Number of pings requested was zero.");
-        }
-
-        // Generate a sample message. This message is already time stamped and 
has its reply-to destination set.
-        ObjectMessage msg = null;
-
-        try
-        {
-            msg = perThreadSetup._pingItselfClient.getTestMessage(null,
-                                                                  
Integer.parseInt(testParameters.getProperty(
-                                                                          
MESSAGE_SIZE_PROPNAME)),
-                                                                  
Boolean.parseBoolean(testParameters.getProperty(
-                                                                          
PERSISTENT_MODE_PROPNAME)));
-        }
-        catch (JMSException e)
-        {
-
-        }
-
-        // start the test
-        long timeout = 
Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME));
-
-        String correlationID = 
Long.toString(perThreadSetup._pingItselfClient.getNewID());
-
-        try
-        {
-            _logger.debug("Sending messages");
-
-            perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings, 
correlationID);
-
-            _logger.debug("All sent");
-        }
-        catch (JMSException e)
-        {
-            e.printStackTrace();
-            Assert.fail("JMS Exception Recevied" + e);
-        }
-        catch (InterruptedException e)
-        {
-            e.printStackTrace();
-        }
-
-        while (!_done)
-        {
-            try
-            {
-                _logger.debug("Awating test finish");
-
-                
perThreadSetup._pingItselfClient.getEndLock(correlationID).await(timeout, 
TimeUnit.MILLISECONDS);
-
-                if 
(perThreadSetup._pingItselfClient.getEndLock(correlationID).getCount() != 0)
-                {
-                    _logger.error("Timeout occured");
-                    _done = true;
-                }
-                else
-                {
-                    _logger.error("Countdown reached Done?" + _done);
-                }
-                //Allow the time out to exit the loop.
-            }
-            catch (InterruptedException e)
-            {
-                //ignore
-                _logger.error("Awaiting test end interrupted.");
-
-            }
-        }
-
-        perThreadSetup._pingItselfClient.removeLock(correlationID);
-
-        // Fail the test if the timeout was exceeded.
-        int numReplies = _listener.getReplyCount();
-
-        _logger.info("Test Finished");
-
-        if (numReplies != numPings)
-        {
-
-            try
-            {
-                
perThreadSetup._pingItselfClient.commitTx(perThreadSetup._pingItselfClient.getConsumerSession());
-            }
-            catch (JMSException e)
-            {
-                _logger.error("Error commiting recevied messages", e);
-            }
-            try
-            {
-                _timingController.completeTest(false, numPings - numReplies);
-            }
-            catch (InterruptedException e)
-            {
-                //ignore
-            }
-            Assert.fail("The ping timed out after " + timeout + " ms. Messages 
Sent = " + numPings + ", MessagesReceived = " + numReplies);
-        }
-    }
-
-    public void setTimingController(TimingController timingController)
-    {
-        _timingController = timingController;
-    }
-
-    public TimingController getTimingController()
-    {
-        return _timingController;
-    }
-
-
-    private class AsyncMessageListener implements MessageListener
-    {
-        private AtomicInteger _messageReceived;
-        private volatile int _totalMessages;
-        private int _batchSize;
-
-        public AsyncMessageListener(int batchSize, int totalMessages)
-        {
-            _batchSize = batchSize;
-            _totalMessages = totalMessages;
-            _messageReceived = new AtomicInteger(0);
-        }
-
-        public AsyncMessageListener(int batchSize)
-        {
-            _batchSize = batchSize;
-            _totalMessages = -1;
-            _messageReceived = new AtomicInteger(0);
-        }
-
-        public void setTotalMessages(int newTotal)
-        {
-            _totalMessages = newTotal;
-            _messageReceived.set(0);
-        }
-
-        public void onMessage(Message message)
-        {
-            _logger.trace("Message Recevied");
-
-            int messagesReceived = _messageReceived.incrementAndGet();
-
-            try
-            {
-                if (messagesReceived % _batchSize == 0)
-                {
-                    if (_timingController != null)
-                    {
-                        _timingController.completeTest(true, _batchSize);
-                    }
-
-                    if (messagesReceived == _totalMessages)
-                    {
-                        _done = true;
-                    }
-                }
-                else if (messagesReceived == _totalMessages)
-                {
-                    _logger.info("Test Completed.. signalling");
-                    doDone();
-                }
-
-            }
-            catch (InterruptedException e)
-            {
-                _logger.error("Interupted Test");
-//                doDone();
-            }
-
-        }
-
-        private void doDone()
-        {
-            _done = true;
-
-            _logger.trace("Messages received:" + _messageReceived.get());
-            _logger.trace("Total Messages :" + _totalMessages);
-
-            try
-            {
-                _timingController.completeTest(true, _totalMessages - 
_messageReceived.get());
-            }
-            catch (InterruptedException e)
-            {
-                //ignore
-            }
-        }
-
-        public int getReplyCount()
-        {
-            return _messageReceived.get();
-        }
-
-    }
+//    /**
+//     * Compile all the tests into a test suite.
+//     */
+//    public static Test suite()
+//    {
+//        // Build a new test suite
+//        TestSuite suite = new TestSuite("Ping Performance Tests");
+//
+//        // Run performance tests in read committed mode.
+//        suite.addTest(new PingAsyncTestPerf("testAsyncPingOk"));
+//
+//        return suite;
+//    }
+//
+//    protected void setUp() throws Exception
+//    {
+//        // Create the test setups on a per thread basis, only if they have 
not already been created.
+//
+//        if (threadSetup.get() == null)
+//        {
+//            PerThreadSetup perThreadSetup = new PerThreadSetup();
+//
+//            // Extract the test set up paramaeters.
+//            String brokerDetails = 
testParameters.getProperty(BROKER_PROPNAME);
+//            String username = "guest";
+//            String password = "guest";
+//            String virtualpath = 
testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
+//            int destinationscount = 
Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME));
+//            String destinationname = 
testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME);
+//            boolean persistent = 
Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
+//            boolean transacted = 
Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
+//            String selector = null;
+//            boolean verbose = 
Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
+//            int messageSize = 
Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
+//            int rate = 
Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
+//            boolean pubsub = 
Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME));
+//
+//
+//            boolean afterCommit = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
+//            boolean beforeCommit = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
+//            boolean afterSend = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND));
+//            boolean beforeSend = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND));
+//            boolean failOnce = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE));
+//
+//            int batchSize = 
Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
+//            int commitbatchSize = 
Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE));
+//
+//            // This is synchronized because there is a race condition, which 
causes one connection to sleep if
+//            // all threads try to create connection concurrently
+//            synchronized (this)
+//            {
+//                // Establish a client to ping a Queue and listen the reply 
back from same Queue
+//                perThreadSetup._pingItselfClient = new 
TestPingItself(brokerDetails, username, password, virtualpath,
+//                                                                      
destinationname, selector, transacted, persistent,
+//                                                                      
messageSize, verbose,
+//                                                                      
afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
+//                                                                      
commitbatchSize, destinationscount, rate, pubsub);
+//            }
+//
+//            // Attach the per-thread set to the thread.
+//            threadSetup.set(perThreadSetup);
+//
+//            _listener = new AsyncMessageListener(batchSize);
+//
+//            perThreadSetup._pingItselfClient.setMessageListener(_listener);
+//            // Start the client connection
+//            perThreadSetup._pingItselfClient.getConnection().start();
+//
+//        }
+//    }
+//
+//
+//    public void testAsyncPingOk(int numPings)
+//    {
+//        _timingController = this.getTimingController();
+//
+//        _listener.setTotalMessages(numPings);
+//
+//        PerThreadSetup perThreadSetup = threadSetup.get();
+//        if (numPings == 0)
+//        {
+//            _logger.error("Number of pings requested was zero.");
+//            fail("Number of pings requested was zero.");
+//        }
+//
+//        // Generate a sample message. This message is already time stamped 
and has its reply-to destination set.
+//        ObjectMessage msg = null;
+//
+//        try
+//        {
+//            msg = perThreadSetup._pingItselfClient.getTestMessage(null,
+//                                                                  
Integer.parseInt(testParameters.getProperty(
+//                                                                          
MESSAGE_SIZE_PROPNAME)),
+//                                                                  
Boolean.parseBoolean(testParameters.getProperty(
+//                                                                          
PERSISTENT_MODE_PROPNAME)));
+//        }
+//        catch (JMSException e)
+//        {
+//
+//        }
+//
+//        // start the test
+//        long timeout = 
Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME));
+//
+//        String correlationID = 
Long.toString(perThreadSetup._pingItselfClient.getNewID());
+//
+//        try
+//        {
+//            _logger.debug("Sending messages");
+//
+//            perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, 
numPings, correlationID);
+//
+//            _logger.debug("All sent");
+//        }
+//        catch (JMSException e)
+//        {
+//            e.printStackTrace();
+//            Assert.fail("JMS Exception Recevied" + e);
+//        }
+//        catch (InterruptedException e)
+//        {
+//            e.printStackTrace();
+//        }
+//
+//        try
+//        {
+//            _logger.debug("Awating test finish");
+//
+//            
perThreadSetup._pingItselfClient.getEndLock(correlationID).await(timeout, 
TimeUnit.MILLISECONDS);
+//
+//            if 
(perThreadSetup._pingItselfClient.getEndLock(correlationID).getCount() != 0)
+//            {
+//                _logger.error("Timeout occured");
+//            }
+//            //Allow the time out to exit the loop.
+//        }
+//        catch (InterruptedException e)
+//        {
+//            //ignore
+//            _logger.error("Awaiting test end was interrupted.");
+//
+//        }
+//
+//        // Fail the test if the timeout was exceeded.
+//        int numReplies = numPings - (int) 
perThreadSetup._pingItselfClient.removeLock(correlationID).getCount();
+//
+//        _logger.info("Test Finished");
+//
+//        if (numReplies != numPings)
+//        {
+//            try
+//            {
+//                
perThreadSetup._pingItselfClient.commitTx(perThreadSetup._pingItselfClient.getConsumerSession());
+//            }
+//            catch (JMSException e)
+//            {
+//                _logger.error("Error commiting recevied messages", e);
+//            }
+//            try
+//            {
+//                _timingController.completeTest(false, numPings - numReplies);
+//            }
+//            catch (InterruptedException e)
+//            {
+//                //ignore
+//            }
+//            Assert.fail("The ping timed out after " + timeout + " ms. 
Messages Sent = " + numPings + ", MessagesReceived = " + numReplies);
+//        }
+//    }
+//
+//    public void setTimingController(TimingController timingController)
+//    {
+//        _timingController = timingController;
+//    }
+//
+//    public TimingController getTimingController()
+//    {
+//        return _timingController;
+//    }
+//
+//
+//    private class AsyncMessageListener implements MessageListener
+//    {
+//        private volatile int _totalMessages;
+//        private int _batchSize;
+//        PerThreadSetup _perThreadSetup;
+//
+//        public AsyncMessageListener(int batchSize)
+//        {
+//            this(batchSize, -1);
+//        }
+//
+//        public AsyncMessageListener(int batchSize, int totalMessages)
+//        {
+//            _batchSize = batchSize;
+//            _totalMessages = totalMessages;
+//            _perThreadSetup = threadSetup.get();
+//        }
+//
+//        public void setTotalMessages(int newTotal)
+//        {
+//            _totalMessages = newTotal;
+//        }
+//
+//        public void onMessage(Message message)
+//        {
+//            try
+//            {
+//                _logger.trace("Message Recevied");
+//
+//                CountDownLatch count = 
_perThreadSetup._pingItselfClient.getEndLock(message.getJMSCorrelationID());
+//
+//                int messagesLeft = (int) count.getCount();
+//
+//                int messagesReceived = _totalMessages - messagesLeft;
+//
+//                try
+//                {
+//                    if (messagesReceived % _batchSize == 0)
+//                    {
+//                        if (_timingController != null)
+//                        {
+//                            _timingController.completeTest(true, _batchSize);
+//                        }
+//                    }
+//                    else if (messagesReceived == _totalMessages)
+//                    {
+//                        _logger.info("Test Completed.. signalling");
+//                        doDone(messagesReceived);
+//                    }
+//
+//                }
+//                catch (InterruptedException e)
+//                {
+//                    _logger.error("Interupted Test");
+////                doDone(messagesReceived);
+//                }
+//            }
+//            catch (JMSException e)
+//            {
+//                _logger.warn("There was a JMSException", e);
+//            }
+//
+//        }
+//
+//        private void doDone(int messageCount)
+//        {
+//            _logger.trace("Messages received:" + messageCount);
+//            _logger.trace("Total Messages :" + _totalMessages);
+//
+//            try
+//            {
+//                _timingController.completeTest(true, messageCount);
+//            }
+//            catch (InterruptedException e)
+//            {
+//                //ignore
+//            }
+//        }
+//
+//    }
 
 }


Reply via email to