Author: bhupendrab
Date: Fri Feb  9 04:29:14 2007
New Revision: 505268

URL: http://svn.apache.org/viewvc?view=rev&rev=505268
Log:
QPID-170
predelivery queues will also be cleared with moved messages. Messages will be 
moved to another queue and predelivery queues of subsribers of another queue 
will also be populated.
the features - removeMmessageFromTop and clearQueue is also modified by using 
the getNextMessage

Modified:
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=505268&r1=505267&r2=505268
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 Fri Feb  9 04:29:14 2007
@@ -41,6 +41,7 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * This is an AMQ Queue, and should not be confused with a JMS queue or any 
other abstraction like
@@ -157,7 +158,7 @@
     /**
      * total messages received by the queue since startup.
      */
-    public long _totalMessagesReceived = 0;
+    public AtomicLong _totalMessagesReceived = new AtomicLong();
 
     public int compareTo(Object o)
     {
@@ -291,59 +292,77 @@
     }
 
     /**
-     * @see ManagedQueue#moveMessages
+     * moves messages from this queue to another queue. to do this the 
approach is following-
+     * - setup the queue for moving messages (hold the lock and stop the async 
delivery)
+     * - get all the messages available in the given message id range
+     * - setup the other queue for moving messages (hold the lock and stop the 
async delivery)
+     * - send these available messages to the other queue (enqueue in other 
queue)
+     * - Once sending to other Queue is successful, remove messages from this 
queue
+     * - remove locks from both queues and start async delivery
      * @param fromMessageId
      * @param toMessageId
      * @param queueName
      * @param storeContext
-     * @throws AMQException
      */
     public synchronized void moveMessagesToAnotherQueue(long fromMessageId, 
long toMessageId, String queueName,
-                                                        StoreContext 
storeContext) throws AMQException
+                                                        StoreContext 
storeContext)
     {
+        // prepare the delivery manager for moving messages by stopping the 
async delivery and creating a lock
         AMQQueue anotherQueue = 
getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
-        List<AMQMessage> list = getMessagesOnTheQueue();
-        List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
-        int maxMessageCountToBeMoved = (int)(toMessageId - fromMessageId + 1);
-        for (AMQMessage message : list)
+        try
         {
-            long msgId = message.getMessageId();
-            if (msgId >= fromMessageId && msgId <= toMessageId)
-            {
-                foundMessagesList.add(message);
-            }
-            // break the loop as soon as messages to be removed are found
-            if (foundMessagesList.size() == maxMessageCountToBeMoved)
+            startMovingMessages();
+            List<AMQMessage> list = getMessagesOnTheQueue();
+            List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
+            int maxMessageCountToBeMoved = (int)(toMessageId - fromMessageId + 
1);
+
+            // Run this loop till you find all the messages or the list has no 
more messages
+            for (AMQMessage message : list)
             {
-                break;
+                long msgId = message.getMessageId();
+                if (msgId >= fromMessageId && msgId <= toMessageId)
+                {
+                    foundMessagesList.add(message);
+                }
+                // break the loop as soon as messages to be removed are found
+                if (foundMessagesList.size() == maxMessageCountToBeMoved)
+                {
+                    break;
+                }
             }
-        }
 
-        // move messages to another queue
-        for (AMQMessage message : foundMessagesList)
+            // move messages to another queue
+            anotherQueue.startMovingMessages();
+            anotherQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+
+            // moving is successful, now remove from original queue
+            _deliveryMgr.removeMovedMessages(foundMessagesList);
+        }
+        finally
         {
-            try
-            {
-                anotherQueue.process(storeContext, message);
-            }
-            catch(AMQException ex)
-            {
-                foundMessagesList.subList(foundMessagesList.indexOf(message), 
foundMessagesList.size()).clear();
-                // Exception occured, so rollback the changes
-                anotherQueue.removeMessages(foundMessagesList);
-                throw ex;
-            }
+            // remove the lock and start the async delivery
+            anotherQueue.stopMovingMessages();
+            stopMovingMessages();   
         }
+    }
 
-        // moving is successful, now remove from original queue
-        removeMessages(foundMessagesList);
+    public void startMovingMessages()
+    {
+        _deliveryMgr.startMovingMessages();
     }
 
-    public synchronized void removeMessages(List<AMQMessage> messageList)
+    private void enqueueMovedMessages(StoreContext storeContext, 
List<AMQMessage> messageList)
     {
-        _deliveryMgr.removeMessages(messageList);
+        _deliveryMgr.enqueueMovedMessages(storeContext, messageList);
+        _totalMessagesReceived.addAndGet(messageList.size());
     }
 
