Modified: 
incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java?rev=568924&r1=568923&r2=568924&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
 (original)
+++ 
incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
 Thu Aug 23 03:37:59 2007
@@ -20,15 +20,6 @@
  */
 package org.apache.qpid.ping;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-
 import junit.framework.Test;
 import junit.framework.TestSuite;
 
@@ -40,6 +31,15 @@
 import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
 import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
 
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * PingAsyncTestPerf is a performance test that outputs multiple timings from 
its test method, using the timing controller
  * interface supplied by the test runner from a seperate listener thread. It 
differs from the [EMAIL PROTECTED] PingTestPerf} test
@@ -239,7 +239,7 @@
          *
          * @throws JMSException Any underlying JMSException is allowed to fall 
through.
          */
-        public void onMessage(Message message, int remainingCount) throws 
JMSException
+        public void onMessage(Message message, int remainingCount, long 
latency) throws JMSException
         {
             // Check if a batch boundary has been crossed.
             if ((remainingCount % _batchSize) == 0)

Modified: 
incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java?rev=568924&r1=568923&r2=568924&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
 (original)
+++ 
incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
 Thu Aug 23 03:37:59 2007
@@ -95,13 +95,13 @@
         {
             log.debug("1 consumer per destination.");
 
-            return 1;
+            return _noOfConsumers;
         }
         else
         {
             log.debug(_pingClientCount + " consumers per destination.");
 
-            return _pingClientCount;
+            return _pingClientCount * _noOfConsumers;
         }
     }
 }

Modified: 
incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java?rev=568924&r1=568923&r2=568924&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
 (original)
+++ 
incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
 Thu Aug 23 03:37:59 2007
@@ -360,7 +360,7 @@
         }
 
         // Ensure messages received are committed.
