Author: ritchiem
Date: Tue Oct 31 01:08:42 2006
New Revision: 469422
URL: http://svn.apache.org/viewvc?view=rev&rev=469422
Log:
QPID-56
Also resolves a race condition where an messages could be sent out of order.
This change needs to be benchmarked against original DeliveryManager.java with
the race condition fixed.
Modified:
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
Modified:
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=469422&r1=469421&r2=469422
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
Tue Oct 31 01:08:42 2006
@@ -19,10 +19,10 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.util.ConcurrentLinkedQueueNoSize;
import java.util.ArrayList;
import java.util.Iterator;
@@ -31,6 +31,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
+
/**
* Manages delivery of messages on behalf of a queue
*/
@@ -44,7 +45,7 @@
/**
* Holds any queued messages
*/
- private final Queue<AMQMessage> _messages = new
ConcurrentLinkedQueueNoSize<AMQMessage>();
+ private final Queue<AMQMessage> _messages = new
ConcurrentLinkedQueueAtomicSize<AMQMessage>();
//private int _messageCount;
/**
* Ensures that only one asynchronous task is running for this manager at
@@ -63,8 +64,6 @@
private final AMQQueue _queue;
- private volatile int _queueSize = 0;
-
DeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
{
//Set values from configuration
@@ -84,7 +83,7 @@
*/
private boolean queueing()
{
- return getMessageCount() != 0;
+ return hasQueuedMessages();
}
@@ -121,28 +120,19 @@
private boolean addMessageToQueue(AMQMessage msg)
{
- // Shrink the ContentBodies to their actual size to save memory.
- // synchronize to ensure this msg is the next one to get added.
+ // Shrink the ContentBodies to their actual size to save memory.
if (compressBufferOnQueue)
{
- synchronized(_messages)
+ Iterator it = msg.getContentBodies().iterator();
+ while (it.hasNext())
{
- Iterator it = msg.getContentBodies().iterator();
- while (it.hasNext())
- {
- ContentBody cb = (ContentBody) it.next();
- cb.reduceBufferToFit();
- }
-
- _messages.offer(msg);
- _queueSize++;
+ ContentBody cb = (ContentBody) it.next();
+ cb.reduceBufferToFit();
}
}
- else
- {
- _messages.offer(msg);
- _queueSize++;
- }
+
+ _messages.offer(msg);
+
return true;
}
@@ -153,9 +143,9 @@
*
* @return true if there are queued messages
*/
- private boolean hasQueuedMessages()
+ public boolean hasQueuedMessages()
{
- return getMessageCount() != 0;
+ return !_messages.isEmpty();
}
public int getQueueMessageCount()
@@ -216,7 +206,8 @@
//We don't synchronize access to subscribers so need to
re-check
if (next != null)
{
- next.send(poll(), _queue);
+ next.send(peek(), _queue);
+ poll();
}
else
{
@@ -230,7 +221,7 @@
}
finally
{
- _log.debug("End of processQueue: (" + _queueSize + ")" + "
subscribers:" + _subscriptions.hasActiveSubscribers());
+ _log.debug("End of processQueue: (" + getQueueMessageCount() + ")"
+ " subscribers:" + _subscriptions.hasActiveSubscribers());
_processing.set(false);
}
}
@@ -242,7 +233,6 @@
private AMQMessage poll()
{
- _queueSize--;
return _messages.poll();
}
@@ -262,7 +252,7 @@
*/
void processAsync(Executor executor)
{
- _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" +
_queueSize + ")" +
+ _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" +
getQueueMessageCount() + ")" +
" Active:" + _subscriptions.hasActiveSubscribers() +
" Processing:" + _processing.get());