Author: ritchiem
Date: Tue Jan 23 14:34:25 2007
New Revision: 499166

URL: http://svn.apache.org/viewvc?view=rev&rev=499166
Log:
Updated perftests to include an Asynchronous ping sender

Added:
    
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
   (with props)
Modified:
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/perftests.log4j
    
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java?view=diff&rev=499166&r1=499165&r2=499166
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
 Tue Jan 23 14:34:25 2007
@@ -265,6 +265,7 @@
         _logger.trace("Batch time reached");
         if (_failAfterSend)
         {
+            _logger.trace("Batch size reached");
             if (_failOnce)
             {
                 _failAfterSend = false;

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java?view=diff&rev=499166&r1=499165&r2=499166
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
 Tue Jan 23 14:34:25 2007
@@ -20,6 +20,7 @@
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.ObjectMessage;
+import javax.jms.MessageListener;
 
 import org.apache.log4j.Logger;
 
@@ -41,6 +42,7 @@
      * If queueCount is > 1 : This creats a client for tests with multiple 
queues. Creates as many consumer instances
      * as there are queues, each listening to a Queue. A producer is created 
which picks up a queue from
      * the list of queues to send message
+     *
      * @param brokerDetails
      * @param username
      * @param password
@@ -81,6 +83,25 @@
         }
     }
 
+
+    /**
+     * Sets the replyQueue to be the same as ping queue.
+     */
+    @Override
+    public void createConsumer(String selector) throws JMSException
+    {
+        // Create a message consumer to get the replies with and register this 
to be called back by it.
+        setReplyQueue(getPingQueue());
+        MessageConsumer consumer =
+                getConsumerSession().createConsumer(getReplyQueue(), PREFETCH, 
false, EXCLUSIVE, selector);
+        consumer.setMessageListener(this);
+    }
+
+    public void setMessageListener(MessageListener messageListener) throws 
JMSException
+    {
+        getConsumerSession().setMessageListener(messageListener);
+    }
+
     /**
      * Starts a ping-pong loop running from the command line.
      *
@@ -147,10 +168,10 @@
         }
 
         // Create a ping producer to handle the request/wait/reply cycle.
-        TestPingItself pingItself =
-            new TestPingItself(brokerDetails, "guest", "guest", virtualpath, 
queue, null, transacted, persistent,
-                               messageSize, verbose, afterCommit, 
beforeCommit, afterSend, beforeSend, failOnce, batchSize,
-                               queueCount, rate);
+        TestPingItself pingItself = new TestPingItself(brokerDetails, "guest", 
"guest", virtualpath, queue, null,
+                                                       transacted, persistent, 
messageSize, verbose,
+                                                       afterCommit, 
beforeCommit, afterSend, beforeSend, failOnce,
+                                                       batchSize, queueCount, 
rate);
 
         pingItself.getConnection().start();
 
@@ -194,16 +215,5 @@
         System.exit(0);
     }
 
-    /**
-     * Sets the replyQueue to be the same as ping queue.
-     */
-    @Override
-    public void createConsumer(String selector) throws JMSException
-    {
-        // Create a message consumer to get the replies with and register this 
to be called back by it.
-        setReplyQueue(getPingQueue());
-        MessageConsumer consumer =
-            getConsumerSession().createConsumer(getReplyQueue(), PREFETCH, 
false, EXCLUSIVE, selector);
-        consumer.setMessageListener(this);
-    }
+
 }

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=499166&r1=499165&r2=499166
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 Tue Jan 23 14:34:25 2007
@@ -140,7 +140,9 @@
 
     protected Session _consumerSession;
 