-        if (_transacted)
+        if (_consTransacted)
         {
             try
             {

Modified: 
incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java?rev=568924&r1=568923&r2=568924&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
 (original)
+++ 
incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
 Thu Aug 23 03:37:59 2007
@@ -20,15 +20,6 @@
  */
 package org.apache.qpid.ping;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-
 import junit.framework.Test;
 import junit.framework.TestSuite;
 
@@ -43,6 +34,15 @@
 import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
 import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
 
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * PingLatencyTestPerf is a performance test that outputs multiple timings 
from its test method, using the timing
  * controller interface supplied by the test runner from a seperate listener 
thread. It outputs round trip timings for
@@ -261,7 +261,7 @@
          *
          * @throws javax.jms.JMSException Any underlying JMSException is 
allowed to fall through.
          */
-        public void onMessage(Message message, int remainingCount) throws 
JMSException
+        public void onMessage(Message message, int remainingCount, long 
latency) throws JMSException
         {
             _logger.debug("public void onMessage(Message message, int 
remainingCount = " + remainingCount + "): called");
 
@@ -280,26 +280,6 @@
                     TimingController tc = perCorrelationId._tc;
                     int expected = perCorrelationId._expectedCount;
 
-                    // Extract the send time from the message and work out 
from the current time, what the ping latency was.
-                    // The ping producer time stamps messages in nanoseconds.
-                    long startTime;
-
-                    if (_strictAMQP)
-                    {
-                        Long value =
-                            ((AMQMessage) message).getTimestampProperty(new 
AMQShortString(
-                                    
PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME));
-
-                        startTime = ((value == null) ? 0L : value);
-                    }
-                    else
-                    {
-                        startTime = 
message.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME);
-                    }
-
-                    long now = System.nanoTime();
-                    long pingTime = now - startTime;
-
                     // Calculate how many messages were actually received in 
the last batch. This will be the batch size
                     // except where the number expected is not a multiple of 
the batch size and this is the first remaining
                     // count to cross a batch size boundary, in which case it 
will be the number expected modulo the batch
@@ -309,8 +289,7 @@
                     // Register a test result for the correlation id.
                     try
                     {
-
-                        tc.completeTest(true, receivedInBatch, pingTime);
+                        tc.completeTest(true, receivedInBatch, latency);
                     }
                     catch (InterruptedException e)
                     {

Modified: 
incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=568924&r1=568923&r2=568924&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 (original)
+++ 
incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 Thu Aug 23 03:37:59 2007
@@ -21,6 +21,7 @@
 package org.apache.qpid.requestreply;
 
 import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.*;
@@ -97,6 +98,10 @@
  *                                               3 - DUPS_OK_ACKNOWLEDGE
  *                                               257 - NO_ACKNOWLEDGE
  *                                               258 - PRE_ACKNOWLEDGE
+ * <tr><td> consTransacted   <td> false    <td> Whether or not consumers use 
transactions. Defaults to the same value
+ *                                              as the 'transacted' option if 
not seperately defined.
+ * <tr><td> consAckMode      <td> AUTO_ACK <td> The message acknowledgement 
mode for consumers. Defaults to the same
+ *                                              value as 'ackMode' if not 
seperately defined.
  * <tr><td> maxPending       <td> 0        <td> The maximum size in bytes, of 
messages sent but not yet received.
  *                                              Limits the volume of messages 
currently buffered on the client
  *                                              or broker. Can help scale test 
clients by limiting amount of buffered
@@ -132,6 +137,7 @@
  */
 public class PingPongProducer implements Runnable /*, MessageListener*/, 
ExceptionListener
 {
+    /** Used for debugging. */
     private static final Logger log = Logger.getLogger(PingPongProducer.class);
 
     /** Holds the name of the property to get the test message size from. */
@@ -158,6 +164,9 @@
     /** Holds the transactional mode to use for the test. */
     public static final boolean TRANSACTED_DEFAULT = false;
 
+    public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted";
+    public static final boolean CONSUMER_TRANSACTED_DEFAULT = false;
+
     /** Holds the name of the property to get the test broker url from. */
     public static final String BROKER_PROPNAME = "broker";
 
@@ -275,6 +284,9 @@
     /** Defines the default message acknowledgement mode. */
     public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
 
+    public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode";
+    public static final int CONSUMER_ACK_MODE_DEFAULT = 
Session.AUTO_ACKNOWLEDGE;
+
     public static final String MAX_PENDING_PROPNAME = "maxPending";
     public static final int MAX_PENDING_DEFAULT = 0;
 
@@ -302,8 +314,10 @@
         defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, 
PING_QUEUE_NAME_DEFAULT);
         defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT);
         defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT);
+        defaults.setPropertyIfNull(CONSUMER_TRANSACTED_PROPNAME, 
CONSUMER_TRANSACTED_DEFAULT);
         defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, 
PERSISTENT_MODE_DEFAULT);
         defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT);
+        defaults.setPropertyIfNull(CONSUMER_ACK_MODE_PROPNAME, 
CONSUMER_ACK_MODE_DEFAULT);
         defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, 
MESSAGE_SIZE_DEAFULT);
         defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT);
         defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT);
@@ -329,12 +343,15 @@
     protected String _destinationName;
     protected String _selector;
     protected boolean _transacted;
+    protected boolean _consTransacted;
 
     /** Determines whether this producer sends persistent messages. */
     protected boolean _persistent;
 
     /** Holds the acknowledgement mode used for sending and receiving 
messages. */
-    private int _ackMode;
+    protected int _ackMode;
+
+    protected int _consAckMode;
 
     /** Determines what size of messages this producer sends. */
     protected int _messageSize;
@@ -451,16 +468,13 @@
     /** Holds the message consumer to receive the ping replies through. */
     protected MessageConsumer[] _consumer;
 
-    /**
-     * Holds the number of consumers that will be attached to each destination 
in the test. Each pings will result in
-     * a message being received by each of these clients in a pub/sub tests, 
and by only one at a time in a p2p test.
-     */
-    static int _consumersPerDestination = 1;
-
     /** The prompt to display when asking the user to kill the broker for 
failover testing. */
     private static final String KILL_BROKER_PROMPT = "Kill broker now, then 
press Return.";
     private String _clientID;
 
