Author: rupertlssmith
Date: Wed Jul  4 04:11:04 2007
New Revision: 553172

URL: http://svn.apache.org/viewvc?view=rev&rev=553172
Log:
Messages moved by management console now commited on the message store.

Modified:
    
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java

Modified: 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=553172&r1=553171&r2=553172
==============================================================================
--- 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 (original)
+++ 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 Wed Jul  4 04:11:04 2007
@@ -1,25 +1,16 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
+/* Copyright Rupert Smith, 2005 to 2007, all rights reserved. */
 package org.apache.qpid.server.queue;
 
+import java.text.MessageFormat;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.JMException;
+
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
@@ -31,19 +22,10 @@
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-import javax.management.JMException;
-
-import java.text.MessageFormat;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * This is an AMQ Queue, and should not be confused with a JMS queue or any 
other abstraction like that. It is described
  * fully in RFC 006.
@@ -156,26 +138,26 @@
 
     public int compareTo(Object o)
     {
-        return _name.compareTo(((AMQQueue) o).getName());
+        return _name.compareTo(((AMQQueue)o).getName());
     }
 
     public AMQQueue(AMQShortString name, boolean durable, AMQShortString 
owner, boolean autoDelete, VirtualHost virtualHost)
-        throws AMQException
+             throws AMQException
     {
         this(name, durable, owner, autoDelete, virtualHost, 
AsyncDeliveryConfig.getAsyncDeliveryExecutor(),
-            new SubscriptionSet(), new SubscriptionImpl.Factory());
+             new SubscriptionSet(), new SubscriptionImpl.Factory());
     }
 
     protected AMQQueue(AMQShortString name, boolean durable, AMQShortString 
owner, boolean autoDelete,
-        VirtualHost virtualHost, SubscriptionSet subscribers) throws 
AMQException
+                       VirtualHost virtualHost, SubscriptionSet subscribers) 
throws AMQException
     {
         this(name, durable, owner, autoDelete, virtualHost, 
AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers,
-            new SubscriptionImpl.Factory());
+             new SubscriptionImpl.Factory());
     }
 
     protected AMQQueue(AMQShortString name, boolean durable, AMQShortString 
owner, boolean autoDelete,
-        VirtualHost virtualHost, Executor asyncDelivery, SubscriptionSet 
subscribers,
-        SubscriptionFactory subscriptionFactory) throws AMQException
+                       VirtualHost virtualHost, Executor asyncDelivery, 
SubscriptionSet subscribers,
+                       SubscriptionFactory subscriptionFactory) throws 
AMQException
     {
         if (name == null)
         {
@@ -252,7 +234,7 @@
     }
 
     /**
-     * Returns messages within the given range of message Ids
+     * Returns messages within the given range of message Ids.
      *
      * @param fromMessageId
      * @param toMessageId
@@ -292,32 +274,86 @@
      * (enqueue in other queue) - Once sending to other Queue is successful, 
remove messages from this queue - remove
      * locks from both queues and start async delivery
      *
-     * @param fromMessageId
-     * @param toMessageId
-     * @param queueName
-     * @param storeContext
+     * @param fromMessageId The first message id to move.
+     * @param toMessageId   The last message id to move.
+     * @param queueName     The queue to move the messages to.
+     * @param storeContext  The context of the message store under which to 
perform the move. This is associated with
+     *                      the stores transactional context.
      */
     public synchronized void moveMessagesToAnotherQueue(long fromMessageId, 
long toMessageId, String queueName,
-        StoreContext storeContext)
+                                                        StoreContext 
storeContext)
     {
-        // prepare the delivery manager for moving messages by stopping the 
async delivery and creating a lock
-        AMQQueue anotherQueue = 
getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+        AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new 
AMQShortString(queueName));
+
+        MessageStore fromStore = getVirtualHost().getMessageStore();
+        MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
+
         try
         {
+            // Obtain locks to prevent activity on the queues being moved 
between.
             startMovingMessages();
+            toQueue.startMovingMessages();
+
+            // Get the list of messages to move.
             List<AMQMessage> foundMessagesList = 
getMessagesOnTheQueue(fromMessageId, toMessageId);
 
-            // move messages to another queue
-            anotherQueue.startMovingMessages();
-            anotherQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+            try
+            {
+                fromStore.beginTran(storeContext);
 
-            // moving is successful, now remove from original queue
-            _deliveryMgr.removeMovedMessages(foundMessagesList);
+                if (toStore != fromStore)
+                {
+                    toStore.beginTran(storeContext);
+                }
+
+                // Move the messages in on the message store.
+                for (AMQMessage message : foundMessagesList)
+                {
+                    fromStore.dequeueMessage(storeContext, _name, 
message.getMessageId());
+                    toStore.enqueueMessage(storeContext, toQueue._name, 
message.getMessageId());
+                }
+
+                // Commit and flush the move transcations.
+                try
+                {
+                    fromStore.commitTran(storeContext);
+
+                    if (toStore != fromStore)
+                    {
+                        toStore.commitTran(storeContext);
+                    }
+                }
+                catch (AMQException e)
+                {
+                    throw new RuntimeException("Failed to commit transaction 
whilst moving messages on message store.", e);
+                }
+
+                // Move the messages on the in-memory queues.
+                toQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+                _deliveryMgr.removeMovedMessages(foundMessagesList);
+            }
+            // Abort the move transactions on move failures.
+            catch (AMQException e)
+            {
+                try
+                {
+                    fromStore.abortTran(storeContext);
+
+                    if (toStore != fromStore)
+                    {
+                        toStore.abortTran(storeContext);
+                    }
+                }
+                catch (AMQException ae)
+                {
+                    throw new RuntimeException("Failed to abort transaction 
whilst moving messages on message store.", ae);
+                }
+            }
         }
+        // Release locks to allow activity on the queues being moved between 
to continue.
         finally
         {
-            // remove the lock and start the async delivery
-            anotherQueue.stopMovingMessages();
+            toQueue.stopMovingMessages();
             stopMovingMessages();
         }
     }
@@ -432,7 +468,7 @@
     }
 
     public void registerProtocolSession(AMQProtocolSession ps, int channel, 
AMQShortString consumerTag, boolean acks,
-        FieldTable filters, boolean noLocal, boolean exclusive) throws 
AMQException
+                                        FieldTable filters, boolean noLocal, 
boolean exclusive) throws AMQException
     {
         if (incrementSubscriberCount() > 1)
         {
@@ -455,9 +491,8 @@
 
         if (_logger.isDebugEnabled())
         {
-            _logger.debug(MessageFormat.format(
-                    "Registering protocol session {0} with channel {1} and " + 
"consumer tag {2} with {3}", ps, channel,
-                    consumerTag, this));
+            _logger.debug(MessageFormat.format("Registering protocol session 
{0} with channel {1} and "
+                                               + "consumer tag {2} with {3}", 
ps, channel, consumerTag, this));
         }
 
         Subscription subscription =
@@ -499,17 +534,17 @@
         if (_logger.isDebugEnabled())
         {
             _logger.debug(MessageFormat.format(
-                    "Unregistering protocol session {0} with channel {1} and 
consumer tag {2} from {3}", ps, channel,
-                    consumerTag, this));
+                              "Unregistering protocol session {0} with channel 
{1} and consumer tag {2} from {3}",
+                              ps, channel, consumerTag, this));
         }
 
         Subscription removedSubscription;