-    /** Used to restrict the sending rate to a specified limit. */
+    /**
+     * Used to restrict the sending rate to a specified limit.
+     */
     private Throttle rateLimiter = null;
 
     /**
@@ -152,7 +154,7 @@
     private PingPongProducer(String brokerDetails, String username, String 
password, String virtualpath, boolean transacted,
                              boolean persistent, int messageSize, boolean 
verbose, boolean afterCommit, boolean beforeCommit,
                              boolean afterSend, boolean beforeSend, boolean 
failOnce, int batchSize, int rate)
-                      throws Exception
+            throws Exception
     {
         // Create a connection to the broker.
         InetAddress address = InetAddress.getLocalHost();
@@ -238,6 +240,77 @@
     }
 
     /**
+     * Creates the producer to send the pings on.  If the tests are with 
nultiple queues, then producer
+     * is created with null destination, so that any destination can be 
specified while sending
+     *
+     * @throws JMSException
+     */
+    public void createProducer() throws JMSException
+    {
+        if (getQueueCount() > 1)
+        {
+            // create producer with initial destination as null for test with 
multiple queues
+            // In this case, a different destination will be used while 
sending the message
+            _producer = (MessageProducer) 
getProducerSession().createProducer(null);
+        }
+        else
+        {
+            // Create a queue and producer to send the pings on.
+            _producer = (MessageProducer) 
getProducerSession().createProducer(_pingQueue);
+
+        }
+        _producer.setDisableMessageTimestamp(true);
+        _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : 
DeliveryMode.NON_PERSISTENT);
+    }
+
+    /**
+     * Creates the temporary queue to listen to the responses
+     *
+     * @param selector
+     * @throws JMSException
+     */
+    public void createConsumer(String selector) throws JMSException
+    {
+        // Create a temporary queue to get the pongs on.
+        _replyQueue = _consumerSession.createTemporaryQueue();
+
+        // Create a message consumer to get the replies with and register this 
to be called back by it.
+        MessageConsumer consumer = 
_consumerSession.createConsumer(_replyQueue, PREFETCH, NO_LOCAL, EXCLUSIVE, 
selector);
+        consumer.setMessageListener(this);
+    }
+
+    /**
+     * Creates consumer instances for each queue. This is used when test is 
being done with multiple queues.
+     *
+     * @param selector
+     * @throws JMSException
+     */
+    public void createConsumers(String selector) throws JMSException
+    {
+        for (int i = 0; i < getQueueCount(); i++)
+        {
+            MessageConsumer consumer = 
getConsumerSession().createConsumer(getQueue(i), PREFETCH, false, EXCLUSIVE, 
selector);
+            consumer.setMessageListener(this);
+        }
+    }
+
+
+    protected Session getConsumerSession()
+    {
+        return _consumerSession;
+    }
+
+    public Queue getPingQueue()
+    {
+        return _pingQueue;
+    }
+
+    protected void setPingQueue(Queue queue)
+    {
+        _pingQueue = queue;
+    }
+
+    /**
      * Starts a ping-pong loop running from the command line. The bounce back 
client [EMAIL PROTECTED] org.apache.qpid.requestreply.PingPongBouncer} also 
needs
      * to be started to bounce the pings back again.
      * <p/>
@@ -257,8 +330,8 @@
         if (args.length < 2)
         {
             System.err.println(
-                "Usage: TestPingPublisher <brokerDetails> <virtual path> 
[verbose (true/false)] "
-                + "[transacted (true/false)] [persistent (true/false)] 
[message size in bytes] [batchsize] [rate]");
+                    "Usage: TestPingPublisher <brokerDetails> <virtual path> 
[verbose (true/false)] "
+                    + "[transacted (true/false)] [persistent (true/false)] 
[message size in bytes] [batchsize] [rate]");
             System.exit(0);
         }
 
@@ -310,10 +383,10 @@
         }
 
         // Create a ping producer to handle the request/wait/reply cycle.
-        PingPongProducer pingProducer =
-            new PingPongProducer(brokerDetails, "guest", "guest", virtualpath, 
PING_QUEUE_NAME, null, transacted, persistent,
-                                 messageSize, verbose, afterCommit, 
beforeCommit, afterSend, beforeSend, failOnce, batchSize,
-                                 0, rate);
+        PingPongProducer pingProducer = new PingPongProducer(brokerDetails, 
"guest", "guest", virtualpath, PING_QUEUE_NAME, null, transacted,
+                                                             persistent, 
messageSize, verbose,
+                                                             afterCommit, 
beforeCommit, afterSend, beforeSend, failOnce,
+                                                             batchSize, 0, 
rate);
 
         pingProducer.getConnection().start();
 
@@ -332,66 +405,11 @@
     }
 
     /**
-     * Creates the producer to send the pings on.  If the tests are with 
nultiple queues, then producer
-     * is created with null destination, so that any destination can be 
specified while sending
-     *
-     * @throws JMSException
-     */
-    public void createProducer() throws JMSException
-    {
-        if (getQueueCount() > 1)
-        {
-            // create producer with initial destination as null for test with 
multiple queues
-            // In this case, a different destination will be used while 
sending the message
-            _producer = (MessageProducer) 
getProducerSession().createProducer(null);
-        }
-        else
-        {
-            // Create a queue and producer to send the pings on.
-            _producer = (MessageProducer) 
getProducerSession().createProducer(_pingQueue);
-
-        }
-
-        _producer.setDisableMessageTimestamp(true);
-        _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : 
DeliveryMode.NON_PERSISTENT);
-    }
-
-    /**
-     * Creates the temporary queue to listen to the responses
-     *
-     * @param selector
-     * @throws JMSException
-     */
-    public void createConsumer(String selector) throws JMSException
-    {
-        // Create a temporary queue to get the pongs on.
-        _replyQueue = _consumerSession.createTemporaryQueue();
-
-        // Create a message consumer to get the replies with and register this 
to be called back by it.
-        MessageConsumer consumer = 
_consumerSession.createConsumer(_replyQueue, PREFETCH, NO_LOCAL, EXCLUSIVE, 
selector);
-        consumer.setMessageListener(this);
-    }
-
-    /**
      * Creates consumer instances for each queue. This is used when test is 
being done with multiple queues.
      *
      * @param selector
      * @throws JMSException
      */
