Author: bhupendrab
Date: Tue Jan 23 06:41:33 2007
New Revision: 499036
URL: http://svn.apache.org/viewvc?view=rev&rev=499036
Log:
updated the test for testing with multiple threads
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java?view=diff&rev=499036&r1=499035&r2=499036
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
(original)
+++
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
Tue Jan 23 06:41:33 2007
@@ -36,9 +36,10 @@
private static final Logger _logger =
Logger.getLogger(TestPingItself.class);
/**
- * This creates a client for pinging to a Queue. There will be one
producer and one consumer instance. Consumer
- * listening to the same Queue, producer is sending to
- *
+ * If queueCount is <= 1 : There will be one Queue and one consumer
instance for the test
+ * If queueCount is > 1 : This creats a client for tests with multiple
queues. Creates as many consumer instances
+ * as there are queues, each listening to a Queue. A producer is created
which picks up a queue from
+ * the list of queues to send message
* @param brokerDetails
* @param username
* @param password
@@ -53,57 +54,31 @@
* @param beforeCommit
* @param afterSend
* @param beforeSend
- * @param batchSize
- * @throws Exception
- */
- public TestPingItself(String brokerDetails, String username, String
password, String virtualpath, String queueName,
- String selector, boolean transacted, boolean
persistent, int messageSize, boolean verbose,
- boolean afterCommit, boolean beforeCommit, boolean
afterSend, boolean beforeSend, boolean failOnce,
- int batchSize)
- throws Exception
- {
- super(brokerDetails, username, password, virtualpath, queueName,
selector, transacted, persistent, messageSize,
- verbose, afterCommit, beforeCommit, afterSend, beforeSend,
failOnce, batchSize, 0);
- }
-
- /**
- * This creats a client for tests with multiple queues. Creates as many
consumer instances as there are queues,
- * each listening to a Queue. A producer is created which picks up a queue
from the list of queues to send message.
- *
- * @param brokerDetails
- * @param username
- * @param password
- * @param virtualpath
- * @param selector
- * @param transacted
- * @param persistent
- * @param messageSize
- * @param verbose
- * @param afterCommit
- * @param beforeCommit
- * @param afterSend
- * @param beforeSend
+ * @param failOnce
* @param batchSize
* @param queueCount
* @throws Exception
*/
- public TestPingItself(String brokerDetails, String username, String
password, String virtualpath,
+ public TestPingItself(String brokerDetails, String username, String
password, String virtualpath, String queueName,
String selector, boolean transacted, boolean
persistent, int messageSize, boolean verbose,
boolean afterCommit, boolean beforeCommit, boolean
afterSend, boolean beforeSend, boolean failOnce,
int batchSize, int queueCount)
throws Exception
{
- super(brokerDetails, username, password, virtualpath, null, null,
transacted, persistent, messageSize,
+ super(brokerDetails, username, password, virtualpath, queueName,
selector, transacted, persistent, messageSize,
verbose, afterCommit, beforeCommit, afterSend, beforeSend,
failOnce, batchSize, queueCount);
- createQueues(queueCount);
+ if (queueCount > 1)
+ {
+ createQueues(queueCount);
- _persistent = persistent;
- _messageSize = messageSize;
- _verbose = verbose;
+ _persistent = persistent;
+ _messageSize = messageSize;
+ _verbose = verbose;
- createConsumers(selector);
- createProducer();
+ createConsumers(selector);
+ createProducer();
+ }
}
/**
@@ -136,7 +111,7 @@
boolean persistent = config.usePersistentMessages();
int messageSize = config.getPayload() != 0 ? config.getPayload() :
DEFAULT_MESSAGE_SIZE;
int messageCount = config.getMessages();
- int queueCount = config.getQueueCount();
+ int queueCount = config.getQueueCount() != 0 ? config.getQueueCount()
: 1;
int batchSize = config.getBatchSize() != 0 ? config.getBatchSize() :
BATCH_SIZE;
String queue = "ping_" + System.currentTimeMillis();
@@ -182,22 +157,11 @@
}
}
- TestPingItself pingItself = null;
// Create a ping producer to handle the request/wait/reply cycle.
- if (queueCount > 1)
- {
- pingItself = new TestPingItself(brokerDetails, "guest", "guest",
virtualpath, null,
- transacted, persistent,
messageSize, verbose,
- afterCommit, beforeCommit,
afterSend, beforeSend, failOnce,
- batchSize, queueCount);
- }
- else
- {
- pingItself = new TestPingItself(brokerDetails, "guest", "guest",
virtualpath, queue, null,
- transacted, persistent,
messageSize, verbose,
- afterCommit, beforeCommit,
afterSend, beforeSend, failOnce,
- batchSize);
- }
+ TestPingItself pingItself = new TestPingItself(brokerDetails, "guest",
"guest", virtualpath, queue, null,
+ transacted, persistent,
messageSize, verbose,
+ afterCommit, beforeCommit,
afterSend, beforeSend, failOnce,
+ batchSize, queueCount);
pingItself.getConnection().start();
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=499036&r1=499035&r2=499036
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
(original)
+++
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Tue Jan 23 06:41:33 2007
@@ -93,10 +93,6 @@
*/
protected static final int BATCH_SIZE = 100;
- /**
- * Keeps track of the ping producer instance used in the run loop.
- */
- private static PingPongProducer _pingProducer;
protected static final int PREFETCH = 100;
protected static final boolean NO_LOCAL = true;
protected static final boolean EXCLUSIVE = false;
@@ -109,7 +105,7 @@
/**
* A source for providing sequential unique correlation ids.
*/
- private AtomicLong idGenerator = new AtomicLong(0L);
+ private static AtomicLong idGenerator = new AtomicLong(0L);
/**
* Holds the queue to send the ping replies to.
@@ -134,7 +130,7 @@
/**
* Holds a map from message ids to latches on which threads wait for
replies.
*/
- private Map<String, CountDownLatch> trafficLights = new HashMap<String,
CountDownLatch>();
+ private static Map<String, CountDownLatch> trafficLights = new
HashMap<String, CountDownLatch>();
/**
* Used to indicate that the ping loop should print out whenever it pings.
@@ -192,21 +188,21 @@
this(brokerDetails, username, password, virtualpath, transacted,
persistent, messageSize, verbose,
afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
batchSize);
- if (queueName != null)
- {
- _pingQueue = new AMQQueue(queueName);
- // Create producer and the consumer
- createProducer();
- createConsumer(selector);
- }
- else if (queueCount > 0)
- {
- _queueCount = queueCount;
- }
- else
+ _queueCount = queueCount;
+ if (queueCount <= 1)
{
- _logger.error("Queue Count is zero and no queueName specified. One
must be set.");
- throw new IllegalArgumentException("Queue Count is zero and no
queueName specified. One must be set.");
+ if (queueName != null)
+ {
+ _pingQueue = new AMQQueue(queueName);
+ // Create producer and the consumer
+ createProducer();
+ createConsumer(selector);
+ }
+ else
+ {
+ _logger.error("Queue Name is not specified");
+ throw new IllegalArgumentException("Queue Name is not
specified");
+ }
}
}
@@ -351,23 +347,23 @@
}
// Create a ping producer to handle the request/wait/reply cycle.
- _pingProducer = new PingPongProducer(brokerDetails, "guest", "guest",
virtualpath, PING_QUEUE_NAME, null, transacted,
+ PingPongProducer pingProducer = new PingPongProducer(brokerDetails,
"guest", "guest", virtualpath, PING_QUEUE_NAME, null, transacted,
persistent, messageSize, verbose,
afterCommit, beforeCommit,
afterSend, beforeSend, failOnce,
batchSize, 0);
- _pingProducer.getConnection().start();
+ pingProducer.getConnection().start();
// Run a few priming pings to remove warm up time from test results.
- _pingProducer.prime(PRIMING_LOOPS);
+ pingProducer.prime(PRIMING_LOOPS);
// Create a shutdown hook to terminate the ping-pong producer.
- Runtime.getRuntime().addShutdownHook(_pingProducer.getShutdownHook());
+ Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
// Ensure that the ping pong producer is registered to listen for
exceptions on the connection too.
- _pingProducer.getConnection().setExceptionListener(_pingProducer);
+ pingProducer.getConnection().setExceptionListener(pingProducer);
// Create the ping loop thread and run it until it is terminated by
the shutdown hook or exception.
- Thread pingThread = new Thread(_pingProducer);
+ Thread pingThread = new Thread(pingProducer);
pingThread.run();
pingThread.join();
}
@@ -502,7 +498,7 @@
if ((numReplies < numPings) && _verbose)
{
- _logger.info("Timed out before all replies received on id, " +
messageCorrelationId);
+ _logger.info("Timed out (" + timeout + " ms) before all replies
received on id, " + messageCorrelationId);
}
else if (_verbose)
{
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java?view=diff&rev=499036&r1=499035&r2=499036
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
(original)
+++
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
Tue Jan 23 06:41:33 2007
@@ -79,6 +79,8 @@
*/
private static final String TIMEOUT_PROPNAME = "timeout";
+ private static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
+
/**
* Holds the size of message body to attach to the ping messages.
*/
@@ -158,6 +160,7 @@
setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
setSystemPropertyIfNull(TIMEOUT_PROPNAME,
Long.toString(TIMEOUT_DEFAULT));
setSystemPropertyIfNull(PING_QUEUE_COUNT_PROPNAME,
Integer.toString(1));
+ setSystemPropertyIfNull(VERBOSE_OUTPUT_PROPNAME,
Boolean.toString(false));
}
/**
@@ -192,7 +195,6 @@
{
// Get the per thread test setup to run the test through.
PerThreadSetup perThreadSetup = threadSetup.get();
-
if (numPings == 0)
{
_logger.error("Number of pings requested was zero.");
@@ -210,12 +212,10 @@
long timeout =
Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME));
int numReplies =
perThreadSetup._pingItselfClient.pingAndWaitForReply(msg, numPings, timeout);
- _logger.info("replies = " + numReplies);
-
// Fail the test if the timeout was exceeded.
if (numReplies != numPings)
{
- Assert.fail("The ping timed out. Messages Sent = " + numPings + ",
MessagesReceived = " + numReplies);
+ Assert.fail("The ping timed out after "+ timeout + " ms. Messages
Sent = " + numPings + ", MessagesReceived = " + numReplies);
}
}
@@ -226,6 +226,7 @@
//NDC.push(getName());
// Create the test setups on a per thread basis, only if they have not
already been created.
+
if (threadSetup.get() == null)
{
PerThreadSetup perThreadSetup = new PerThreadSetup();
@@ -240,7 +241,7 @@
boolean persistent =
Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
boolean transacted =
Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
String selector = null;
- boolean verbose = false;
+ boolean verbose =
Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
int messageSize =
Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
boolean afterCommit =
Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
@@ -251,26 +252,17 @@
int batchSize =
Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
- // Establish a client to ping a Queue and listen the reply back
from same Queue
- if (queueCount > 1)
- {
- // test client with multiple queues
- perThreadSetup._pingItselfClient = new
TestPingItself(brokerDetails, username, password, virtualpath,
-
selector, transacted, persistent,
-
messageSize, verbose,
-
afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
-
batchSize, queueCount);
- }
- else
+ // This is synchronized because there is a race condition, which
causes one connection to sleep if
+ // all threads try to create connection concurrently
+ synchronized(this)
{
// Establish a client to ping a Queue and listen the reply
back from same Queue
perThreadSetup._pingItselfClient = new
TestPingItself(brokerDetails, username, password, virtualpath,
queueName, selector, transacted, persistent,
messageSize, verbose,
afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
-
batchSize);
+
batchSize, queueCount);
}
-
// Start the client connection
perThreadSetup._pingItselfClient.getConnection().start();