Author: ritchiem
Date: Thu Jan 25 02:04:52 2007
New Revision: 499716

URL: http://svn.apache.org/viewvc?view=rev&rev=499716
Log:
Race condition fixed fro AsyncTestPerf

Modified:
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
    
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=499716&r1=499715&r2=499716
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 Thu Jan 25 02:04:52 2007
@@ -76,7 +76,9 @@
      */
     protected static final int DEFAULT_MESSAGE_SIZE = 0;
 
-    /** This is set and used when the test is for multiple-destinations */
+    /**
+     * This is set and used when the test is for multiple-destinations
+     */
     protected static final int DEFAULT_DESTINATION_COUNT = 0;
 
     protected static final int DEFAULT_RATE = 0;
@@ -202,10 +204,10 @@
             _throttleBatchSize = (int) Math.pow(100, x);
             int throttleRate = rate / _throttleBatchSize;
 
-            _logger.info("rate = " + rate);
-            _logger.info("x = " + x);
-            _logger.info("_throttleBatchSize = " + _throttleBatchSize);
-            _logger.info("throttleRate = " + throttleRate);
+            _logger.debug("rate = " + rate);
+            _logger.debug("x = " + x);
+            _logger.debug("_throttleBatchSize = " + _throttleBatchSize);
+            _logger.debug("throttleRate = " + throttleRate);
 
             rateLimiter = new Throttle();
             rateLimiter.setRate(throttleRate);