-    public void createConsumers(String selector) throws JMSException
-    {
-        for (int i = 0; i < getQueueCount(); i++)
-        {
-            MessageConsumer consumer =
-                getConsumerSession().createConsumer(getQueue(i), PREFETCH, 
false, EXCLUSIVE, selector);
-            consumer.setMessageListener(this);
-        }
-    }
-
-    public Queue getPingQueue()
-    {
-        return _pingQueue;
-    }
 
     /**
      * Primes the test loop by sending a few messages, then introduces a short 
wait. This allows the bounce back client
@@ -415,7 +433,8 @@
                 Thread.sleep(100);
             }
             catch (InterruptedException ignore)
-            { }
+            {
+            }
         }
     }
 
@@ -476,10 +495,8 @@
      * @param message  The message to send.
      * @param numPings The number of ping messages to send.
      * @param timeout  The timeout in milliseconds.
-     *
      * @return The number of replies received. This may be less than the 
number sent if the timeout terminated the
      *         wait for all prematurely.
-     *
      * @throws JMSException All underlying JMSExceptions are allowed to fall 
through.
      */
     public int pingAndWaitForReply(Message message, int numPings, long 
timeout) throws JMSException, InterruptedException
@@ -570,18 +587,18 @@
      * @return The reply, or null if no reply arrives before the timeout.
      * @throws JMSException All underlying JMSExceptions are allowed to fall 
through.
      */
