Author: arnaudsimon
Date: Thu Aug 23 05:10:42 2007
New Revision: 568952

URL: http://svn.apache.org/viewvc?rev=568952&view=rev
Log:
updated from M2 branch

Added:
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
   (with props)
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
   (with props)
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
   (with props)
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java
   (with props)
Removed:
    incubator/qpid/trunk/qpid/java/perftests/src/test/
Modified:
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java?rev=568952&r1=568951&r2=568952&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
 Thu Aug 23 05:10:42 2007
@@ -1,18 +1,21 @@
 /*
  *
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  *
  */
 package org.apache.qpid.client.message;

Added: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java?rev=568952&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
 Thu Aug 23 05:10:42 2007
@@ -0,0 +1,292 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ping;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.requestreply.PingPongProducer;
+
+import uk.co.thebadgerset.junit.extensions.TimingController;
+import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * PingAsyncTestPerf is a performance test that outputs multiple timings from 
its test method, using the timing controller
+ * interface supplied by the test runner from a seperate listener thread. It 
differs from the [EMAIL PROTECTED] PingTestPerf} test
+ * that it extends because it can output timings as replies are received, 
rather than waiting until all expected replies
+ * are received. This is less 'blocky' than the tests in [EMAIL PROTECTED] 
PingTestPerf}, and provides a truer simulation of sending
+ * and recieving clients working asynchronously.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><td> Responsibilities <th> Collaborations
+ * <tr><td> Send many ping messages and output timings asynchronously on 
batches received.
+ * </table>
+ */
+public class PingAsyncTestPerf extends PingTestPerf implements 
TimingControllerAware
+{
+    private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class);
+
+    /** Holds the name of the property to get the test results logging batch 
size. */
+    public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize";
+
+    /** Holds the default test results logging batch size. */
+    public static final int TEST_RESULTS_BATCH_SIZE_DEFAULT = 1000;
+
+    /** Used to hold the timing controller passed from the test runner. */
+    private TimingController _timingController;
+
+    /** Used to generate unique correlation ids for each test run. */
+    private AtomicLong corellationIdGenerator = new AtomicLong();
+
+    /** Holds test specifics by correlation id. This consists of the expected 
number of messages and the timing controler. */
+    private Map<String, PerCorrelationId> perCorrelationIds =
+        Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+
+    /** Holds the batched results listener, that does logging on batch 
boundaries. */
+    private BatchedResultsListener batchedResultsListener = null;
+
+    /**
+     * Creates a new asynchronous ping performance test with the specified 
name.
+     *
+     * @param name The test name.
+     */
+    public PingAsyncTestPerf(String name)
+    {
+        super(name);
+
+        // Sets up the test parameters with defaults.
+        testParameters.setPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME,
+            Integer.toString(TEST_RESULTS_BATCH_SIZE_DEFAULT));
+    }
+
+    /**
+     * Compile all the tests into a test suite.
+     * @return The test suite to run. Should only contain testAsyncPingOk 
method.
+     */
+    public static Test suite()
+    {
+        // Build a new test suite
+        TestSuite suite = new TestSuite("Ping Performance Tests");
+
+        // Run performance tests in read committed mode.
+        suite.addTest(new PingAsyncTestPerf("testAsyncPingOk"));
+
+        return suite;
+    }
+
+    /**
+     * Accepts a timing controller from the test runner.
+     *
+     * @param timingController The timing controller to register mutliple 
timings with.
+     */
+    public void setTimingController(TimingController timingController)
+    {
+        _timingController = timingController;
+    }
+
+    /**
+     * Gets the timing controller passed in by the test runner.
+     *
+     * @return The timing controller passed in by the test runner.
+     */
+    public TimingController getTimingController()
+    {
+        return _timingController;
+    }
+
+    /**
+     * Sends the specified number of pings, asynchronously outputs timings on 
every batch boundary, and waits until
+     * all replies have been received or a time out occurs before exiting this 
method.
+     *
+     * @param numPings The number of pings to send.
+     * @throws Exception pass all errors out to the test harness
+     */
+    public void testAsyncPingOk(int numPings) throws Exception
+    {
+        // _logger.debug("public void testAsyncPingOk(int numPings): called");
+
+        // Ensure that at least one ping was requeusted.
+        if (numPings == 0)
+        {
+            _logger.error("Number of pings requested was zero.");
+            fail("Number of pings requested was zero.");
+        }
+
+        // Get the per thread test setup to run the test through.
+        PerThreadSetup perThreadSetup = threadSetup.get();
+        PingClient pingClient = perThreadSetup._pingClient;
+
+        // Advance the correlation id of messages to send, to make it unique 
for this run.
+        perThreadSetup._correlationId = 
Long.toString(corellationIdGenerator.incrementAndGet());
+        // String messageCorrelationId = perThreadSetup._correlationId;
+        // _logger.debug("messageCorrelationId = " + messageCorrelationId);
+
+        // Initialize the count and timing controller for the new correlation 
id.
+        PerCorrelationId perCorrelationId = new PerCorrelationId();
+        TimingController tc = 
getTimingController().getControllerForCurrentThread();
+        perCorrelationId._tc = tc;
+        perCorrelationId._expectedCount = 
pingClient.getExpectedNumPings(numPings);
+        perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId);
+
+        // Send the requested number of messages, and wait until they have all 
been received.
+        long timeout = 
Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
+        int numReplies = pingClient.pingAndWaitForReply(null, numPings, 
timeout, perThreadSetup._correlationId);
+
+        // Check that all the replies were received and log a fail if they 
were not.
+        if (numReplies < perCorrelationId._expectedCount)
+        {
+            perCorrelationId._tc.completeTest(false, numPings - 
perCorrelationId._expectedCount);
+        }
+
+        // Remove the expected count and timing controller for the message 
correlation id, to ensure they are cleaned up.
+        perCorrelationIds.remove(perThreadSetup._correlationId);
+    }
+
+    /**
+     * Performs test fixture creation on a per thread basis. This will only be 
called once for each test thread.
+     */
+    public void threadSetUp()
+    {
+        _logger.debug("public void threadSetUp(): called");
+
+        try
+        {
+            // Call the set up method in the super class. This creates a 
PingClient pinger.
+            super.threadSetUp();
+
+            // Create the chained message listener, only if it has not already 
been created.  This is set up with the
+            // batch size property, to tell it what batch size to output 
results on. A synchronized block is used to
+            // ensure that only one thread creates this.
+            synchronized (this)
+            {
+                if (batchedResultsListener == null)
+                {
+                    int batchSize = 
Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME));
+                    batchedResultsListener = new 
BatchedResultsListener(batchSize);
+                }
+            }
+
+            // Get the set up that the super class created.
+            PerThreadSetup perThreadSetup = threadSetup.get();
+
+            // Register the chained message listener on the pinger to do its 
asynchronous test timings from.
+            
perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener);
+        }
+        catch (Exception e)
+        {
+            _logger.warn("There was an exception during per thread setup.", e);
+        }
+    }
+
+    /**
+     * BatchedResultsListener is a [EMAIL PROTECTED] 
PingPongProducer.ChainedMessageListener} that can be attached to the
+     * pinger, in order to receive notifications about every message received 
and the number remaining to be
+     * received. Whenever the number remaining crosses a batch size boundary 
this results listener outputs
+     * a test timing for the actual number of messages received in the current 
batch.
+     */
+    private class BatchedResultsListener implements 
PingPongProducer.ChainedMessageListener
+    {
+        /** The test results logging batch size. */
+        int _batchSize;
+
+        /**
+         * Creates a results listener on the specified batch size.
+         *
+         * @param batchSize The batch size to use.
+         */
+        public BatchedResultsListener(int batchSize)
+        {
+            _batchSize = batchSize;
+        }
+
+        /**
+         * This callback method is called from all of the pingers that this 
test creates. It uses the correlation id
+         * from the message to identify the timing controller for the test 
thread that was responsible for sending those
+         * messages.
+         *
+         * @param message        The message.
+         * @param remainingCount The count of messages remaining to be 
received with a particular correlation id.
+         *
+         * @throws JMSException Any underlying JMSException is allowed to fall 
through.
+         */
+        public void onMessage(Message message, int remainingCount, long 
latency) throws JMSException
+        {
+            // Check if a batch boundary has been crossed.
+            if ((remainingCount % _batchSize) == 0)
+            {
+                // Extract the correlation id from the message.
+                String correlationId = message.getJMSCorrelationID();
+
+                /*_logger.debug("public void onMessage(Message message, int 
remainingCount = " + remainingCount
+                    + "): called on batch boundary for message id: " + 
correlationId + " with thread id: "
+                    + Thread.currentThread().getId());*/
+
+                // Get the details for the correlation id and check that they 
are not null. They can become null
+                // if a test times out.
+                PerCorrelationId perCorrelationId = 
perCorrelationIds.get(correlationId);
+                if (perCorrelationId != null)
+                {
+                    // Get the timing controller and expected count for this 
correlation id.
+                    TimingController tc = perCorrelationId._tc;
+                    int expected = perCorrelationId._expectedCount;
+
+                    // Calculate how many messages were actually received in 
the last batch. This will be the batch size
+                    // except where the number expected is not a multiple of 
the batch size and this is the first remaining
+                    // count to cross a batch size boundary, in which case it 
will be the number expected modulo the batch
+                    // size.
+                    int receivedInBatch = ((expected - remainingCount) < 
_batchSize) ? (expected % _batchSize) : _batchSize;
+
+                    // Register a test result for the correlation id.
+                    try
+                    {
+                        tc.completeTest(true, receivedInBatch);
+                    }
+                    catch (InterruptedException e)
+                    {
+                        // Ignore this. It means the test runner wants to stop 
as soon as possible.
+                        _logger.warn("Got InterruptedException.", e);
+                    }
+                }
+                // Else ignore, test timed out. Should log a fail here?
+            }
+        }
+    }
+
+    /**
+     * Holds state specific to each correlation id, needed to output test 
results. This consists of the count of
+     * the total expected number of messages, and the timing controller for 
the thread sending those message ids.
+     */
+    private static class PerCorrelationId
+    {
+        public int _expectedCount;
+        public TimingController _tc;
+    }
+}

