Author: rupertlssmith
Date: Mon Aug 20 08:27:32 2007
New Revision: 567727
URL: http://svn.apache.org/viewvc?rev=567727&view=rev
Log:
Added options to set transacted or ack mode differently on consumers than
producers for perf tests.
Modified:
incubator/qpid/branches/M2/java/perftests/pom.xml
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Modified: incubator/qpid/branches/M2/java/perftests/pom.xml
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/pom.xml?rev=567727&r1=567726&r2=567727&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/perftests/pom.xml (original)
+++ incubator/qpid/branches/M2/java/perftests/pom.xml Mon Aug 20 08:27:32 2007
@@ -174,7 +174,7 @@
<!-- Single pings. These can be scaled up by
overriding the parameters when calling the test script. -->
<Ping-Once>-n Ping-Once -s[1] -r 1 -t testPingOk -o .
org.apache.qpid.ping.PingTestPerf</Ping-Once>
<Ping-Once-Async>-n Ping-Once-Async -s[1] -r 1 -t
testAsyncPingOk -o . org.apache.qpid.ping.PingAsyncTestPerf</Ping-Once-Async>
- <Ping-Latency>-n Ping-Latency -s[1000] -d10S -t
testPingLatency -o . org.apache.qpid.ping.PingLatencyTestPerf
rate=100</Ping-Latency>
+ <Ping-Latency>-n Ping-Latency -s[1000] -d10S -t
testPingLatency -o . org.apache.qpid.ping.PingLatencyTestPerf rate=100
batchSize=100</Ping-Latency>
<!-- More example Tests. These are examples to
exercise all the features of the test harness. Can scale up with option
overrides. -->
<Ping-Tx>-n Ping-Tx -s[100] -o . -t testAsyncPingOk
org.apache.qpid.ping.PingAsyncTestPerf transacted=true</Ping-Tx>
Modified:
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java?rev=567727&r1=567726&r2=567727&view=diff
==============================================================================
---
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
(original)
+++
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
Mon Aug 20 08:27:32 2007
@@ -20,15 +20,6 @@
*/
package org.apache.qpid.ping;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-
import junit.framework.Test;
import junit.framework.TestSuite;
@@ -40,6 +31,15 @@
import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* PingAsyncTestPerf is a performance test that outputs multiple timings from
its test method, using the timing controller
* interface supplied by the test runner from a seperate listener thread. It
differs from the [EMAIL PROTECTED] PingTestPerf} test
@@ -239,7 +239,7 @@
*
* @throws JMSException Any underlying JMSException is allowed to fall
through.
*/
- public void onMessage(Message message, int remainingCount) throws
JMSException
+ public void onMessage(Message message, int remainingCount, long
latency) throws JMSException
{
// Check if a batch boundary has been crossed.
if ((remainingCount % _batchSize) == 0)
Modified:
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java?rev=567727&r1=567726&r2=567727&view=diff
==============================================================================
---
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
(original)
+++
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
Mon Aug 20 08:27:32 2007
@@ -360,7 +360,7 @@
}
// Ensure messages received are committed.
- if (_transacted)
+ if (_consTransacted)
{
try
{
Modified:
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java?rev=567727&r1=567726&r2=567727&view=diff
==============================================================================
---
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
(original)
+++
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
Mon Aug 20 08:27:32 2007
@@ -20,15 +20,6 @@
*/
package org.apache.qpid.ping;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-
import junit.framework.Test;
import junit.framework.TestSuite;
@@ -43,6 +34,15 @@
import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* PingLatencyTestPerf is a performance test that outputs multiple timings
from its test method, using the timing
* controller interface supplied by the test runner from a seperate listener
thread. It outputs round trip timings for
@@ -261,7 +261,7 @@
*
* @throws javax.jms.JMSException Any underlying JMSException is
allowed to fall through.
*/
- public void onMessage(Message message, int remainingCount) throws
JMSException
+ public void onMessage(Message message, int remainingCount, long
latency) throws JMSException
{
_logger.debug("public void onMessage(Message message, int
remainingCount = " + remainingCount + "): called");
@@ -280,26 +280,6 @@
TimingController tc = perCorrelationId._tc;
int expected = perCorrelationId._expectedCount;
- // Extract the send time from the message and work out
from the current time, what the ping latency was.
- // The ping producer time stamps messages in nanoseconds.
- long startTime;
-
- if (_strictAMQP)
- {
- Long value =
- ((AMQMessage) message).getTimestampProperty(new
AMQShortString(
-
PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME));
-
- startTime = ((value == null) ? 0L : value);
- }
- else
- {
- startTime =
message.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME);
- }
-
- long now = System.nanoTime();
- long pingTime = now - startTime;
-
// Calculate how many messages were actually received in
the last batch. This will be the batch size
// except where the number expected is not a multiple of
the batch size and this is the first remaining
// count to cross a batch size boundary, in which case it
will be the number expected modulo the batch
@@ -309,8 +289,7 @@
// Register a test result for the correlation id.
try
{
-
- tc.completeTest(true, receivedInBatch, pingTime);
+ tc.completeTest(true, receivedInBatch, latency);
}
catch (InterruptedException e)
{
Modified:
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=567727&r1=567726&r2=567727&view=diff
==============================================================================
---
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
(original)
+++
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Mon Aug 20 08:27:32 2007
@@ -98,6 +98,10 @@
* 3 - DUPS_OK_ACKNOWLEDGE
* 257 - NO_ACKNOWLEDGE
* 258 - PRE_ACKNOWLEDGE
+ * <tr><td> consTransacted <td> transacted <td> Whether or not consumers use
transactions. Defaults to the same value
+ * as the 'transacted' option
if not seperately defined.
+ * <tr><td> consAckMode <td> ackMode <td> The message acknowledgement
mode for consumers. Defaults to the same
+ * value as 'ackMode' if not
seperately defined.
* <tr><td> maxPending <td> 0 <td> The maximum size in bytes, of
messages sent but not yet received.
* Limits the volume of messages
currently buffered on the client
* or broker. Can help scale test
clients by limiting amount of buffered
@@ -160,6 +164,8 @@
/** Holds the transactional mode to use for the test. */
public static final boolean TRANSACTED_DEFAULT = false;
+ public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted";
+
/** Holds the name of the property to get the test broker url from. */
public static final String BROKER_PROPNAME = "broker";
@@ -277,6 +283,8 @@
/** Defines the default message acknowledgement mode. */
public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
+ public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode";
+
public static final String MAX_PENDING_PROPNAME = "maxPending";
public static final int MAX_PENDING_DEFAULT = 0;
@@ -304,8 +312,10 @@
defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME,
PING_QUEUE_NAME_DEFAULT);
defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT);
defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT);
+ defaults.setPropertyIfNull(CONSUMER_TRANSACTED_PROPNAME,
defaults.getProperty(TRANSACTED_PROPNAME));
defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME,
PERSISTENT_MODE_DEFAULT);
defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT);
+ defaults.setPropertyIfNull(CONSUMER_ACK_MODE_PROPNAME,
defaults.getProperty(ACK_MODE_PROPNAME));
defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME,
MESSAGE_SIZE_DEAFULT);
defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT);
defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT);
@@ -331,12 +341,15 @@
protected String _destinationName;
protected String _selector;
protected boolean _transacted;
+ protected boolean _consTransacted;
/** Determines whether this producer sends persistent messages. */
protected boolean _persistent;
/** Holds the acknowledgement mode used for sending and receiving
messages. */
- private int _ackMode;
+ protected int _ackMode;
+
+ protected int _consAckMode;
/** Determines what size of messages this producer sends. */
protected int _messageSize;
@@ -485,6 +498,7 @@
_destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME);
_selector = properties.getProperty(SELECTOR_PROPNAME);
_transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME);
+ _consTransacted =
properties.getPropertyAsBoolean(CONSUMER_TRANSACTED_PROPNAME);
_persistent =
properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME);
_messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME);
_verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME);
@@ -501,6 +515,7 @@
_isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
_isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME);
_ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
+ _consAckMode =
properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME);
_maxPendingSize =
properties.getPropertyAsInteger(MAX_PENDING_PROPNAME);
// Check that one or more destinations were specified.
@@ -547,7 +562,7 @@
for (int i = 0; i < _noOfConsumers; i++)
{
- _consumerSession[i] = (Session)
_consumerConnection[i].createSession(_transacted, _ackMode);
+ _consumerSession[i] = (Session)
_consumerConnection[i].createSession(_consTransacted, _consAckMode);
}
// Create the destinations to send pings to and receive replies from.
@@ -815,11 +830,14 @@
public void onMessageWithConsumerNo(Message message, int consumerNo)
{
// log.debug("public void onMessageWithConsumerNo(Message message, int
consumerNo = " + consumerNo + "): called");
-
- NDC.push("cons" + consumerNo);
-
try
{
+ long now = System.nanoTime();
+ long timestamp = getTimestamp(message);
+ long pingTime = now - timestamp;
+
+ // NDC.push("cons" + consumerNo);
+
// Extract the messages correlation id.
String correlationID = message.getJMSCorrelationID();
// log.debug("correlationID = " + correlationID);
@@ -886,7 +904,7 @@
}
}
- NDC.push("/rem" + remainingCount);
+ // NDC.push("/rem" + remainingCount);
// log.debug("remainingCount = " + remainingCount);
// log.debug("trueCount = " + trueCount);
@@ -907,7 +925,7 @@
// Forward the message and remaining count to any
interested chained message listener.
if (_chainedMessageListener != null)
{
- _chainedMessageListener.onMessage(message, (int)
remainingCount);
+ _chainedMessageListener.onMessage(message, (int)
remainingCount, pingTime);
}
// Check if this is the last message, in which case
release any waiting producers. This is done
@@ -947,7 +965,7 @@
finally
{
// log.debug("public void onMessageWithConsumerNo(Message message,
int consumerNo): ending");
- NDC.clear();
+ // NDC.clear();
}
}
@@ -981,7 +999,7 @@
try
{
- NDC.push("prod");
+ // NDC.push("prod");
// Create a count down latch to count the number of replies with.
This is created before the messages are
// sent so that the replies cannot be received before the count
down is created.
@@ -1046,7 +1064,7 @@
// so will be a memory leak if this is not done.
finally
{
- NDC.pop();
+ // NDC.pop();
perCorrelationIds.remove(messageCorrelationId);
}
}
@@ -1566,7 +1584,17 @@
*/
public static interface ChainedMessageListener
{
- public void onMessage(Message message, int remainingCount) throws
JMSException;
+ /**
+ * Notifies interested listeners about message arrival and important
test stats, the number of messages
+ * remaining in the test, and the messages send timestamp.
+ *
+ * @param message The newly arrived message.
+ * @param remainingCount The number of messages left to complete the
test.
+ * @param latency The nanosecond latency of the message.
+ *
+ * @throws JMSException Any JMS exceptions is allowed to fall through.
+ */
+ public void onMessage(Message message, int remainingCount, long
latency) throws JMSException;
}
/**