-    /*public void pingNoWaitForReply(Message message, int numPings) throws 
JMSException, InterruptedException
+    public void pingNoWaitForReply(Message message, int numPings) throws 
JMSException, InterruptedException
     {
         for (int i = 0; i < numPings; i++)
         {
             sendMessage(message);
-
+            
             if (_verbose)
             {
                 _logger.info(timestampFormatter.format(new Date()) + ": Pinged 
at.");
             }
         }
-    }*/
+    }
 
     /**
      * The ping loop implementation. This send out pings of the configured 
size, persistence and transactionality, and
@@ -618,15 +635,6 @@
         return _replyQueue;
     }
 
-    protected Session getConsumerSession()
-    {
-        return _consumerSession;
-    }
-
-    protected void setPingQueue(Queue queue)
-    {
-        _pingQueue = queue;
-    }
 
     protected void setReplyQueue(Queue queue)
     {
@@ -666,10 +674,12 @@
     public static class FailoverNotifier implements ConnectionListener
     {
         public void bytesSent(long count)
-        { }
+        {
+        }
 
         public void bytesReceived(long count)
-        { }
+        {
+        }
 
         public boolean preFailover(boolean redirect)
         {

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/perftests.log4j
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/perftests.log4j?view=diff&rev=499166&r1=499165&r2=499166
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/perftests.log4j 
(original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/perftests.log4j Tue 
Jan 23 14:34:25 2007
@@ -28,7 +28,7 @@
 
 
 log4j.logger.uk.co.thebadgerset.junit.extensions=info, console
-log4j.additivity.uk.co.thebadgerset.junit.extensions=fals
+log4j.additivity.uk.co.thebadgerset.junit.extensions=false
 log4j.logger.uk.co.thebadgerset.junit.extensions=info
 
 log4j.appender.console=org.apache.log4j.ConsoleAppender

Added: 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java?view=auto&rev=499166
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
 Tue Jan 23 14:34:25 2007
@@ -0,0 +1,288 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ *
+ */
+package org.apache.qpid.ping;
+
+import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
+import uk.co.thebadgerset.junit.extensions.TimingController;
+
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import junit.framework.Assert;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.CountDownLatch;
+
+public class PingAsyncTestPerf extends PingTestPerf implements 
TimingControllerAware
+{
+    private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class);
+
+    private TimingController _timingController;
+
+    private final CountDownLatch _completedLock = new CountDownLatch(1);
+
+    private AsyncMessageListener _listener;
+
+    private volatile boolean _done = false;
+
+    public PingAsyncTestPerf(String name)
+    {
+        super(name);
+    }
+
+    /**
+     * Compile all the tests into a test suite.
+     */
+    public static Test suite()
+    {
+        // Build a new test suite
+        TestSuite suite = new TestSuite("Ping Performance Tests");
+
+        // Run performance tests in read committed mode.
+        suite.addTest(new PingAsyncTestPerf("testAsyncPingOk"));
+
+        return suite;
+    }
+
+    protected void setUp() throws Exception
+    {
+        // Create the test setups on a per thread basis, only if they have not 
already been created.
+
+        if (threadSetup.get() == null)
+        {
+            PerThreadSetup perThreadSetup = new PerThreadSetup();
+
+            // Extract the test set up paramaeters.
+            String brokerDetails = testParameters.getProperty(BROKER_PROPNAME);
+            String username = "guest";
+            String password = "guest";
+            String virtualpath = 
testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
+            int queueCount = 
Integer.parseInt(testParameters.getProperty(PING_QUEUE_COUNT_PROPNAME));
+            String queueName = 
testParameters.getProperty(PING_QUEUE_NAME_PROPNAME);
+            boolean persistent = 
Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
+            boolean transacted = 
Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
+            String selector = null;
+            boolean verbose = 
Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
+            int messageSize = 
Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
+
+            boolean afterCommit = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
+            boolean beforeCommit = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
+            boolean afterSend = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND));
+            boolean beforeSend = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND));
+            boolean failOnce = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE));
+
+            int batchSize = 
Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
+            int commitbatchSize = 
Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE));
+
+            int rate = 
Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
+
+            // This is synchronized because there is a race condition, which 
causes one connection to sleep if
+            // all threads try to create connection concurrently
+            synchronized (this)
+            {
+                // Establish a client to ping a Queue and listen the reply 
back from same Queue
+                perThreadSetup._pingItselfClient = new 
TestPingItself(brokerDetails, username, password, virtualpath,
+                                                                      
queueName, selector, transacted, persistent,
+                                                                      
messageSize, verbose,
+                                                                      
afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
+                                                                      
commitbatchSize, queueCount, rate);
+
+
+                _listener = new AsyncMessageListener(batchSize);
+
+                perThreadSetup._pingItselfClient.setMessageListener(_listener);
+                // Start the client connection
+                perThreadSetup._pingItselfClient.getConnection().start();
+
+                // Attach the per-thread set to the thread.
+                threadSetup.set(perThreadSetup);
+            }
+        }
+    }
+
+
+    public void testAsyncPingOk(int numPings)
+    {
+        _timingController = this.getTimingController();
+
+        _listener.setTotalMessages(numPings);
+
+        PerThreadSetup perThreadSetup = threadSetup.get();
+        if (numPings == 0)
+        {
+            _logger.error("Number of pings requested was zero.");
+        }
+
+        // Generate a sample message. This message is already time stamped and 
has its reply-to destination set.
+        ObjectMessage msg = null;
+
+        try
+        {
+            msg = perThreadSetup._pingItselfClient.getTestMessage(null,
+                                                                  
Integer.parseInt(testParameters.getProperty(
+                                                                          
MESSAGE_SIZE_PROPNAME)),
+                                                                  
Boolean.parseBoolean(testParameters.getProperty(
+                                                                          
PERSISTENT_MODE_PROPNAME)));
+        }
+        catch (JMSException e)
+        {
+
+        }
+
+        // start the test
+        long timeout = 
Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME));
+
+        try
+        {
+            perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings);
+        }
+        catch (JMSException e)
+        {
+            e.printStackTrace();
+            Assert.fail("JMS Exception Recevied" + e);
+        }
+        catch (InterruptedException e)
+        {
+            e.printStackTrace();
+        }
+
+        while (!_done)
+        {
+            try
+            {
+                _logger.info("awating test finish");
+
+                _completedLock.await();
+            }
+            catch (InterruptedException e)
+            {
+                //ignore
+            }
+        }
+
+        // Fail the test if the timeout was exceeded.
+        int numReplies = _listener.getReplyCount();
+
+        _logger.info("Test Finished");
+
+        if (numReplies != numPings)
+
+        {
+            Assert.fail("The ping timed out after " + timeout + " ms. Messages 
Sent = " + numPings + ", MessagesReceived = " + numReplies);
+            try
+            {
+                _timingController.completeTest(false);
+            }
+            catch (InterruptedException e)
+            {
+                //ignore
+            }
+        }
+    }
+
+
+    public void setTimingController(TimingController timingController)
+    {
+        _timingController = timingController;
+    }
+
+    public TimingController getTimingController()
+    {
+        return _timingController;
+    }
+
+
+    private class AsyncMessageListener implements MessageListener
+    {
+        private int _messageRecevied;
+        private int _totalMessages;
+        private int _batchSize;
+
+        public AsyncMessageListener(int batchSize, int totalMessages)
+        {
+            _batchSize = batchSize;
+            _totalMessages = totalMessages;
+            _messageRecevied = 0;
+        }
+
+        public AsyncMessageListener(int batchSize)
+        {
+            _batchSize = batchSize;
+            _totalMessages = -1;
+            _messageRecevied = 0;
+        }
+
+        public void setTotalMessages(int newTotal)
+        {
+            _totalMessages = newTotal;
+        }
+
+        public void onMessage(Message message)
+        {
+            _logger.info("Message Recevied");
+            try
+            {
+                _messageRecevied++;
+                if (_messageRecevied == _batchSize)
+                {
+                    if (_timingController != null)
+                    {
+                        _timingController.completeTest(true);
+                    }
+                }
+            }
+            catch (InterruptedException e)
+            {
+                doDone();
+            }
+
+            if (_totalMessages == -1 || _messageRecevied == _totalMessages)
+            {
+                _logger.info("Test Completed.. signalling");
+                doDone();
+            }
+        }
+
+        private void doDone()
+        {
+            _done = true;
+            _completedLock.countDown();
+            try
+            {
+                _timingController.completeTest(true);
+            }
+            catch (InterruptedException e)
+            {
+                //ignore
+            }
+        }
+
+        public int getReplyCount()
+        {
+            return _messageRecevied;
+        }
+    }
+
+}