-        if ((removedSubscription =
-                        
_subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, 
ps, consumerTag)))
+        if ((removedSubscription = 
_subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, 
ps,
+                                                                               
                          consumerTag)))
                 == null)
         {
             throw new AMQException("Protocol session with channel " + channel 
+ " and consumer tag " + consumerTag
-                + " and protocol session key " + ps.getKey() + " not 
registered with queue " + this);
+                                   + " and protocol session key " + 
ps.getKey() + " not registered with queue " + this);
         }
 
         removedSubscription.close();
@@ -688,7 +723,7 @@
             return false;
         }
 
-        final AMQQueue amqQueue = (AMQQueue) o;
+        final AMQQueue amqQueue = (AMQQueue)o;
 
         return (_name.equals(amqQueue._name));
     }

Modified: 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?view=diff&rev=553172&r1=553171&r2=553172
==============================================================================
--- 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
 (original)
+++ 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
 Wed Jul  4 04:11:04 2007
@@ -18,30 +18,23 @@
  * under the License.
  *
  */
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
 package org.apache.qpid.server.queue;
 
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
+import org.apache.log4j.Logger;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.CommonContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.store.StoreContext;
 
 import javax.management.JMException;
 import javax.management.MBeanException;
@@ -60,30 +53,25 @@
 import javax.management.openmbean.TabularDataSupport;
 import javax.management.openmbean.TabularType;
 
-import org.apache.log4j.Logger;
-
-import org.apache.mina.common.ByteBuffer;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.CommonContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.store.StoreContext;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
 
 /**
- * MBean class for AMQQueue. It implements all the management features exposed
- * for an AMQQueue.
+ * AMQQueueMBean is the management bean for an [EMAIL PROTECTED] AMQQueue}.
+ *
+ * <p/><tablse id="crc"><caption>CRC Caption</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
  */
 @MBeanDescription("Management Interface for AMQQueue")
 public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, 
QueueNotificationListener
 {
+    /** Used for debugging purposes. */
     private static final Logger _logger = 
Logger.getLogger(AMQQueueMBean.class);
+
     private static final SimpleDateFormat _dateFormat = new 
SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z");
 
     /**


Reply via email to