Author: arnaudsimon
Date: Thu Aug 23 05:10:42 2007
New Revision: 568952
URL: http://svn.apache.org/viewvc?rev=568952&view=rev
Log:
updated from M2 branch
Added:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
(with props)
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
(with props)
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
(with props)
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java
(with props)
Removed:
incubator/qpid/trunk/qpid/java/perftests/src/test/
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java?rev=568952&r1=568951&r2=568952&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
(original)
+++
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
Thu Aug 23 05:10:42 2007
@@ -1,18 +1,21 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.client.message;
Added:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java?rev=568952&view=auto
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
(added)
+++
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
Thu Aug 23 05:10:42 2007
@@ -0,0 +1,292 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ping;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.requestreply.PingPongProducer;
+
+import uk.co.thebadgerset.junit.extensions.TimingController;
+import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * PingAsyncTestPerf is a performance test that outputs multiple timings from
its test method, using the timing controller
+ * interface supplied by the test runner from a seperate listener thread. It
differs from the [EMAIL PROTECTED] PingTestPerf} test
+ * that it extends because it can output timings as replies are received,
rather than waiting until all expected replies
+ * are received. This is less 'blocky' than the tests in [EMAIL PROTECTED]
PingTestPerf}, and provides a truer simulation of sending
+ * and recieving clients working asynchronously.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><td> Responsibilities <th> Collaborations
+ * <tr><td> Send many ping messages and output timings asynchronously on
batches received.
+ * </table>
+ */
+public class PingAsyncTestPerf extends PingTestPerf implements
TimingControllerAware
+{
+ private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class);
+
+ /** Holds the name of the property to get the test results logging batch
size. */
+ public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize";
+
+ /** Holds the default test results logging batch size. */
+ public static final int TEST_RESULTS_BATCH_SIZE_DEFAULT = 1000;
+
+ /** Used to hold the timing controller passed from the test runner. */
+ private TimingController _timingController;
+
+ /** Used to generate unique correlation ids for each test run. */
+ private AtomicLong corellationIdGenerator = new AtomicLong();
+
+ /** Holds test specifics by correlation id. This consists of the expected
number of messages and the timing controler. */
+ private Map<String, PerCorrelationId> perCorrelationIds =
+ Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+
+ /** Holds the batched results listener, that does logging on batch
boundaries. */
+ private BatchedResultsListener batchedResultsListener = null;
+
+ /**
+ * Creates a new asynchronous ping performance test with the specified
name.
+ *
+ * @param name The test name.
+ */
+ public PingAsyncTestPerf(String name)
+ {
+ super(name);
+
+ // Sets up the test parameters with defaults.
+ testParameters.setPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME,
+ Integer.toString(TEST_RESULTS_BATCH_SIZE_DEFAULT));
+ }
+
+ /**
+ * Compile all the tests into a test suite.
+ * @return The test suite to run. Should only contain testAsyncPingOk
method.
+ */
+ public static Test suite()
+ {
+ // Build a new test suite
+ TestSuite suite = new TestSuite("Ping Performance Tests");
+
+ // Run performance tests in read committed mode.
+ suite.addTest(new PingAsyncTestPerf("testAsyncPingOk"));
+
+ return suite;
+ }
+
+ /**
+ * Accepts a timing controller from the test runner.
+ *
+ * @param timingController The timing controller to register mutliple
timings with.
+ */
+ public void setTimingController(TimingController timingController)
+ {
+ _timingController = timingController;
+ }
+
+ /**
+ * Gets the timing controller passed in by the test runner.
+ *
+ * @return The timing controller passed in by the test runner.
+ */
+ public TimingController getTimingController()
+ {
+ return _timingController;
+ }
+
+ /**
+ * Sends the specified number of pings, asynchronously outputs timings on
every batch boundary, and waits until
+ * all replies have been received or a time out occurs before exiting this
method.
+ *
+ * @param numPings The number of pings to send.
+ * @throws Exception pass all errors out to the test harness
+ */
+ public void testAsyncPingOk(int numPings) throws Exception
+ {
+ // _logger.debug("public void testAsyncPingOk(int numPings): called");
+
+ // Ensure that at least one ping was requeusted.
+ if (numPings == 0)
+ {
+ _logger.error("Number of pings requested was zero.");
+ fail("Number of pings requested was zero.");
+ }
+
+ // Get the per thread test setup to run the test through.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+ PingClient pingClient = perThreadSetup._pingClient;
+
+ // Advance the correlation id of messages to send, to make it unique
for this run.
+ perThreadSetup._correlationId =
Long.toString(corellationIdGenerator.incrementAndGet());
+ // String messageCorrelationId = perThreadSetup._correlationId;
+ // _logger.debug("messageCorrelationId = " + messageCorrelationId);
+
+ // Initialize the count and timing controller for the new correlation
id.
+ PerCorrelationId perCorrelationId = new PerCorrelationId();
+ TimingController tc =
getTimingController().getControllerForCurrentThread();
+ perCorrelationId._tc = tc;
+ perCorrelationId._expectedCount =
pingClient.getExpectedNumPings(numPings);
+ perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId);
+
+ // Send the requested number of messages, and wait until they have all
been received.
+ long timeout =
Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
+ int numReplies = pingClient.pingAndWaitForReply(null, numPings,
timeout, perThreadSetup._correlationId);
+
+ // Check that all the replies were received and log a fail if they
were not.
+ if (numReplies < perCorrelationId._expectedCount)
+ {
+ perCorrelationId._tc.completeTest(false, numPings -
perCorrelationId._expectedCount);
+ }
+
+ // Remove the expected count and timing controller for the message
correlation id, to ensure they are cleaned up.
+ perCorrelationIds.remove(perThreadSetup._correlationId);
+ }
+
+ /**
+ * Performs test fixture creation on a per thread basis. This will only be
called once for each test thread.
+ */
+ public void threadSetUp()
+ {
+ _logger.debug("public void threadSetUp(): called");
+
+ try
+ {
+ // Call the set up method in the super class. This creates a
PingClient pinger.
+ super.threadSetUp();
+
+ // Create the chained message listener, only if it has not already
been created. This is set up with the
+ // batch size property, to tell it what batch size to output
results on. A synchronized block is used to
+ // ensure that only one thread creates this.
+ synchronized (this)
+ {
+ if (batchedResultsListener == null)
+ {
+ int batchSize =
Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME));
+ batchedResultsListener = new
BatchedResultsListener(batchSize);
+ }
+ }
+
+ // Get the set up that the super class created.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+
+ // Register the chained message listener on the pinger to do its
asynchronous test timings from.
+
perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("There was an exception during per thread setup.", e);
+ }
+ }
+
+ /**
+ * BatchedResultsListener is a [EMAIL PROTECTED]
PingPongProducer.ChainedMessageListener} that can be attached to the
+ * pinger, in order to receive notifications about every message received
and the number remaining to be
+ * received. Whenever the number remaining crosses a batch size boundary
this results listener outputs
+ * a test timing for the actual number of messages received in the current
batch.
+ */
+ private class BatchedResultsListener implements
PingPongProducer.ChainedMessageListener
+ {
+ /** The test results logging batch size. */
+ int _batchSize;
+
+ /**
+ * Creates a results listener on the specified batch size.
+ *
+ * @param batchSize The batch size to use.
+ */
+ public BatchedResultsListener(int batchSize)
+ {
+ _batchSize = batchSize;
+ }
+
+ /**
+ * This callback method is called from all of the pingers that this
test creates. It uses the correlation id
+ * from the message to identify the timing controller for the test
thread that was responsible for sending those
+ * messages.
+ *
+ * @param message The message.
+ * @param remainingCount The count of messages remaining to be
received with a particular correlation id.
+ *
+ * @throws JMSException Any underlying JMSException is allowed to fall
through.
+ */
+ public void onMessage(Message message, int remainingCount, long
latency) throws JMSException
+ {
+ // Check if a batch boundary has been crossed.
+ if ((remainingCount % _batchSize) == 0)
+ {
+ // Extract the correlation id from the message.
+ String correlationId = message.getJMSCorrelationID();
+
+ /*_logger.debug("public void onMessage(Message message, int
remainingCount = " + remainingCount
+ + "): called on batch boundary for message id: " +
correlationId + " with thread id: "
+ + Thread.currentThread().getId());*/
+
+ // Get the details for the correlation id and check that they
are not null. They can become null
+ // if a test times out.
+ PerCorrelationId perCorrelationId =
perCorrelationIds.get(correlationId);
+ if (perCorrelationId != null)
+ {
+ // Get the timing controller and expected count for this
correlation id.
+ TimingController tc = perCorrelationId._tc;
+ int expected = perCorrelationId._expectedCount;
+
+ // Calculate how many messages were actually received in
the last batch. This will be the batch size
+ // except where the number expected is not a multiple of
the batch size and this is the first remaining
+ // count to cross a batch size boundary, in which case it
will be the number expected modulo the batch
+ // size.
+ int receivedInBatch = ((expected - remainingCount) <
_batchSize) ? (expected % _batchSize) : _batchSize;
+
+ // Register a test result for the correlation id.
+ try
+ {
+ tc.completeTest(true, receivedInBatch);
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore this. It means the test runner wants to stop
as soon as possible.
+ _logger.warn("Got InterruptedException.", e);
+ }
+ }
+ // Else ignore, test timed out. Should log a fail here?
+ }
+ }
+ }
+
+ /**
+ * Holds state specific to each correlation id, needed to output test
results. This consists of the count of
+ * the total expected number of messages, and the timing controller for
the thread sending those message ids.
+ */
+ private static class PerCorrelationId
+ {
+ public int _expectedCount;
+ public TimingController _tc;
+ }
+}
Propchange:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java?rev=568952&r1=568951&r2=568952&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
(original)
+++
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
Thu Aug 23 05:10:42 2007
@@ -20,12 +20,14 @@
*/
package org.apache.qpid.ping;
-import java.util.List;
-import java.util.Properties;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.requestreply.PingPongProducer;
import javax.jms.Destination;
-import org.apache.qpid.requestreply.PingPongProducer;
+import java.util.List;
+import java.util.Properties;
/**
* PingClient is a [EMAIL PROTECTED] PingPongProducer} that does not need a
[EMAIL PROTECTED] org.apache.qpid.requestreply.PingPongBouncer}
@@ -36,7 +38,7 @@
* are created they will all be run in parallel and be active in sending and
consuming pings at the same time.
* If the unique destinations flag is not set and a pub/sub ping cycle is
being run, this means that they will all hear
* pings sent by each other. The expected number of pings received will
therefore be multiplied up by the number of
- * active ping clients. The [EMAIL PROTECTED] #getConsumersPerTopic()} method
is used to supply this multiplier under these
+ * active ping clients. The [EMAIL PROTECTED] #getConsumersPerDestination()}
method is used to supply this multiplier under these
* conditions.
*
* <p/><table id="crc"><caption>CRC Card</caption>
@@ -47,6 +49,9 @@
*/
public class PingClient extends PingPongProducer
{
+ /** Used for debugging. */
+ private final Logger log = Logger.getLogger(PingClient.class);
+
/** Used to count the number of ping clients created. */
private static int _pingClientCount;
@@ -82,15 +87,21 @@
*
* @return The scaling up of the number of expected pub/sub pings.
*/
- public int getConsumersPerTopic()
+ public int getConsumersPerDestination()
{
+ log.debug("public int getConsumersPerDestination(): called");
+
if (_isUnique)
{
- return 1;
+ log.debug("1 consumer per destination.");
+
+ return _noOfConsumers;
}
else
{
- return _pingClientCount;
+ log.debug(_pingClientCount + " consumers per destination.");
+
+ return _pingClientCount * _noOfConsumers;
}
}
}
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java?rev=568952&r1=568951&r2=568952&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
(original)
+++
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
Thu Aug 23 05:10:42 2007
@@ -20,18 +20,6 @@
*/
package org.apache.qpid.ping;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-
import org.apache.log4j.Logger;
import org.apache.qpid.requestreply.PingPongProducer;
@@ -40,6 +28,15 @@
import uk.co.thebadgerset.junit.extensions.util.MathUtils;
import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+import javax.jms.*;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* PingDurableClient is a variation of the [EMAIL PROTECTED] PingPongProducer}
ping tool. Instead of sending its pings and
* receiving replies to them at the same time, this tool sends pings until it
is signalled by some 'event' to stop
@@ -167,7 +164,8 @@
try
{
// Create a ping producer overriding its defaults with all options
passed on the command line.
- Properties options = CommandLineParser.processCommandLine(args,
new CommandLineParser(new String[][] {}));
+ Properties options =
+ CommandLineParser.processCommandLine(args, new
CommandLineParser(new String[][] {}), System.getProperties());
PingDurableClient pingProducer = new PingDurableClient(options);
// Create a shutdown hook to terminate the ping-pong producer.
@@ -219,7 +217,7 @@
// Establish the connection and the message producer.
establishConnection(true, false);
- getConnection().start();
+ _connection.start();
Message message = getTestMessage(getReplyDestinations().get(0),
_messageSize, _persistent);
@@ -329,8 +327,8 @@
_queueSharedID = new AtomicInteger();
establishConnection(false, true);
- _consumer.setMessageListener(null);
- _connection.start();
+ _consumer[0].setMessageListener(null);
+ _consumerConnection[0].start();
// Try to receive all of the pings that were successfully sent.
int messagesReceived = 0;
@@ -339,7 +337,7 @@
while (!endCondition)
{
// Message received = _consumer.receiveNoWait();
- Message received = _consumer.receive(TIME_OUT);
+ Message received = _consumer[0].receive(TIME_OUT);
log.debug("received = " + received);
if (received != null)
@@ -362,11 +360,11 @@
}
// Ensure messages received are committed.
- if (_transacted)
+ if (_consTransacted)
{
try
{
- _consumerSession.commit();
+ _consumerSession[0].commit();
System.out.println("Committed for all messages received.");
}
catch (JMSException e)
@@ -375,7 +373,7 @@
System.out.println("Error during commit.");
try
{
- _consumerSession.rollback();
+ _consumerSession[0].rollback();
System.out.println("Rolled back on all messages
received.");
}
catch (JMSException e2)
Added:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java?rev=568952&view=auto
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
(added)
+++
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
Thu Aug 23 05:10:42 2007
@@ -0,0 +1,314 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ping;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.AMQMessage;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.requestreply.PingPongProducer;
+
+import uk.co.thebadgerset.junit.extensions.TimingController;
+import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * PingLatencyTestPerf is a performance test that outputs multiple timings
from its test method, using the timing
+ * controller interface supplied by the test runner from a seperate listener
thread. It outputs round trip timings for
+ * individual ping messages rather than for how long a complete batch of
messages took to process. It also differs from
+ * the [EMAIL PROTECTED] PingTestPerf} test that it extends because it can
output timings as replies are received, rather than
+ * waiting until all expected replies are received.
+ *
+ * <p/>This test does not output timings for every single ping message, as
when running at high volume, writing the test
+ * log for a vast number of messages would slow the testing down. Instead
samples ping latency occasionally. The
+ * frequency of ping sampling is set using the [EMAIL PROTECTED]
#TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the
+ * default of every [EMAIL PROTECTED] #DEFAULT_TEST_RESULTS_BATCH_SIZE}.
+ *
+ * <p/>The size parameter logged for each individual ping is set to the size
of the batch of messages that the
+ * individual timed ping was taken from, rather than 1 for a single message.
This is so that the total throughput
+ * (messages / time) can be calculated in order to examine the relationship
between throughput and latency.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption> <tr><td> Responsibilities
<th> Collaborations <tr><td> Send many ping
+ * messages and output timings for sampled individual pings. </table>
+ */
+public class PingLatencyTestPerf extends PingTestPerf implements
TimingControllerAware
+{
+ private static Logger _logger =
Logger.getLogger(PingLatencyTestPerf.class);
+
+ /** Holds the name of the property to get the test results logging batch
size. */
+ public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize";
+
+ /** Holds the default test results logging batch size. */
+ public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000;
+
+ /** Used to hold the timing controller passed from the test runner. */
+ private TimingController _timingController;
+
+ /** Used to generate unique correlation ids for each test run. */
+ private AtomicLong corellationIdGenerator = new AtomicLong();
+
+ /**
+ * Holds test specifics by correlation id. This consists of the expected
number of messages and the timing
+ * controler.
+ */
+ private Map<String, PerCorrelationId> perCorrelationIds =
+ Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+
+ /** Holds the batched results listener, that does logging on batch
boundaries. */
+ private BatchedResultsListener batchedResultsListener = null;
+
+ /**
+ * Creates a new asynchronous ping performance test with the specified
name.
+ *
+ * @param name The test name.
+ */
+ public PingLatencyTestPerf(String name)
+ {
+ super(name);
+
+ // Sets up the test parameters with defaults.
+ ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME,
+ Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE));
+ }
+
+ /** Compile all the tests into a test suite. */
+ public static Test suite()
+ {
+ // Build a new test suite
+ TestSuite suite = new TestSuite("Ping Latency Tests");
+
+ // Run performance tests in read committed mode.
+ suite.addTest(new PingLatencyTestPerf("testPingLatency"));
+
+ return suite;
+ }
+
+ /**
+ * Accepts a timing controller from the test runner.
+ *
+ * @param timingController The timing controller to register mutliple
timings with.
+ */
+ public void setTimingController(TimingController timingController)
+ {
+ _timingController = timingController;
+ }
+
+ /**
+ * Gets the timing controller passed in by the test runner.
+ *
+ * @return The timing controller passed in by the test runner.
+ */
+ public TimingController getTimingController()
+ {
+ return _timingController;
+ }
+
+ /**
+ * Sends the specified number of pings, asynchronously outputs timings on
every batch boundary, and waits until all
+ * replies have been received or a time out occurs before exiting this
method.
+ *
+ * @param numPings The number of pings to send.
+ */
+ public void testPingLatency(int numPings) throws Exception
+ {
+ _logger.debug("public void testPingLatency(int numPings): called");
+
+ // Ensure that at least one ping was requeusted.
+ if (numPings == 0)
+ {
+ _logger.error("Number of pings requested was zero.");
+ }
+
+ // Get the per thread test setup to run the test through.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+ PingClient pingClient = perThreadSetup._pingClient;
+
+ // Advance the correlation id of messages to send, to make it unique
for this run.
+ String messageCorrelationId =
Long.toString(corellationIdGenerator.incrementAndGet());
+ _logger.debug("messageCorrelationId = " + messageCorrelationId);
+
+ // Initialize the count and timing controller for the new correlation
id.
+ PerCorrelationId perCorrelationId = new PerCorrelationId();
+ TimingController tc =
getTimingController().getControllerForCurrentThread();
+ perCorrelationId._tc = tc;
+ perCorrelationId._expectedCount = numPings;
+ perCorrelationIds.put(messageCorrelationId, perCorrelationId);
+
+ // Attach the chained message listener to the ping producer to listen
asynchronously for the replies to these
+ // messages.
+ pingClient.setChainedMessageListener(batchedResultsListener);
+
+ // Generate a sample message of the specified size.
+ Message msg =
+
pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+
testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+
testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
+
+ // Send the requested number of messages, and wait until they have all
been received.
+ long timeout =
Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
+ int numReplies = pingClient.pingAndWaitForReply(msg, numPings,
timeout, null);
+
+ // Check that all the replies were received and log a fail if they
were not.
+ if (numReplies < numPings)
+ {
+ tc.completeTest(false, 0);
+ }
+
+ // Remove the chained message listener from the ping producer.
+ pingClient.removeChainedMessageListener();
+
+ // Remove the expected count and timing controller for the message
correlation id, to ensure they are cleaned up.
+ perCorrelationIds.remove(messageCorrelationId);
+ }
+
+ /** Performs test fixture creation on a per thread basis. This will only
be called once for each test thread. */
+ public void threadSetUp()
+ {
+ _logger.debug("public void threadSetUp(): called");
+
+ try
+ {
+ // Call the set up method in the super class. This creates a
PingClient pinger.
+ super.threadSetUp();
+
+ // Create the chained message listener, only if it has not already
been created. This is set up with the
+ // batch size property, to tell it what batch size to output
results on. A synchronized block is used to
+ // ensure that only one thread creates this.
+ synchronized (this)
+ {
+ if (batchedResultsListener == null)
+ {
+ int batchSize =
Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME));
+ batchedResultsListener = new
BatchedResultsListener(batchSize);
+ }
+ }
+
+ // Get the set up that the super class created.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+
+ // Register the chained message listener on the pinger to do its
asynchronous test timings from.
+
perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("There was an exception during per thread setup.", e);
+ }
+ }
+
+ /**
+ * BatchedResultsListener is a [EMAIL PROTECTED]
org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can
+ * be attached to the pinger, in order to receive notifications about
every message received and the number
+ * remaining to be received. Whenever the number remaining crosses a batch
size boundary this results listener
+ * outputs a test timing for the actual number of messages received in the
current batch.
+ */
+ private class BatchedResultsListener implements
PingPongProducer.ChainedMessageListener
+ {
+ /** The test results logging batch size. */
+ int _batchSize;
+ private boolean _strictAMQP;
+
+ /**
+ * Creates a results listener on the specified batch size.
+ *
+ * @param batchSize The batch size to use.
+ */
+ public BatchedResultsListener(int batchSize)
+ {
+ _batchSize = batchSize;
+ _strictAMQP =
+
Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP,
+ AMQSession.STRICT_AMQP_DEFAULT));
+ }
+
+ /**
+ * This callback method is called from all of the pingers that this
test creates. It uses the correlation id
+ * from the message to identify the timing controller for the test
thread that was responsible for sending those
+ * messages.
+ *
+ * @param message The message.
+ * @param remainingCount The count of messages remaining to be
received with a particular correlation id.
+ *
+ * @throws javax.jms.JMSException Any underlying JMSException is
allowed to fall through.
+ */
+ public void onMessage(Message message, int remainingCount, long
latency) throws JMSException
+ {
+ _logger.debug("public void onMessage(Message message, int
remainingCount = " + remainingCount + "): called");
+
+ // Check if a batch boundary has been crossed.
+ if ((remainingCount % _batchSize) == 0)
+ {
+ // Extract the correlation id from the message.
+ String correlationId = message.getJMSCorrelationID();
+
+ // Get the details for the correlation id and check that they
are not null. They can become null
+ // if a test times out.
+ PerCorrelationId perCorrelationId =
perCorrelationIds.get(correlationId);
+ if (perCorrelationId != null)
+ {
+ // Get the timing controller and expected count for this
correlation id.
+ TimingController tc = perCorrelationId._tc;
+ int expected = perCorrelationId._expectedCount;
+
+ // Calculate how many messages were actually received in
the last batch. This will be the batch size
+ // except where the number expected is not a multiple of
the batch size and this is the first remaining
+ // count to cross a batch size boundary, in which case it
will be the number expected modulo the batch
+ // size.
+ int receivedInBatch = ((expected - remainingCount) <
_batchSize) ? (expected % _batchSize) : _batchSize;
+
+ // Register a test result for the correlation id.
+ try
+ {
+ tc.completeTest(true, receivedInBatch, latency);
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore this. It means the test runner wants to stop
as soon as possible.
+ _logger.warn("Got InterruptedException.", e);
+ }
+ }
+ // Else ignore, test timed out. Should log a fail here?
+ }
+ }
+ }
+
+ /**
+ * Holds state specific to each correlation id, needed to output test
results. This consists of the count of the
+ * total expected number of messages, and the timing controller for the
thread sending those message ids.
+ */
+ private static class PerCorrelationId
+ {
+ public int _expectedCount;
+ public TimingController _tc;
+ }
+}
Propchange:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java?rev=568952&r1=568951&r2=568952&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java
(original)
+++
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java
Thu Aug 23 05:10:42 2007
@@ -57,7 +57,7 @@
try
{
// Create a ping producer overriding its defaults with all options
passed on the command line.
- Properties options = CommandLineParser.processCommandLine(args,
new CommandLineParser(new String[][] {}));
+ Properties options = CommandLineParser.processCommandLine(args,
new CommandLineParser(new String[][] {}), System.getProperties());
PingSendOnlyClient pingProducer = new PingSendOnlyClient(options);
// Create a shutdown hook to terminate the ping-pong producer.
Added:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java?rev=568952&view=auto
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
(added)
+++
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
Thu Aug 23 05:10:42 2007
@@ -0,0 +1,196 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ping;
+
+import junit.framework.Assert;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.requestreply.PingPongProducer;
+
+import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase;
+import uk.co.thebadgerset.junit.extensions.TestThreadAware;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
+
+import javax.jms.*;
+
+/**
+ * PingTestPerf is a ping test, that has been written with the intention of
being scaled up to run many times
+ * simultaneously to simluate many clients/producers/connections.
+ *
+ * <p/>A single run of the test using the default JUnit test runner will
result in the sending and timing of a single
+ * full round trip ping. This test may be scaled up using a suitable JUnit
test runner.
+ *
+ * <p/>The setup/teardown cycle establishes a connection to a broker and sets
up a queue to send ping messages to and a
+ * temporary queue for replies. This setup is only established once for all
the test repeats/threads that may be run,
+ * except if the connection is lost in which case an attempt to re-establish
the setup is made.
+ *
+ * <p/>The test cycle is: Connects to a queue, creates a temporary queue,
creates messages containing a property that
+ * is the name of the temporary queue, fires off a message on the original
queue and waits for a response on the
+ * temporary queue.
+ *
+ * <p/>Configurable test properties: message size, transacted or not,
persistent or not. Broker connection details.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ */
+public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
+{
+ private static Logger _logger = Logger.getLogger(PingTestPerf.class);
+
+ /** Thread local to hold the per-thread test setup fields. */
+ ThreadLocal<PerThreadSetup> threadSetup = new
ThreadLocal<PerThreadSetup>();
+
+ /** Holds a property reader to extract the test parameters from. */
+ protected ParsedProperties testParameters =
+ TestContextProperties.getInstance(PingPongProducer.defaults
/*System.getProperties()*/);
+
+ public PingTestPerf(String name)
+ {
+ super(name);
+
+ _logger.debug("testParameters = " + testParameters);
+ }
+
+ /**
+ * Compile all the tests into a test suite.
+ * @return The test method testPingOk.
+ */
+ public static Test suite()
+ {
+ // Build a new test suite
+ TestSuite suite = new TestSuite("Ping Performance Tests");
+
+ // Run performance tests in read committed mode.
+ suite.addTest(new PingTestPerf("testPingOk"));
+
+ return suite;
+ }
+
+ public void testPingOk(int numPings) throws Exception
+ {
+ if (numPings == 0)
+ {
+ Assert.fail("Number of pings requested was zero.");
+ }
+
+ // Get the per thread test setup to run the test through.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+
+ if (perThreadSetup == null)
+ {
+ Assert.fail("Could not get per thread test setup, it was null.");
+ }
+
+ // Generate a sample message. This message is already time stamped and
has its reply-to destination set.
+ Message msg =
+
perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+
testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+
testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
+
+ // start the test
+ long timeout =
Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
+ int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg,
numPings, timeout, null);
+
+ // Fail the test if the timeout was exceeded.
+ if (numReplies !=
perThreadSetup._pingClient.getExpectedNumPings(numPings))
+ {
+ Assert.fail("The ping timed out after " + timeout + " ms. Messages
Sent = " + numPings + ", MessagesReceived = "
+ + numReplies);
+ }
+ }
+
+ /**
+ * Performs test fixture creation on a per thread basis. This will only be
called once for each test thread.
+ */
+ public void threadSetUp()
+ {
+ _logger.debug("public void threadSetUp(): called");
+
+ try
+ {
+ PerThreadSetup perThreadSetup = new PerThreadSetup();
+
+ // This is synchronized because there is a race condition, which
causes one connection to sleep if
+ // all threads try to create connection concurrently.
+ synchronized (this)
+ {
+ // Establish a client to ping a Destination and listen the
reply back from same Destination
+ perThreadSetup._pingClient = new PingClient(testParameters);
+ perThreadSetup._pingClient.establishConnection(true, true);
+ }
+ // Start the client connection
+ perThreadSetup._pingClient.start();
+
+ // Attach the per-thread set to the thread.
+ threadSetup.set(perThreadSetup);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("There was an exception during per thread setup.", e);
+ }
+ }
+
+ /**
+ * Performs test fixture clean
+ */
+ public void threadTearDown()
+ {
+ _logger.debug("public void threadTearDown(): called");
+
+ try
+ {
+ // Get the per thread test fixture.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+
+ // Close the pingers so that it cleans up its connection cleanly.
+ synchronized (this)
+ {
+ if ((perThreadSetup != null) && (perThreadSetup._pingClient !=
null))
+ {
+ perThreadSetup._pingClient.close();
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.warn("There was an exception during per thread tear
down.");
+ }
+ finally
+ {
+ // Ensure the per thread fixture is reclaimed.
+ threadSetup.remove();
+ }
+ }
+
+ protected static class PerThreadSetup
+ {
+ /**
+ * Holds the test ping client.
+ */
+ protected PingClient _pingClient;
+ protected String _correlationId;
+ }
+}
Propchange:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java?rev=568952&r1=568951&r2=568952&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
(original)
+++
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
Thu Aug 23 05:10:42 2007
@@ -92,10 +92,10 @@
/** The producer for sending replies with. */
private MessageProducer _replyProducer;
- /** The consumer session. */
+ /** The consumer controlSession. */
private Session _consumerSession;
- /** The producer session. */
+ /** The producer controlSession. */
private Session _producerSession;
/** Holds the connection to the broker. */
@@ -149,7 +149,7 @@
// Set up the failover notifier.
getConnection().setConnectionListener(new FailoverNotifier());
- // Create a session to listen for messages on and one to send replies
on, transactional depending on the
+ // Create a controlSession to listen for messages on and one to send
replies on, transactional depending on the
// command line option.
_consumerSession = (Session) getConnection().createSession(transacted,
Session.AUTO_ACKNOWLEDGE);
_producerSession = (Session) getConnection().createSession(transacted,
Session.AUTO_ACKNOWLEDGE);
@@ -323,8 +323,8 @@
}
/**
- * Convenience method to commit the transaction on the specified session.
If the session to commit on is not
- * a transactional session, this method does nothing.
+ * Convenience method to commit the transaction on the specified
controlSession. If the controlSession to commit on is not
+ * a transactional controlSession, this method does nothing.
*
* <p/>If the [EMAIL PROTECTED] #_failBeforeCommit} flag is set, this will
prompt the user to kill the broker before the
* commit is applied. If the [EMAIL PROTECTED] #_failAfterCommit} flag is
set, this will prompt the user to kill the broker