+    /** Keeps count of the total messages sent purely for debugging purposes. 
*/
+    private static AtomicInteger numSent = new AtomicInteger();
+
     /**
      * Creates a ping producer with the specified parameters, of which there 
are many. See the class level comments
      * for details. This constructor creates a connection to the broker and 
creates producer and consumer sessions on
@@ -472,7 +486,7 @@
      */
     public PingPongProducer(Properties overrides) throws Exception
     {
-        log.debug("public PingPongProducer(Properties overrides = " + 
overrides + "): called");
+        // log.debug("public PingPongProducer(Properties overrides = " + 
overrides + "): called");
 
         // Create a set of parsed properties from the defaults overriden by 
the passed in values.
         ParsedProperties properties = new ParsedProperties(defaults);
@@ -486,6 +500,7 @@
         _destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME);
         _selector = properties.getProperty(SELECTOR_PROPNAME);
         _transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME);
+        _consTransacted = 
properties.getPropertyAsBoolean(CONSUMER_TRANSACTED_PROPNAME);
         _persistent = 
properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME);
         _messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME);
         _verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME);
@@ -502,6 +517,7 @@
         _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
         _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME);
         _ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
+        _consAckMode = 
properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME);
         _maxPendingSize = 
properties.getPropertyAsInteger(MAX_PENDING_PROPNAME);
 
         // Check that one or more destinations were specified.
@@ -532,7 +548,7 @@
      */
     public void establishConnection(boolean producer, boolean consumer) throws 
Exception
     {
-        log.debug("public void establishConnection(): called");
+        // log.debug("public void establishConnection(): called");
 
         // Generate a unique identifying name for this client, based on it ip 
address and the current time.
         InetAddress address = InetAddress.getLocalHost();
@@ -548,7 +564,7 @@
 
         for (int i = 0; i < _noOfConsumers; i++)
         {
-            _consumerSession[i] = (Session) 
_consumerConnection[i].createSession(_transacted, _ackMode);
+            _consumerSession[i] = (Session) 
_consumerConnection[i].createSession(_consTransacted, _consAckMode);
         }
 
         // Create the destinations to send pings to and receive replies from.
@@ -579,8 +595,14 @@
      */
     protected void createConnection(String clientID) throws AMQException, 
URLSyntaxException
     {
+        // log.debug("protected void createConnection(String clientID = " + 
clientID + "): called");
+
+        // log.debug("Creating a connection for the message producer.");
+
         _connection = new AMQConnection(_brokerDetails, _username, _password, 
clientID, _virtualpath);
 
+        // log.debug("Creating " + _noOfConsumers + " connections for the 
consumers.");
+
         _consumerConnection = new Connection[_noOfConsumers];
 
         for (int i = 0; i < _noOfConsumers; i++)
@@ -654,12 +676,12 @@
      */
     public List<Destination> getReplyDestinations()
     {
-        log.debug("public List<Destination> getReplyDestinations(): called");
+        // log.debug("public List<Destination> getReplyDestinations(): 
called");
 
         List<Destination> replyDestinations = new ArrayList<Destination>();
         replyDestinations.add(_replyDestination);
 
-        log.debug("replyDestinations = " + replyDestinations);
+        // log.debug("replyDestinations = " + replyDestinations);
 
         return replyDestinations;
     }
@@ -672,12 +694,12 @@
      */
     public void createProducer() throws JMSException
     {
-        log.debug("public void createProducer(): called");
+        // log.debug("public void createProducer(): called");
 
         _producer = (MessageProducer) _producerSession.createProducer(null);
         _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : 
DeliveryMode.NON_PERSISTENT);
 
-        log.debug("Created producer for " + (_persistent ? "persistent" : 
"non-persistent") + " messages.");
+        // log.debug("Created producer for " + (_persistent ? "persistent" : 
"non-persistent") + " messages.");
     }
 
     /**
@@ -695,14 +717,14 @@
     public void createPingDestinations(int noOfDestinations, String selector, 
String rootName, boolean unique,
         boolean durable) throws JMSException, AMQException
     {
-        log.debug("public void createPingDestinations(int noOfDestinations = " 
+ noOfDestinations + ", String selector = "
+        /*log.debug("public void createPingDestinations(int noOfDestinations = 
" + noOfDestinations + ", String selector = "
             + selector + ", String rootName = " + rootName + ", boolean unique 
= " + unique + ", boolean durable = "
-            + durable + "): called");
+            + durable + "): called");*/
 
         _pingDestinations = new ArrayList<Destination>();
 
         // Create the desired number of ping destinations and consumers for 