+    public void stopMovingMessages()
+    {
+        _deliveryMgr.stopMovingMessages();
+        _deliveryMgr.processAsync(_asyncDelivery);
+    }
+    
     /**
      * @return MBean object associated with this Queue
      */
@@ -374,7 +393,7 @@
 
     public long getReceivedMessageCount()
     {
-        return _totalMessagesReceived;
+        return _totalMessagesReceived.get();
     }
 
     public int getMaximumMessageCount()
@@ -407,7 +426,7 @@
     /**
      * Removes the AMQMessage from the top of the queue.
      */
-    public void deleteMessageFromTop(StoreContext storeContext) throws 
AMQException
+    public synchronized void deleteMessageFromTop(StoreContext storeContext) 
throws AMQException
     {
         _deliveryMgr.removeAMessageFromTop(storeContext);
     }
@@ -415,7 +434,7 @@
     /**
      * removes all the messages from the queue.
      */
-    public long clearQueue(StoreContext storeContext) throws AMQException
+    public synchronized long clearQueue(StoreContext storeContext) throws 
AMQException
     {
         return _deliveryMgr.clearAllMessages(storeContext);
     }
@@ -633,7 +652,7 @@
 
     protected void updateReceivedMessageCount(AMQMessage msg) throws 
AMQException
     {
-        _totalMessagesReceived++;
+        _totalMessagesReceived.incrementAndGet();
         try
         {
             _managedObject.checkForNotification(msg);

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?view=diff&rev=505268&r1=505267&r2=505268
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
 Fri Feb  9 04:29:14 2007
@@ -395,24 +395,9 @@
         {
             throw new OperationsException("\"From MessageId\" should be 
greater then 0 and less then \"To MessageId\"");            
         }
-        
-        try
-        {
-            _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, 
toQueueName, _storeContext);
-        }
-        catch(AMQException amqex)
-        {
-            throw new JMException("Error moving messages to "  + toQueueName + 
": " + amqex);
-        }
 
+        _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, 
toQueueName, _storeContext);
     }
-//
-//    public ObjectName getObjectName() throws MalformedObjectNameException
-//    {
-//        String objNameString = super.getObjectName().toString();
-//
-//        return new ObjectName(objNameString);
-//    }
 
 
     /**

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=505268&r1=505267&r2=505268
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 Fri Feb  9 04:29:14 2007
@@ -75,7 +75,13 @@
      */
     private final AMQQueue _queue;
 
-
+    /**
+     * Flag used while moving messages from this queue to another. For moving 
messages the async delivery
+     * should also stop. This flat should be set to true to stop async 
delivery and set to false to enable
+     * async delivery again.
+     */
+    private AtomicBoolean _movingMessages = new AtomicBoolean();
+    
     /**
      * Lock used to ensure that an channel that becomes unsuspended during the 
start of the queueing process is forced
      * to wait till the first message is added to the queue. This will ensure 
that the _queue has messages to be delivered
@@ -167,9 +173,12 @@
     }
 
 
-    public synchronized List<AMQMessage> getMessages()
+    public List<AMQMessage> getMessages()
     {
-        return new ArrayList<AMQMessage>(_messages);
+        _lock.lock();
+        ArrayList<AMQMessage> list = new ArrayList<AMQMessage>(_messages);
+        _lock.unlock();
+        return list;
     }
 
     public void populatePreDeliveryQueue(Subscription subscription)
@@ -242,8 +251,52 @@
         }
     }
 
-    public synchronized void removeMessages(List<AMQMessage> messageList)
+    /**
+     * For feature of moving messages, this method is used. It sets the lock 
and sets the movingMessages flag,
+     * so that the asyn delivery is also stopped.
+     */
+    public void startMovingMessages()
+    {
+        _lock.lock();
+        _movingMessages.set(true);
+    }
+
+    /**
+     * Once moving messages to another queue is done or aborted, remove lock 
and unset the movingMessages flag,
+     * so that the async delivery can start again.
+     */
+    public void stopMovingMessages()
+    {
+        _movingMessages.set(false);
+        if (_lock.isHeldByCurrentThread())
+        {
+            _lock.unlock();
+        }
+    }
+
+    /**
+     * Messages will be removed from this queue and all preDeliveryQueues
+     * @param messageList
+     */
+    public void removeMovedMessages(List<AMQMessage> messageList)
     {
+        // Remove from the
+        boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
+        if (hasSubscribers)
+        {
+            for (Subscription sub : _subscriptions.getSubscriptions())
+            {
+                if (!sub.isSuspended() && sub.hasFilters())
+                {
+                    Queue<AMQMessage> preDeliveryQueue = 
sub.getPreDeliveryQueue();
+                    for (AMQMessage msg : messageList)
+                    {
+                        preDeliveryQueue.remove(msg);
+                    }
+                }
+            }
+        }
+
         for (AMQMessage msg : messageList)
         {
             if (_messages.remove(msg))
@@ -253,29 +306,42 @@
         }
     }
 
