Author: rupertlssmith
Date: Tue Jul 10 07:40:04 2007
New Revision: 554964

URL: http://svn.apache.org/viewvc?view=rev&rev=554964
Log:
Added message copy method.

Modified:
    
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java

Modified: 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=554964&r1=554963&r2=554964
==============================================================================
--- 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 (original)
+++ 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 Tue Jul 10 07:40:04 2007
@@ -268,11 +268,8 @@
     }
 
     /**
-     * moves messages from this queue to another queue. to do this the 
approach is following- - setup the queue for
-     * moving messages (stop the async delivery) - get all the messages 
available in the given message id range - setup
-     * the other queue for moving messages (stop the async delivery) - send 
these available messages to the other queue
-     * (enqueue in other queue) - Once sending to other Queue is successful, 
remove messages from this queue - remove
-     * locks from both queues and start async delivery
+     * Moves messages from this queue to another queue, and also commits the 
move on the message store. Delivery activity
+     * on the queues being moved between is suspended during the move.
      *
      * @param fromMessageId The first message id to move.
      * @param toMessageId   The last message id to move.
@@ -288,6 +285,11 @@
         MessageStore fromStore = getVirtualHost().getMessageStore();
         MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
 
+        if (toStore != fromStore)
+        {
+            throw new RuntimeException("Can only move messages between queues 
on the same message store.");
+        }
+
         try
         {
             // Obtain locks to prevent activity on the queues being moved 
between.
@@ -301,11 +303,6 @@
             {
                 fromStore.beginTran(storeContext);
 
-                if (toStore != fromStore)
-                {
-                    toStore.beginTran(storeContext);
-                }
-
                 // Move the messages in on the message store.
                 for (AMQMessage message : foundMessagesList)
                 {
@@ -317,11 +314,6 @@
                 try
                 {
                     fromStore.commitTran(storeContext);
-
-                    if (toStore != fromStore)
-                    {
-                        toStore.commitTran(storeContext);
-                    }
                 }
                 catch (AMQException e)
                 {
@@ -338,11 +330,83 @@
                 try
                 {
                     fromStore.abortTran(storeContext);
+                }
+                catch (AMQException ae)
+                {
+                    throw new RuntimeException("Failed to abort transaction 
whilst moving messages on message store.", ae);
+                }
+            }
+        }
+        // Release locks to allow activity on the queues being moved between 
to continue.
+        finally
+        {
+            toQueue.stopMovingMessages();
+            stopMovingMessages();
+        }
+    }
+
+    /**
+     * Copies messages on this queue to another queue, and also commits the 
move on the message store. Delivery activity
+     * on the queues being moved between is suspended during the move.
+     *
+     * @param fromMessageId The first message id to move.
+     * @param toMessageId   The last message id to move.
+     * @param queueName     The queue to move the messages to.
+     * @param storeContext  The context of the message store under which to 
perform the move. This is associated with
+     *                      the stores transactional context.
+     */
+    public synchronized void copyMessagesToAnotherQueue(long fromMessageId, 
long toMessageId, String queueName,
+                                                        StoreContext 
storeContext)
+    {
+        AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new 
AMQShortString(queueName));
+
+        MessageStore fromStore = getVirtualHost().getMessageStore();
+        MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
+
+        if (toStore != fromStore)
+        {
+            throw new RuntimeException("Can only move messages between queues 
on the same message store.");
+        }
+
+        try
+        {
+            // Obtain locks to prevent activity on the queues being moved 
between.
+            startMovingMessages();
+            toQueue.startMovingMessages();
 
-                    if (toStore != fromStore)
-                    {
-                        toStore.abortTran(storeContext);
-                    }
+            // Get the list of messages to move.
+            List<AMQMessage> foundMessagesList = 
getMessagesOnTheQueue(fromMessageId, toMessageId);
+
+            try
+            {
+                fromStore.beginTran(storeContext);
+
+                // Move the messages in on the message store.
+                for (AMQMessage message : foundMessagesList)
+                {
+                    toStore.enqueueMessage(storeContext, toQueue._name, 
message.getMessageId());
+                    message.takeReference();
+                }
+
+                // Commit and flush the move transcations.
+                try
+                {
+                    fromStore.commitTran(storeContext);
+                }
+                catch (AMQException e)
+                {
+                    throw new RuntimeException("Failed to commit transaction 
whilst moving messages on message store.", e);
+                }
+
+                // Move the messages on the in-memory queues.
+                toQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+            }
+            // Abort the move transactions on move failures.
+            catch (AMQException e)
+            {
+                try
+                {
+                    fromStore.abortTran(storeContext);
                 }
                 catch (AMQException ae)
                 {


Reply via email to