them.
-        log.debug("Creating " + noOfDestinations + " destinations to ping.");
+        // log.debug("Creating " + noOfDestinations + " destinations to 
ping.");
 
         for (int i = 0; i < noOfDestinations; i++)
         {
@@ -713,12 +735,12 @@
             // Generate an id, unique within this pinger or to the whole JVM 
depending on the unique flag.
             if (unique)
             {
-                log.debug("Creating unique destinations.");
+                // log.debug("Creating unique destinations.");
                 id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + 
_connection.getClientID();
             }
             else
             {
-                log.debug("Creating shared destinations.");
+                // log.debug("Creating shared destinations.");
                 id = "_" + _queueSharedID.incrementAndGet();
             }
 
@@ -728,14 +750,14 @@
                 if (!durable)
                 {
                     destination = new 
AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
-                    log.debug("Created non-durable topic " + destination);
+                    // log.debug("Created non-durable topic " + destination);
                 }
                 else
                 {
                     destination =
                         AMQTopic.createDurableTopic(new 
AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id),
                             _clientID, (AMQConnection) _connection);
-                    log.debug("Created durable topic " + destination);
+                    // log.debug("Created durable topic " + destination);
                 }
             }
             // Otherwise this is a p2p pinger, in which case create queues.
@@ -749,7 +771,7 @@
                 ((AMQSession) _producerSession).bindQueue(destinationName, 
destinationName, null,
                     ExchangeDefaults.DIRECT_EXCHANGE_NAME);
 
-                log.debug("Created queue " + destination);
+                // log.debug("Created queue " + destination);
             }
 
             // Keep the destination.
@@ -767,10 +789,12 @@
      */
     public void createReplyConsumers(Collection<Destination> destinations, 
String selector) throws JMSException
     {
-        log.debug("public void createReplyConsumers(Collection<Destination> 
destinations = " + destinations
-            + ", String selector = " + selector + "): called");
+        /*log.debug("public void createReplyConsumers(Collection<Destination> 
destinations = " + destinations
+            + ", String selector = " + selector + "): called");*/
 
-        log.debug("Creating " + destinations.size() + " reply consumers.");
+        // log.debug("There are " + destinations.size() + " destinations.");
+        // log.debug("Creating " + _noOfConsumers + " consumers on each 
destination.");
+        // log.debug("Total number of consumers is: " + (destinations.size() * 
_noOfConsumers));
 
         for (Destination destination : destinations)
         {
@@ -793,7 +817,7 @@
                         }
                     });
 
-                log.debug("Set this to listen to replies sent to destination: 
" + destination);
+                // log.debug("Set consumer " + i + " to listen to replies sent 
to destination: " + destination);
             }
         }
     }
