Author: bhupendrab
Date: Tue Jan 23 06:41:33 2007
New Revision: 499036

URL: http://svn.apache.org/viewvc?view=rev&rev=499036
Log:
updated the test for testing with multiple threads

Modified:
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
    
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java?view=diff&rev=499036&r1=499035&r2=499036
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
 Tue Jan 23 06:41:33 2007
@@ -36,9 +36,10 @@
     private static final Logger _logger = 
Logger.getLogger(TestPingItself.class);
 
     /**
-     * This creates a client for pinging to a Queue. There will be one 
producer and one consumer instance. Consumer
-     * listening to the same Queue, producer is sending to
-     *
+     * If queueCount is <= 1 : There will be one Queue and one consumer 
instance for the test
+     * If queueCount is > 1 : This creats a client for tests with multiple 
queues. Creates as many consumer instances
+     * as there are queues, each listening to a Queue. A producer is created 
which picks up a queue from
+     * the list of queues to send message
      * @param brokerDetails
      * @param username
      * @param password
@@ -53,57 +54,31 @@
      * @param beforeCommit
      * @param afterSend
      * @param beforeSend
-     * @param batchSize
-     * @throws Exception
-     */
-    public TestPingItself(String brokerDetails, String username, String 
password, String virtualpath, String queueName,
-                          String selector, boolean transacted, boolean 
persistent, int messageSize, boolean verbose,
-                          boolean afterCommit, boolean beforeCommit, boolean 
afterSend, boolean beforeSend, boolean failOnce,
-                          int batchSize)
-            throws Exception
-    {
-        super(brokerDetails, username, password, virtualpath, queueName, 
selector, transacted, persistent, messageSize,
-              verbose, afterCommit, beforeCommit, afterSend, beforeSend, 
failOnce, batchSize, 0);
-    }
-
-    /**
-     * This creats a client for tests with multiple queues. Creates as many 
consumer instances as there are queues,
-     * each listening to a Queue. A producer is created which picks up a queue 
from the list of queues to send message.
-     *
-     * @param brokerDetails
-     * @param username
-     * @param password
-     * @param virtualpath
-     * @param selector
-     * @param transacted
-     * @param persistent
-     * @param messageSize
-     * @param verbose
-     * @param afterCommit
-     * @param beforeCommit
-     * @param afterSend
-     * @param beforeSend
+     * @param failOnce
      * @param batchSize
      * @param queueCount
      * @throws Exception
      */
