Author: bhupendrab
Date: Mon Jan 22 08:41:23 2007
New Revision: 498687
URL: http://svn.apache.org/viewvc?view=rev&rev=498687
Log:
performance Ping tests modified for scalability test. Now tests with multiple
queues can be performed.
Modified:
incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient-createLogFile.sh
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
Modified:
incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient-createLogFile.sh
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient-createLogFile.sh?view=diff&rev=498687&r1=498686&r2=498687
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient-createLogFile.sh
(original)
+++
incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient-createLogFile.sh
Mon Jan 22 08:41:23 2007
@@ -20,7 +20,8 @@
##LOGDIR=$QPID_HOME/logs
LOGDIR=../logs
-LOGFILE=$LOGDIR/perftest.log
+date=`date +"%y%m%d%H%M%S"`
+LOGFILE=$LOGDIR/perftest.log.$date
## create the log dir
if [ ! -d $LOGDIR ]; then
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java?view=diff&rev=498687&r1=498686&r2=498687
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
(original)
+++
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
Mon Jan 22 08:41:23 2007
@@ -1,14 +1,19 @@
package org.apache.qpid.ping;
import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.*;
import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.message.TestMessageFactory;
import org.apache.qpid.jms.Session;
+import org.apache.qpid.framing.AMQShortString;
/**
* This abstract class captures functionality that is common to all ping
producers. It provides functionality to
@@ -41,6 +46,12 @@
/** Holds the producer session, need to create test messages. */
private Session _producerSession;
+
+ /** holds the no of queues the tests will be using to send messages. By
default it will be 1 */
+ private int _queueCount;
+ private static AtomicInteger _queueSequenceID = new AtomicInteger();
+ private List<Queue> _queues = new ArrayList<Queue>();
+
/**
* Convenience method for a short pause.
*
@@ -167,6 +178,37 @@
public void setProducerSession(Session session)
{
this._producerSession = session;
+ }
+
+ public int getQueueCount()
+ {
+ return _queueCount;
+ }
+
+ public void setQueueCount(int queueCount)
+ {
+ this._queueCount = queueCount;
+ }
+
+ /**
+ * Creates queues dynamically and adds to the queues list. This is when
the test is being done with
+ * multiple queues.
+ * @param queueCount
+ */
+ protected void createQueues(int queueCount)
+ {
+ for (int i = 0; i < queueCount; i++)
+ {
+ AMQShortString name = new AMQShortString("Queue_" +
_queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis());
+ AMQQueue queue = new AMQQueue(name, name, false, false, false);
+
+ _queues.add(queue);
+ }
+ }
+
+ protected Queue getQueue(int index)
+ {
+ return _queues.get(index);
}
/**
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java?view=diff&rev=498687&r1=498686&r2=498687
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
(original)
+++
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
Mon Jan 22 08:41:23 2007
@@ -31,6 +31,8 @@
import javax.jms.DeliveryMode;
import javax.jms.Queue;
import java.net.InetAddress;
+import java.util.List;
+import java.util.ArrayList;
/**
* This class is used to test sending and receiving messages to (pingQueue)
and from a queue (replyQueue).
@@ -41,7 +43,7 @@
public class TestPingItself extends PingPongProducer
{
private static final Logger _logger =
Logger.getLogger(TestPingItself.class);
-
+
public TestPingItself(String brokerDetails, String username, String
password, String virtualpath, String queueName,
String selector, boolean transacted, boolean
persistent, int messageSize, boolean verbose)
throws Exception
@@ -49,7 +51,26 @@
super(brokerDetails, username, password, virtualpath, queueName,
selector, transacted, persistent, messageSize, verbose);
}
+ public TestPingItself(String brokerDetails, String username, String
password, String virtualpath, int queueCount,
+ String selector, boolean transacted, boolean
persistent, int messageSize, boolean verbose)
+ throws Exception
+ {
+ super(brokerDetails, username, password, virtualpath, transacted);
+ setQueueCount(queueCount);
+ createQueues(queueCount);
+
+ _persistent = persistent;
+ _messageSize = messageSize;
+ _verbose = verbose;
+
+ createConsumers(selector);
+ createProducer();
+ }
+
@Override
+ /**
+ * Sets the replyQueue to be the same as ping queue.
+ */
public void createConsumer(String selector) throws JMSException
{
// Create a message consumer to get the replies with and register this
to be called back by it.
@@ -57,8 +78,6 @@
MessageConsumer consumer =
getConsumerSession().createConsumer(getReplyQueue(), PREFETCH, false,
EXCLUSIVE, selector);
consumer.setMessageListener(this);
}
-
-
/**
* Starts a ping-pong loop running from the command line. The bounce back
client [EMAIL PROTECTED] org.apache.qpid.requestreply.PingPongBouncer} also
needs
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=498687&r1=498686&r2=498687
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
(original)
+++
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Mon Jan 22 08:41:23 2007
@@ -118,7 +118,22 @@
protected boolean _verbose = false;
protected Session _consumerSession;
-
+
+ protected PingPongProducer(String brokerDetails, String username, String
password, String virtualpath,
+ boolean transacted)
+ throws Exception
+ {
+ // Create a connection to the broker.
+ InetAddress address = InetAddress.getLocalHost();
+ String clientID = address.getHostName() + System.currentTimeMillis();
+
+ setConnection(new AMQConnection(brokerDetails, username, password,
clientID, virtualpath));
+
+ // Create transactional or non-transactional sessions, based on the
command line arguments.
+ setProducerSession((Session) getConnection().createSession(transacted,
Session.AUTO_ACKNOWLEDGE));
+ _consumerSession = (Session) getConnection().createSession(transacted,
Session.AUTO_ACKNOWLEDGE);
+ }
+
/**
* Creates a ping pong producer with the specified connection details and
type.
*
@@ -137,39 +152,39 @@
String selector, boolean transacted, boolean
persistent, int messageSize, boolean verbose)
throws Exception
{
- // Create a connection to the broker.
- InetAddress address = InetAddress.getLocalHost();
- String clientID = address.getHostName() + System.currentTimeMillis();
-
- setConnection(new AMQConnection(brokerDetails, username, password,
clientID, virtualpath));
-
- // Create transactional or non-transactional sessions, based on the
command line arguments.
- setProducerSession((Session) getConnection().createSession(transacted,
Session.AUTO_ACKNOWLEDGE));
- _consumerSession = (Session) getConnection().createSession(transacted,
Session.AUTO_ACKNOWLEDGE);
-
- // Create producer and the consumer
- createProducer(queueName, persistent);
- createConsumer(selector);
+ this(brokerDetails, username, password, virtualpath, transacted);
+ _pingQueue = new AMQQueue(queueName);
_persistent = persistent;
_messageSize = messageSize;
_verbose = verbose;
+
+ // Create producer and the consumer
+ createProducer();
+ createConsumer(selector);
}
/**
- * Creates the queue and producer to send the pings on
- * @param queueName
- * @param persistent
+ * Creates the producer to send the pings on. If the tests are with
nultiple queues, then producer
+ * is created with null destination, so that any destination can be
specified while sending
* @throws JMSException
*/
- public void createProducer(String queueName, boolean persistent) throws
JMSException
+ public void createProducer() throws JMSException
{
- // Create a queue and producer to send the pings on.
- if (_pingQueue == null)
- _pingQueue = new AMQQueue(queueName);
- _producer = (MessageProducer)
getProducerSession().createProducer(_pingQueue);
+ if (getQueueCount() > 1)
+ {
+ // create producer with initial destination as null for test with
multiple queues
+ // In this case, a different destination will be used while
sending the message
+ _producer = (MessageProducer)
getProducerSession().createProducer(null);
+ }
+ else
+ {
+ // Create a queue and producer to send the pings on.
+ _producer = (MessageProducer)
getProducerSession().createProducer(_pingQueue);
+
+ }
_producer.setDisableMessageTimestamp(true);
- _producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT :
DeliveryMode.NON_PERSISTENT);
+ _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT :
DeliveryMode.NON_PERSISTENT);
}
/**
@@ -187,6 +202,20 @@
consumer.setMessageListener(this);
}
+ /**
+ * Creates consumer instances for each queue. This is used when test is
being done with multiple queues.
+ * @param selector
+ * @throws JMSException
+ */
+ public void createConsumers(String selector) throws JMSException
+ {
+ for (int i = 0; i < getQueueCount(); i++)
+ {
+ MessageConsumer consumer =
getConsumerSession().createConsumer(getQueue(i), PREFETCH, false, EXCLUSIVE,
selector);
+ consumer.setMessageListener(this);
+ }
+ }
+
protected Session getConsumerSession()
{
return _consumerSession;
@@ -296,6 +325,7 @@
if (_verbose)
{
_logger.info(timestampFormatter.format(new Date()) + ": Got
reply with correlation id, " + correlationID);
+ //_logger.debug("Received from : " +
message.getJMSDestination());
}
// Turn the traffic light to green.
@@ -352,12 +382,20 @@
CountDownLatch trafficLight = new CountDownLatch(numPings);
trafficLights.put(messageCorrelationId, trafficLight);
- for (int i = 0; i < numPings; i++)
+ if (getQueueCount() > 1)
{
- // Re-timestamp the message.
- message.setLongProperty("timestamp", System.currentTimeMillis());
-
- _producer.send(message);
+ // If test is with multiple queues
+ pingMultipleQueues(message, numPings);
+ }
+ else
+ {
+ // If test is with one Queue only
+ for (int i = 0; i < numPings; i++)
+ {
+ // Re-timestamp the message.
+ message.setLongProperty("timestamp",
System.currentTimeMillis());
+ _producer.send(message);
+ }
}
// Commit the transaction if running in transactional mode. This must
happen now, rather than at the end of
@@ -389,6 +427,30 @@
return numReplies;
}
+ /**
+ * When the test is being performed with multiple queues, then this method
will be used, which has a loop to
+ * pick up the next queue from the queues list and sends message to it.
+ * @param message
+ * @param numPings
+ * @throws JMSException
+ */
+ private void pingMultipleQueues(Message message, int numPings) throws
JMSException
+ {
+ int queueIndex = 0;
+ for (int i = 0; i < numPings; i++)
+ {
+ // Re-timestamp the message.
+ message.setLongProperty("timestamp", System.currentTimeMillis());
+ _producer.send(getQueue(queueIndex++), message);
+
+ // reset the counter to get the first queue
+ if (queueIndex == getQueueCount() -1)
+ {
+ queueIndex = 0;
+ }
+ }
+ }
+
/**
* Sends the specified ping message but does not wait for a correlating
reply.
*
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java?view=diff&rev=498687&r1=498686&r2=498687
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
(original)
+++
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
Mon Jan 22 08:41:23 2007
@@ -46,6 +46,9 @@
/** Holds the name of the property to get the ping queue name from. */
private static final String PING_QUEUE_NAME_PROPNAME = "pingQueue";
+ /** holds the queue count, if the test is being performed with multiple
queues */
+ private static final String PING_QUEUE_COUNT_PROPNAME = "queues";
+
/** Holds the name of the property to get the test delivery mode from. */
private static final String PERSISTENT_MODE_PROPNAME = "persistent";
@@ -92,6 +95,7 @@
setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
setSystemPropertyIfNull(TIMEOUT_PROPNAME,
Long.toString(TIMEOUT_DEFAULT));
+ setSystemPropertyIfNull(PING_QUEUE_COUNT_PROPNAME,
Integer.toString(1));
}
/** Holds the test ping client. */
@@ -130,7 +134,7 @@
// Fail the test if the timeout was exceeded.
if (numReplies != numPings)
{
- Assert.fail("The ping timed out. Messages Sent = " + numReplies +
", MessagesReceived = " + numPings);
+ Assert.fail("The ping timed out. Messages Sent = " + numPings + ",
MessagesReceived = " + numReplies);
}
}
@@ -147,6 +151,7 @@
String username = "guest";
String password = "guest";
String virtualpath =
testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
+ int queueCount =
Integer.parseInt(testParameters.getProperty(PING_QUEUE_COUNT_PROPNAME));
String queueName =
testParameters.getProperty(PING_QUEUE_NAME_PROPNAME);
boolean persistent =
Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
boolean transacted =
Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
@@ -154,9 +159,18 @@
boolean verbose = false;
int messageSize =
Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
- // Establish a client to ping a Queue and listen the reply back
from same Queue
- _pingItselfClient = new TestPingItself(brokerDetails, username,
password, virtualpath, queueName,
- selector, transacted,
persistent, messageSize, verbose);
+ if (queueCount > 1)
+ {
+ // test client with multiple queues
+ _pingItselfClient = new TestPingItself(brokerDetails,
username, password, virtualpath, queueCount,
+ selector, transacted,
persistent, messageSize, verbose);
+ }
+ else
+ {
+ // Establish a client to ping a Queue and listen the reply
back from same Queue
+ _pingItselfClient = new TestPingItself(brokerDetails,
username, password, virtualpath, queueName,
+ selector, transacted,
persistent, messageSize, verbose);
+ }
// Start the client connection
_pingItselfClient.getConnection().start();