Author: rgreig
Date: Wed May 9 03:36:06 2007
New Revision: 536483
URL: http://svn.apache.org/viewvc?view=rev&rev=536483
Log:
Improvements made to max pending message limit code.
Modified:
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Modified:
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=536483&r1=536482&r2=536483
==============================================================================
---
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
(original)
+++
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Wed May 9 03:36:06 2007
@@ -98,7 +98,7 @@
* 3 - DUPS_OK_ACKNOWLEDGE
* 257 - NO_ACKNOWLEDGE
* 258 - PRE_ACKNOWLEDGE
- * <tr><td> maxPending <td> 0 <td> The maximum size in bytes, of
messages send but not yet received.
+ * <tr><td> maxPending <td> 0 <td> The maximum size in bytes, of
messages sent but not yet received.
* Limits the volume of messages
currently buffered on the client
* or broker. Can help scale test
clients by limiting amount of buffered
* data to avoid out of memory
errors.
@@ -373,10 +373,10 @@
protected int _maxPendingSize;
/**
- * Holds a cyclic barrier which is used to synchronize sender and receiver
threads, where the sender has elected
+ * Holds a monitor which is used to synchronize sender and receiver
threads, where the sender has elected
* to wait until the number of unreceived message is reduced before
continuing to send.
*/
- protected CyclicBarrier _sendPauseBarrier = new CyclicBarrier(2);
+ protected Object _sendPauseMonitor = new Object();
/** Keeps a count of the number of message currently sent but not
received. */
protected AtomicInteger _unreceived = new AtomicInteger(0);
@@ -801,23 +801,27 @@
int unreceivedSize = (unreceived * ((_messageSize == 0) ?
1 : _messageSize));
// Release a waiting sender if there is one.
- if ((_maxPendingSize > 0) && (unreceivedSize <
_maxPendingSize)
- && (_sendPauseBarrier.getNumberWaiting() == 1))
+ synchronized (_sendPauseMonitor)
{
- log.debug("unreceived size estimate under limit = " +
unreceivedSize);
-
- // Wait on the send pause barrier for the limit to be
re-established.
- try
- {
- _sendPauseBarrier.await();
- }
- catch (InterruptedException e)
+ if ((_maxPendingSize > 0) && (unreceivedSize <
_maxPendingSize))
+ // && (_sendPauseBarrier.getNumberWaiting() == 1))
{
- throw new RuntimeException(e);
- }
- catch (BrokenBarrierException e)
- {
- throw new RuntimeException(e);
+ log.debug("unreceived size estimate under limit =
" + unreceivedSize);
+
+ // Wait on the send pause barrier for the limit to
be re-established.
+ /*try
+ {*/
+ // _sendPauseBarrier.await();
+ _sendPauseMonitor.notify();
+ /*}
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (BrokenBarrierException e)
+ {
+ throw new RuntimeException(e);
+ }*/
}
}
@@ -1052,26 +1056,40 @@
waitForUser(KILL_BROKER_PROMPT);
}
- // Increase the count of sent but not yet received messages.
- int unreceived = _unreceived.getAndIncrement();
- int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 :
_messageSize));
-
- if ((_maxPendingSize > 0) && (unreceivedSize > _maxPendingSize))
+ // If necessary, wait until the max pending message size comes within
its limit.
+ synchronized (_sendPauseMonitor)
{
- log.debug("unreceived size estimate over limit = " +
unreceivedSize);
-
- // Wait on the send pause barrier for the limit to be
re-established.
- try
- {
- _sendPauseBarrier.await();
- }
- catch (InterruptedException e)
+ while ((_maxPendingSize > 0))
{
- throw new RuntimeException(e);
- }
- catch (BrokenBarrierException e)
- {
- throw new RuntimeException(e);
+ // Get the size estimate of sent but not yet received messages.
+ int unreceived = _unreceived.get();
+ int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 :
_messageSize));
+
+ if (unreceivedSize > _maxPendingSize)
+ {
+ log.debug("unreceived size estimate over limit = " +
unreceivedSize);
+
+ // Wait on the send pause barrier for the limit to be
re-established.
+ try
+ {
+ // _sendPauseBarrier.await();
+ _sendPauseMonitor.wait(1000);
+ }
+ catch (InterruptedException e)
+ {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ /*catch (BrokenBarrierException e)
+ {
+ throw new RuntimeException(e);
+ }*/
+ }
+ else
+ {
+ break;
+ }
}
}
@@ -1085,6 +1103,9 @@
_producer.send(destination, message);
}
+ // Increase the unreceived size, this may actually happen aftern the
message is recevied.
+ _unreceived.getAndIncrement();
+
// Apply message rate throttling if a rate limit has been set up.
if (_rateLimiter != null)
{
@@ -1300,6 +1321,7 @@
* @return <tt>true</tt> if the session was committed, <tt>false</tt> if
it was not.
*
* @throws javax.jms.JMSException If the commit fails and then the
rollback fails.
+ *
* @todo Consider moving the fail after send logic into the send method.
It is confusing to have it in this commit
* method, because commits only apply to transactional pingers, but fail
after send applied to transactional and
* non-transactional alike.