-    public synchronized void removeAMessageFromTop(StoreContext storeContext) 
throws AMQException
+    /**
+     * Now with implementation of predelivery queues, this method will mark 
the message on the top as taken.
+     * @param storeContext
+     * @throws AMQException
+     */
+    public void removeAMessageFromTop(StoreContext storeContext) throws 
AMQException
     {
-        AMQMessage msg = poll();
+        _lock.lock();
+        AMQMessage msg = getNextMessage();
         if (msg != null)
         {
-            msg.dequeue(storeContext, _queue);
-            _totalMessageSize.getAndAdd(-msg.getSize());
-        }        
+            // mark this message as taken and get it removed
+            msg.taken();
+            _queue.dequeue(storeContext, msg);
+            getNextMessage();
+        }
+        
+        _lock.unlock();
     }
 
-    public synchronized long clearAllMessages(StoreContext storeContext) 
throws AMQException
+    public long clearAllMessages(StoreContext storeContext) throws AMQException
     {
         long count = 0;
-        AMQMessage msg = poll();
+        _lock.lock();
+
+        AMQMessage msg = getNextMessage();
         while (msg != null)
         {
-            msg.dequeue(storeContext, _queue);
+            //mark this message as taken and get it removed
+            msg.taken();
+            _queue.dequeue(storeContext, msg);
+            msg = getNextMessage();
             count++;
-            _totalMessageSize.set(0L);
-            msg = poll();
-
         }
 
+        _lock.unlock();
         return count;
     }
 
@@ -298,6 +364,7 @@
         {
             //remove the already taken message
             messages.poll();
+            _totalMessageSize.addAndGet(-message.getSize());
             // try the next message
             message = messages.peek();
         }
@@ -335,6 +402,34 @@
     }
 
     /**
+     * enqueues the messages in the list on the queue and all required 
predelivery queues
+     * @param storeContext
+     * @param movedMessageList
+     */
+    public void enqueueMovedMessages(StoreContext storeContext, 
List<AMQMessage> movedMessageList)
+    {
+        _lock.lock();
+        for (AMQMessage msg : movedMessageList)
+        {
+            addMessageToQueue(msg);
+        }
+
+        // enqueue on the pre delivery queues
+        for (Subscription sub : _subscriptions.getSubscriptions())
+        {
+            for (AMQMessage msg : movedMessageList)
+            {
+                // Only give the message to those that want them.
+                if (sub.hasInterest(msg))
+                {
+                    sub.enqueueForPreDelivery(msg);
+                }
+            }
+        }
+        _lock.unlock();
+    }
+
+    /**
      * Only one thread should ever execute this method concurrently, but
      * it can do so while other threads invoke deliver().
      */
@@ -343,7 +438,7 @@
         // Continue to process delivery while we haveSubscribers and messages
         boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
 
-        while (hasSubscribers && hasQueuedMessages())
+        while (hasSubscribers && hasQueuedMessages() && !_movingMessages.get())
         {
             hasSubscribers = false;
 
@@ -378,11 +473,6 @@
         }
     }
 
-    private AMQMessage poll()
-    {
-        return _messages.poll();
-    }
-
     public void deliver(StoreContext context, AMQShortString name, AMQMessage 
msg) throws AMQException
     {
         if (_log.isDebugEnabled())
@@ -482,7 +572,7 @@
         public void run()
         {
             boolean running = true;
-            while (running)
+            while (running && !_movingMessages.get())
             {
                 processQueue();
 

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=505268&r1=505267&r2=505268
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
 Fri Feb  9 04:29:14 2007
@@ -76,7 +76,13 @@
 
     long clearAllMessages(StoreContext storeContext) throws AMQException;
 
-    void removeMessages(List<AMQMessage> messageListToRemove);
+    void startMovingMessages();
+
+    void enqueueMovedMessages(StoreContext context, List<AMQMessage> 
messageList);
+
+    void stopMovingMessages();
+
+    void removeMovedMessages(List<AMQMessage> messageListToRemove);
 
     List<AMQMessage> getMessages();
 


Reply via email to