Propchange: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java?rev=568952&r1=568951&r2=568952&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
 Thu Aug 23 05:10:42 2007
@@ -20,12 +20,14 @@
  */
 package org.apache.qpid.ping;
 
-import java.util.List;
-import java.util.Properties;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.requestreply.PingPongProducer;
 
 import javax.jms.Destination;
 
-import org.apache.qpid.requestreply.PingPongProducer;
+import java.util.List;
+import java.util.Properties;
 
 /**
  * PingClient is a [EMAIL PROTECTED] PingPongProducer} that does not need a 
[EMAIL PROTECTED] org.apache.qpid.requestreply.PingPongBouncer}
@@ -36,7 +38,7 @@
  * are created they will all be run in parallel and be active in sending and 
consuming pings at the same time.
  * If the unique destinations flag is not set and a pub/sub ping cycle is 
being run, this means that they will all hear
  * pings sent by each other. The expected number of pings received will 
therefore be multiplied up by the number of
- * active ping clients. The [EMAIL PROTECTED] #getConsumersPerTopic()} method 
is used to supply this multiplier under these
+ * active ping clients. The [EMAIL PROTECTED] #getConsumersPerDestination()} 
method is used to supply this multiplier under these
  * conditions.
  *
  * <p/><table id="crc"><caption>CRC Card</caption>
@@ -47,6 +49,9 @@
  */
 public class PingClient extends PingPongProducer
 {
+    /** Used for debugging. */
+    private final Logger log = Logger.getLogger(PingClient.class);
+
     /** Used to count the number of ping clients created. */
     private static int _pingClientCount;
 
@@ -82,15 +87,21 @@
      *
      * @return The scaling up of the number of expected pub/sub pings.
      */
-    public int getConsumersPerTopic()
+    public int getConsumersPerDestination()
     {
+        log.debug("public int getConsumersPerDestination(): called");
+
         if (_isUnique)
         {
-            return 1;
+            log.debug("1 consumer per destination.");
+
+            return _noOfConsumers;
         }
         else
         {
-            return _pingClientCount;
+            log.debug(_pingClientCount + " consumers per destination.");
+
+            return _pingClientCount * _noOfConsumers;
         }
     }
 }

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java?rev=568952&r1=568951&r2=568952&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
 Thu Aug 23 05:10:42 2007