Propchange: 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java?view=diff&rev=499166&r1=499165&r2=499166
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
 Tue Jan 23 14:34:25 2007
@@ -42,94 +42,96 @@
     /**
      * Holds the name of the property to get the test message size from.
      */
-    private static final String MESSAGE_SIZE_PROPNAME = "messageSize";
+    protected static final String MESSAGE_SIZE_PROPNAME = "messageSize";
 
     /**
      * Holds the name of the property to get the ping queue name from.
      */
-    private static final String PING_QUEUE_NAME_PROPNAME = "pingQueue";
+    protected static final String PING_QUEUE_NAME_PROPNAME = "pingQueue";
 
     /**
      * holds the queue count, if the test is being performed with multiple 
queues
      */
-    private static final String PING_QUEUE_COUNT_PROPNAME = "queues";
+    protected static final String PING_QUEUE_COUNT_PROPNAME = "queues";
 
     /**
      * Holds the name of the property to get the test delivery mode from.
      */
-    private static final String PERSISTENT_MODE_PROPNAME = "persistent";
+    protected static final String PERSISTENT_MODE_PROPNAME = "persistent";
 
     /**
      * Holds the name of the property to get the test transactional mode from.
      */
-    private static final String TRANSACTED_PROPNAME = "transacted";
+    protected static final String TRANSACTED_PROPNAME = "transacted";
 
     /**
      * Holds the name of the property to get the test broker url from.
      */
-    private static final String BROKER_PROPNAME = "broker";
+    protected static final String BROKER_PROPNAME = "broker";
 
     /**
      * Holds the name of the property to get the test broker virtual path.
      */
-    private static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
+    protected static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
 
     /**
      * Holds the name of the property to get the waiting timeout for response 
messages.
      */
-    private static final String TIMEOUT_PROPNAME = "timeout";
+    protected static final String TIMEOUT_PROPNAME = "timeout";
 
     /** Holds the name of the property to get the message rate from. */
-    private static final String RATE_PROPNAME = "rate";
+    protected static final String RATE_PROPNAME = "rate";
 
-    private static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
+    protected static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
 
     /**
      * Holds the size of message body to attach to the ping messages.
      */
-    private static final int MESSAGE_SIZE_DEFAULT = 0;
+    protected static final int MESSAGE_SIZE_DEFAULT = 0;
 
