Author: rupertlssmith
Date: Fri Aug 17 08:10:01 2007
New Revision: 567062

URL: http://svn.apache.org/viewvc?view=rev&rev=567062
Log:
Calculating commit batch size boundaries correctly for multi-consumer tests. 
Ignoring redelivered messages.

Modified:
    
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java

Modified: 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=567062&r1=567061&r2=567062
==============================================================================
--- 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 (original)
+++ 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 Fri Aug 17 08:10:01 2007
@@ -21,6 +21,7 @@
 package org.apache.qpid.requestreply;
 
 import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.*;
@@ -456,6 +457,9 @@
     private static final String KILL_BROKER_PROMPT = "Kill broker now, then 
press Return.";
     private String _clientID;
 
+    /** Keeps count of the total messages sent purely for debugging purposes. 
*/
+    private static AtomicInteger numSent = new AtomicInteger();
+
     /**
      * Creates a ping producer with the specified parameters, of which there 
are many. See the class level comments
      * for details. This constructor creates a connection to the broker and 
creates producer and consumer sessions on
@@ -574,8 +578,14 @@
      */
     protected void createConnection(String clientID) throws AMQException, 
URLSyntaxException
     {
+        log.debug("protected void createConnection(String clientID = " + 
clientID + "): called");
+
+        log.debug("Creating a connection for the message producer.");
+
         _connection = new AMQConnection(_brokerDetails, _username, _password, 
clientID, _virtualpath);
 
+        log.debug("Creating " + _noOfConsumers + " connections for the 
consumers.");
+
         _consumerConnection = new Connection[_noOfConsumers];
 
         for (int i = 0; i < _noOfConsumers; i++)
@@ -765,7 +775,9 @@
         log.debug("public void createReplyConsumers(Collection<Destination> 
destinations = " + destinations
             + ", String selector = " + selector + "): called");
 
-        log.debug("Creating " + destinations.size() + " reply consumers.");
+        log.debug("There are " + destinations.size() + " destinations.");
+        log.debug("Creating " + _noOfConsumers + " consumers on each 
destination.");
+        log.debug("Total number of consumers is: " + (destinations.size() * 
_noOfConsumers));
 
         for (Destination destination : destinations)
         {
@@ -788,7 +800,7 @@
                         }
                     });
 
-                log.debug("Set this to listen to replies sent to destination: 
" + destination);
+                log.debug("Set consumer " + i + " to listen to replies sent to 
destination: " + destination);
             }
         }
     }
