Author: bhupendrab
Date: Fri Feb 9 04:29:14 2007
New Revision: 505268
URL: http://svn.apache.org/viewvc?view=rev&rev=505268
Log:
QPID-170
predelivery queues will also be cleared with moved messages. Messages will be
moved to another queue and predelivery queues of subsribers of another queue
will also be populated.
the features - removeMmessageFromTop and clearQueue is also modified by using
the getNextMessage
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=505268&r1=505267&r2=505268
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Fri Feb 9 04:29:14 2007
@@ -41,6 +41,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
/**
* This is an AMQ Queue, and should not be confused with a JMS queue or any
other abstraction like
@@ -157,7 +158,7 @@
/**
* total messages received by the queue since startup.
*/
- public long _totalMessagesReceived = 0;
+ public AtomicLong _totalMessagesReceived = new AtomicLong();
public int compareTo(Object o)
{
@@ -291,59 +292,77 @@
}
/**
- * @see ManagedQueue#moveMessages
+ * moves messages from this queue to another queue. to do this the
approach is following-
+ * - setup the queue for moving messages (hold the lock and stop the async
delivery)
+ * - get all the messages available in the given message id range
+ * - setup the other queue for moving messages (hold the lock and stop the
async delivery)
+ * - send these available messages to the other queue (enqueue in other
queue)
+ * - Once sending to other Queue is successful, remove messages from this
queue
+ * - remove locks from both queues and start async delivery
* @param fromMessageId
* @param toMessageId
* @param queueName
* @param storeContext
- * @throws AMQException
*/
public synchronized void moveMessagesToAnotherQueue(long fromMessageId,
long toMessageId, String queueName,
- StoreContext
storeContext) throws AMQException
+ StoreContext
storeContext)
{
+ // prepare the delivery manager for moving messages by stopping the
async delivery and creating a lock
AMQQueue anotherQueue =
getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
- List<AMQMessage> list = getMessagesOnTheQueue();
- List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
- int maxMessageCountToBeMoved = (int)(toMessageId - fromMessageId + 1);
- for (AMQMessage message : list)
+ try
{
- long msgId = message.getMessageId();
- if (msgId >= fromMessageId && msgId <= toMessageId)
- {
- foundMessagesList.add(message);
- }
- // break the loop as soon as messages to be removed are found
- if (foundMessagesList.size() == maxMessageCountToBeMoved)
+ startMovingMessages();
+ List<AMQMessage> list = getMessagesOnTheQueue();
+ List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
+ int maxMessageCountToBeMoved = (int)(toMessageId - fromMessageId +
1);
+
+ // Run this loop till you find all the messages or the list has no
more messages
+ for (AMQMessage message : list)
{
- break;
+ long msgId = message.getMessageId();
+ if (msgId >= fromMessageId && msgId <= toMessageId)
+ {
+ foundMessagesList.add(message);
+ }
+ // break the loop as soon as messages to be removed are found
+ if (foundMessagesList.size() == maxMessageCountToBeMoved)
+ {
+ break;
+ }
}
- }
- // move messages to another queue
- for (AMQMessage message : foundMessagesList)
+ // move messages to another queue
+ anotherQueue.startMovingMessages();
+ anotherQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+
+ // moving is successful, now remove from original queue
+ _deliveryMgr.removeMovedMessages(foundMessagesList);
+ }
+ finally
{
- try
- {
- anotherQueue.process(storeContext, message);
- }
- catch(AMQException ex)
- {
- foundMessagesList.subList(foundMessagesList.indexOf(message),
foundMessagesList.size()).clear();
- // Exception occured, so rollback the changes
- anotherQueue.removeMessages(foundMessagesList);
- throw ex;
- }
+ // remove the lock and start the async delivery
+ anotherQueue.stopMovingMessages();
+ stopMovingMessages();
}
+ }
- // moving is successful, now remove from original queue
- removeMessages(foundMessagesList);
+ public void startMovingMessages()
+ {
+ _deliveryMgr.startMovingMessages();
}
- public synchronized void removeMessages(List<AMQMessage> messageList)
+ private void enqueueMovedMessages(StoreContext storeContext,
List<AMQMessage> messageList)
{
- _deliveryMgr.removeMessages(messageList);
+ _deliveryMgr.enqueueMovedMessages(storeContext, messageList);
+ _totalMessagesReceived.addAndGet(messageList.size());
}
+ public void stopMovingMessages()
+ {
+ _deliveryMgr.stopMovingMessages();
+ _deliveryMgr.processAsync(_asyncDelivery);
+ }
+
/**
* @return MBean object associated with this Queue
*/
@@ -374,7 +393,7 @@
public long getReceivedMessageCount()
{
- return _totalMessagesReceived;
+ return _totalMessagesReceived.get();
}
public int getMaximumMessageCount()
@@ -407,7 +426,7 @@
/**
* Removes the AMQMessage from the top of the queue.
*/
- public void deleteMessageFromTop(StoreContext storeContext) throws
AMQException
+ public synchronized void deleteMessageFromTop(StoreContext storeContext)
throws AMQException
{
_deliveryMgr.removeAMessageFromTop(storeContext);
}
@@ -415,7 +434,7 @@
/**
* removes all the messages from the queue.
*/
- public long clearQueue(StoreContext storeContext) throws AMQException
+ public synchronized long clearQueue(StoreContext storeContext) throws
AMQException
{
return _deliveryMgr.clearAllMessages(storeContext);
}
@@ -633,7 +652,7 @@
protected void updateReceivedMessageCount(AMQMessage msg) throws
AMQException
{
- _totalMessagesReceived++;
+ _totalMessagesReceived.incrementAndGet();
try
{
_managedObject.checkForNotification(msg);
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?view=diff&rev=505268&r1=505267&r2=505268
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
Fri Feb 9 04:29:14 2007
@@ -395,24 +395,9 @@
{
throw new OperationsException("\"From MessageId\" should be
greater then 0 and less then \"To MessageId\"");
}
-
- try
- {
- _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId,
toQueueName, _storeContext);
- }
- catch(AMQException amqex)
- {
- throw new JMException("Error moving messages to " + toQueueName +
": " + amqex);
- }
+ _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId,
toQueueName, _storeContext);
}
-//
-// public ObjectName getObjectName() throws MalformedObjectNameException
-// {
-// String objNameString = super.getObjectName().toString();
-//
-// return new ObjectName(objNameString);
-// }
/**
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=505268&r1=505267&r2=505268
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Fri Feb 9 04:29:14 2007
@@ -75,7 +75,13 @@
*/
private final AMQQueue _queue;
-
+ /**
+ * Flag used while moving messages from this queue to another. For moving
messages the async delivery
+ * should also stop. This flat should be set to true to stop async
delivery and set to false to enable
+ * async delivery again.
+ */
+ private AtomicBoolean _movingMessages = new AtomicBoolean();
+
/**
* Lock used to ensure that an channel that becomes unsuspended during the
start of the queueing process is forced
* to wait till the first message is added to the queue. This will ensure
that the _queue has messages to be delivered
@@ -167,9 +173,12 @@
}
- public synchronized List<AMQMessage> getMessages()
+ public List<AMQMessage> getMessages()
{
- return new ArrayList<AMQMessage>(_messages);
+ _lock.lock();
+ ArrayList<AMQMessage> list = new ArrayList<AMQMessage>(_messages);
+ _lock.unlock();
+ return list;
}
public void populatePreDeliveryQueue(Subscription subscription)
@@ -242,8 +251,52 @@
}
}
- public synchronized void removeMessages(List<AMQMessage> messageList)
+ /**
+ * For feature of moving messages, this method is used. It sets the lock
and sets the movingMessages flag,
+ * so that the asyn delivery is also stopped.
+ */
+ public void startMovingMessages()
+ {
+ _lock.lock();
+ _movingMessages.set(true);
+ }
+
+ /**
+ * Once moving messages to another queue is done or aborted, remove lock
and unset the movingMessages flag,
+ * so that the async delivery can start again.
+ */
+ public void stopMovingMessages()
+ {
+ _movingMessages.set(false);
+ if (_lock.isHeldByCurrentThread())
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * Messages will be removed from this queue and all preDeliveryQueues
+ * @param messageList
+ */
+ public void removeMovedMessages(List<AMQMessage> messageList)
{
+ // Remove from the
+ boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
+ if (hasSubscribers)
+ {
+ for (Subscription sub : _subscriptions.getSubscriptions())
+ {
+ if (!sub.isSuspended() && sub.hasFilters())
+ {
+ Queue<AMQMessage> preDeliveryQueue =
sub.getPreDeliveryQueue();
+ for (AMQMessage msg : messageList)
+ {
+ preDeliveryQueue.remove(msg);
+ }
+ }
+ }
+ }
+
for (AMQMessage msg : messageList)
{
if (_messages.remove(msg))
@@ -253,29 +306,42 @@
}
}
- public synchronized void removeAMessageFromTop(StoreContext storeContext)
throws AMQException
+ /**
+ * Now with implementation of predelivery queues, this method will mark
the message on the top as taken.
+ * @param storeContext
+ * @throws AMQException
+ */
+ public void removeAMessageFromTop(StoreContext storeContext) throws
AMQException
{
- AMQMessage msg = poll();
+ _lock.lock();
+ AMQMessage msg = getNextMessage();
if (msg != null)
{
- msg.dequeue(storeContext, _queue);
- _totalMessageSize.getAndAdd(-msg.getSize());
- }
+ // mark this message as taken and get it removed
+ msg.taken();
+ _queue.dequeue(storeContext, msg);
+ getNextMessage();
+ }
+
+ _lock.unlock();
}
- public synchronized long clearAllMessages(StoreContext storeContext)
throws AMQException
+ public long clearAllMessages(StoreContext storeContext) throws AMQException
{
long count = 0;
- AMQMessage msg = poll();
+ _lock.lock();
+
+ AMQMessage msg = getNextMessage();
while (msg != null)
{
- msg.dequeue(storeContext, _queue);
+ //mark this message as taken and get it removed
+ msg.taken();
+ _queue.dequeue(storeContext, msg);
+ msg = getNextMessage();
count++;
- _totalMessageSize.set(0L);
- msg = poll();
-
}
+ _lock.unlock();
return count;
}
@@ -298,6 +364,7 @@
{
//remove the already taken message
messages.poll();
+ _totalMessageSize.addAndGet(-message.getSize());
// try the next message
message = messages.peek();
}
@@ -335,6 +402,34 @@
}
/**
+ * enqueues the messages in the list on the queue and all required
predelivery queues
+ * @param storeContext
+ * @param movedMessageList
+ */
+ public void enqueueMovedMessages(StoreContext storeContext,
List<AMQMessage> movedMessageList)
+ {
+ _lock.lock();
+ for (AMQMessage msg : movedMessageList)
+ {
+ addMessageToQueue(msg);
+ }
+
+ // enqueue on the pre delivery queues
+ for (Subscription sub : _subscriptions.getSubscriptions())
+ {
+ for (AMQMessage msg : movedMessageList)
+ {
+ // Only give the message to those that want them.
+ if (sub.hasInterest(msg))
+ {
+ sub.enqueueForPreDelivery(msg);
+ }
+ }
+ }
+ _lock.unlock();
+ }
+
+ /**
* Only one thread should ever execute this method concurrently, but
* it can do so while other threads invoke deliver().
*/
@@ -343,7 +438,7 @@
// Continue to process delivery while we haveSubscribers and messages
boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
- while (hasSubscribers && hasQueuedMessages())
+ while (hasSubscribers && hasQueuedMessages() && !_movingMessages.get())
{
hasSubscribers = false;
@@ -378,11 +473,6 @@
}
}
- private AMQMessage poll()
- {
- return _messages.poll();
- }
-
public void deliver(StoreContext context, AMQShortString name, AMQMessage
msg) throws AMQException
{
if (_log.isDebugEnabled())
@@ -482,7 +572,7 @@
public void run()
{
boolean running = true;
- while (running)
+ while (running && !_movingMessages.get())
{
processQueue();
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=505268&r1=505267&r2=505268
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
Fri Feb 9 04:29:14 2007
@@ -76,7 +76,13 @@
long clearAllMessages(StoreContext storeContext) throws AMQException;
- void removeMessages(List<AMQMessage> messageListToRemove);
+ void startMovingMessages();
+
+ void enqueueMovedMessages(StoreContext context, List<AMQMessage>
messageList);
+
+ void stopMovingMessages();
+
+ void removeMovedMessages(List<AMQMessage> messageListToRemove);
List<AMQMessage> getMessages();