Author: ritchiem
Date: Wed Jan 24 05:16:08 2007
New Revision: 499392
URL: http://svn.apache.org/viewvc?view=rev&rev=499392
Log:
Updated Async Test for destinations and for signalling completed runs when
there is only 1 queue.
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java?view=diff&rev=499392&r1=499391&r2=499392
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
(original)
+++
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
Wed Jan 24 05:16:08 2007
@@ -376,9 +376,7 @@
else
{
_producer.send(destination, message);
- }
-
- commitTx();
+ }
}
protected void doFailover(String broker)
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java?view=diff&rev=499392&r1=499391&r2=499392
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
(original)
+++
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
Wed Jan 24 05:16:08 2007
@@ -26,6 +26,7 @@
import org.apache.qpid.requestreply.PingPongProducer;
import org.apache.qpid.topic.Config;
+import org.apache.qpid.util.concurrent.BooleanLatch;
/**
* This class is used to test sending and receiving messages to (pingQueue)
and from a queue (replyQueue).
@@ -71,7 +72,7 @@
messageSize, verbose, afterCommit, beforeCommit, afterSend,
beforeSend, failOnce, batchSize,
noOfDestinations, rate, pubsub);
- if (noOfDestinations > 1)
+ if (noOfDestinations > 0)
{
createDestinations(noOfDestinations);
@@ -84,7 +85,7 @@
}
}
- /**
+ /**
* Sets the replyQueue to be the same as ping queue.
*/
@Override
@@ -95,11 +96,6 @@
MessageConsumer consumer =
getConsumerSession().createConsumer(getReplyDestination(),
PREFETCH, false, EXCLUSIVE, selector);
consumer.setMessageListener(this);
- }
-
- public void setMessageListener(MessageListener messageListener) throws
JMSException
- {
- getConsumerSession().setMessageListener(messageListener);
}
/**
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java?view=diff&rev=499392&r1=499391&r2=499392
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
(original)
+++
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
Wed Jan 24 05:16:08 2007
@@ -172,7 +172,7 @@
}
// Extract all command line parameters.
- String brokerDetails = args[0];
+ String brokerDetails = args[0];
String username = args[1];
String password = args[2];
String virtualpath = args[3];
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=499392&r1=499391&r2=499392
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
(original)
+++
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Wed Jan 24 05:16:08 2007
@@ -152,6 +152,8 @@
*/
int _throttleBatchSize;
+ private MessageListener _messageListener = null;
+
private PingPongProducer(String brokerDetails, String username, String
password, String virtualpath, boolean transacted,
boolean persistent, int messageSize, boolean
verbose, boolean afterCommit, boolean beforeCommit,
boolean afterSend, boolean beforeSend, boolean
failOnce, int batchSize, int rate)
@@ -225,8 +227,8 @@
_destinationCount = noOfDestinations;
setPubSub(pubsub);
-
- if (noOfDestinations <= 1)
+
+ if (noOfDestinations == 0)
{
if (destinationName != null)
{
@@ -237,8 +239,8 @@
}
else
{
- _logger.error("Queue Name is not specified");
- throw new IllegalArgumentException("Queue Name is not
specified");
+ _logger.error("Destination is not specified");
+ throw new IllegalArgumentException("Destination is not
specified");
}
}
}
@@ -258,6 +260,7 @@
/**
* Creates the producer to send the pings on. If the tests are with
nultiple-destinations, then producer
* is created with null destination, so that any destination can be
specified while sending
+ *
* @throws JMSException
*/
public void createProducer() throws JMSException
@@ -281,6 +284,7 @@
/**
* Creates the temporary destination to listen to the responses
+ *
* @param selector
* @throws JMSException
*/
@@ -303,7 +307,7 @@
/**
* Creates consumer instances for each destination. This is used when test
is being done with multiple destinations.
- *
+ *
* @param selector
* @throws JMSException
*/
@@ -312,7 +316,7 @@
for (int i = 0; i < getDestinationsCount(); i++)
{
MessageConsumer consumer =
- getConsumerSession().createConsumer(getDestination(i),
PREFETCH, false, EXCLUSIVE, selector);
+ getConsumerSession().createConsumer(getDestination(i),
PREFETCH, false, EXCLUSIVE, selector);
consumer.setMessageListener(this);
}
}
@@ -353,8 +357,8 @@
if (args.length < 2)
{
System.err.println("Usage: TestPingPublisher <brokerDetails>
<virtual path> [verbose (true/false)] " +
- "[transacted (true/false)] [persistent (true/false)] [message
size in bytes] [batchsize]" +
- " [rate] [pubsub(true/false)]");
+ "[transacted (true/false)] [persistent
(true/false)] [message size in bytes] [batchsize]" +
+ " [rate] [pubsub(true/false)]");
System.exit(0);
}
@@ -366,7 +370,7 @@
int messageSize = (args.length >= 6) ? Integer.parseInt(args[5]) :
DEFAULT_MESSAGE_SIZE;
int batchSize = (args.length >= 7) ? Integer.parseInt(args[6]) : 1;
int rate = (args.length >= 8) ? Integer.parseInt(args[7]) : 0;
- boolean ispubsub = (args.length >= 9) ? Boolean.parseBoolean(args[8])
: false;
+ boolean ispubsub = (args.length >= 9) ? Boolean.parseBoolean(args[8])
: false;
boolean afterCommit = false;
boolean beforeCommit = false;
@@ -408,9 +412,9 @@
// Create a ping producer to handle the request/wait/reply cycle.
PingPongProducer pingProducer = new PingPongProducer(brokerDetails,
"guest", "guest", virtualpath,
- PING_DESTINATION_NAME, null,
transacted, persistent, messageSize, verbose,
- afterCommit, beforeCommit,
afterSend, beforeSend, failOnce, batchSize,
- 0, rate, ispubsub);
+
PING_DESTINATION_NAME, null, transacted, persistent, messageSize, verbose,
+ afterCommit,
beforeCommit, afterSend, beforeSend, failOnce, batchSize,
+ 0, rate,
ispubsub);
pingProducer.getConnection().start();
@@ -421,7 +425,7 @@
// Ensure that the ping pong producer is registered to listen for
exceptions on the connection too.
pingProducer.getConnection().setExceptionListener(pingProducer);
-
+
// Create the ping loop thread and run it until it is terminated by
the shutdown hook or exception.
Thread pingThread = new Thread(pingProducer);
pingThread.run();
@@ -444,15 +448,19 @@
Message first = getTestMessage(_replyDestination, 0, false);
sendMessage(first);
+ commitTx();
+
try
{
Thread.sleep(100);
}
catch (InterruptedException ignore)
{
-
+
}
}
+
+
}
/**
@@ -482,6 +490,12 @@
{
_logger.debug("Reply was expected, decrementing the latch for
the id.");
trafficLight.countDown();
+
+ if (_messageListener != null)
+ {
+ _messageListener.onMessage(message);
+ }
+
}
else
{
@@ -519,14 +533,52 @@
public int pingAndWaitForReply(Message message, int numPings, long
timeout) throws JMSException, InterruptedException
{
// Put a unique correlation id on the message before sending it.
- String messageCorrelationId =
Long.toString(idGenerator.incrementAndGet());
- message.setJMSCorrelationID(messageCorrelationId);
+ String messageCorrelationId = Long.toString(getNewID());
+
+
+ pingNoWaitForReply(message, numPings, messageCorrelationId);
+
+ CountDownLatch trafficLight = trafficLights.get(messageCorrelationId);
+ // Block the current thread until a reply to the message is received,
or it times out.
+ trafficLight.await(timeout, TimeUnit.MILLISECONDS);
+
+ // Work out how many replies were receieved.
+ int numReplies = numPings - (int) trafficLight.getCount();
+
+ if ((numReplies < numPings) && _verbose)
+ {
+ _logger.info("Timed out (" + timeout + " ms) before all replies
received on id, " + messageCorrelationId);
+ }
+ else if (_verbose)
+ {
+ _logger.info("Got all replies on id, " + messageCorrelationId);
+ }
+ return numReplies;
+ }
+
+ public long getNewID()
+ {
+ return idGenerator.incrementAndGet();
+ }
+
+ /*
+ * Sends the specified ping message but does not wait for a correlating
reply.
+ *
+ * @param message The message to send.
+ * @param numPings The number of pings to send.
+ * @return The reply, or null if no reply arrives before the timeout.
+ * @throws JMSException All underlying JMSExceptions are allowed to fall
through.
+ */
+ public void pingNoWaitForReply(Message message, int numPings, String
messageCorrelationId) throws JMSException, InterruptedException
+ {
// Create a count down latch to count the number of replies with. This
is created before the message is sent
// so that the message is not received before the count down is
created.
CountDownLatch trafficLight = new CountDownLatch(numPings);
trafficLights.put(messageCorrelationId, trafficLight);
+ message.setJMSCorrelationID(messageCorrelationId);
+
// Set up a committed flag to detect uncommitted message at the end of
the send loop. This may occurr if the
// transaction batch size is not a factor of the number of pings. In
which case an extra commit at the end is
// needed.
@@ -579,43 +631,6 @@
_logger.info(timestampFormatter.format(new Date()) + ": Pinged at
with correlation id, " + messageCorrelationId);
}
- // Block the current thread until a reply to the message is received,
or it times out.
- trafficLight.await(timeout, TimeUnit.MILLISECONDS);
-
- // Work out how many replies were receieved.
- int numReplies = numPings - (int) trafficLight.getCount();
-
- if ((numReplies < numPings) && _verbose)
- {
- _logger.info("Timed out (" + timeout + " ms) before all replies
received on id, " + messageCorrelationId);
- }
- else if (_verbose)
- {
- _logger.info("Got all replies on id, " + messageCorrelationId);
- }
-
- return numReplies;
- }
-
- /*
- * Sends the specified ping message but does not wait for a correlating
reply.
- *
- * @param message The message to send.
- * @param numPings The number of pings to send.
- * @return The reply, or null if no reply arrives before the timeout.
- * @throws JMSException All underlying JMSExceptions are allowed to fall
through.
- */
- public void pingNoWaitForReply(Message message, int numPings) throws
JMSException, InterruptedException
- {
- for (int i = 0; i < numPings; i++)
- {
- sendMessage(message);
-
- if (_verbose)
- {
- _logger.info(timestampFormatter.format(new Date()) + ": Pinged
at.");
- }
- }
}
/**
@@ -658,14 +673,24 @@
_replyDestination = destination;
}
+ public void setMessageListener(MessageListener messageListener)
+ {
+ _messageListener = messageListener;
+ }
+
+ public CountDownLatch getEndLock(String correlationID)
+ {
+ return trafficLights.get(correlationID);
+ }
+
/*
- * When the test is being performed with multiple queues, then this method
will be used, which has a loop to
- * pick up the next queue from the queues list and sends message to it.
- *
- * @param message
- * @param numPings
- * @throws JMSException
- */
+ * When the test is being performed with multiple queues, then this method
will be used, which has a loop to
+ * pick up the next queue from the queues list and sends message to it.
+ *
+ * @param message
+ * @param numPings
+ * @throws JMSException
+ */
/*private void pingMultipleQueues(Message message, int numPings) throws
JMSException
{
int queueIndex = 0;
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java?view=diff&rev=499392&r1=499391&r2=499392
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
(original)
+++
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
Wed Jan 24 05:16:08 2007
@@ -37,13 +37,11 @@
public class PingAsyncTestPerf extends PingTestPerf //implements
TimingControllerAware
{
- private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class);
+// private static Logger _logger =
Logger.getLogger(PingAsyncTestPerf.class);
// private TimingController _timingController;
-//
-// private final CountDownLatch _completedLock = new CountDownLatch(1);
-//
-// private AsyncMessageListener _listener;
+
+// private AsyncMessageListener _listener;
private volatile boolean _done = false;
@@ -51,7 +49,7 @@
{
super(name);
}
-//
+
// /**
// * Compile all the tests into a test suite.
// */
@@ -79,13 +77,16 @@
// String username = "guest";
// String password = "guest";
// String virtualpath =
testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
-// int queueCount =
Integer.parseInt(testParameters.getProperty(PING_QUEUE_COUNT_PROPNAME));
-// String queueName =
testParameters.getProperty(PING_QUEUE_NAME_PROPNAME);
+// int destinationscount =
Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME));
+// String destinationname =
testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME);
// boolean persistent =
Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
// boolean transacted =
Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
// String selector = null;
// boolean verbose =
Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
// int messageSize =
Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
+// int rate =
Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
+// boolean pubsub =
Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME));
+//
//
// boolean afterCommit =
Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
// boolean beforeCommit =
Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
@@ -96,18 +97,16 @@
// int batchSize =
Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
// int commitbatchSize =
Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE));
//
-// int rate =
Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
-//
// // This is synchronized because there is a race condition, which
causes one connection to sleep if
// // all threads try to create connection concurrently
// synchronized (this)
// {
// // Establish a client to ping a Queue and listen the reply
back from same Queue
// perThreadSetup._pingItselfClient = new
TestPingItself(brokerDetails, username, password, virtualpath,
-//
queueName, selector, transacted, persistent,
+//
destinationname, selector, transacted, persistent,
//
messageSize, verbose,
//
afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
-//
commitbatchSize, queueCount, rate);
+//
commitbatchSize, destinationscount, rate, pubsub);
//
//
// _listener = new AsyncMessageListener(batchSize);
@@ -154,9 +153,11 @@
// // start the test
// long timeout =
Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME));
//
+// String correlationID =
Long.toString(perThreadSetup._pingItselfClient.getNewID());
+//
// try
// {
-// perThreadSetup._pingItselfClient.pingNoWaitForReply(msg,
numPings);
+// perThreadSetup._pingItselfClient.pingNoWaitForReply(msg,
numPings, correlationID);
// }
// catch (JMSException e)
// {
@@ -174,7 +175,7 @@
// {
// _logger.info("awating test finish");
//
-// _completedLock.await();
+//
perThreadSetup._pingItselfClient.getEndLock(correlationID).await();
// }
// catch (InterruptedException e)
// {
@@ -193,7 +194,7 @@
// Assert.fail("The ping timed out after " + timeout + " ms.
Messages Sent = " + numPings + ", MessagesReceived = " + numReplies);
// try
// {
-// _timingController.completeTest(false);
+// _timingController.completeTest(false, numPings - numReplies);
// }
// catch (InterruptedException e)
// {
@@ -202,7 +203,6 @@
// }
// }
//
-//
// public void setTimingController(TimingController timingController)
// {
// _timingController = timingController;
@@ -216,7 +216,7 @@
//
// private class AsyncMessageListener implements MessageListener
// {
-// private int _messageRecevied;
+// private int _messageReceived;
// private int _totalMessages;
// private int _batchSize;
//
@@ -224,14 +224,14 @@
// {
// _batchSize = batchSize;
// _totalMessages = totalMessages;
-// _messageRecevied = 0;
+// _messageReceived = 0;
// }
//
// public AsyncMessageListener(int batchSize)
// {
// _batchSize = batchSize;
// _totalMessages = -1;
-// _messageRecevied = 0;
+// _messageReceived = 0;
// }
//
// public void setTotalMessages(int newTotal)
@@ -242,14 +242,16 @@
// public void onMessage(Message message)
// {
// _logger.info("Message Recevied");
+//
+// _messageReceived++;
+//
// try
// {
-// _messageRecevied++;
-// if (_messageRecevied == _batchSize)
+// if (_messageReceived == _batchSize)
// {
// if (_timingController != null)
// {
-// _timingController.completeTest(true);
+// _timingController.completeTest(true, _batchSize);
// }
// }
// }
@@ -258,7 +260,7 @@
// doDone();
// }
//
-// if (_totalMessages == -1 || _messageRecevied == _totalMessages)
+// if (_totalMessages == -1 || _messageReceived == _totalMessages)
// {
// _logger.info("Test Completed.. signalling");
// doDone();
@@ -268,10 +270,9 @@
// private void doDone()
// {
// _done = true;
-// _completedLock.countDown();
// try
// {
-// _timingController.completeTest(true);
+// _timingController.completeTest(true, _totalMessages -
_messageReceived);
// }
// catch (InterruptedException e)
// {
@@ -281,8 +282,9 @@
//
// public int getReplyCount()
// {
-// return _messageRecevied;
+// return _messageReceived;
// }
+//
// }
}
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java?view=diff&rev=499392&r1=499391&r2=499392
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
(original)
+++
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
Wed Jan 24 05:16:08 2007
@@ -42,17 +42,17 @@
/**
* Holds the name of the property to get the test message size from.
*/
- private static final String MESSAGE_SIZE_PROPNAME = "messagesize";
+ protected static final String MESSAGE_SIZE_PROPNAME = "messagesize";
/**
* Holds the name of the property to get the ping queue name from.
*/
- private static final String PING_DESTINATION_NAME_PROPNAME =
"destinationname";
+ protected static final String PING_DESTINATION_NAME_PROPNAME =
"destinationname";
/**
* holds the queue count, if the test is being performed with multiple
queues
*/
- private static final String PING_DESTINATION_COUNT_PROPNAME =
"destinationscount";
+ protected static final String PING_DESTINATION_COUNT_PROPNAME =
"destinationscount";
/**
* Holds the name of the property to get the test delivery mode from.
@@ -85,7 +85,7 @@
protected static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
/** Holds the true or false depending on wether it is P2P test or PubSub */
- private static final String IS_PUBSUB_PROPNAME = "pubsub";
+ protected static final String IS_PUBSUB_PROPNAME = "pubsub";
/**
* Holds the size of message body to attach to the ping messages.
*/
@@ -168,7 +168,7 @@
setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
setSystemPropertyIfNull(TIMEOUT_PROPNAME,
Long.toString(TIMEOUT_DEFAULT));
- setSystemPropertyIfNull(PING_DESTINATION_COUNT_PROPNAME,
Integer.toString(1));
+ setSystemPropertyIfNull(PING_DESTINATION_COUNT_PROPNAME,
Integer.toString(0));
setSystemPropertyIfNull(VERBOSE_OUTPUT_PROPNAME,
Boolean.toString(false));
setSystemPropertyIfNull(RATE_PROPNAME, Integer.toString(RATE_DEFAULT));
setSystemPropertyIfNull(IS_PUBSUB_PROPNAME, Boolean.toString(false));