@@ -807,95 +831,121 @@
      */
     public void onMessageWithConsumerNo(Message message, int consumerNo)
     {
-        // log.debug("public void onMessage(Message message): called");
-
+        // log.debug("public void onMessageWithConsumerNo(Message message, int 
consumerNo = " + consumerNo + "): called");
         try
         {
+            long now = System.nanoTime();
+            long timestamp = getTimestamp(message);
+            long pingTime = now - timestamp;
+
+            // NDC.push("cons" + consumerNo);
+
             // Extract the messages correlation id.
             String correlationID = message.getJMSCorrelationID();
             // log.debug("correlationID = " + correlationID);
 
-            // Countdown on the traffic light if there is one for the matching 
correlation id.
-            PerCorrelationId perCorrelationId = 
perCorrelationIds.get(correlationID);
+            int num = message.getIntProperty("MSG_NUM");
+            // log.info("Message " + num + " received.");
+
+            boolean isRedelivered = message.getJMSRedelivered();
+            // log.debug("isRedelivered = " + isRedelivered);
 
-            if (perCorrelationId != null)
+            if (!isRedelivered)
             {
-                CountDownLatch trafficLight = perCorrelationId.trafficLight;
+                // Countdown on the traffic light if there is one for the 
matching correlation id.
+                PerCorrelationId perCorrelationId = 
perCorrelationIds.get(correlationID);
+
+                if (perCorrelationId != null)
+                {
+                    CountDownLatch trafficLight = 
perCorrelationId.trafficLight;
 
-                // Restart the timeout timer on every message.
-                perCorrelationId.timeOutStart = System.nanoTime();
+                    // Restart the timeout timer on every message.
+                    perCorrelationId.timeOutStart = System.nanoTime();
 
-                // log.debug("Reply was expected, decrementing the latch for 
the id, " + correlationID);
+                    // log.debug("Reply was expected, decrementing the latch 
for the id, " + correlationID);
 
-                // Decrement the countdown latch. Before this point, it is 
possible that two threads might enter this
-                // method simultanesouly with the same correlation id. 
Decrementing the latch in a synchronized block
-                // ensures that each thread will get a unique value for the 
remaining messages.
-                long trueCount = -1;
-                long remainingCount = -1;
+                    // Decrement the countdown latch. Before this point, it is 
possible that two threads might enter this
+                    // method simultanesouly with the same correlation id. 
Decrementing the latch in a synchronized block
+                    // ensures that each thread will get a unique value for 
the remaining messages.
+                    long trueCount = -1;
+                    long remainingCount = -1;
 
-                synchronized (trafficLight)
-                {
-                    trafficLight.countDown();
+                    synchronized (trafficLight)
+                    {
+                        trafficLight.countDown();
 
-                    trueCount = trafficLight.getCount();
-                    remainingCount = trueCount - 1;
+                        trueCount = trafficLight.getCount();
+                        remainingCount = trueCount - 1;
 
-                    // Decrement the count of sent but not yet received 
messages.
-                    int unreceived = _unreceived.decrementAndGet();
-                    int unreceivedSize = (unreceived * ((_messageSize == 0) ? 
1 : _messageSize));
+                        // Decrement the count of sent but not yet received 
messages.
+                        int unreceived = _unreceived.decrementAndGet();
+                        int unreceivedSize = (unreceived * ((_messageSize == 
0) ? 1 : _messageSize));
 
-                    // Release a waiting sender if there is one.
-                    synchronized (_sendPauseMonitor)
-                    {
-                        if ((_maxPendingSize > 0) && (unreceivedSize < 
_maxPendingSize))
-                        // && (_sendPauseBarrier.getNumberWaiting() == 1))
+                        // Release a waiting sender if there is one.
+                        synchronized (_sendPauseMonitor)
                         {
-                            log.debug("unreceived size estimate under limit = 
" + unreceivedSize);
-
-                            // Wait on the send pause barrier for the limit to 
be re-established.
-                            /*try
-                            {*/
-                            // _sendPauseBarrier.await();
-                            _sendPauseMonitor.notify();
-                            /*}
-                            catch (InterruptedException e)
+                            if ((_maxPendingSize > 0) && (unreceivedSize < 
_maxPendingSize))
+                            // && (_sendPauseBarrier.getNumberWaiting() == 1))
                             {
-                                throw new RuntimeException(e);
+                                // log.debug("unreceived size estimate under 
limit = " + unreceivedSize);
+
+                                // Wait on the send pause barrier for the 
limit to be re-established.
+                                /*try
+                                {*/
+                                // _sendPauseBarrier.await();
+                                _sendPauseMonitor.notify();
+                                /*}
+                                catch (InterruptedException e)
+                                {
+                                    throw new RuntimeException(e);
+                                }
+                                catch (BrokenBarrierException e)
+                                {
+                                    throw new RuntimeException(e);
+                                }*/
                             }
-                            catch (BrokenBarrierException e)
-                            {
-                                throw new RuntimeException(e);
-                            }*/
                         }
-                    }
 
-                    // log.debug("remainingCount = " + remainingCount);
-                    // log.debug("trueCount = " + trueCount);
+                        // NDC.push("/rem" + remainingCount);
 
-                    // Commit on transaction batch size boundaries. At this 
point in time the waiting producer remains
-                    // blocked, even on the last message.
-                    if ((remainingCount % _txBatchSize) == 0)
-                    {
-                        commitTx(_consumerSession[consumerNo]);
-                    }
+                        // log.debug("remainingCount = " + remainingCount);
+                        // log.debug("trueCount = " + trueCount);
 
-                    // Forward the message and remaining count to any 
interested chained message listener.
-                    if (_chainedMessageListener != null)
-                    {
-                        _chainedMessageListener.onMessage(message, (int) 
remainingCount);
-                    }
+                        // Commit on transaction batch size boundaries. At 
this point in time the waiting producer remains
+                        // blocked, even on the last message.
+                        // Commit count is divided by noOfConsumers in p2p 
mode, so that each consumer only commits on
+                        // each batch boundary. For pub/sub each consumer gets 
every message so no division is done.
+                        long commitCount = _isPubSub ? remainingCount : 
(remainingCount / _noOfConsumers);
+                        // log.debug("commitCount = " + commitCount);
 
-                    // Check if this is the last message, in which case 
release any waiting producers. This is done
-                    // after the transaction has been committed and any 
listeners notified.
-                    if (trueCount == 1)
-                    {
-                        trafficLight.countDown();
+                        if ((commitCount % _txBatchSize) == 0)
+                        {
+                            // log.debug("Trying commit for consumer " + 
consumerNo + ".");
+                            commitTx(_consumerSession[consumerNo]);
+                        }
+
+                        // Forward the message and remaining count to any 
interested chained message listener.
+                        if (_chainedMessageListener != null)
+                        {
+                            _chainedMessageListener.onMessage(message, (int) 
remainingCount, pingTime);
+                        }
+
+                        // Check if this is the last message, in which case 
release any waiting producers. This is done
+                        // after the transaction has been committed and any 
listeners notified.
+                        if (trueCount == 1)
+                        {
+                            trafficLight.countDown();
+                        }
                     }
                 }
+                else
+                {
+                    log.warn("Got unexpected message with correlationId: " + 
correlationID);
+                }
             }
             else
             {
-                log.warn("Got unexpected message with correlationId: " + 
correlationID);
+                log.warn("Got redelivered message, ignoring.");
             }
 
             // Print out ping times for every message in verbose mode only.
@@ -914,8 +964,11 @@
         {
             log.warn("There was a JMSException: " + e.getMessage(), e);
         }
-
-        // log.debug("public void onMessage(Message message): ending");
+        finally
+        {
+            // log.debug("public void onMessageWithConsumerNo(Message message, 
int consumerNo): ending");
+            // NDC.clear();
+        }
     }
 
     /**
@@ -937,8 +990,8 @@
     public int pingAndWaitForReply(Message message, int numPings, long 
timeout, String messageCorrelationId)
         throws JMSException, InterruptedException
     {
-        log.debug("public int pingAndWaitForReply(Message message, int 
numPings = " + numPings + ", long timeout = "
-            + timeout + ", String messageCorrelationId = " + 
messageCorrelationId + "): called");
+        /*log.debug("public int pingAndWaitForReply(Message message, int 
numPings = " + numPings + ", long timeout = "
+            + timeout + ", String messageCorrelationId = " + 
messageCorrelationId + "): called");*/
 
         // Generate a unique correlation id to put on the messages before 