@@ -20,18 +20,6 @@
  */
 package org.apache.qpid.ping;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.requestreply.PingPongProducer;
@@ -40,6 +28,15 @@
 import uk.co.thebadgerset.junit.extensions.util.MathUtils;
 import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
 
+import javax.jms.*;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * PingDurableClient is a variation of the [EMAIL PROTECTED] PingPongProducer} 
ping tool. Instead of sending its pings and
  * receiving replies to them at the same time, this tool sends pings until it 
is signalled by some 'event' to stop
@@ -167,7 +164,8 @@
         try
         {
             // Create a ping producer overriding its defaults with all options 
passed on the command line.
-            Properties options = CommandLineParser.processCommandLine(args, 
new CommandLineParser(new String[][] {}));
+            Properties options =
+                CommandLineParser.processCommandLine(args, new 
CommandLineParser(new String[][] {}), System.getProperties());
             PingDurableClient pingProducer = new PingDurableClient(options);
 
             // Create a shutdown hook to terminate the ping-pong producer.
@@ -219,7 +217,7 @@
 
         // Establish the connection and the message producer.
         establishConnection(true, false);
-        getConnection().start();
+        _connection.start();
 
         Message message = getTestMessage(getReplyDestinations().get(0), 
_messageSize, _persistent);
 
@@ -329,8 +327,8 @@
         _queueSharedID = new AtomicInteger();
 
         establishConnection(false, true);
-        _consumer.setMessageListener(null);
-        _connection.start();
+        _consumer[0].setMessageListener(null);
+        _consumerConnection[0].start();
 
         // Try to receive all of the pings that were successfully sent.
         int messagesReceived = 0;
@@ -339,7 +337,7 @@
         while (!endCondition)
         {
             // Message received = _consumer.receiveNoWait();
-            Message received = _consumer.receive(TIME_OUT);
+            Message received = _consumer[0].receive(TIME_OUT);
             log.debug("received = " + received);
 
             if (received != null)
@@ -362,11 +360,11 @@
         }
 
         // Ensure messages received are committed.
-        if (_transacted)
+        if (_consTransacted)
         {
             try
             {
-                _consumerSession.commit();
+                _consumerSession[0].commit();
                 System.out.println("Committed for all messages received.");
             }
             catch (JMSException e)
@@ -375,7 +373,7 @@
                 System.out.println("Error during commit.");
                 try
                 {
-                    _consumerSession.rollback();
+                    _consumerSession[0].rollback();
                     System.out.println("Rolled back on all messages 
received.");
                 }
                 catch (JMSException e2)

Added: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java?rev=568952&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
 Thu Aug 23 05:10:42 2007
@@ -0,0 +1,314 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ping;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.AMQMessage;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.requestreply.PingPongProducer;
+
+import uk.co.thebadgerset.junit.extensions.TimingController;
+import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * PingLatencyTestPerf is a performance test that outputs multiple timings 
from its test method, using the timing
+ * controller interface supplied by the test runner from a seperate listener 
thread. It outputs round trip timings for
+ * individual ping messages rather than for how long a complete batch of 
messages took to process. It also differs from
+ * the [EMAIL PROTECTED] PingTestPerf} test that it extends because it can 
output timings as replies are received, rather than
+ * waiting until all expected replies are received.
+ *
+ * <p/>This test does not output timings for every single ping message, as 
when running at high volume, writing the test
+ * log for a vast number of messages would slow the testing down. Instead 
samples ping latency occasionally. The
+ * frequency of ping sampling is set using the [EMAIL PROTECTED] 
#TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the
+ * default of every [EMAIL PROTECTED] #DEFAULT_TEST_RESULTS_BATCH_SIZE}.
+ *
+ * <p/>The size parameter logged for each individual ping is set to the size 
of the batch of messages that the
+ * individual timed ping was taken from, rather than 1 for a single message. 
This is so that the total throughput
+ * (messages / time) can be calculated in order to examine the relationship 
between throughput and latency.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption> <tr><td> Responsibilities 
<th> Collaborations <tr><td> Send many ping
+ * messages and output timings for sampled individual pings. </table>
+ */
+public class PingLatencyTestPerf extends PingTestPerf implements 
TimingControllerAware
+{
+    private static Logger _logger = 
Logger.getLogger(PingLatencyTestPerf.class);
+
+    /** Holds the name of the property to get the test results logging batch 
size. */
+    public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize";
+
+    /** Holds the default test results logging batch size. */
+    public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000;
+
+    /** Used to hold the timing controller passed from the test runner. */
+    private TimingController _timingController;
+
+    /** Used to generate unique correlation ids for each test run. */
+    private AtomicLong corellationIdGenerator = new AtomicLong();
+
+    /**
+     * Holds test specifics by correlation id. This consists of the expected 
number of messages and the timing
+     * controler.
+     */
+    private Map<String, PerCorrelationId> perCorrelationIds =
+        Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+
+    /** Holds the batched results listener, that does logging on batch 
boundaries. */
+    private BatchedResultsListener batchedResultsListener = null;
+
+    /**
+     * Creates a new asynchronous ping performance test with the specified 
name.
+     *
+     * @param name The test name.
+     */
+    public PingLatencyTestPerf(String name)
+    {
+        super(name);
+
+        // Sets up the test parameters with defaults.
+        ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME,
+            Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE));
+    }
+
+    /** Compile all the tests into a test suite. */
+    public static Test suite()
+    {
+        // Build a new test suite
+        TestSuite suite = new TestSuite("Ping Latency Tests");
+
+        // Run performance tests in read committed mode.
+        suite.addTest(new PingLatencyTestPerf("testPingLatency"));
+
+        return suite;
+    }
+
+    /**
+     * Accepts a timing controller from the test runner.
+     *
+     * @param timingController The timing controller to register mutliple 
timings with.
+     */
+    public void setTimingController(TimingController timingController)
+    {
+        _timingController = timingController;
+    }
+
+    /**
+     * Gets the timing controller passed in by the test runner.
+     *
+     * @return The timing controller passed in by the test runner.
+     */
+    public TimingController getTimingController()
+    {
+        return _timingController;
+    }
+
+    /**
+     * Sends the specified number of pings, asynchronously outputs timings on 
every batch boundary, and waits until all
+     * replies have been received or a time out occurs before exiting this 
method.
+     *
+     * @param numPings The number of pings to send.
+     */
+    public void testPingLatency(int numPings) throws Exception
+    {
+        _logger.debug("public void testPingLatency(int numPings): called");
+
+        // Ensure that at least one ping was requeusted.
+        if (numPings == 0)
+        {
+            _logger.error("Number of pings requested was zero.");
+        }
+
+        // Get the per thread test setup to run the test through.
+        PerThreadSetup perThreadSetup = threadSetup.get();
+        PingClient pingClient = perThreadSetup._pingClient;
+
+        // Advance the correlation id of messages to send, to make it unique 
for this run.
+        String messageCorrelationId = 
Long.toString(corellationIdGenerator.incrementAndGet());
+        _logger.debug("messageCorrelationId = " + messageCorrelationId);
+
+        // Initialize the count and timing controller for the new correlation 
id.
+        PerCorrelationId perCorrelationId = new PerCorrelationId();
+        TimingController tc = 
getTimingController().getControllerForCurrentThread();
+        perCorrelationId._tc = tc;
+        perCorrelationId._expectedCount = numPings;
+        perCorrelationIds.put(messageCorrelationId, perCorrelationId);
+
+        // Attach the chained message listener to the ping producer to listen 
asynchronously for the replies to these
+        // messages.
+        pingClient.setChainedMessageListener(batchedResultsListener);
+
+        // Generate a sample message of the specified size.
+        Message msg =
+            
pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+                
testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+                
testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
+
+        // Send the requested number of messages, and wait until they have all 
been received.
+        long timeout = 
Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
+        int numReplies = pingClient.pingAndWaitForReply(msg, numPings, 
timeout, null);
+
+        // Check that all the replies were received and log a fail if they 
were not.
+        if (numReplies < numPings)
+        {
+            tc.completeTest(false, 0);
+        }
+
+        // Remove the chained message listener from the ping producer.
+        pingClient.removeChainedMessageListener();
+
+        // Remove the expected count and timing controller for the message 
correlation id, to ensure they are cleaned up.
+        perCorrelationIds.remove(messageCorrelationId);
+    }
+
+    /** Performs test fixture creation on a per thread basis. This will only 
be called once for each test thread. */
+    public void threadSetUp()
+    {
+        _logger.debug("public void threadSetUp(): called");
+
+        try
+        {
+            // Call the set up method in the super class. This creates a 
PingClient pinger.
+            super.threadSetUp();
+
+            // Create the chained message listener, only if it has not already 
been created.  This is set up with the
+            // batch size property, to tell it what batch size to output 
results on. A synchronized block is used to
+            // ensure that only one thread creates this.
+            synchronized (this)
+            {
+                if (batchedResultsListener == null)
+                {
+                    int batchSize = 
Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME));
+                    batchedResultsListener = new 
BatchedResultsListener(batchSize);
+                }
+            }
+
+            // Get the set up that the super class created.
+            PerThreadSetup perThreadSetup = threadSetup.get();
+
+            // Register the chained message listener on the pinger to do its 
asynchronous test timings from.
+            
perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener);
+        }
+        catch (Exception e)
+        {
+            _logger.warn("There was an exception during per thread setup.", e);
+        }
+    }
+
+    /**
+     * BatchedResultsListener is a [EMAIL PROTECTED] 
org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can
+     * be attached to the pinger, in order to receive notifications about 
every message received and the number
+     * remaining to be received. Whenever the number remaining crosses a batch 
size boundary this results listener
+     * outputs a test timing for the actual number of messages received in the 
current batch.
+     */
+    private class BatchedResultsListener implements 
PingPongProducer.ChainedMessageListener
+    {
+        /** The test results logging batch size. */
+        int _batchSize;
+        private boolean _strictAMQP;
+
+        /**
+         * Creates a results listener on the specified batch size.
+         *
+         * @param batchSize The batch size to use.
+         */
+        public BatchedResultsListener(int batchSize)
+        {
+            _batchSize = batchSize;
+            _strictAMQP =
+                
Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP,
+                        AMQSession.STRICT_AMQP_DEFAULT));
+        }
+
+        /**
+         * This callback method is called from all of the pingers that this 
test creates. It uses the correlation id
+         * from the message to identify the timing controller for the test 
thread that was responsible for sending those
+         * messages.
+         *
+         * @param message        The message.
+         * @param remainingCount The count of messages remaining to be 
received with a particular correlation id.
+         *
+         * @throws javax.jms.JMSException Any underlying JMSException is 
allowed to fall through.
+         */
+        public void onMessage(Message message, int remainingCount, long 
latency) throws JMSException
+        {
+            _logger.debug("public void onMessage(Message message, int 
remainingCount = " + remainingCount + "): called");
+
+            // Check if a batch boundary has been crossed.
+            if ((remainingCount % _batchSize) == 0)
+            {
+                // Extract the correlation id from the message.
+                String correlationId = message.getJMSCorrelationID();
+
+                // Get the details for the correlation id and check that they 
are not null. They can become null
+                // if a test times out.
+                PerCorrelationId perCorrelationId = 
perCorrelationIds.get(correlationId);
+                if (perCorrelationId != null)
+                {
+                    // Get the timing controller and expected count for this 
correlation id.
+                    TimingController tc = perCorrelationId._tc;
+                    int expected = perCorrelationId._expectedCount;
+
+                    // Calculate how many messages were actually received in 
the last batch. This will be the batch size
+                    // except where the number expected is not a multiple of 
the batch size and this is the first remaining
+                    // count to cross a batch size boundary, in which case it 
will be the number expected modulo the batch
+                    // size.
+                    int receivedInBatch = ((expected - remainingCount) < 
_batchSize) ? (expected % _batchSize) : _batchSize;
+
+                    // Register a test result for the correlation id.
+                    try
+                    {
+                        tc.completeTest(true, receivedInBatch, latency);
+                    }
+                    catch (InterruptedException e)
+                    {
+                        // Ignore this. It means the test runner wants to stop 
as soon as possible.
+                        _logger.warn("Got InterruptedException.", e);
+                    }
+                }
+                // Else ignore, test timed out. Should log a fail here?
+            }
+        }
+    }
+
+    /**
+     * Holds state specific to each correlation id, needed to output test 
results. This consists of the count of the
+     * total expected number of messages, and the timing controller for the 
thread sending those message ids.
+     */
+    private static class PerCorrelationId
+    {
+        public int _expectedCount;
+        public TimingController _tc;
+    }
+}