-    public TestPingItself(String brokerDetails, String username, String 
password, String virtualpath,
+    public TestPingItself(String brokerDetails, String username, String 
password, String virtualpath, String queueName,
                           String selector, boolean transacted, boolean 
persistent, int messageSize, boolean verbose,
                           boolean afterCommit, boolean beforeCommit, boolean 
afterSend, boolean beforeSend, boolean failOnce,
                           int batchSize, int queueCount)
             throws Exception
     {
-        super(brokerDetails, username, password, virtualpath, null, null, 
transacted, persistent, messageSize,
+        super(brokerDetails, username, password, virtualpath, queueName, 
selector, transacted, persistent, messageSize,
               verbose, afterCommit, beforeCommit, afterSend, beforeSend, 
failOnce, batchSize, queueCount);
 
-        createQueues(queueCount);
+        if (queueCount > 1)
+        {
+            createQueues(queueCount);
 
-        _persistent = persistent;
-        _messageSize = messageSize;
-        _verbose = verbose;
+            _persistent = persistent;
+            _messageSize = messageSize;
+            _verbose = verbose;
 
-        createConsumers(selector);
-        createProducer();
+            createConsumers(selector);
+            createProducer();
+        }
     }
 
     /**
@@ -136,7 +111,7 @@
         boolean persistent = config.usePersistentMessages();
         int messageSize = config.getPayload() != 0 ? config.getPayload() : 
DEFAULT_MESSAGE_SIZE;
         int messageCount = config.getMessages();
-        int queueCount = config.getQueueCount();
+        int queueCount = config.getQueueCount() != 0 ? config.getQueueCount() 
: 1;
         int batchSize = config.getBatchSize() != 0 ? config.getBatchSize() : 
BATCH_SIZE;
 
         String queue = "ping_" + System.currentTimeMillis();
@@ -182,22 +157,11 @@
             }
         }
 
-        TestPingItself pingItself = null;
         // Create a ping producer to handle the request/wait/reply cycle.
-        if (queueCount > 1)
-        {
-            pingItself = new TestPingItself(brokerDetails, "guest", "guest", 
virtualpath, null,
-                                            transacted, persistent, 
messageSize, verbose,
-                                            afterCommit, beforeCommit, 
afterSend, beforeSend, failOnce,
-                                            batchSize, queueCount);
-        }
-        else
-        {
-            pingItself = new TestPingItself(brokerDetails, "guest", "guest", 
virtualpath, queue, null,
-                                            transacted, persistent, 
messageSize, verbose,
-                                            afterCommit, beforeCommit, 
afterSend, beforeSend, failOnce,
-                                            batchSize);
-        }
+        TestPingItself pingItself = new TestPingItself(brokerDetails, "guest", 
"guest", virtualpath, queue, null,
+                                                    transacted, persistent, 
messageSize, verbose,
+                                                    afterCommit, beforeCommit, 
afterSend, beforeSend, failOnce,
+                                                    batchSize, queueCount);
 
         pingItself.getConnection().start();
 

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=499036&r1=499035&r2=499036
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 Tue Jan 23 06:41:33 2007
@@ -93,10 +93,6 @@
      */
     protected static final int BATCH_SIZE = 100;
 
-    /**
-     * Keeps track of the ping producer instance used in the run loop.
-     */
-    private static PingPongProducer _pingProducer;
     protected static final int PREFETCH = 100;
     protected static final boolean NO_LOCAL = true;
     protected static final boolean EXCLUSIVE = false;
@@ -109,7 +105,7 @@
     /**
      * A source for providing sequential unique correlation ids.
      */
-    private AtomicLong idGenerator = new AtomicLong(0L);
+    private static AtomicLong idGenerator = new AtomicLong(0L);
 
     /**
      * Holds the queue to send the ping replies to.
@@ -134,7 +130,7 @@
     /**
      * Holds a map from message ids to latches on which threads wait for 
replies.
      */
-    private Map<String, CountDownLatch> trafficLights = new HashMap<String, 
CountDownLatch>();
+    private static Map<String, CountDownLatch> trafficLights = new 
HashMap<String, CountDownLatch>();
 
     /**
      * Used to indicate that the ping loop should print out whenever it pings.
@@ -192,21 +188,21 @@
         this(brokerDetails, username, password, virtualpath, transacted, 
persistent, messageSize, verbose,
              afterCommit, beforeCommit, afterSend, beforeSend, failOnce, 
batchSize);
 
-        if (queueName != null)
-        {
-            _pingQueue = new AMQQueue(queueName);
-            // Create producer and the consumer
-            createProducer();
-            createConsumer(selector);
-        }
-        else if (queueCount > 0)
-        {
-            _queueCount = queueCount;
-        }
-        else
+        _queueCount = queueCount;
+        if (queueCount <= 1)
         {
-            _logger.error("Queue Count is zero and no queueName specified. One 
must be set.");
-            throw new IllegalArgumentException("Queue Count is zero and no 
queueName specified. One must be set.");
+            if (queueName != null)
+            {
+                _pingQueue = new AMQQueue(queueName);
+                // Create producer and the consumer
+                createProducer();
+                createConsumer(selector);
+            }
+            else
+            {
+                _logger.error("Queue Name is not specified");
+                throw new IllegalArgumentException("Queue Name is not 
specified");
+            }
         }
     }
 
@@ -351,23 +347,23 @@
         }
 
         // Create a ping producer to handle the request/wait/reply cycle.
-        _pingProducer = new PingPongProducer(brokerDetails, "guest", "guest", 
virtualpath, PING_QUEUE_NAME, null, transacted,
+        PingPongProducer pingProducer = new PingPongProducer(brokerDetails, 
"guest", "guest", virtualpath, PING_QUEUE_NAME, null, transacted,
                                              persistent, messageSize, verbose,
                                              afterCommit, beforeCommit, 
afterSend, beforeSend, failOnce,
                                              batchSize, 0);
 
-        _pingProducer.getConnection().start();
+        pingProducer.getConnection().start();
 
         // Run a few priming pings to remove warm up time from test results.
-        _pingProducer.prime(PRIMING_LOOPS);
+        pingProducer.prime(PRIMING_LOOPS);
         // Create a shutdown hook to terminate the ping-pong producer.
-        Runtime.getRuntime().addShutdownHook(_pingProducer.getShutdownHook());
+        Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
 
         // Ensure that the ping pong producer is registered to listen for 
exceptions on the connection too.
-        _pingProducer.getConnection().setExceptionListener(_pingProducer);
+        pingProducer.getConnection().setExceptionListener(pingProducer);
 
         // Create the ping loop thread and run it until it is terminated by 
the shutdown hook or exception.
-        Thread pingThread = new Thread(_pingProducer);
+        Thread pingThread = new Thread(pingProducer);
         pingThread.run();
         pingThread.join();
     }
@@ -502,7 +498,7 @@
 
         if ((numReplies < numPings) && _verbose)
         {
-            _logger.info("Timed out before all replies received on id, " + 
messageCorrelationId);
+            _logger.info("Timed out (" + timeout  + " ms) before all replies 
received on id, " + messageCorrelationId);
         }
         else if (_verbose)
         {

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java?view=diff&rev=499036&r1=499035&r2=499036
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
 Tue Jan 23 06:41:33 2007
@@ -79,6 +79,8 @@
      */
     private static final String TIMEOUT_PROPNAME = "timeout";
 
+    private static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
+
     /**
      * Holds the size of message body to attach to the ping messages.
      */
@@ -158,6 +160,7 @@
         setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
         setSystemPropertyIfNull(TIMEOUT_PROPNAME, 
Long.toString(TIMEOUT_DEFAULT));
         setSystemPropertyIfNull(PING_QUEUE_COUNT_PROPNAME, 
Integer.toString(1));
+        setSystemPropertyIfNull(VERBOSE_OUTPUT_PROPNAME, 
Boolean.toString(false));
     }
 
     /**
@@ -192,7 +195,6 @@
     {
         // Get the per thread test setup to run the test through.
         PerThreadSetup perThreadSetup = threadSetup.get();
-
         if (numPings == 0)
         {
             _logger.error("Number of pings requested was zero.");
@@ -210,12 +212,10 @@
         long timeout = 
Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME));
         int numReplies = 
perThreadSetup._pingItselfClient.pingAndWaitForReply(msg, numPings, timeout);
 
-        _logger.info("replies = " + numReplies);
-
         // Fail the test if the timeout was exceeded.
         if (numReplies != numPings)
         {
-            Assert.fail("The ping timed out. Messages Sent = " + numPings + ", 
MessagesReceived = " + numReplies);
+            Assert.fail("The ping timed out after "+ timeout  + " ms. Messages 
Sent = " + numPings + ", MessagesReceived = " + numReplies);
         }
     }
 
@@ -226,6 +226,7 @@
         //NDC.push(getName());
 
         // Create the test setups on a per thread basis, only if they have not 
already been created.
+
         if (threadSetup.get() == null)
         {
             PerThreadSetup perThreadSetup = new PerThreadSetup();
@@ -240,7 +241,7 @@
             boolean persistent = 
Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
             boolean transacted = 
Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
             String selector = null;
-            boolean verbose = false;
+            boolean verbose = 
Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
             int messageSize = 
Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
 
             boolean afterCommit = 
Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
@@ -251,26 +252,17 @@
 
             int batchSize = 
Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
 
-            // Establish a client to ping a Queue and listen the reply back 
from same Queue
-            if (queueCount > 1)
-            {
-                // test client with multiple queues
-                perThreadSetup._pingItselfClient = new 
TestPingItself(brokerDetails, username, password, virtualpath,
-                                                                      
selector, transacted, persistent,
-                                                                      
messageSize, verbose,
-                                                                      
afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
-                                                                      
batchSize, queueCount);
-            }
-            else
+            // This is synchronized because there is a race condition, which 
causes one connection to sleep if
+            // all threads try to create connection concurrently
+            synchronized(this)
             {
                 // Establish a client to ping a Queue and listen the reply 
back from same Queue
                 perThreadSetup._pingItselfClient = new 
TestPingItself(brokerDetails, username, password, virtualpath,
                                                                       
queueName, selector, transacted, persistent,
                                                                       
messageSize, verbose,
                                                                       
afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
-                                                                      
batchSize);
+                                                                      
batchSize, queueCount);
             }
-
             // Start the client connection
             perThreadSetup._pingItselfClient.getConnection().start();
 


Reply via email to