sending them, if one was not specified.
         if (messageCorrelationId == null)
@@ -948,6 +1001,8 @@
 
         try
         {
+            // NDC.push("prod");
+
             // Create a count down latch to count the number of replies with. 
This is created before the messages are
             // sent so that the replies cannot be received before the count 
down is created.
             // One is added to this, so that the last reply becomes a special 
case. The special case is that the
@@ -979,16 +1034,16 @@
 
                 allMessagesReceived = numReplies == 
getExpectedNumPings(numPings);
 
-                log.debug("numReplies = " + numReplies);
-                log.debug("allMessagesReceived = " + allMessagesReceived);
+                // log.debug("numReplies = " + numReplies);
+                // log.debug("allMessagesReceived = " + allMessagesReceived);
 
                 // Recheck the timeout condition.
                 long now = System.nanoTime();
                 long lastMessageReceievedAt = perCorrelationId.timeOutStart;
                 timedOut = (now - lastMessageReceievedAt) > (timeout * 
1000000);
 
-                log.debug("now = " + now);
-                log.debug("lastMessageReceievedAt = " + 
lastMessageReceievedAt);
+                // log.debug("now = " + now);
+                // log.debug("lastMessageReceievedAt = " + 
lastMessageReceievedAt);
             }
             while (!timedOut && !allMessagesReceived);
 
