Author: rupertlssmith
Date: Tue Jul 10 07:40:04 2007
New Revision: 554964
URL: http://svn.apache.org/viewvc?view=rev&rev=554964
Log:
Added message copy method.
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=554964&r1=554963&r2=554964
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Tue Jul 10 07:40:04 2007
@@ -268,11 +268,8 @@
}
/**
- * moves messages from this queue to another queue. to do this the
approach is following- - setup the queue for
- * moving messages (stop the async delivery) - get all the messages
available in the given message id range - setup
- * the other queue for moving messages (stop the async delivery) - send
these available messages to the other queue
- * (enqueue in other queue) - Once sending to other Queue is successful,
remove messages from this queue - remove
- * locks from both queues and start async delivery
+ * Moves messages from this queue to another queue, and also commits the
move on the message store. Delivery activity
+ * on the queues being moved between is suspended during the move.
*
* @param fromMessageId The first message id to move.
* @param toMessageId The last message id to move.
@@ -288,6 +285,11 @@
MessageStore fromStore = getVirtualHost().getMessageStore();
MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
+ if (toStore != fromStore)
+ {
+ throw new RuntimeException("Can only move messages between queues
on the same message store.");
+ }
+
try
{
// Obtain locks to prevent activity on the queues being moved
between.
@@ -301,11 +303,6 @@
{
fromStore.beginTran(storeContext);
- if (toStore != fromStore)
- {
- toStore.beginTran(storeContext);
- }
-
// Move the messages in on the message store.
for (AMQMessage message : foundMessagesList)
{
@@ -317,11 +314,6 @@
try
{
fromStore.commitTran(storeContext);
-
- if (toStore != fromStore)
- {
- toStore.commitTran(storeContext);
- }
}
catch (AMQException e)
{
@@ -338,11 +330,83 @@
try
{
fromStore.abortTran(storeContext);
+ }
+ catch (AMQException ae)
+ {
+ throw new RuntimeException("Failed to abort transaction
whilst moving messages on message store.", ae);
+ }
+ }
+ }
+ // Release locks to allow activity on the queues being moved between
to continue.
+ finally
+ {
+ toQueue.stopMovingMessages();
+ stopMovingMessages();
+ }
+ }
+
+ /**
+ * Copies messages on this queue to another queue, and also commits the
move on the message store. Delivery activity
+ * on the queues being moved between is suspended during the move.
+ *
+ * @param fromMessageId The first message id to move.
+ * @param toMessageId The last message id to move.
+ * @param queueName The queue to move the messages to.
+ * @param storeContext The context of the message store under which to
perform the move. This is associated with
+ * the stores transactional context.
+ */
+ public synchronized void copyMessagesToAnotherQueue(long fromMessageId,
long toMessageId, String queueName,
+ StoreContext
storeContext)
+ {
+ AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new
AMQShortString(queueName));
+
+ MessageStore fromStore = getVirtualHost().getMessageStore();
+ MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
+
+ if (toStore != fromStore)
+ {
+ throw new RuntimeException("Can only move messages between queues
on the same message store.");
+ }
+
+ try
+ {
+ // Obtain locks to prevent activity on the queues being moved
between.
+ startMovingMessages();
+ toQueue.startMovingMessages();
- if (toStore != fromStore)
- {
- toStore.abortTran(storeContext);
- }
+ // Get the list of messages to move.
+ List<AMQMessage> foundMessagesList =
getMessagesOnTheQueue(fromMessageId, toMessageId);
+
+ try
+ {
+ fromStore.beginTran(storeContext);
+
+ // Move the messages in on the message store.
+ for (AMQMessage message : foundMessagesList)
+ {
+ toStore.enqueueMessage(storeContext, toQueue._name,
message.getMessageId());
+ message.takeReference();
+ }
+
+ // Commit and flush the move transcations.
+ try
+ {
+ fromStore.commitTran(storeContext);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Failed to commit transaction
whilst moving messages on message store.", e);
+ }
+
+ // Move the messages on the in-memory queues.
+ toQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+ }
+ // Abort the move transactions on move failures.
+ catch (AMQException e)
+ {
+ try
+ {
+ fromStore.abortTran(storeContext);
}
catch (AMQException ae)
{