Author: rgreig
Date: Wed May  9 03:36:06 2007
New Revision: 536483

URL: http://svn.apache.org/viewvc?view=rev&rev=536483
Log:
Improvements made to max pending message limit code.

Modified:
    
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java

Modified: 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=536483&r1=536482&r2=536483
==============================================================================
--- 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 (original)
+++ 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 Wed May  9 03:36:06 2007
@@ -98,7 +98,7 @@
  *                                               3 - DUPS_OK_ACKNOWLEDGE
  *                                               257 - NO_ACKNOWLEDGE
  *                                               258 - PRE_ACKNOWLEDGE
- * <tr><td> maxPending       <td> 0        <td> The maximum size in bytes, of 
messages send but not yet received.
+ * <tr><td> maxPending       <td> 0        <td> The maximum size in bytes, of 
messages sent but not yet received.
  *                                              Limits the volume of messages 
currently buffered on the client
  *                                              or broker. Can help scale test 
clients by limiting amount of buffered
  *                                              data to avoid out of memory 
errors.
@@ -373,10 +373,10 @@
     protected int _maxPendingSize;
 
     /**
-     * Holds a cyclic barrier which is used to synchronize sender and receiver 
threads, where the sender has elected
+     * Holds a monitor which is used to synchronize sender and receiver 
threads, where the sender has elected
      * to wait until the number of unreceived message is reduced before 
continuing to send.
      */
-    protected CyclicBarrier _sendPauseBarrier = new CyclicBarrier(2);
+    protected Object _sendPauseMonitor = new Object();
 
     /** Keeps a count of the number of message currently sent but not 
received. */
     protected AtomicInteger _unreceived = new AtomicInteger(0);
@@ -801,23 +801,27 @@
                     int unreceivedSize = (unreceived * ((_messageSize == 0) ? 
1 : _messageSize));
 
                     // Release a waiting sender if there is one.
-                    if ((_maxPendingSize > 0) && (unreceivedSize < 
_maxPendingSize)
-                            && (_sendPauseBarrier.getNumberWaiting() == 1))
+                    synchronized (_sendPauseMonitor)
                     {
-                        log.debug("unreceived size estimate under limit = " + 
unreceivedSize);
-
-                        // Wait on the send pause barrier for the limit to be 
re-established.
-                        try
-                        {
-                            _sendPauseBarrier.await();
-                        }
-                        catch (InterruptedException e)
+                        if ((_maxPendingSize > 0) && (unreceivedSize < 
_maxPendingSize))
+                        // && (_sendPauseBarrier.getNumberWaiting() == 1))
                         {
-                            throw new RuntimeException(e);
-                        }
-                        catch (BrokenBarrierException e)
-                        {
-                            throw new RuntimeException(e);
+                            log.debug("unreceived size estimate under limit = 
" + unreceivedSize);
+
+                            // Wait on the send pause barrier for the limit to 
be re-established.
+                            /*try
+                            {*/
+                            // _sendPauseBarrier.await();
+                            _sendPauseMonitor.notify();
+                            /*}
+                            catch (InterruptedException e)
+                            {
+                                throw new RuntimeException(e);
+                            }
+                            catch (BrokenBarrierException e)
+                            {
+                                throw new RuntimeException(e);
+                            }*/
                         }
                     }
 
@@ -1052,26 +1056,40 @@
             waitForUser(KILL_BROKER_PROMPT);
         }
 
-        // Increase the count of sent but not yet received messages.
-        int unreceived = _unreceived.getAndIncrement();
-        int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : 
_messageSize));
-
-        if ((_maxPendingSize > 0) && (unreceivedSize > _maxPendingSize))
+        // If necessary, wait until the max pending message size comes within 
its limit.
+        synchronized (_sendPauseMonitor)
         {
-            log.debug("unreceived size estimate over limit = " + 
unreceivedSize);
-
-            // Wait on the send pause barrier for the limit to be 
re-established.
-            try
-            {
-                _sendPauseBarrier.await();
-            }
-            catch (InterruptedException e)
+            while ((_maxPendingSize > 0))
             {
-                throw new RuntimeException(e);
-            }
-            catch (BrokenBarrierException e)
-            {
-                throw new RuntimeException(e);
+                // Get the size estimate of sent but not yet received messages.
+                int unreceived = _unreceived.get();
+                int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : 
_messageSize));
+
+                if (unreceivedSize > _maxPendingSize)
+                {
+                    log.debug("unreceived size estimate over limit = " + 
unreceivedSize);
+
+                    // Wait on the send pause barrier for the limit to be 
re-established.
+                    try
+                    {
+                        // _sendPauseBarrier.await();
+                        _sendPauseMonitor.wait(1000);
+                    }
+                    catch (InterruptedException e)
+                    {
+                        // Restore the interrupted status
+                        Thread.currentThread().interrupt();
+                        throw new RuntimeException(e);
+                    }
+                    /*catch (BrokenBarrierException e)
+                    {
+                        throw new RuntimeException(e);
+                    }*/
+                }
+                else
+                {
+                    break;
+                }
             }
         }
 
@@ -1085,6 +1103,9 @@
             _producer.send(destination, message);
         }
 
+        // Increase the unreceived size, this may actually happen aftern the 
message is recevied.
+        _unreceived.getAndIncrement();
+
         // Apply message rate throttling if a rate limit has been set up.
         if (_rateLimiter != null)
         {
@@ -1300,6 +1321,7 @@
      * @return <tt>true</tt> if the session was committed, <tt>false</tt> if 
it was not.
      *
      * @throws javax.jms.JMSException If the commit fails and then the 
rollback fails.
+     *
      * @todo Consider moving the fail after send logic into the send method. 
It is confusing to have it in this commit
      * method, because commits only apply to transactional pingers, but fail 
after send applied to transactional and
      * non-transactional alike.


Reply via email to