Author: rupertlssmith
Date: Fri Oct 12 03:55:02 2007
New Revision: 584125

URL: http://svn.apache.org/viewvc?rev=584125&view=rev
Log:
Merged revisions 584124 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1

........
  r584124 | rupertlssmith | 2007-10-12 11:52:52 +0100 (Fri, 12 Oct 2007) | 1 
line
  
  Implemented fair scheduling of producers in tests to prevent starvation and 
test timeout.
........

Modified:
    incubator/qpid/branches/M2/   (props changed)
    
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java

Propchange: incubator/qpid/branches/M2/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Fri Oct 12 03:55:02 2007
@@ -1 +1 @@
-/incubator/qpid/branches/M2.1:1-573736,573738-577772,577774-578732,578734,578736-578744,578746-578827,578829-584113
+/incubator/qpid/branches/M2.1:1-573736,573738-577772,577774-578732,578734,578736-578744,578746-578827,578829-584113,584124

Modified: 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=584125&r1=584124&r2=584125&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 (original)
+++ 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 Fri Oct 12 03:55:02 2007
@@ -46,6 +46,7 @@
 import java.text.SimpleDateFormat;
 import java.util.*;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -493,6 +494,17 @@
     private static AtomicInteger numSent = new AtomicInteger();
 
     /**
+     * Holds a monitor which is used to synchronize sender and receivers 
threads, where the sender has elected
+     * to wait until the number of unreceived message is reduced before 
continuing to send. This monitor is a
+     * fair SynchronousQueue becuase that provides fair scheduling, to ensure 
that all producer threads get an
+     * equal chance to produce messages.
+     */
+    static final SynchronousQueue _sendPauseMonitor = new 
SynchronousQueue(true);
+
+    /** Keeps a count of the number of message currently sent but not 
received. */
+    static AtomicInteger _unreceived = new AtomicInteger(0);
+
+    /**
      * Creates a ping producer with the specified parameters, of which there 
are many. See the class level comments
      * for details. This constructor creates a connection to the broker and 
creates producer and consumer sessions on
      * it, to send and recieve its pings and replies on.
@@ -889,7 +901,7 @@
                     if ((_maxPendingSize > 0))
                     {
                         // Decrement the count of sent but not yet received 
messages.
-                        int unreceived = 
perCorrelationId._unreceived.decrementAndGet();
+                        int unreceived = _unreceived.decrementAndGet();
                         int unreceivedSize =
                             (unreceived * ((_messageSize == 0) ? 1 : 
_messageSize))
                             / (_isPubSub ? getConsumersPerDestination() : 1);
@@ -897,13 +909,13 @@
                         // log.debug("unreceived = " + unreceived);
                         // log.debug("unreceivedSize = " + unreceivedSize);
 
-                        synchronized (perCorrelationId._sendPauseMonitor)
+                        // synchronized (_sendPauseMonitor)
+                        // {
+                        if (unreceivedSize < _maxPendingSize)
                         {
-                            if (unreceivedSize < _maxPendingSize)
-                            {
-                                perCorrelationId._sendPauseMonitor.notify();
-                            }
+                            _sendPauseMonitor.poll();
                         }
+                        // }
                     }
 
                     // Decrement the countdown latch. Before this point, it is 
possible that two threads might enter this
@@ -1167,7 +1179,7 @@
             // If necessary, wait until the max pending message size comes 
within its limit.
             if (_maxPendingSize > 0)
             {
-                synchronized (perCorrelationId._sendPauseMonitor)
+                synchronized (_sendPauseMonitor)
                 {
                     // Used to keep track of the number of times that send has 
to wait.
                     int numWaits = 0;
@@ -1179,7 +1191,7 @@
                     while (true)
                     {
                         // Get the size estimate of sent but not yet received 
messages.
-                        int unreceived = perCorrelationId._unreceived.get();
+                        int unreceived = _unreceived.get();
                         int unreceivedSize =
                             (unreceived * ((_messageSize == 0) ? 1 : 
_messageSize))
                             / (_isPubSub ? getConsumersPerDestination() : 1);
@@ -1207,7 +1219,8 @@
                             try
                             {
                                 long start = System.nanoTime();
-                                perCorrelationId._sendPauseMonitor.wait(10000);
+                                // _sendPauseMonitor.wait(10000);
+                                _sendPauseMonitor.offer(new Object(), 10000, 
TimeUnit.MILLISECONDS);
                                 long end = System.nanoTime();
 
                                 // Count the wait only if it was for > 99% of 
the requested wait time.
@@ -1250,8 +1263,7 @@
             // in pub/sub mode.
             if (_maxPendingSize > 0)
             {
-                int newUnreceivedCount =
-                    perCorrelationId._unreceived.addAndGet(_isPubSub ? 
getConsumersPerDestination() : 1);
+                int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? 
getConsumersPerDestination() : 1);
                 // log.debug("newUnreceivedCount = " + newUnreceivedCount);
             }
 
@@ -1672,14 +1684,5 @@
 
         /** Holds the last timestamp that the timeout was reset to. */
         Long timeOutStart;
-
-        /**
-         * Holds a monitor which is used to synchronize sender and receivers 
threads, where the sender has elected
-         * to wait until the number of unreceived message is reduced before 
continuing to send.
-         */
-        final Object _sendPauseMonitor = new Object();
-
-        /** Keeps a count of the number of message currently sent but not 
received. */
-        AtomicInteger _unreceived = new AtomicInteger(0);
     }
 }


Reply via email to