Author: rupertlssmith
Date: Wed Oct 10 08:45:56 2007
New Revision: 583518

URL: http://svn.apache.org/viewvc?rev=583518&view=rev
Log:
Changed maxPending to be by message correlation id.

Modified:
    
incubator/qpid/branches/M2.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java

Modified: 
incubator/qpid/branches/M2.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=583518&r1=583517&r2=583518&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 Wed Oct 10 08:45:56 2007
@@ -419,15 +419,6 @@
      */
     protected int _maxPendingSize;
 
-    /**
-     * Holds a monitor which is used to synchronize sender and receivers 
threads, where the sender has elected
-     * to wait until the number of unreceived message is reduced before 
continuing to send.
-     */
-    protected static final Object _sendPauseMonitor = new Object();
-
-    /** Keeps a count of the number of message currently sent but not 
received. */
-    protected static AtomicInteger _unreceived = new AtomicInteger(0);
-
     /** A source for providing sequential unique correlation ids. These will 
be unique within the same JVM. */
     private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
 
@@ -898,7 +889,7 @@
                     if ((_maxPendingSize > 0))
                     {
                         // Decrement the count of sent but not yet received 
messages.
-                        int unreceived = _unreceived.decrementAndGet();
+                        int unreceived = 
perCorrelationId._unreceived.decrementAndGet();
                         int unreceivedSize =
                             (unreceived * ((_messageSize == 0) ? 1 : 
_messageSize))
                             / (_isPubSub ? getConsumersPerDestination() : 1);
@@ -906,11 +897,11 @@
                         // log.debug("unreceived = " + unreceived);
                         // log.debug("unreceivedSize = " + unreceivedSize);
 
-                        synchronized (_sendPauseMonitor)
+                        synchronized (perCorrelationId._sendPauseMonitor)
                         {
                             if (unreceivedSize < _maxPendingSize)
                             {
-                                _sendPauseMonitor.notifyAll();
+                                perCorrelationId._sendPauseMonitor.notify();
                             }
                         }
                     }
@@ -1169,10 +1160,14 @@
             // Prompt the user to kill the broker when doing failover testing.
             _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend);
 
+            // Get the test setup for the correlation id.
+            String correlationID = message.getJMSCorrelationID();
+            PerCorrelationId perCorrelationId = 
perCorrelationIds.get(correlationID);
+
             // If necessary, wait until the max pending message size comes 
within its limit.
             if (_maxPendingSize > 0)
             {
-                synchronized (_sendPauseMonitor)
+                synchronized (perCorrelationId._sendPauseMonitor)
                 {
                     // Used to keep track of the number of times that send has 
to wait.
                     int numWaits = 0;
@@ -1184,7 +1179,7 @@
                     while (true)
                     {
                         // Get the size estimate of sent but not yet received 
messages.
-                        int unreceived = _unreceived.get();
+                        int unreceived = perCorrelationId._unreceived.get();
                         int unreceivedSize =
                             (unreceived * ((_messageSize == 0) ? 1 : 
_messageSize))
                             / (_isPubSub ? getConsumersPerDestination() : 1);
@@ -1212,7 +1207,7 @@
                             try
                             {
                                 long start = System.nanoTime();
-                                _sendPauseMonitor.wait(10000);
+                                perCorrelationId._sendPauseMonitor.wait(10000);
                                 long end = System.nanoTime();
 
                                 // Count the wait only if it was for > 99% of 
the requested wait time.
@@ -1255,7 +1250,8 @@
             // in pub/sub mode.
             if (_maxPendingSize > 0)
             {
-                int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? 
getConsumersPerDestination() : 1);
+                int newUnreceivedCount =
+                    perCorrelationId._unreceived.addAndGet(_isPubSub ? 
getConsumersPerDestination() : 1);
                 // log.debug("newUnreceivedCount = " + newUnreceivedCount);
             }
 
@@ -1676,5 +1672,14 @@
 
         /** Holds the last timestamp that the timeout was reset to. */
         Long timeOutStart;
+
+        /**
+         * Holds a monitor which is used to synchronize sender and receivers 
threads, where the sender has elected
+         * to wait until the number of unreceived message is reduced before 
continuing to send.
+         */
+        final Object _sendPauseMonitor = new Object();
+
+        /** Keeps a count of the number of message currently sent but not 
received. */
+        AtomicInteger _unreceived = new AtomicInteger(0);
     }
 }


Reply via email to