@@ -802,95 +814,118 @@
      */
     public void onMessageWithConsumerNo(Message message, int consumerNo)
     {
-        // log.debug("public void onMessage(Message message): called");
+        log.debug("public void onMessageWithConsumerNo(Message message, int 
consumerNo = " + consumerNo + "): called");
+
+        NDC.push("cons" + consumerNo);
 
         try
         {
             // Extract the messages correlation id.
             String correlationID = message.getJMSCorrelationID();
-            // log.debug("correlationID = " + correlationID);
+            log.debug("correlationID = " + correlationID);
 
-            // Countdown on the traffic light if there is one for the matching 
correlation id.
-            PerCorrelationId perCorrelationId = 
perCorrelationIds.get(correlationID);
+            int num = message.getIntProperty("MSG_NUM");
+            log.info("Message " + num + " received.");
 
-            if (perCorrelationId != null)
+            boolean isRedelivered = message.getJMSRedelivered();
+            log.debug("isRedelivered = " + isRedelivered);
+
+            if (!isRedelivered)
             {
-                CountDownLatch trafficLight = perCorrelationId.trafficLight;
+                // Countdown on the traffic light if there is one for the 
matching correlation id.
+                PerCorrelationId perCorrelationId = 
perCorrelationIds.get(correlationID);
 
-                // Restart the timeout timer on every message.
-                perCorrelationId.timeOutStart = System.nanoTime();
+                if (perCorrelationId != null)
+                {
+                    CountDownLatch trafficLight = 
perCorrelationId.trafficLight;
 
-                // log.debug("Reply was expected, decrementing the latch for 
the id, " + correlationID);
+                    // Restart the timeout timer on every message.
+                    perCorrelationId.timeOutStart = System.nanoTime();
 
-                // Decrement the countdown latch. Before this point, it is 
possible that two threads might enter this
-                // method simultanesouly with the same correlation id. 
Decrementing the latch in a synchronized block
-                // ensures that each thread will get a unique value for the 
remaining messages.
-                long trueCount = -1;
-                long remainingCount = -1;
+                    log.debug("Reply was expected, decrementing the latch for 
the id, " + correlationID);
 
-                synchronized (trafficLight)
-                {
-                    trafficLight.countDown();
+                    // Decrement the countdown latch. Before this point, it is 
possible that two threads might enter this
+                    // method simultanesouly with the same correlation id. 
Decrementing the latch in a synchronized block
+                    // ensures that each thread will get a unique value for 
the remaining messages.
+                    long trueCount = -1;
+                    long remainingCount = -1;
+
+                    synchronized (trafficLight)
+                    {
+                        trafficLight.countDown();
 
-                    trueCount = trafficLight.getCount();
-                    remainingCount = trueCount - 1;
+                        trueCount = trafficLight.getCount();
+                        remainingCount = trueCount - 1;
 
-                    // Decrement the count of sent but not yet received 
messages.
-                    int unreceived = _unreceived.decrementAndGet();
-                    int unreceivedSize = (unreceived * ((_messageSize == 0) ? 
1 : _messageSize));
+                        // Decrement the count of sent but not yet received 
messages.
+                        int unreceived = _unreceived.decrementAndGet();
+                        int unreceivedSize = (unreceived * ((_messageSize == 
0) ? 1 : _messageSize));
 
-                    // Release a waiting sender if there is one.
-                    synchronized (_sendPauseMonitor)
-                    {
-                        if ((_maxPendingSize > 0) && (unreceivedSize < 
_maxPendingSize))
-                        // && (_sendPauseBarrier.getNumberWaiting() == 1))
+                        // Release a waiting sender if there is one.
+                        synchronized (_sendPauseMonitor)
                         {
-                            log.debug("unreceived size estimate under limit = 
" + unreceivedSize);
-
-                            // Wait on the send pause barrier for the limit to 
be re-established.
-                            /*try
-                            {*/
-                            // _sendPauseBarrier.await();
-                            _sendPauseMonitor.notify();
-                            /*}
-                            catch (InterruptedException e)
+                            if ((_maxPendingSize > 0) && (unreceivedSize < 
_maxPendingSize))
+                            // && (_sendPauseBarrier.getNumberWaiting() == 1))
                             {
-                                throw new RuntimeException(e);
+                                log.debug("unreceived size estimate under 
limit = " + unreceivedSize);
+
+                                // Wait on the send pause barrier for the 
limit to be re-established.
+                                /*try
+                                {*/
+                                // _sendPauseBarrier.await();
+                                _sendPauseMonitor.notify();
+                                /*}
+                                catch (InterruptedException e)
+                                {
+                                    throw new RuntimeException(e);
+                                }
+                                catch (BrokenBarrierException e)
+                                {
+                                    throw new RuntimeException(e);
+                                }*/
                             }
-                            catch (BrokenBarrierException e)
-                            {
-                                throw new RuntimeException(e);
-                            }*/
                         }
-                    }
 
-                    // log.debug("remainingCount = " + remainingCount);
-                    // log.debug("trueCount = " + trueCount);
+                        NDC.push("/rem" + remainingCount);
 
-                    // Commit on transaction batch size boundaries. At this 
point in time the waiting producer remains
-                    // blocked, even on the last message.
-                    if ((remainingCount % _txBatchSize) == 0)
-                    {
-                        commitTx(_consumerSession[consumerNo]);
-                    }
+                        log.debug("remainingCount = " + remainingCount);
+                        log.debug("trueCount = " + trueCount);
 
-                    // Forward the message and remaining count to any 
interested chained message listener.
-                    if (_chainedMessageListener != null)
-                    {
-                        _chainedMessageListener.onMessage(message, (int) 
remainingCount);
-                    }
+                        // Commit on transaction batch size boundaries. At 
this point in time the waiting producer remains
+                        // blocked, even on the last message.
+                        // Commit count is divided by noOfConsumers in p2p 
mode, so that each consumer only commits on
+                        // each batch boundary. For pub/sub each consumer gets 
every message so no division is done.
+                        long commitCount = _isPubSub ? remainingCount : 
(remainingCount / _noOfConsumers);
+                        log.debug("commitCount = " + commitCount);
 
-                    // Check if this is the last message, in which case 
release any waiting producers. This is done
-                    // after the transaction has been committed and any 
listeners notified.
-                    if (trueCount == 1)
-                    {
-                        trafficLight.countDown();
+                        if ((commitCount % _txBatchSize) == 0)
+                        {
+                            log.debug("Trying commit for consumer " + 
consumerNo + ".");
+                            commitTx(_consumerSession[consumerNo]);
+                        }
+
+                        // Forward the message and remaining count to any 
interested chained message listener.
+                        if (_chainedMessageListener != null)
+                        {
+                            _chainedMessageListener.onMessage(message, (int) 
remainingCount);
+                        }
+
+                        // Check if this is the last message, in which case 
release any waiting producers. This is done
+                        // after the transaction has been committed and any 
listeners notified.
+                        if (trueCount == 1)
+                        {
+                            trafficLight.countDown();
+                        }
                     }
                 }
+                else
+                {
+                    log.warn("Got unexpected message with correlationId: " + 
correlationID);
+                }
             }
             else
             {
-                log.warn("Got unexpected message with correlationId: " + 
correlationID);
+                log.debug("Got redelivered message, ignoring.");
             }
 
             // Print out ping times for every message in verbose mode only.
@@ -909,8 +944,11 @@
         {
             log.warn("There was a JMSException: " + e.getMessage(), e);
         }
-
-        // log.debug("public void onMessage(Message message): ending");
+        finally
+        {
+            log.debug("public void onMessageWithConsumerNo(Message message, 
int consumerNo): ending");
+            NDC.clear();
+        }
     }
 
     /**
@@ -943,6 +981,8 @@
 
         try
         {
+            NDC.push("prod");
+
             // Create a count down latch to count the number of replies with. 
This is created before the messages are
             // sent so that the replies cannot be received before the count 
down is created.
             // One is added to this, so that the last reply becomes a special 
case. The special case is that the
@@ -1006,6 +1046,7 @@
         // so will be a memory leak if this is not done.
         finally
         {
+            NDC.pop();
             perCorrelationIds.remove(messageCorrelationId);
         }
     }
@@ -1135,11 +1176,17 @@
         // Send the message either to its round robin destination, or its 
default destination.
         if (destination == null)
         {
+            int num = numSent.incrementAndGet();
+            message.setIntProperty("MSG_NUM", num);
             _producer.send(message);
+            log.info("Message " + num + " sent.");
         }
         else
         {
+            int num = numSent.incrementAndGet();
+            message.setIntProperty("MSG_NUM", num);
             _producer.send(destination, message);
+            log.info("Message " + num + " sent.");
         }
 
         // Increase the unreceived size, this may actually happen aftern the 
message is recevied.
@@ -1157,6 +1204,7 @@
         // Commit on every transaction batch size boundary. Here i + 1 is the 
count of actual messages sent.
         if (((i + 1) % _txBatchSize) == 0)
         {
+            log.debug("Trying commit on producer session.");
             committed = commitTx(_producerSession);
         }
 
@@ -1377,7 +1425,7 @@
      */
     protected boolean commitTx(Session session) throws JMSException
     {
-        // log.debug("protected void commitTx(Session controlSession): 
called");
+        log.debug("protected void commitTx(Session session): called");
 
         boolean committed = false;
 
@@ -1396,6 +1444,8 @@
 
         if (session.getTransacted())
         {
+            log.debug("Session is transacted.");
+
             try
             {
                 if (_failBeforeCommit)
@@ -1409,10 +1459,10 @@
                     waitForUser(KILL_BROKER_PROMPT);
                 }
 
-                // long l = System.nanoTime();
+                long start = System.nanoTime();
                 session.commit();
                 committed = true;
-                // log.debug("Time taken to commit :" + ((System.nanoTime() - 
l) / 1000000f) + " ms");
+                log.debug("Time taken to commit :" + ((System.nanoTime() - 
start) / 1000000f) + " ms");
 
                 if (_failAfterCommit)
                 {
@@ -1425,7 +1475,7 @@
                     waitForUser(KILL_BROKER_PROMPT);
                 }
 
-                // log.trace("Session Commited.");
+                log.debug("Session Commited.");
             }
             catch (JMSException e)
             {
@@ -1495,6 +1545,8 @@
      */
     public int getExpectedNumPings(int numpings)
     {
+        log.debug("public int getExpectedNumPings(int numpings = " + numpings 
+ "): called");
+
         log.debug("Each ping will be received by " + (_isPubSub ? 
getConsumersPerDestination() : 1) + " consumers.");
 
         return numpings * (_isPubSub ? getConsumersPerDestination() : 1);


Reply via email to