Author: rupertlssmith
Date: Wed Jul 4 04:11:04 2007
New Revision: 553172
URL: http://svn.apache.org/viewvc?view=rev&rev=553172
Log:
Messages moved by management console now commited on the message store.
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=553172&r1=553171&r2=553172
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Wed Jul 4 04:11:04 2007
@@ -1,25 +1,16 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
+/* Copyright Rupert Smith, 2005 to 2007, all rights reserved. */
package org.apache.qpid.server.queue;
+import java.text.MessageFormat;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.JMException;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -31,19 +22,10 @@
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import javax.management.JMException;
-
-import java.text.MessageFormat;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* This is an AMQ Queue, and should not be confused with a JMS queue or any
other abstraction like that. It is described
* fully in RFC 006.
@@ -156,26 +138,26 @@
public int compareTo(Object o)
{
- return _name.compareTo(((AMQQueue) o).getName());
+ return _name.compareTo(((AMQQueue)o).getName());
}
public AMQQueue(AMQShortString name, boolean durable, AMQShortString
owner, boolean autoDelete, VirtualHost virtualHost)
- throws AMQException
+ throws AMQException
{
this(name, durable, owner, autoDelete, virtualHost,
AsyncDeliveryConfig.getAsyncDeliveryExecutor(),
- new SubscriptionSet(), new SubscriptionImpl.Factory());
+ new SubscriptionSet(), new SubscriptionImpl.Factory());
}
protected AMQQueue(AMQShortString name, boolean durable, AMQShortString
owner, boolean autoDelete,
- VirtualHost virtualHost, SubscriptionSet subscribers) throws
AMQException
+ VirtualHost virtualHost, SubscriptionSet subscribers)
throws AMQException
{
this(name, durable, owner, autoDelete, virtualHost,
AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers,
- new SubscriptionImpl.Factory());
+ new SubscriptionImpl.Factory());
}
protected AMQQueue(AMQShortString name, boolean durable, AMQShortString
owner, boolean autoDelete,
- VirtualHost virtualHost, Executor asyncDelivery, SubscriptionSet
subscribers,
- SubscriptionFactory subscriptionFactory) throws AMQException
+ VirtualHost virtualHost, Executor asyncDelivery,
SubscriptionSet subscribers,
+ SubscriptionFactory subscriptionFactory) throws
AMQException
{
if (name == null)
{
@@ -252,7 +234,7 @@
}
/**
- * Returns messages within the given range of message Ids
+ * Returns messages within the given range of message Ids.
*
* @param fromMessageId
* @param toMessageId
@@ -292,32 +274,86 @@
* (enqueue in other queue) - Once sending to other Queue is successful,
remove messages from this queue - remove
* locks from both queues and start async delivery
*
- * @param fromMessageId
- * @param toMessageId
- * @param queueName
- * @param storeContext
+ * @param fromMessageId The first message id to move.
+ * @param toMessageId The last message id to move.
+ * @param queueName The queue to move the messages to.
+ * @param storeContext The context of the message store under which to
perform the move. This is associated with
+ * the stores transactional context.
*/
public synchronized void moveMessagesToAnotherQueue(long fromMessageId,
long toMessageId, String queueName,
- StoreContext storeContext)
+ StoreContext
storeContext)
{
- // prepare the delivery manager for moving messages by stopping the
async delivery and creating a lock
- AMQQueue anotherQueue =
getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new
AMQShortString(queueName));
+
+ MessageStore fromStore = getVirtualHost().getMessageStore();
+ MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
+
try
{
+ // Obtain locks to prevent activity on the queues being moved
between.
startMovingMessages();
+ toQueue.startMovingMessages();
+
+ // Get the list of messages to move.
List<AMQMessage> foundMessagesList =
getMessagesOnTheQueue(fromMessageId, toMessageId);
- // move messages to another queue
- anotherQueue.startMovingMessages();
- anotherQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+ try
+ {
+ fromStore.beginTran(storeContext);
- // moving is successful, now remove from original queue
- _deliveryMgr.removeMovedMessages(foundMessagesList);
+ if (toStore != fromStore)
+ {
+ toStore.beginTran(storeContext);
+ }
+
+ // Move the messages in on the message store.
+ for (AMQMessage message : foundMessagesList)
+ {
+ fromStore.dequeueMessage(storeContext, _name,
message.getMessageId());
+ toStore.enqueueMessage(storeContext, toQueue._name,
message.getMessageId());
+ }
+
+ // Commit and flush the move transcations.
+ try
+ {
+ fromStore.commitTran(storeContext);
+
+ if (toStore != fromStore)
+ {
+ toStore.commitTran(storeContext);
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Failed to commit transaction
whilst moving messages on message store.", e);
+ }
+
+ // Move the messages on the in-memory queues.
+ toQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+ _deliveryMgr.removeMovedMessages(foundMessagesList);
+ }
+ // Abort the move transactions on move failures.
+ catch (AMQException e)
+ {
+ try
+ {
+ fromStore.abortTran(storeContext);
+
+ if (toStore != fromStore)
+ {
+ toStore.abortTran(storeContext);
+ }
+ }
+ catch (AMQException ae)
+ {
+ throw new RuntimeException("Failed to abort transaction
whilst moving messages on message store.", ae);
+ }
+ }
}
+ // Release locks to allow activity on the queues being moved between
to continue.
finally
{
- // remove the lock and start the async delivery
- anotherQueue.stopMovingMessages();
+ toQueue.stopMovingMessages();
stopMovingMessages();
}
}
@@ -432,7 +468,7 @@
}
public void registerProtocolSession(AMQProtocolSession ps, int channel,
AMQShortString consumerTag, boolean acks,
- FieldTable filters, boolean noLocal, boolean exclusive) throws
AMQException
+ FieldTable filters, boolean noLocal,
boolean exclusive) throws AMQException
{
if (incrementSubscriberCount() > 1)
{
@@ -455,9 +491,8 @@
if (_logger.isDebugEnabled())
{
- _logger.debug(MessageFormat.format(
- "Registering protocol session {0} with channel {1} and " +
"consumer tag {2} with {3}", ps, channel,
- consumerTag, this));
+ _logger.debug(MessageFormat.format("Registering protocol session
{0} with channel {1} and "
+ + "consumer tag {2} with {3}",
ps, channel, consumerTag, this));
}
Subscription subscription =
@@ -499,17 +534,17 @@
if (_logger.isDebugEnabled())
{
_logger.debug(MessageFormat.format(
- "Unregistering protocol session {0} with channel {1} and
consumer tag {2} from {3}", ps, channel,
- consumerTag, this));
+ "Unregistering protocol session {0} with channel
{1} and consumer tag {2} from {3}",
+ ps, channel, consumerTag, this));
}
Subscription removedSubscription;
- if ((removedSubscription =
-
_subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel,
ps, consumerTag)))
+ if ((removedSubscription =
_subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel,
ps,
+
consumerTag)))
== null)
{
throw new AMQException("Protocol session with channel " + channel
+ " and consumer tag " + consumerTag
- + " and protocol session key " + ps.getKey() + " not
registered with queue " + this);
+ + " and protocol session key " +
ps.getKey() + " not registered with queue " + this);
}
removedSubscription.close();
@@ -688,7 +723,7 @@
return false;
}
- final AMQQueue amqQueue = (AMQQueue) o;
+ final AMQQueue amqQueue = (AMQQueue)o;
return (_name.equals(amqQueue._name));
}
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?view=diff&rev=553172&r1=553171&r2=553172
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
Wed Jul 4 04:11:04 2007
@@ -18,30 +18,23 @@
* under the License.
*
*/
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
package org.apache.qpid.server.queue;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
+import org.apache.log4j.Logger;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.CommonContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.store.StoreContext;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -60,30 +53,25 @@
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
-import org.apache.log4j.Logger;
-
-import org.apache.mina.common.ByteBuffer;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.CommonContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.store.StoreContext;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
/**
- * MBean class for AMQQueue. It implements all the management features exposed
- * for an AMQQueue.
+ * AMQQueueMBean is the management bean for an [EMAIL PROTECTED] AMQQueue}.
+ *
+ * <p/><tablse id="crc"><caption>CRC Caption</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
*/
@MBeanDescription("Management Interface for AMQQueue")
public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue,
QueueNotificationListener
{
+ /** Used for debugging purposes. */
private static final Logger _logger =
Logger.getLogger(AMQQueueMBean.class);
+
private static final SimpleDateFormat _dateFormat = new
SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z");
/**