@@ -519,6 +521,11 @@
 
             if (trafficLight != null)
             {
+                if (_messageListener != null)
+                {
+                    _messageListener.onMessage(message);
+                }
+
                 _logger.trace("Reply was expected, decrementing the latch for 
the id.");
                 trafficLight.countDown();
 
@@ -529,11 +536,6 @@
                     commitTx(getConsumerSession());
                 }
 
-                if (_messageListener != null)
-                {
-                    _messageListener.onMessage(message);
-                }
-
             }
             else
             {
@@ -570,32 +572,39 @@
      */
     public int pingAndWaitForReply(Message message, int numPings, long 
timeout) throws JMSException, InterruptedException
     {
-        // Put a unique correlation id on the message before sending it.
-        String messageCorrelationId = Long.toString(getNewID());
+        String messageCorrelationId = null;
 
-        pingNoWaitForReply(message, numPings, messageCorrelationId);
+        try
+        {
+            // Put a unique correlation id on the message before sending it.
+            messageCorrelationId = Long.toString(getNewID());
 
-        CountDownLatch trafficLight = trafficLights.get(messageCorrelationId);
-        // Block the current thread until a reply to the message is received, 
or it times out.
-        trafficLight.await(timeout, TimeUnit.MILLISECONDS);
+            pingNoWaitForReply(message, numPings, messageCorrelationId);
 
-        trafficLights.remove(messageCorrelationId);
+            CountDownLatch trafficLight = 
trafficLights.get(messageCorrelationId);
+            // Block the current thread until a reply to the message is 
received, or it times out.
+            trafficLight.await(timeout, TimeUnit.MILLISECONDS);
 
-        // Work out how many replies were receieved.
-        int numReplies = numPings - (int) trafficLight.getCount();
+            // Work out how many replies were receieved.
+            int numReplies = numPings - (int) trafficLight.getCount();
 
-        if ((numReplies < numPings) && _verbose)
-        {
-            _logger.info("Timed out (" + timeout + " ms) before all replies 
received on id, " + messageCorrelationId);
+            if ((numReplies < numPings) && _verbose)
+            {
+                _logger.info("Timed out (" + timeout + " ms) before all 
replies received on id, " + messageCorrelationId);
+            }
+            else if (_verbose)
+            {
+                _logger.info("Got all replies on id, " + messageCorrelationId);
+            }
+
+            commitTx(getConsumerSession());
+
+            return numReplies;
         }
-        else if (_verbose)
+        finally
         {
-            _logger.info("Got all replies on id, " + messageCorrelationId);
+            removeLock(messageCorrelationId);
         }
-
-        commitTx(getConsumerSession());
-        
-        return numReplies;
     }
 
     public long getNewID()
@@ -603,14 +612,20 @@
         return idGenerator.incrementAndGet();
     }
 
+    public CountDownLatch removeLock(String correlationID)
+    {
+        return trafficLights.remove(correlationID);
+    }
+
+
     /*
-     * Sends the specified ping message but does not wait for a correlating 
reply.
-     *
-     * @param message  The message to send.
-     * @param numPings The number of pings to send.
-     * @return The reply, or null if no reply arrives before the timeout.
-     * @throws JMSException All underlying JMSExceptions are allowed to fall 
through.
-     */
+    * Sends the specified ping message but does not wait for a correlating 
reply.
+    *
+    * @param message  The message to send.
+    * @param numPings The number of pings to send.
+    * @return The reply, or null if no reply arrives before the timeout.
+    * @throws JMSException All underlying JMSExceptions are allowed to fall 
through.
+    */
     public void pingNoWaitForReply(Message message, int numPings, String 
messageCorrelationId) throws JMSException, InterruptedException
     {
         // Create a count down latch to count the number of replies with. This 
is created before the message is sent

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java?view=diff&rev=499716&r1=499715&r2=499716
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
 Thu Jan 25 02:04:52 2007
@@ -20,8 +20,8 @@
  */
 package org.apache.qpid.ping;
 
-//import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
-//import uk.co.thebadgerset.junit.extensions.TimingController;
+import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
+import uk.co.thebadgerset.junit.extensions.TimingController;
 
 import javax.jms.MessageListener;
 import javax.jms.ObjectMessage;
@@ -33,15 +33,17 @@
 import junit.framework.TestSuite;
 import org.apache.log4j.Logger;
 
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.TimeUnit;
 
 
-public class PingAsyncTestPerf extends PingTestPerf //implements 
TimingControllerAware
+public class PingAsyncTestPerf extends PingTestPerf implements 
TimingControllerAware
 {
-//    private static Logger _logger = 
Logger.getLogger(PingAsyncTestPerf.class);
+    private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class);
 
-//    private TimingController _timingController;
+    private TimingController _timingController;
 
-//    private AsyncMessageListener _listener;
+    private AsyncMessageListener _listener;
 
     private volatile boolean _done = false;
 
@@ -50,260 +52,280 @@
         super(name);
     }
 
-//    /**
-//     * Compile all the tests into a test suite.
-//     */
-//    public static Test suite()
-//    {
-//        // Build a new test suite
-//        TestSuite suite = new TestSuite("Ping Performance Tests");
-//
-//        // Run performance tests in read committed mode.
-//        suite.addTest(new PingAsyncTestPerf("testAsyncPingOk"));
-//
-//        return suite;
-//    }
-//
-//    protected void setUp() throws Exception
-//    {
-//        // Create the test setups on a per thread basis, only if they have 
not already been created.
-//
-//        if (threadSetup.get() == null)
-//        {
-//            PerThreadSetup perThreadSetup = new PerThreadSetup();
-//
-//            // Extract the test set up paramaeters.
-//            String brokerDetails = 
testParameters.getProperty(BROKER_PROPNAME);
-//            String username = "guest";
-//            String password = "guest";
-//            String virtualpath = 
testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
-//            int destinationscount = 
Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME));
-//            String destinationname = 
testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME);
-//            boolean persistent = 
Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
-//            boolean transacted = 
Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
-//            String selector = null;
-//            boolean verbose = 
Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
-//            int messageSize = 
Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
-//            int rate = 
Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
-//            boolean pubsub = 
Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME));
-//
-//
-//            boolean afterCommit = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
-//            boolean beforeCommit = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
-//            boolean afterSend = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND));
-//            boolean beforeSend = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND));
-//            boolean failOnce = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE));
-//
-//            int batchSize = 
Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
-//            int commitbatchSize = 
Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE));
-//
-//            // This is synchronized because there is a race condition, which 
causes one connection to sleep if
-//            // all threads try to create connection concurrently
-//            synchronized (this)
-//            {
-//                // Establish a client to ping a Queue and listen the reply 
back from same Queue
-//                perThreadSetup._pingItselfClient = new 
TestPingItself(brokerDetails, username, password, virtualpath,
-//                                                                      
destinationname, selector, transacted, persistent,
-//                                                                      
messageSize, verbose,
-//                                                                      
afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
-//                                                                      
commitbatchSize, destinationscount, rate, pubsub);
-//
-//
-//                _listener = new AsyncMessageListener(batchSize);
-//
-//                
perThreadSetup._pingItselfClient.setMessageListener(_listener);
-//                // Start the client connection
-//                perThreadSetup._pingItselfClient.getConnection().start();
-//
-//                // Attach the per-thread set to the thread.
-//                threadSetup.set(perThreadSetup);
-//            }
-//        }
-//    }
-//
-//
-//    public void testAsyncPingOk(int numPings)
-//    {
-//        _timingController = this.getTimingController();
-//
-//        _listener.setTotalMessages(numPings);
-//
-//        PerThreadSetup perThreadSetup = threadSetup.get();
-//        if (numPings == 0)
-//        {
-//            _logger.error("Number of pings requested was zero.");
-//        }
-//
-//        // Generate a sample message. This message is already time stamped 
and has its reply-to destination set.
-//        ObjectMessage msg = null;
-//
-//        try
-//        {
-//            msg = perThreadSetup._pingItselfClient.getTestMessage(null,
-//                                                                  
Integer.parseInt(testParameters.getProperty(
-//                                                                          
MESSAGE_SIZE_PROPNAME)),
-//                                                                  
Boolean.parseBoolean(testParameters.getProperty(
-//                                                                          
PERSISTENT_MODE_PROPNAME)));
-//        }
-//        catch (JMSException e)
-//        {
-//
-//        }
-//
-//        // start the test
-//        long timeout = 
Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME));
-//
-//        String correlationID = 
Long.toString(perThreadSetup._pingItselfClient.getNewID());
-//
-//        try
-//        {
-//            _logger.debug("Sending messages");
-//
-//            perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, 
numPings, correlationID);
-//
-//            _logger.debug("All sent");
-//        }
-//        catch (JMSException e)
-//        {
-//            e.printStackTrace();
-//            Assert.fail("JMS Exception Recevied" + e);
-//        }
-//        catch (InterruptedException e)
-//        {
-//            e.printStackTrace();
-//        }
-//
-//        while (!_done)
-//        {
-//            try
-//            {
-//                _logger.debug("Awating test finish");
-//
-//                
perThreadSetup._pingItselfClient.getEndLock(correlationID).await();
-//
-//
-//            }
-//            catch (InterruptedException e)
-//            {
-//                //ignore
-//            }
-//        }
-//
-//        // Fail the test if the timeout was exceeded.
-//        int numReplies = _listener.getReplyCount();
-//
-//        _logger.info("Test Finished");
-//
-//        if (numReplies != numPings)
-//        {
-//
-//            try
-//            {
-//                
perThreadSetup._pingItselfClient.commitTx(perThreadSetup._pingItselfClient.getConsumerSession());
-//            }
-//            catch (JMSException e)
-//            {
-//                _logger.error("Error commiting recevied messages", e);
-//            }
-//            try
-//            {
-//                _timingController.completeTest(false, numPings - numReplies);
-//            }
-//            catch (InterruptedException e)
-//            {
-//                //ignore
-//            }
-//            Assert.fail("The ping timed out after " + timeout + " ms. 
Messages Sent = " + numPings + ", MessagesReceived = " + numReplies);
-//        }
-//    }
-//
-//    public void setTimingController(TimingController timingController)
-//    {
-//        _timingController = timingController;
-//    }
-//
-//    public TimingController getTimingController()
-//    {
-//        return _timingController;
-//    }
-//
-//
-//    private class AsyncMessageListener implements MessageListener
-//    {
-//        private int _messageReceived;
-//        private int _totalMessages;
-//        private int _batchSize;
-//
-//        public AsyncMessageListener(int batchSize, int totalMessages)
-//        {
-//            _batchSize = batchSize;
-//            _totalMessages = totalMessages;
-//            _messageReceived = 0;
-//        }
-//
-//        public AsyncMessageListener(int batchSize)
-//        {
-//            _batchSize = batchSize;
-//            _totalMessages = -1;
-//            _messageReceived = 0;
-//        }
-//
-//        public void setTotalMessages(int newTotal)
-//        {
-//            _totalMessages = newTotal;
-//        }
-//
-//        public void onMessage(Message message)
-//        {
-//            _logger.trace("Message Recevied");
-//
-//            _messageReceived++;
-//
-//            try
-//            {
-//                if (_messageReceived % _batchSize == 0)
-//                {
-//                    if (_timingController != null)
-//                    {
-//                        _timingController.completeTest(true, _batchSize);
-//                    }
-//                }
-//            }
-//            catch (InterruptedException e)
-//            {
-////                _logger.error("Interupted");
-////                doDone();
-//            }
-//
-//            if (_totalMessages == -1 || _messageReceived == _totalMessages)
-//            {
-//                _logger.info("Test Completed.. signalling");
+    /**
+     * Compile all the tests into a test suite.
+     */
+    public static Test suite()
+    {
+        // Build a new test suite
+        TestSuite suite = new TestSuite("Ping Performance Tests");
+
+        // Run performance tests in read committed mode.
+        suite.addTest(new PingAsyncTestPerf("testAsyncPingOk"));
+
+        return suite;
+    }
+
+    protected void setUp() throws Exception
+    {
+        // Create the test setups on a per thread basis, only if they have not 
already been created.
+
+        if (threadSetup.get() == null)
+        {
+            PerThreadSetup perThreadSetup = new PerThreadSetup();
+
+            // Extract the test set up paramaeters.
+            String brokerDetails = testParameters.getProperty(BROKER_PROPNAME);
+            String username = "guest";
+            String password = "guest";
+            String virtualpath = 
testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
+            int destinationscount = 
Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME));
+            String destinationname = 
testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME);
+            boolean persistent = 
Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
+            boolean transacted = 
Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
+            String selector = null;
+            boolean verbose = 
Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
+            int messageSize = 
Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
+            int rate = 
Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
+            boolean pubsub = 
Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME));
+
+
+            boolean afterCommit = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
+            boolean beforeCommit = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
+            boolean afterSend = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND));
+            boolean beforeSend = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND));
+            boolean failOnce = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE));
+
+            int batchSize = 
Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
+            int commitbatchSize = 
Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE));
+
+            // This is synchronized because there is a race condition, which 
causes one connection to sleep if
+            // all threads try to create connection concurrently
+            synchronized (this)
+            {
+                // Establish a client to ping a Queue and listen the reply 
back from same Queue
+                perThreadSetup._pingItselfClient = new 
TestPingItself(brokerDetails, username, password, virtualpath,
+                                                                      
destinationname, selector, transacted, persistent,
+                                                                      
messageSize, verbose,
+                                                                      
afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
+                                                                      
commitbatchSize, destinationscount, rate, pubsub);
+
+
+                _listener = new AsyncMessageListener(batchSize);
+
+                perThreadSetup._pingItselfClient.setMessageListener(_listener);
+                // Start the client connection
+                perThreadSetup._pingItselfClient.getConnection().start();
+
+                // Attach the per-thread set to the thread.
+                threadSetup.set(perThreadSetup);
+            }
+        }
+    }
+
+
+    public void testAsyncPingOk(int numPings)
+    {
+        _timingController = this.getTimingController();
+
+        _listener.setTotalMessages(numPings);
+
+        PerThreadSetup perThreadSetup = threadSetup.get();
+        if (numPings == 0)
+        {
+            _logger.error("Number of pings requested was zero.");
+        }
+
+        // Generate a sample message. This message is already time stamped and 
has its reply-to destination set.
+        ObjectMessage msg = null;
+
+        try
+        {
+            msg = perThreadSetup._pingItselfClient.getTestMessage(null,
+                                                                  
Integer.parseInt(testParameters.getProperty(
+                                                                          
MESSAGE_SIZE_PROPNAME)),
+                                                                  
Boolean.parseBoolean(testParameters.getProperty(
+                                                                          
PERSISTENT_MODE_PROPNAME)));
+        }
+        catch (JMSException e)
+        {
+
+        }
+
+        // start the test
+        long timeout = 
Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME));
+
+        String correlationID = 
Long.toString(perThreadSetup._pingItselfClient.getNewID());
+
+        try
+        {
+            _logger.debug("Sending messages");
+
+            perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings, 
correlationID);
+
+            _logger.debug("All sent");
+        }
+        catch (JMSException e)
+        {
+            e.printStackTrace();
+            Assert.fail("JMS Exception Recevied" + e);
+        }
+        catch (InterruptedException e)
+        {
+            e.printStackTrace();
+        }
+
+        while (!_done)
+        {
+            try
+            {
+                _logger.debug("Awating test finish");
+
+                
perThreadSetup._pingItselfClient.getEndLock(correlationID).await(timeout, 
TimeUnit.MILLISECONDS);
+
+                if 
(perThreadSetup._pingItselfClient.getEndLock(correlationID).getCount() != 0)
+                {
+                    _logger.error("Timeout occured");
+                    _done = true;
+                }
+                else
+                {
+                    _logger.error("Countdown reached Done?" + _done);
+                }
+                //Allow the time out to exit the loop.
+            }
+            catch (InterruptedException e)
+            {
+                //ignore
+                _logger.error("Awaiting test end interrupted.");
+
+            }
+        }
+
+        perThreadSetup._pingItselfClient.removeLock(correlationID);
+
+        // Fail the test if the timeout was exceeded.
+        int numReplies = _listener.getReplyCount();
+
+        _logger.info("Test Finished");
+
+        if (numReplies != numPings)
+        {
+
+            try
+            {
+                
perThreadSetup._pingItselfClient.commitTx(perThreadSetup._pingItselfClient.getConsumerSession());
+            }
+            catch (JMSException e)
+            {
+                _logger.error("Error commiting recevied messages", e);
+            }
+            try
+            {
+                _timingController.completeTest(false, numPings - numReplies);
+            }
+            catch (InterruptedException e)
+            {
+                //ignore
+            }
+            Assert.fail("The ping timed out after " + timeout + " ms. Messages 
Sent = " + numPings + ", MessagesReceived = " + numReplies);
+        }
+    }
+
+    public void setTimingController(TimingController timingController)
+    {
+        _timingController = timingController;
+    }
+
+    public TimingController getTimingController()
+    {
+        return _timingController;
+    }
+
+
+    private class AsyncMessageListener implements MessageListener
+    {
+        private AtomicInteger _messageReceived;
+        private volatile int _totalMessages;
+        private int _batchSize;
+
+        public AsyncMessageListener(int batchSize, int totalMessages)
+        {
+            _batchSize = batchSize;
+            _totalMessages = totalMessages;
+            _messageReceived = new AtomicInteger(0);
+        }
+
+        public AsyncMessageListener(int batchSize)
+        {
+            _batchSize = batchSize;
+            _totalMessages = -1;
+            _messageReceived = new AtomicInteger(0);
+        }
+
+        public void setTotalMessages(int newTotal)
+        {
+            _totalMessages = newTotal;
+            _messageReceived.set(0);
+        }
+
+        public void onMessage(Message message)
+        {
+            _logger.trace("Message Recevied");
+
+            int messagesReceived = _messageReceived.incrementAndGet();
+
+            try
+            {
+                if (messagesReceived % _batchSize == 0)
+                {
+                    if (_timingController != null)
+                    {
+                        _timingController.completeTest(true, _batchSize);
+                    }
+
+                    if (messagesReceived == _totalMessages)
+                    {
+                        _done = true;
+                    }
+                }
+                else if (messagesReceived == _totalMessages)
+                {
+                    _logger.info("Test Completed.. signalling");
+                    doDone();
+                }
+
+            }
+            catch (InterruptedException e)
+            {
+                _logger.error("Interupted Test");
 //                doDone();
-//            }
-//        }
-//
-//        private void doDone()
-//        {
-//            _done = true;
-//
-//            _logger.error("Messages received:" + _messageReceived);
-//            _logger.error("Total Messages :" + _totalMessages);
-//
-//            try
-//            {
-//                _timingController.completeTest(true, _totalMessages - 
_messageReceived);
-//            }
-//            catch (InterruptedException e)
-//            {
-//                //ignore
-//            }
-//        }
-//
-//        public int getReplyCount()
-//        {
-//            return _messageReceived;
-//        }
-//
-//    }
+            }
+
+        }
+
+        private void doDone()
+        {
+            _done = true;
+
+            _logger.trace("Messages received:" + _messageReceived.get());
+            _logger.trace("Total Messages :" + _totalMessages);
+
+            try
+            {
+                _timingController.completeTest(true, _totalMessages - 
_messageReceived.get());
+            }
+            catch (InterruptedException e)
+            {
+                //ignore
+            }
+        }
+
+        public int getReplyCount()
+        {
+            return _messageReceived.get();
+        }
+
+    }
 
 }


Reply via email to