Author: rupertlssmith
Date: Mon Aug 20 08:27:32 2007
New Revision: 567727

URL: http://svn.apache.org/viewvc?rev=567727&view=rev
Log:
Added options to set transacted or ack mode differently on consumers than 
producers for perf tests.

Modified:
    incubator/qpid/branches/M2/java/perftests/pom.xml
    
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
    
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
    
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
    
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java

Modified: incubator/qpid/branches/M2/java/perftests/pom.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/pom.xml?rev=567727&r1=567726&r2=567727&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/perftests/pom.xml (original)
+++ incubator/qpid/branches/M2/java/perftests/pom.xml Mon Aug 20 08:27:32 2007
@@ -174,7 +174,7 @@
                         <!-- Single pings. These can be scaled up by 
overriding the parameters when calling the test script. -->
                         <Ping-Once>-n Ping-Once -s[1] -r 1 -t testPingOk -o . 
org.apache.qpid.ping.PingTestPerf</Ping-Once>
                         <Ping-Once-Async>-n Ping-Once-Async -s[1] -r 1 -t 
testAsyncPingOk -o . org.apache.qpid.ping.PingAsyncTestPerf</Ping-Once-Async>
-                        <Ping-Latency>-n Ping-Latency -s[1000] -d10S -t 
testPingLatency -o . org.apache.qpid.ping.PingLatencyTestPerf 
rate=100</Ping-Latency>
+                        <Ping-Latency>-n Ping-Latency -s[1000] -d10S -t 
testPingLatency -o . org.apache.qpid.ping.PingLatencyTestPerf rate=100 
batchSize=100</Ping-Latency>
 
                         <!-- More example Tests. These are examples to 
exercise all the features of the test harness. Can scale up with option 
overrides. -->
                         <Ping-Tx>-n Ping-Tx -s[100] -o . -t testAsyncPingOk 
org.apache.qpid.ping.PingAsyncTestPerf transacted=true</Ping-Tx>

Modified: 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java?rev=567727&r1=567726&r2=567727&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
 (original)
+++ 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
 Mon Aug 20 08:27:32 2007
@@ -20,15 +20,6 @@
  */
 package org.apache.qpid.ping;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-
 import junit.framework.Test;
 import junit.framework.TestSuite;
 
@@ -40,6 +31,15 @@
 import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
 import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
 
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * PingAsyncTestPerf is a performance test that outputs multiple timings from 
its test method, using the timing controller
  * interface supplied by the test runner from a seperate listener thread. It 
differs from the [EMAIL PROTECTED] PingTestPerf} test
@@ -239,7 +239,7 @@
          *
          * @throws JMSException Any underlying JMSException is allowed to fall 
through.
          */
-        public void onMessage(Message message, int remainingCount) throws 
JMSException
+        public void onMessage(Message message, int remainingCount, long 
latency) throws JMSException
         {
             // Check if a batch boundary has been crossed.
             if ((remainingCount % _batchSize) == 0)

Modified: 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java?rev=567727&r1=567726&r2=567727&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
 (original)
+++ 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
 Mon Aug 20 08:27:32 2007
@@ -360,7 +360,7 @@
         }
 
         // Ensure messages received are committed.