Propchange: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java?rev=568952&r1=568951&r2=568952&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java
 Thu Aug 23 05:10:42 2007
@@ -57,7 +57,7 @@
         try
         {
             // Create a ping producer overriding its defaults with all options 
passed on the command line.
-            Properties options = CommandLineParser.processCommandLine(args, 
new CommandLineParser(new String[][] {}));
+            Properties options = CommandLineParser.processCommandLine(args, 
new CommandLineParser(new String[][] {}), System.getProperties());
             PingSendOnlyClient pingProducer = new PingSendOnlyClient(options);
 
             // Create a shutdown hook to terminate the ping-pong producer.

Added: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java?rev=568952&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
 Thu Aug 23 05:10:42 2007
@@ -0,0 +1,196 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ping;
+
+import junit.framework.Assert;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.requestreply.PingPongProducer;
+
+import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase;
+import uk.co.thebadgerset.junit.extensions.TestThreadAware;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
+
+import javax.jms.*;
+
+/**
+ * PingTestPerf is a ping test, that has been written with the intention of 
being scaled up to run many times
+ * simultaneously to simluate many clients/producers/connections.
+ *
+ * <p/>A single run of the test using the default JUnit test runner will 
result in the sending and timing of a single
+ * full round trip ping. This test may be scaled up using a suitable JUnit 
test runner.
+ *
+ * <p/>The setup/teardown cycle establishes a connection to a broker and sets 
up a queue to send ping messages to and a
+ * temporary queue for replies. This setup is only established once for all 
the test repeats/threads that may be run,
+ * except if the connection is lost in which case an attempt to re-establish 
the setup is made.
+ *
+ * <p/>The test cycle is: Connects to a queue, creates a temporary queue, 
creates messages containing a property that
+ * is the name of the temporary queue, fires off a message on the original 
queue and waits for a response on the
+ * temporary queue.
+ *
+ * <p/>Configurable test properties: message size, transacted or not, 
persistent or not. Broker connection details.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ */
+public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
+{
+    private static Logger _logger = Logger.getLogger(PingTestPerf.class);
+
+    /** Thread local to hold the per-thread test setup fields. */
+    ThreadLocal<PerThreadSetup> threadSetup = new 
ThreadLocal<PerThreadSetup>();
+
+    /** Holds a property reader to extract the test parameters from. */
+    protected ParsedProperties testParameters =
+        TestContextProperties.getInstance(PingPongProducer.defaults 
/*System.getProperties()*/);
+
+    public PingTestPerf(String name)
+    {
+        super(name);
+
+        _logger.debug("testParameters = " + testParameters);
+    }
+
+    /**
+     * Compile all the tests into a test suite.
+     * @return The test method testPingOk.
+     */
+    public static Test suite()
+    {
+        // Build a new test suite
+        TestSuite suite = new TestSuite("Ping Performance Tests");
+
+        // Run performance tests in read committed mode.
+        suite.addTest(new PingTestPerf("testPingOk"));
+
+        return suite;
+    }
+
+    public void testPingOk(int numPings) throws Exception
+    {
+        if (numPings == 0)
+        {
+            Assert.fail("Number of pings requested was zero.");
+        }
+
+        // Get the per thread test setup to run the test through.
+        PerThreadSetup perThreadSetup = threadSetup.get();
+
+        if (perThreadSetup == null)
+        {
+            Assert.fail("Could not get per thread test setup, it was null.");
+        }
+
+        // Generate a sample message. This message is already time stamped and 
has its reply-to destination set.
+        Message msg =
+            
perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+                
testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+                
testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
+
+        // start the test
+        long timeout = 
Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
+        int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, 
numPings, timeout, null);
+
+        // Fail the test if the timeout was exceeded.
+        if (numReplies != 
perThreadSetup._pingClient.getExpectedNumPings(numPings))
+        {
+            Assert.fail("The ping timed out after " + timeout + " ms. Messages 
Sent = " + numPings + ", MessagesReceived = "
+                + numReplies);
+        }
+    }
+
+    /**
+     * Performs test fixture creation on a per thread basis. This will only be 
called once for each test thread.
+     */
+    public void threadSetUp()
+    {
+        _logger.debug("public void threadSetUp(): called");
+
+        try
+        {
+            PerThreadSetup perThreadSetup = new PerThreadSetup();
+
+            // This is synchronized because there is a race condition, which 
causes one connection to sleep if
+            // all threads try to create connection concurrently.
+            synchronized (this)
+            {
+                // Establish a client to ping a Destination and listen the 
reply back from same Destination
+                perThreadSetup._pingClient = new PingClient(testParameters);
+                perThreadSetup._pingClient.establishConnection(true, true);
+            }
+            // Start the client connection
+            perThreadSetup._pingClient.start();
+
+            // Attach the per-thread set to the thread.
+            threadSetup.set(perThreadSetup);
+        }
+        catch (Exception e)
+        {
+            _logger.warn("There was an exception during per thread setup.", e);
+        }
+    }
+
+    /**
+     * Performs test fixture clean
+     */
+    public void threadTearDown()
+    {
+        _logger.debug("public void threadTearDown(): called");
+
+        try
+        {
+            // Get the per thread test fixture.
+            PerThreadSetup perThreadSetup = threadSetup.get();
+
+            // Close the pingers so that it cleans up its connection cleanly.
+            synchronized (this)
+            {
+                if ((perThreadSetup != null) && (perThreadSetup._pingClient != 
null))
+                {
+                    perThreadSetup._pingClient.close();
+                }
+            }
+        }
+        catch (JMSException e)
+        {
+            _logger.warn("There was an exception during per thread tear 
down.");
+        }
+        finally
+        {
+            // Ensure the per thread fixture is reclaimed.
+            threadSetup.remove();
+        }
+    }
+
+    protected static class PerThreadSetup
+    {
+        /**
+         * Holds the test ping client.
+         */
+        protected PingClient _pingClient;
+        protected String _correlationId;
+    }
+}

Propchange: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java?rev=568952&r1=568951&r2=568952&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
 Thu Aug 23 05:10:42 2007
@@ -92,10 +92,10 @@
     /** The producer for sending replies with. */
     private MessageProducer _replyProducer;
 
-    /** The consumer session. */
+    /** The consumer controlSession. */
     private Session _consumerSession;
 
-    /** The producer session. */
+    /** The producer controlSession. */
     private Session _producerSession;
 
     /** Holds the connection to the broker. */
@@ -149,7 +149,7 @@
         // Set up the failover notifier.
         getConnection().setConnectionListener(new FailoverNotifier());
 
-        // Create a session to listen for messages on and one to send replies 
on, transactional depending on the
+        // Create a controlSession to listen for messages on and one to send 
replies on, transactional depending on the
         // command line option.
         _consumerSession = (Session) getConnection().createSession(transacted, 
Session.AUTO_ACKNOWLEDGE);
         _producerSession = (Session) getConnection().createSession(transacted, 
Session.AUTO_ACKNOWLEDGE);
@@ -323,8 +323,8 @@
     }
 
     /**
-     * Convenience method to commit the transaction on the specified session. 
If the session to commit on is not
-     * a transactional session, this method does nothing.
+     * Convenience method to commit the transaction on the specified 
controlSession. If the controlSession to commit on is not
+     * a transactional controlSession, this method does nothing.
      *
      * <p/>If the [EMAIL PROTECTED] #_failBeforeCommit} flag is set, this will 
prompt the user to kill the broker before the
      * commit is applied. If the [EMAIL PROTECTED] #_failAfterCommit} flag is 
set, this will prompt the user to kill the broker


Reply via email to