@@ -1003,7 +1058,7 @@
 
             // commitTx(_consumerSession);
 
-            log.debug("public int pingAndWaitForReply(Message message, int 
numPings, long timeout): ending");
+            // log.debug("public int pingAndWaitForReply(Message message, int 
numPings, long timeout): ending");
 
             return numReplies;
         }
@@ -1011,6 +1066,7 @@
         // so will be a memory leak if this is not done.
         finally
         {
+            // NDC.pop();
             perCorrelationIds.remove(messageCorrelationId);
         }
     }
@@ -1026,8 +1082,8 @@
      */
     public void pingNoWaitForReply(Message message, int numPings, String 
messageCorrelationId) throws JMSException
     {
-        log.debug("public void pingNoWaitForReply(Message message, int 
numPings = " + numPings
-            + ", String messageCorrelationId = " + messageCorrelationId + "): 
called");
+        /*log.debug("public void pingNoWaitForReply(Message message, int 
numPings = " + numPings
+            + ", String messageCorrelationId = " + messageCorrelationId + "): 
called");*/
 
         if (message == null)
         {
@@ -1111,7 +1167,7 @@
 
                 if (unreceivedSize > _maxPendingSize)
                 {
-                    log.debug("unreceived size estimate over limit = " + 
unreceivedSize);
+                    // log.debug("unreceived size estimate over limit = " + 
unreceivedSize);
 
                     // Wait on the send pause barrier for the limit to be 
re-established.
                     try
@@ -1140,11 +1196,19 @@
         // Send the message either to its round robin destination, or its 
default destination.
         if (destination == null)
         {
+            int num = numSent.incrementAndGet();
+            message.setIntProperty("MSG_NUM", num);
+            setTimestamp(message);
             _producer.send(message);
+            // log.info("Message " + num + " sent.");
         }
         else
         {
+            int num = numSent.incrementAndGet();
+            message.setIntProperty("MSG_NUM", num);
+            setTimestamp(message);
             _producer.send(destination, message);
+            // log.info("Message " + num + " sent.");
         }
 
         // Increase the unreceived size, this may actually happen aftern the 
message is recevied.
@@ -1162,6 +1226,7 @@
         // Commit on every transaction batch size boundary. Here i + 1 is the 
count of actual messages sent.
         if (((i + 1) % _txBatchSize) == 0)
         {
+            // log.debug("Trying commit on producer session.");
             committed = commitTx(_producerSession);
         }
 
@@ -1179,7 +1244,7 @@
         {
             // Generate a sample message and time stamp it.
             Message msg = getTestMessage(_replyDestination, _messageSize, 
_persistent);
-            setTimestamp(msg);
+            // setTimestamp(msg);
 
             // Send the message and wait for a reply.
             pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, 
null);
@@ -1187,12 +1252,12 @@
         catch (JMSException e)
         {
             _publish = false;
-            log.debug("There was a JMSException: " + e.getMessage(), e);
+            // log.debug("There was a JMSException: " + e.getMessage(), e);
         }
         catch (InterruptedException e)
         {
             _publish = false;
-            log.debug("There was an interruption: " + e.getMessage(), e);
+            // log.debug("There was an interruption: " + e.getMessage(), e);
         }
     }
 
@@ -1230,7 +1295,7 @@
 
         // Timestamp the message in nanoseconds.
 
-        setTimestamp(msg);
+        // setTimestamp(msg);
 
         return msg;
     }
@@ -1299,7 +1364,7 @@
      */
     public void onException(JMSException e)
     {
-        log.debug("public void onException(JMSException e = " + e + "): 
called", e);
+        // log.debug("public void onException(JMSException e = " + e + "): 
called", e);
         _publish = false;
     }
 
@@ -1327,14 +1392,14 @@
      */
     public void close() throws JMSException
     {
-        log.debug("public void close(): called");
+        // log.debug("public void close(): called");
 
         try
         {
             if (_connection != null)
             {
                 _connection.close();
-                log.debug("Close connection.");
+                // log.debug("Close connection.");
             }
 
             for (int i = 0; i < _noOfConsumers; i++)
@@ -1342,7 +1407,7 @@
                 if (_consumerConnection[i] != null)
                 {
                     _consumerConnection[i].close();
-                    log.debug("Closed consumer connection.");
+                    // log.debug("Closed consumer connection.");
                 }
             }
         }
@@ -1382,7 +1447,7 @@
      */
     protected boolean commitTx(Session session) throws JMSException
     {
-        // log.debug("protected void commitTx(Session controlSession): 
called");
+        // log.debug("protected void commitTx(Session session): called");
 
         boolean committed = false;
 
@@ -1401,6 +1466,8 @@
 
         if (session.getTransacted())
         {
+            // log.debug("Session is transacted.");
+
             try
             {
                 if (_failBeforeCommit)
@@ -1414,10 +1481,10 @@
                     waitForUser(KILL_BROKER_PROMPT);
                 }
 
-                // long l = System.nanoTime();
+                long start = System.nanoTime();
                 session.commit();
                 committed = true;
-                // log.debug("Time taken to commit :" + ((System.nanoTime() - 
l) / 1000000f) + " ms");
+                // log.debug("Time taken to commit :" + ((System.nanoTime() - 
start) / 1000000f) + " ms");
 
                 if (_failAfterCommit)
                 {
@@ -1430,26 +1497,26 @@
                     waitForUser(KILL_BROKER_PROMPT);
                 }
 