-        if (_transacted)
+        if (_consTransacted)
         {
             try
             {

Modified: 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java?rev=567727&r1=567726&r2=567727&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
 (original)
+++ 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
 Mon Aug 20 08:27:32 2007
@@ -20,15 +20,6 @@
  */
 package org.apache.qpid.ping;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-
 import junit.framework.Test;
 import junit.framework.TestSuite;
 
@@ -43,6 +34,15 @@
 import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
 import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
 
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * PingLatencyTestPerf is a performance test that outputs multiple timings 
from its test method, using the timing
  * controller interface supplied by the test runner from a seperate listener 
thread. It outputs round trip timings for
@@ -261,7 +261,7 @@
          *
          * @throws javax.jms.JMSException Any underlying JMSException is 
allowed to fall through.
          */
-        public void onMessage(Message message, int remainingCount) throws 
JMSException
+        public void onMessage(Message message, int remainingCount, long 
latency) throws JMSException
         {
             _logger.debug("public void onMessage(Message message, int 
remainingCount = " + remainingCount + "): called");
 
@@ -280,26 +280,6 @@
                     TimingController tc = perCorrelationId._tc;
                     int expected = perCorrelationId._expectedCount;
 
-                    // Extract the send time from the message and work out 
from the current time, what the ping latency was.
-                    // The ping producer time stamps messages in nanoseconds.
-                    long startTime;
-
-                    if (_strictAMQP)
-                    {
-                        Long value =
-                            ((AMQMessage) message).getTimestampProperty(new 
AMQShortString(
-                                    
PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME));
-
-                        startTime = ((value == null) ? 0L : value);
-                    }
-                    else
-                    {
-                        startTime = 
message.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME);
-                    }
-
-                    long now = System.nanoTime();
-                    long pingTime = now - startTime;
-
                     // Calculate how many messages were actually received in 
the last batch. This will be the batch size
                     // except where the number expected is not a multiple of 
the batch size and this is the first remaining
                     // count to cross a batch size boundary, in which case it 
will be the number expected modulo the batch
@@ -309,8 +289,7 @@
                     // Register a test result for the correlation id.
                     try
                     {
-
-                        tc.completeTest(true, receivedInBatch, pingTime);
+                        tc.completeTest(true, receivedInBatch, latency);
                     }
                     catch (InterruptedException e)
                     {

Modified: 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=567727&r1=567726&r2=567727&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 (original)
+++ 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 Mon Aug 20 08:27:32 2007
@@ -98,6 +98,10 @@
  *                                               3 - DUPS_OK_ACKNOWLEDGE
  *                                               257 - NO_ACKNOWLEDGE
  *                                               258 - PRE_ACKNOWLEDGE
+ * <tr><td> consTransacted   <td> transacted <td> Whether or not consumers use 
transactions. Defaults to the same value
+ *                                                as the 'transacted' option 
if not seperately defined.
+ * <tr><td> consAckMode      <td> ackMode  <td> The message acknowledgement 
mode for consumers. Defaults to the same
+ *                                              value as 'ackMode' if not 
seperately defined.
  * <tr><td> maxPending       <td> 0        <td> The maximum size in bytes, of 
messages sent but not yet received.
  *                                              Limits the volume of messages 
currently buffered on the client
  *                                              or broker. Can help scale test 
clients by limiting amount of buffered
@@ -160,6 +164,8 @@
     /** Holds the transactional mode to use for the test. */
     public static final boolean TRANSACTED_DEFAULT = false;
 
+    public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted";
+
     /** Holds the name of the property to get the test broker url from. */
     public static final String BROKER_PROPNAME = "broker";
 
@@ -277,6 +283,8 @@
     /** Defines the default message acknowledgement mode. */
     public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
 
+    public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode";
+
     public static final String MAX_PENDING_PROPNAME = "maxPending";
     public static final int MAX_PENDING_DEFAULT = 0;
 
@@ -304,8 +312,10 @@
         defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, 
PING_QUEUE_NAME_DEFAULT);
         defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT);
         defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT);
+        defaults.setPropertyIfNull(CONSUMER_TRANSACTED_PROPNAME, 
defaults.getProperty(TRANSACTED_PROPNAME));
         defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, 
PERSISTENT_MODE_DEFAULT);
         defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT);
+        defaults.setPropertyIfNull(CONSUMER_ACK_MODE_PROPNAME, 
defaults.getProperty(ACK_MODE_PROPNAME));
         defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, 
MESSAGE_SIZE_DEAFULT);
         defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT);
         defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT);
@@ -331,12 +341,15 @@
     protected String _destinationName;
     protected String _selector;
     protected boolean _transacted;
