Author: ritchiem
Date: Mon Feb  5 01:49:59 2007
New Revision: 503609

URL: http://svn.apache.org/viewvc?view=rev&rev=503609
Log:
Update to performance testing to allow the use of shared destinations. This 
allows topics to have multiple consumers and the total message counts updated 
correctly.

Modified:
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
    
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
    
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java?view=diff&rev=503609&r1=503608&r2=503609
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
 Mon Feb  5 01:49:59 2007
@@ -38,6 +38,8 @@
  */
 public class PingClient extends PingPongProducer
 {
+    private static int _pingClientCount;
+
     /**
      * Creates a ping producer with the specified parameters, of which there 
are many. See their individual comments
      * for details. This constructor creates ping pong producer but 
de-registers its reply-to destination message
@@ -76,6 +78,8 @@
         super(brokerDetails, username, password, virtualpath, destinationName, 
selector, transacted, persistent, messageSize,
               verbose, afterCommit, beforeCommit, afterSend, beforeSend, 
failOnce, txBatchSize, noOfDestinations, rate,
               pubsub, unique);
+
+        _pingClientCount++;
     }
 
     /**
@@ -88,4 +92,17 @@
     {
         return _pingDestinations;
     }
+
+    public int getConsumersPerTopic()
+    {
+        if (_isUnique)
+        {
+            return 1;
+        }
+        else
+        {
+            return _pingClientCount;
+        }
+    }
+
 }

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=503609&r1=503608&r2=503609
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 Mon Feb  5 01:49:59 2007
@@ -50,21 +50,21 @@
 /**
  * PingPongProducer is a client that sends pings to a queue and waits for 
pongs to be bounced back by a bounce back
  * client (see [EMAIL PROTECTED] PingPongBouncer} for the bounce back client).
- *
+ * <p/>
  * <p/>The pings are sent with a reply-to field set to a single temporary 
queue, which is the same for all pings.
  * This means that this class has to do some work to correlate pings with 
pongs; it expectes the original message
  * correlation id in the ping to be bounced back in the reply correlation id.
- *
+ * <p/>
  * <p/>This ping tool accepts a vast number of configuration options, all of 
which are passed in to the constructor.
  * It can ping topics or queues; ping multiple destinations; do persistent 
pings; send messages of any size; do pings
  * within transactions; control the number of pings to send in each 
transaction; limit its sending rate; and perform
  * failover testing.
- *
+ * <p/>
  * <p/>This implements the Runnable interface with a run method that 
implements an infinite ping loop. The ping loop
  * does all its work through helper methods, so that code wishing to run a 
ping-pong cycle is not forced to do so
  * by starting a new thread. The command line invocation does take advantage 
of this ping loop. A shutdown hook is
  * also registered to terminate the ping-pong loop cleanly.
- *
+ * <p/>
  * <p/><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
  * <tr><td> Provide a ping and wait for all responses cycle.
@@ -72,55 +72,67 @@
  * </table>
  *
  * @todo The use of a ping rate [EMAIL PROTECTED] #DEFAULT_RATE} and waits 
between pings [EMAIL PROTECTED] #DEFAULT_SLEEP_TIME} are overlapping.
- *       Use the rate and throttling only. Ideally, optionally pass the rate 
throttle into the ping method, throttle to
- *       be created and configured by the test runner from the -f command line 
option and made available through
- *       the timing controller on timing aware tests or by throttling rate of 
calling tests methods on non-timing aware
- *       tests.
- *
+ * Use the rate and throttling only. Ideally, optionally pass the rate 
throttle into the ping method, throttle to
+ * be created and configured by the test runner from the -f command line 
option and made available through
+ * the timing controller on timing aware tests or by throttling rate of 
calling tests methods on non-timing aware
+ * tests.
  * @todo Make acknowledege mode a test option.
- *
  * @todo Make the message listener a static for all replies to be sent to? It 
won't be any more of a bottle neck than
- *       having one per PingPongProducer, as will synchronize on message 
correlation id, allowing threads to process
- *       messages concurrently for different ids. Needs to be static so that 
when using a chained message listener and
- *       shared destinations between multiple PPPs, it gets notified about all 
replies, not just those that happen to
- *       be picked up by the PPP that it is atteched to.
- *
+ * having one per PingPongProducer, as will synchronize on message correlation 
id, allowing threads to process
+ * messages concurrently for different ids. Needs to be static so that when 
using a chained message listener and
+ * shared destinations between multiple PPPs, it gets notified about all 
replies, not just those that happen to
+ * be picked up by the PPP that it is atteched to.
  * @todo Use read/write lock in the onmessage, not for reading writing but to 
make use of a shared and exlcusive lock
- *       pair. Obtian read lock on all messages, before decrementing the 
message count. At the end of the on message
- *       method add a block that obtains the write lock for the very last 
message, releases any waiting producer. Means
- *       that the last message waits until all other messages have been 
handled before releasing producers but allows
- *       messages to be processed concurrently, unlike the current 
synchronized block.
- *
+ * pair. Obtian read lock on all messages, before decrementing the message 
count. At the end of the on message
+ * method add a block that obtains the write lock for the very last message, 
releases any waiting producer. Means
+ * that the last message waits until all other messages have been handled 
before releasing producers but allows
+ * messages to be processed concurrently, unlike the current synchronized 
block.
  * @todo Need to multiply up the number of expected messages for pubsub tests 
as each can be received by many consumers?
  */
 public class PingPongProducer implements Runnable, MessageListener, 
ExceptionListener
 {
     private static final Logger _logger = 
Logger.getLogger(PingPongProducer.class);
 
-    /** Holds the name of the property to get the test message size from. */
+    /**
+     * Holds the name of the property to get the test message size from.
+     */
     public static final String MESSAGE_SIZE_PROPNAME = "messagesize";
 
-    /** Holds the name of the property to get the ping queue name from. */
+    /**
+     * Holds the name of the property to get the ping queue name from.
+     */
     public static final String PING_QUEUE_NAME_PROPNAME = "destinationname";
 
-    /** Holds the name of the property to get the test delivery mode from. */
+    /**
+     * Holds the name of the property to get the test delivery mode from.
+     */
     public static final String PERSISTENT_MODE_PROPNAME = "persistent";
 
-    /** Holds the name of the property to get the test transactional mode 
from. */
+    /**
+     * Holds the name of the property to get the test transactional mode from.
+     */
     public static final String TRANSACTED_PROPNAME = "transacted";
 
-    /** Holds the name of the property to get the test broker url from. */
+    /**
+     * Holds the name of the property to get the test broker url from.
+     */
     public static final String BROKER_PROPNAME = "broker";
 
-    /** Holds the name of the property to get the test broker virtual path. */
+    /**
+     * Holds the name of the property to get the test broker virtual path.
+     */
     public static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
 
-    /** Holds the name of the property to get the message rate from. */
+    /**
+     * Holds the name of the property to get the message rate from.
+     */
     public static final String RATE_PROPNAME = "rate";
 
     public static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
 
-    /** Holds the true or false depending on wether it is P2P test or PubSub */
+    /**
+     * Holds the true or false depending on wether it is P2P test or PubSub
+     */
     public static final String IS_PUBSUB_PROPNAME = "pubsub";
 
     public static final String FAIL_AFTER_COMMIT_PROPNAME = "FailAfterCommit";
@@ -141,96 +153,147 @@
 
     public static final String PING_DESTINATION_COUNT_PROPNAME = 
"destinationscount";
 
-    /** Holds the name of the property to get the waiting timeout for response 
messages. */
+    /**
+     * Holds the name of the property to get the waiting timeout for response 
messages.
+     */
     public static final String TIMEOUT_PROPNAME = "timeout";
 
     public static final String COMMIT_BATCH_SIZE_PROPNAME = "CommitBatchSize";
 
     public static final String UNIQUE_PROPNAME = "uniqueDests";
 
-    /** Used to set up a default message size. */
+    /**
+     * Used to set up a default message size.
+     */
     public static final int DEFAULT_MESSAGE_SIZE = 0;
 
-    /** Holds the name of the default destination to send pings on. */
+    /**
+     * Holds the name of the default destination to send pings on.
+     */
     public static final String DEFAULT_PING_DESTINATION_NAME = "ping";
 
-    /** Defines the default number of destinations to ping. */
+    /**
+     * Defines the default number of destinations to ping.
+     */
     public static final int DEFAULT_DESTINATION_COUNT = 1;
 
-    /** Defines the default rate (in pings per second) to send pings at. 0 
means as fast as possible, no restriction. */
+    /**
+     * Defines the default rate (in pings per second) to send pings at. 0 
means as fast as possible, no restriction.
+     */
     public static final int DEFAULT_RATE = 0;
 
-    /** Defines the default wait between pings. */
+    /**
+     * Defines the default wait between pings.
+     */
     public static final long DEFAULT_SLEEP_TIME = 250;
 
-    /** Default time to wait before assuming that a ping has timed out. */
+    /**
+     * Default time to wait before assuming that a ping has timed out.
+     */
     public static final long DEFAULT_TIMEOUT = 30000;
 
-    /** Defines the default number of pings to send in each transaction when 
running transactionally. */
+    /**
+     * Defines the default number of pings to send in each transaction when 
running transactionally.
+     */
     public static final int DEFAULT_TX_BATCH_SIZE = 100;
 
-    /** Defines the default prefetch size to use when consuming messages. */
+    /**
+     * Defines the default prefetch size to use when consuming messages.
+     */
     public static final int DEFAULT_PREFETCH = 100;
 
-    /** Defines the default value of the no local flag to use when consuming 
messages. */
+    /**
+     * Defines the default value of the no local flag to use when consuming 
messages.
+     */
     public static final boolean DEFAULT_NO_LOCAL = false;
 
-    /** Defines the default value of the exclusive flag to use when consuming 
messages. */
+    /**
+     * Defines the default value of the exclusive flag to use when consuming 
messages.
+     */
     public static final boolean DEFAULT_EXCLUSIVE = false;
 
-    /** Holds the message delivery mode to use for the test. */
+    /**
+     * Holds the message delivery mode to use for the test.
+     */
     public static final boolean DEFAULT_PERSISTENT_MODE = false;
 
-    /** Holds the transactional mode to use for the test. */
+    /**
+     * Holds the transactional mode to use for the test.
+     */
     public static final boolean DEFAULT_TRANSACTED = false;
 
-    /** Holds the default broker url for the test. */
+    /**
+     * Holds the default broker url for the test.
+     */
     public static final String DEFAULT_BROKER = "tcp://localhost:5672";
 
-    /** Holds the default virtual path for the test. */
+    /**
+     * Holds the default virtual path for the test.
+     */
     public static final String DEFAULT_VIRTUAL_PATH = "test";
 
-    /** Holds the pub/sub mode default, true means ping a topic, false means 
ping a queue. */
+    /**
+     * Holds the pub/sub mode default, true means ping a topic, false means 
ping a queue.
+     */
     public static final boolean DEFAULT_PUBSUB = false;
 
-    /** Holds the default broker log on username. */
+    /**
+     * Holds the default broker log on username.
+     */
     public static final String DEFAULT_USERNAME = "guest";
 
-    /** Holds the default broker log on password. */
+    /**
+     * Holds the default broker log on password.
+     */
     public static final String DEFAULT_PASSWORD = "guest";
 
-    /** Holds the default message selector. */
+    /**
+     * Holds the default message selector.
+     */
     public static final String DEFAULT_SELECTOR = null;
 
-    /** Holds the default failover after commit test flag. */
+    /**
+     * Holds the default failover after commit test flag.
+     */
     public static final String DEFAULT_FAIL_AFTER_COMMIT = "false";
 
-    /** Holds the default failover before commit test flag. */
+    /**
+     * Holds the default failover before commit test flag.
+     */
     public static final String DEFAULT_FAIL_BEFORE_COMMIT = "false";
 
-    /** Holds the default failover after send test flag. */
+    /**
+     * Holds the default failover after send test flag.
+     */
     public static final String DEFAULT_FAIL_AFTER_SEND = "false";
 
-    /** Holds the default failover before send test flag. */
+    /**
+     * Holds the default failover before send test flag.
+     */
     public static final String DEFAULT_FAIL_BEFORE_SEND = "false";
 
-    /** Holds the default failover only once flag, true means only do one 
failover, false means failover on every commit cycle. */
+    /**
+     * Holds the default failover only once flag, true means only do one 
failover, false means failover on every commit cycle.
+     */
     public static final String DEFAULT_FAIL_ONCE = "true";
 
-    /** Holds the default verbose mode. */
+    /**
+     * Holds the default verbose mode.
+     */
     public static final boolean DEFAULT_VERBOSE = false;
 
     public static final boolean DEFAULT_UNIQUE = true;
 
-    /** Holds the name of the property to store nanosecond timestamps in ping 
messages with. */
+    /**
+     * Holds the name of the property to store nanosecond timestamps in ping 
messages with.
+     */
     public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp";
 
-    /** A source for providing sequential unique correlation ids. These will 
be unique within the same JVM. */
+    /**
+     * A source for providing sequential unique correlation ids. These will be 
unique within the same JVM.
+     */
     private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
 
-    /** A source for providing unique ids to PingPongProducer. */
-    private static AtomicInteger _pingProducerIdGenerator;
-
     /**
      * Holds a map from message ids to latches on which threads wait for 
replies. This map is shared accross
      * multiple ping producers on the same JVM.
@@ -238,9 +301,11 @@
     /*private static Map<String, CountDownLatch> trafficLights =
         Collections.synchronizedMap(new HashMap<String, CountDownLatch>());*/
     private static Map<String, PerCorrelationId> perCorrelationIds =
-        Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+            Collections.synchronizedMap(new HashMap<String, 
PerCorrelationId>());
 
-    /** A convenient formatter to use when time stamping output. */
+    /**
+     * A convenient formatter to use when time stamping output.
+     */
     protected static final DateFormat timestampFormatter = new 
SimpleDateFormat("hh:mm:ss:SS");
 
     /**
@@ -249,70 +314,119 @@
      */
     protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger();
 
-    /** Destination where the response messages will arrive. */
+    /**
+     * Destination where the response messages will arrive.
+     */
     private Destination _replyDestination;
 
-    /** Determines whether this producer sends persistent messages. */
+    /**
+     * Determines whether this producer sends persistent messages.
+     */
     protected boolean _persistent;
 
-    /** Determines what size of messages this producer sends. */
+    /**
+     * Determines what size of messages this producer sends.
+     */
     protected int _messageSize;
 
-    /** Used to indicate that the ping loop should print out whenever it 
pings. */
+    /**
+     * Used to indicate that the ping loop should print out whenever it pings.
+     */
     protected boolean _verbose = false;
 
-    /** Holds the session on which ping replies are received. */
+    /**
+     * Holds the session on which ping replies are received.
+     */
     protected Session _consumerSession;
 
-    /** Used to restrict the sending rate to a specified limit. */
+    /**
+     * Used to restrict the sending rate to a specified limit.
+     */
     private Throttle _rateLimiter = null;
 
-    /** Holds a message listener that this message listener chains all its 
messages to. */
+    /**
+     * Holds a message listener that this message listener chains all its 
messages to.
+     */
     private ChainedMessageListener _chainedMessageListener = null;
 
-    /** Flag used to indicate if this is a point to point or pub/sub ping 
client. */
+    /**
+     * Flag used to indicate if this is a point to point or pub/sub ping 
client.
+     */
     protected boolean _isPubSub = false;
 
     /**
+     * Flag used to indicate if the destinations should be unique client.
+     */
+    protected static boolean _isUnique = false;
+
+    /**
      * This id generator is used to generates ids that are only unique within 
this pinger. Creating multiple pingers
      * on the same JVM using this id generator will allow them to ping on the 
same queues.
      */
     protected AtomicInteger _queueSharedId = new AtomicInteger();
 
-    /** Used to tell the ping loop when to terminate, it only runs while this 
is true. */
+    /**
+     * Used to tell the ping loop when to terminate, it only runs while this 
is true.
+     */
     protected boolean _publish = true;
 
-    /** Holds the connection to the broker. */
+    /**
+     * Holds the connection to the broker.
+     */
     private Connection _connection;
 
-    /** Holds the producer session, needed to create ping messages. */
+    /**
+     * Holds the producer session, needed to create ping messages.
+     */
     private Session _producerSession;
 
-    /** Holds the set of destiniations that this ping producer pings. */
+    /**
+     * Holds the set of destiniations that this ping producer pings.
+     */
     protected List<Destination> _pingDestinations = new 
ArrayList<Destination>();
 
-    /** Holds the message producer to send the pings through. */
+    /**
+     * Holds the message producer to send the pings through.
+     */
     protected MessageProducer _producer;
 
-    /** Flag used to indicate that the user should be prompted to terminate a 
broker, to test failover before a commit. */
+    /**
+     * Flag used to indicate that the user should be prompted to terminate a 
broker, to test failover before a commit.
+     */
     protected boolean _failBeforeCommit = false;
 
-    /** Flag used to indicate that the user should be prompted to terminate a 
broker, to test failover after a commit. */
+    /**
+     * Flag used to indicate that the user should be prompted to terminate a 
broker, to test failover after a commit.
+     */
     protected boolean _failAfterCommit = false;
 
-    /** Flag used to indicate that the user should be prompted to terminate a 
broker, to test failover before a send. */
+    /**
+     * Flag used to indicate that the user should be prompted to terminate a 
broker, to test failover before a send.
+     */
     protected boolean _failBeforeSend = false;
 
-    /** Flag used to indicate that the user should be prompted to terminate a 
broker, to test failover after a send. */
+    /**
+     * Flag used to indicate that the user should be prompted to terminate a 
broker, to test failover after a send.
+     */
     protected boolean _failAfterSend = false;
 
-    /** Flag used to indicate that failover prompting should only be done on 
the first commit, not on every commit. */
+    /**
+     * Flag used to indicate that failover prompting should only be done on 
the first commit, not on every commit.
+     */
     protected boolean _failOnce = true;
 
-    /** Holds the number of sends that should be performed in every 
transaction when using transactions. */
+    /**
+     * Holds the number of sends that should be performed in every transaction 
when using transactions.
+     */
     protected int _txBatchSize = 1;
 
     /**
+     * Holds the number of consumers that will be attached to each topic.
+     * Each pings will result in a reply from each of the attached clients
+     */
+    static int _consumersPerTopic = 1;
+
+    /**
      * Creates a ping producer with the specified parameters, of which there 
are many. See their individual comments
      * for details. This constructor creates a connection to the broker and 
creates producer and consumer sessions on it,
      * to send and recieve its pings and replies on. The other options are 
kept, and control how this pinger behaves.
@@ -339,7 +453,6 @@
      *                         possible, with no rate restriction.
      * @param pubsub           True to ping topics, false to ping queues.
      * @param unique           True to use unique destinations for each ping 
pong producer, false to share.
-     *
      * @throws Exception Any exceptions are allowed to fall through.
      */
     public PingPongProducer(String brokerDetails, String username, String 
password, String virtualpath,
@@ -369,6 +482,7 @@
         _failOnce = failOnce;
         _txBatchSize = txBatchSize;
         _isPubSub = pubsub;
+        _isUnique = unique;
 
         // Check that one or more destinations were specified.
         if (noOfDestinations < 1)
@@ -407,6 +521,7 @@
      * to be started to bounce the pings back again.
      *
      * @param args The command line arguments.
+     * @throws Exception When something went wrong with the test
      */
     public static void main(String[] args) throws Exception
     {
@@ -479,9 +594,9 @@
 
         // Create a ping producer to handle the request/wait/reply cycle.
         PingPongProducer pingProducer =
-            new PingPongProducer(brokerDetails, DEFAULT_USERNAME, 
DEFAULT_PASSWORD, virtualpath, destName, selector,
-                                 transacted, persistent, messageSize, verbose, 
afterCommit, beforeCommit, afterSend,
-                                 beforeSend, failOnce, batchSize, destCount, 
rate, pubsub, false);
+                new PingPongProducer(brokerDetails, DEFAULT_USERNAME, 
DEFAULT_PASSWORD, virtualpath, destName, selector,
+                                     transacted, persistent, messageSize, 
verbose, afterCommit, beforeCommit, afterSend,
+                                     beforeSend, failOnce, batchSize, 
destCount, rate, pubsub, false);
 
         pingProducer.getConnection().start();
 
@@ -511,7 +626,9 @@
                 Thread.sleep(sleepTime);
             }
             catch (InterruptedException ie)
-            { }
+            {
+                //ignore
+            }
         }
     }
 
@@ -555,11 +672,10 @@
      * @param rootName         The root of the name, or actual name if only 
one is being created.
      * @param unique           <tt>true</tt> to make the destinations unique 
to this pinger, <tt>false</tt> to share
      *                         the numbering with all pingers on the same JVM.
-     *
      * @throws JMSException Any JMSExceptions are allowed to fall through.
      */
     public void createPingDestinations(int noOfDestinations, String selector, 
String rootName, boolean unique)
-                                throws JMSException
+            throws JMSException
     {
         _logger.debug("public void createPingDestinations(int noOfDestinations 
= " + noOfDestinations
                       + ", String selector = " + selector + ", String rootName 
= " + rootName + ", boolean unique = "
@@ -568,7 +684,7 @@
         // Create the desired number of ping destinations and consumers for 
them.
         for (int i = 0; i < noOfDestinations; i++)
         {
-            AMQDestination destination = null;
+            AMQDestination destination;
 
             int id;
 
@@ -701,11 +817,10 @@
      * @param message  The message to send.
      * @param numPings The number of ping messages to send.
      * @param timeout  The timeout in milliseconds.
-     *
      * @return The number of replies received. This may be less than the 
number sent if the timeout terminated the
      *         wait for all prematurely.
-     *
-     * @throws JMSException All underlying JMSExceptions are allowed to fall 
through.
+     * @throws JMSException         All underlying JMSExceptions are allowed 
to fall through.
+     * @throws InterruptedException When interrupted by a timeout.
      */
     public int pingAndWaitForReply(Message message, int numPings, long 
timeout) throws JMSException, InterruptedException
     {
@@ -727,14 +842,13 @@
      * @param numPings             The number of ping messages to send.
      * @param timeout              The timeout in milliseconds.
      * @param messageCorrelationId The message correlation id.
-     *
      * @return The number of replies received. This may be less than the 
number sent if the timeout terminated the
      *         wait for all prematurely.
-     *
-     * @throws JMSException All underlying JMSExceptions are allowed to fall 
through.
+     * @throws JMSException         All underlying JMSExceptions are allowed 
to fall through.
+     * @throws InterruptedException When interrupted by a timeout
      */
     public int pingAndWaitForReply(Message message, int numPings, long 
timeout, String messageCorrelationId)
-                            throws JMSException, InterruptedException
+            throws JMSException, InterruptedException
     {
         _logger.debug("public int pingAndWaitForReply(Message message, int 
numPings = " + numPings + ", long timeout = "
                       + timeout + ", String messageCorrelationId = " + 
messageCorrelationId + "): called");
@@ -747,7 +861,8 @@
             // chained message listener must be called before this sender can 
be unblocked, but that decrementing the
             // countdown needs to be done before the chained listener can be 
called.
             PerCorrelationId perCorrelationId = new PerCorrelationId();
-            perCorrelationId.trafficLight = new CountDownLatch(numPings + 1);
+
+            perCorrelationId.trafficLight = new 
CountDownLatch(getExpectedNumPings(numPings) + 1);
             perCorrelationIds.put(messageCorrelationId, perCorrelationId);
 
             // Set up the current time as the start time for pinging on the 
correlation id. This is used to determine
@@ -767,11 +882,12 @@
                 perCorrelationId.trafficLight.await(timeout, 
TimeUnit.MILLISECONDS);
 
                 // Work out how many replies were receieved.
-                numReplies = numPings - (int) 
perCorrelationId.trafficLight.getCount();
-                allMessagesReceived = numReplies >= numPings;
+                numReplies = getExpectedNumPings(numPings) - (int) 
perCorrelationId.trafficLight.getCount();
+
+                allMessagesReceived = numReplies == 
getExpectedNumPings(numPings);
 
-                _logger.debug("numReplies = "+ numReplies);
-                _logger.debug("allMessagesReceived = "+ allMessagesReceived);
+                _logger.debug("numReplies = " + numReplies);
+                _logger.debug("allMessagesReceived = " + allMessagesReceived);
 
                 // Recheck the timeout condition.
                 long now = System.nanoTime();
@@ -783,7 +899,7 @@
             }
             while (!timedOut && !allMessagesReceived);
 
-            if ((numReplies < numPings) && _verbose)
+            if ((numReplies < getExpectedNumPings(numPings)) && _verbose)
             {
                 _logger.info("Timed out (" + timeout + " ms) before all 
replies received on id, " + messageCorrelationId);
             }
@@ -812,7 +928,6 @@
      * @param message              The message to send.
      * @param numPings             The number of pings to send.
      * @param messageCorrelationId A correlation id to place on all messages 
sent.
-     *
      * @throws JMSException All underlying JMSExceptions are allowed to fall 
through.
      */
     public void pingNoWaitForReply(Message message, int numPings, String 
messageCorrelationId) throws JMSException
@@ -927,9 +1042,7 @@
      * @param replyQueue  The reply-to destination for the message.
      * @param messageSize The desired size of the message in bytes.
      * @param persistent  <tt>true</tt> if the message should use persistent 
delivery, <tt>false</tt> otherwise.
-     *
      * @return A freshly generated test message.
-     *
      * @throws javax.jms.JMSException All underlying JMSException are allowed 
to fall through.
      */
     public ObjectMessage getTestMessage(Destination replyQueue, int 
messageSize, boolean persistent) throws JMSException
@@ -984,12 +1097,12 @@
     public Thread getShutdownHook()
     {
         return new Thread(new Runnable()
+        {
+            public void run()
             {
-                public void run()
-                {
-                    stop();
-                }
-            });
+                stop();
+            }
+        });
     }
 
     /**
@@ -1007,7 +1120,6 @@
      *
      * @param destinations The destinations to listen to.
      * @param selector     A selector to filter the messages with.
-     *
      * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall 
through.
      */
     public void createReplyConsumers(Collection<Destination> destinations, 
String selector) throws JMSException
@@ -1019,8 +1131,8 @@
         {
             // Create a consumer for the destination and set this pinger to 
listen to its messages.
             MessageConsumer consumer =
-                _consumerSession.createConsumer(destination, DEFAULT_PREFETCH, 
DEFAULT_NO_LOCAL, DEFAULT_EXCLUSIVE,
-                                                selector);
+                    _consumerSession.createConsumer(destination, 
DEFAULT_PREFETCH, DEFAULT_NO_LOCAL, DEFAULT_EXCLUSIVE,
+                                                    selector);
             consumer.setMessageListener(this);
         }
     }
@@ -1043,19 +1155,20 @@
     /**
      * Convenience method to commit the transaction on the specified session. 
If the session to commit on is not
      * a transactional session, this method does nothing (unless the failover 
after send flag is set).
-     *
+     * <p/>
      * <p/>If the [EMAIL PROTECTED] #_failAfterSend} flag is set, this will 
prompt the user to kill the broker before the commit
      * is applied. This flag applies whether the pinger is transactional or 
not.
-     *
+     * <p/>
      * <p/>If the [EMAIL PROTECTED] #_failBeforeCommit} flag is set, this will 
prompt the user to kill the broker before the
      * commit is applied. If the [EMAIL PROTECTED] #_failAfterCommit} flag is 
set, this will prompt the user to kill the broker
      * after the commit is applied. These flags will only apply if using a 
transactional pinger.
      *
+     * @param session The session to commit
      * @throws javax.jms.JMSException If the commit fails and then the 
rollback fails.
-     *
-     * @todo Consider moving the fail after send logic into the send method. 
It is confusing to have it in this commit
-     *       method, because commits only apply to transactional pingers, but 
fail after send applied to transactional
-     *       and non-transactional alike.
+     *                                <p/>
+     *                                //todo @todo Consider moving the fail 
after send logic into the send method. It is confusing to have it in this commit
+     *                                method, because commits only apply to 
transactional pingers, but fail after send applied to transactional
+     *                                and non-transactional alike.
      */
     protected void commitTx(Session session) throws JMSException
     {
@@ -1136,7 +1249,6 @@
      *
      * @param destination The destination to send to.
      * @param message     The message to send.
-     *
      * @throws javax.jms.JMSException All underlying JMSExceptions are allowed 
to fall through.
      */
     protected void sendMessage(Destination destination, Message message) 
throws JMSException
@@ -1174,17 +1286,35 @@
             System.in.read();
         }
         catch (IOException e)
-        { }
+        {
+            //ignore
+        }
 
         System.out.println("Continuing.");
     }
 
     /**
+     * This value will be changed by PingClient to represent the number of 
clients connected to each topic.
+     *
+     * @return int The number of consumers subscribing to each topic.
+     */
+    public int getConsumersPerTopic()
+    {
+        return _consumersPerTopic;
+    }
+
+    public int getExpectedNumPings(int numpings)
+    {
+        return numpings * getConsumersPerTopic();
+    }
+
+
+    /**
      * Defines a chained message listener interface that can be attached to 
this pinger. Whenever this pinger's
      * [EMAIL PROTECTED] PingPongProducer#onMessage} method is called, the 
chained listener set through the
      * [EMAIL PROTECTED] PingPongProducer#setChainedMessageListener} method is 
passed the message, and the remaining expected
      * count of messages with that correlation id.
-     *
+     * <p/>
      * Provided only one pinger is producing messages with that correlation 
id, the chained listener will always be
      * given unique message counts. It will always be called while the 
producer waiting for all messages to arrive is
      * still blocked.
@@ -1200,10 +1330,14 @@
      */
     protected static class PerCorrelationId
     {
-        /** Holds a countdown on number of expected messages. */
+        /**
+         * Holds a countdown on number of expected messages.
+         */
         CountDownLatch trafficLight;
 
-        /** Holds the last timestamp that the timeout was reset to. */
+        /**
+         * Holds the last timestamp that the timeout was reset to.
+         */
         Long timeOutStart;
     }
 }

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java?view=diff&rev=503609&r1=503608&r2=503609
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
 Mon Feb  5 01:49:59 2007
@@ -70,7 +70,7 @@
 
     /** Holds test specifics by correlation id. This consists of the expected 
number of messages and the timing controler. */
     private Map<String, PerCorrelationId> perCorrelationIds =
-        Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+            Collections.synchronizedMap(new HashMap<String, 
PerCorrelationId>());
 
     /** Holds the batched results listener, that does logging on batch 
boundaries. */
     private BatchedResultsListener batchedResultsListener = null;
@@ -91,6 +91,7 @@
 
     /**
      * Compile all the tests into a test suite.
+     * @return The test suite to run. Should only contain testAsyncPingOk 
method. 
      */
     public static Test suite()
     {
@@ -128,6 +129,7 @@
      * all replies have been received or a time out occurs before exiting this 
method.
      *
      * @param numPings The number of pings to send.
+     * @throws Exception pass all errors out to the test harness  
      */
     public void testAsyncPingOk(int numPings) throws Exception
     {
@@ -151,7 +153,7 @@
         PerCorrelationId perCorrelationId = new PerCorrelationId();
         TimingController tc = 
getTimingController().getControllerForCurrentThread();
         perCorrelationId._tc = tc;
-        perCorrelationId._expectedCount = numPings;
+        perCorrelationId._expectedCount = 
pingClient.getExpectedNumPings(numPings);
         perCorrelationIds.put(messageCorrelationId, perCorrelationId);
 
         // Attach the chained message listener to the ping producer to listen 
asynchronously for the replies to these
@@ -160,18 +162,18 @@
 
         // Generate a sample message of the specified size.
         ObjectMessage msg =
-            
pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
-                                      
testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
-                                      
testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
+                
pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+                                          
testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+                                          
testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
 
         // Send the requested number of messages, and wait until they have all 
been received.
         long timeout = 
Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
         int numReplies = pingClient.pingAndWaitForReply(msg, numPings, 
timeout, messageCorrelationId);
 
         // Check that all the replies were received and log a fail if they 
were not.
-        if (numReplies < numPings)
+        if (numReplies < perCorrelationId._expectedCount)
         {
-            tc.completeTest(false, numPings - numReplies);
+            tc.completeTest(false, numPings - perCorrelationId._expectedCount);
         }
 
         // Remove the chained message listener from the ping producer.

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java?view=diff&rev=503609&r1=503608&r2=503609
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
 Mon Feb  5 01:49:59 2007
@@ -110,6 +110,7 @@
 
     /**
      * Compile all the tests into a test suite.
+     * @return The test method testPingOk.
      */
     public static Test suite()
     {
@@ -139,18 +140,18 @@
 
         // Generate a sample message. This message is already time stamped and 
has its reply-to destination set.
         ObjectMessage msg =
-            
perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
-                                                      
testParameters.getPropertyAsInteger(
-                                                          
PingPongProducer.MESSAGE_SIZE_PROPNAME),
-                                                      
testParameters.getPropertyAsBoolean(
-                                                          
PingPongProducer.PERSISTENT_MODE_PROPNAME));
+                
perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+                                                          
testParameters.getPropertyAsInteger(
+                                                                  
PingPongProducer.MESSAGE_SIZE_PROPNAME),
+                                                          
testParameters.getPropertyAsBoolean(
+                                                                  
PingPongProducer.PERSISTENT_MODE_PROPNAME));
 
         // start the test
         long timeout = 
Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
         int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, 
numPings, timeout);
 
         // Fail the test if the timeout was exceeded.
-        if (numReplies != numPings)
+        if (numReplies != 
perThreadSetup._pingClient.getExpectedNumPings(numPings))
         {
             Assert.fail("The ping timed out after " + timeout + " ms. Messages 
Sent = " + numPings + ", MessagesReceived = "
                         + numReplies);
@@ -191,7 +192,7 @@
 
             // Extract the test set up paramaeters.
             int destinationscount =
-                
Integer.parseInt(testParameters.getProperty(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME));
+                    
Integer.parseInt(testParameters.getProperty(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME));
 
             // This is synchronized because there is a race condition, which 
causes one connection to sleep if
             // all threads try to create connection concurrently.


Reply via email to