-                // log.trace("Session Commited.");
+                // log.debug("Session Commited.");
             }
             catch (JMSException e)
             {
-                log.debug("JMSException on commit:" + e.getMessage(), e);
+                // log.debug("JMSException on commit:" + e.getMessage(), e);
 
                 // Warn that the bounce back client is not available.
                 if (e.getLinkedException() instanceof AMQNoConsumersException)
                 {
-                    log.debug("No consumers on queue.");
+                    // log.debug("No consumers on queue.");
                 }
 
                 try
                 {
                     session.rollback();
-                    log.debug("Message rolled back.");
+                    // log.debug("Message rolled back.");
                 }
                 catch (JMSException jmse)
                 {
-                    log.debug("JMSE on rollback:" + jmse.getMessage(), jmse);
+                    // log.debug("JMSE on rollback:" + jmse.getMessage(), 
jmse);
 
                     // Both commit and rollback failed. Throw the rollback 
exception.
                     throw jmse;
@@ -1488,7 +1555,7 @@
      */
     public int getConsumersPerDestination()
     {
-        return _consumersPerDestination;
+        return _noOfConsumers;
     }
 
     /**
@@ -1500,7 +1567,9 @@
      */
     public int getExpectedNumPings(int numpings)
     {
-        log.debug("Each ping will be received by " + (_isPubSub ? 
getConsumersPerDestination() : 1) + " consumers.");
+        // log.debug("public int getExpectedNumPings(int numpings = " + 
numpings + "): called");
+
+        // log.debug("Each ping will be received by " + (_isPubSub ? 
getConsumersPerDestination() : 1) + " consumers.");
 
         return numpings * (_isPubSub ? getConsumersPerDestination() : 1);
     }
@@ -1517,7 +1586,17 @@
      */
     public static interface ChainedMessageListener
     {
-        public void onMessage(Message message, int remainingCount) throws 
JMSException;
+        /**
+         * Notifies interested listeners about message arrival and important 
test stats, the number of messages
+         * remaining in the test, and the messages send timestamp.
+         *
+         * @param message        The newly arrived message.
+         * @param remainingCount The number of messages left to complete the 
test.
+         * @param latency        The nanosecond latency of the message.
+         *
+         * @throws JMSException Any JMS exceptions is allowed to fall through.
+         */
+        public void onMessage(Message message, int remainingCount, long 
latency) throws JMSException;
     }
 
     /**


Reply via email to