+    protected boolean _consTransacted;
 
     /** Determines whether this producer sends persistent messages. */
     protected boolean _persistent;
 
     /** Holds the acknowledgement mode used for sending and receiving 
messages. */
-    private int _ackMode;
+    protected int _ackMode;
+
+    protected int _consAckMode;
 
     /** Determines what size of messages this producer sends. */
     protected int _messageSize;
@@ -485,6 +498,7 @@
         _destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME);
         _selector = properties.getProperty(SELECTOR_PROPNAME);
         _transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME);
+        _consTransacted = 
properties.getPropertyAsBoolean(CONSUMER_TRANSACTED_PROPNAME);
         _persistent = 
properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME);
         _messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME);
         _verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME);
@@ -501,6 +515,7 @@
         _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
         _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME);
         _ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
+        _consAckMode = 
properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME);
         _maxPendingSize = 
properties.getPropertyAsInteger(MAX_PENDING_PROPNAME);
 
         // Check that one or more destinations were specified.
@@ -547,7 +562,7 @@
 
         for (int i = 0; i < _noOfConsumers; i++)
         {
-            _consumerSession[i] = (Session) 
_consumerConnection[i].createSession(_transacted, _ackMode);
+            _consumerSession[i] = (Session) 
_consumerConnection[i].createSession(_consTransacted, _consAckMode);
         }
 
         // Create the destinations to send pings to and receive replies from.
@@ -815,11 +830,14 @@
     public void onMessageWithConsumerNo(Message message, int consumerNo)
     {
         // log.debug("public void onMessageWithConsumerNo(Message message, int 
consumerNo = " + consumerNo + "): called");
-
-        NDC.push("cons" + consumerNo);
-
         try
         {
+            long now = System.nanoTime();
+            long timestamp = getTimestamp(message);
+            long pingTime = now - timestamp;
+
+            // NDC.push("cons" + consumerNo);
+
             // Extract the messages correlation id.
             String correlationID = message.getJMSCorrelationID();
             // log.debug("correlationID = " + correlationID);
@@ -886,7 +904,7 @@
                             }
                         }
 
-                        NDC.push("/rem" + remainingCount);
+                        // NDC.push("/rem" + remainingCount);
 
                         // log.debug("remainingCount = " + remainingCount);
                         // log.debug("trueCount = " + trueCount);
@@ -907,7 +925,7 @@
                         // Forward the message and remaining count to any 
interested chained message listener.
                         if (_chainedMessageListener != null)
                         {
-                            _chainedMessageListener.onMessage(message, (int) 
remainingCount);
+                            _chainedMessageListener.onMessage(message, (int) 
remainingCount, pingTime);
                         }
 
                         // Check if this is the last message, in which case 
release any waiting producers. This is done
@@ -947,7 +965,7 @@
         finally
         {
             // log.debug("public void onMessageWithConsumerNo(Message message, 
int consumerNo): ending");
-            NDC.clear();
+            // NDC.clear();
         }
     }
 
@@ -981,7 +999,7 @@
 
         try
         {
-            NDC.push("prod");
+            // NDC.push("prod");
 
             // Create a count down latch to count the number of replies with. 
This is created before the messages are
             // sent so that the replies cannot be received before the count 
down is created.
@@ -1046,7 +1064,7 @@
         // so will be a memory leak if this is not done.
         finally
         {
-            NDC.pop();
+            // NDC.pop();
             perCorrelationIds.remove(messageCorrelationId);
         }
     }
@@ -1566,7 +1584,17 @@
      */
     public static interface ChainedMessageListener
     {
-        public void onMessage(Message message, int remainingCount) throws 
JMSException;
+        /**
+         * Notifies interested listeners about message arrival and important 
test stats, the number of messages
+         * remaining in the test, and the messages send timestamp.
+         *
+         * @param message        The newly arrived message.
+         * @param remainingCount The number of messages left to complete the 
test.
+         * @param latency        The nanosecond latency of the message.
+         *
+         * @throws JMSException Any JMS exceptions is allowed to fall through.
+         */
+        public void onMessage(Message message, int remainingCount, long 
latency) throws JMSException;
     }
 
     /**


Reply via email to