-    private static final int BATCH_SIZE_DEFAULT = 2;
+    protected static final int BATCH_SIZE_DEFAULT = 2;
+    protected static final int COMMIT_BATCH_SIZE_DEFAULT = BATCH_SIZE_DEFAULT;
 
     /**
      * Holds the name of the queue to which pings are sent.
      */
-    private static final String PING_QUEUE_NAME_DEFAULT = "ping";
+    protected static final String PING_QUEUE_NAME_DEFAULT = "ping";
 
     /**
      * Holds the message delivery mode to use for the test.
      */
-    private static final boolean PERSISTENT_MODE_DEFAULT = false;
+    protected static final boolean PERSISTENT_MODE_DEFAULT = false;
 
     /**
      * Holds the transactional mode to use for the test.
      */
-    private static final boolean TRANSACTED_DEFAULT = false;
+    protected static final boolean TRANSACTED_DEFAULT = false;
 
     /**
      * Holds the default broker url for the test.
      */
-    private static final String BROKER_DEFAULT = "tcp://localhost:5672";
+    protected static final String BROKER_DEFAULT = "tcp://localhost:5672";
 
     /**
      * Holds the default virtual path for the test.
      */
-    private static final String VIRTUAL_PATH_DEFAULT = "/test";
+    protected static final String VIRTUAL_PATH_DEFAULT = "/test";
 
     /**
      * Sets a default ping timeout.
      */
-    private static final long TIMEOUT_DEFAULT = 3000;
+    protected static final long TIMEOUT_DEFAULT = 3000;
 
     /** Holds the default rate. A value of zero means infinity, only values of 
1 or greater are meaningfull. */
     private static final int RATE_DEFAULT = 0;
 
-    private static final String FAIL_AFTER_COMMIT = "FailAfterCommit";
-    private static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit";
-    private static final String FAIL_AFTER_SEND = "FailAfterSend";
-    private static final String FAIL_BEFORE_SEND = "FailBeforeSend";
-    private static final String BATCH_SIZE = "BatchSize";
-    private static final String FAIL_ONCE = "FailOnce";
+    protected static final String FAIL_AFTER_COMMIT = "FailAfterCommit";
+    protected static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit";
+    protected static final String FAIL_AFTER_SEND = "FailAfterSend";
+    protected static final String FAIL_BEFORE_SEND = "FailBeforeSend";
+    protected static final String COMMIT_BATCH_SIZE = "CommitBatchSize";
+    protected static final String BATCH_SIZE = "BatchSize";
+    protected static final String FAIL_ONCE = "FailOnce";
 
     /**
      * Thread local to hold the per-thread test setup fields.
@@ -139,7 +141,7 @@
     // Set up a property reader to extract the test parameters from. Once 
ContextualProperties is available in
     // the project dependencies, use it to get property overrides for 
configurable tests and to notify the test runner
     // of the test parameters to log with the results.
-    private Properties testParameters = System.getProperties();
+    protected Properties testParameters = System.getProperties();
     //private Properties testParameters = new 
ContextualProperties(System.getProperties());
 
     public PingTestPerf(String name)
@@ -154,6 +156,7 @@
         setSystemPropertyIfNull(FAIL_ONCE, "true");
 
         setSystemPropertyIfNull(BATCH_SIZE, 
Integer.toString(BATCH_SIZE_DEFAULT));
+        setSystemPropertyIfNull(COMMIT_BATCH_SIZE, 
Integer.toString(COMMIT_BATCH_SIZE_DEFAULT));
         setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, 
Integer.toString(MESSAGE_SIZE_DEFAULT));
         setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, 
PING_QUEUE_NAME_DEFAULT);
         setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, 
Boolean.toString(PERSISTENT_MODE_DEFAULT));
@@ -181,7 +184,7 @@
                //return new junit.framework.TestSuite(PingTestPerf.class);
     }
 
-    private static void setSystemPropertyIfNull(String propName, String 
propValue)
+    protected static void setSystemPropertyIfNull(String propName, String 
propValue)
     {
         if (System.getProperty(propName) == null)
         {
@@ -223,6 +226,7 @@
         }
     }
 
+
     protected void setUp() throws Exception
     {
         // Log4j will propagate the test name as a thread local in all log 
output.
@@ -293,11 +297,11 @@
         }
     }
 
-    private static class PerThreadSetup
+    protected static class PerThreadSetup
     {
         /**
          * Holds the test ping client.
          */
-        private TestPingItself _pingItselfClient;
+        protected TestPingItself _pingItselfClient;
     }
 }


Reply via email to