Author: rgreig
Date: Wed Dec 6 04:41:25 2006
New Revision: 483058
URL: http://svn.apache.org/viewvc?view=rev&rev=483058
Log:
Improvements to support message recovery on broker startup
Added:
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
(with props)
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=483058&r1=483057&r2=483058
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Wed Dec 6 04:41:25 2006
@@ -54,23 +54,8 @@
private AMQMessageHandle _messageHandle;
- /**
- * Stored temporarily until the header has been received at which point it
is used when
- * constructing the handle
- */
- private BasicPublishBody _publishBody;
-
- /**
- * Also stored temporarily.
- */
- private ContentHeaderBody _contentHeaderBody;
-
- /**
- * Keeps a track of how many bytes we have received in body frames
- */
- private long _bodyLengthReceived = 0;
-
- private final TransactionalContext _txnContext;
+ // TODO: ideally this should be able to go into the transient message date
- check this! (RG)
+ private TransactionalContext _txnContext;
/**
* Flag to indicate whether message has been delivered to a
@@ -79,12 +64,7 @@
*/
private boolean _deliveredToConsumer;
- /**
- * This is stored during routing, to know the queues to which this message
should immediately be
- * delivered. It is <b>cleared after delivery has been attempted</b>. Any
persistent record of destinations is done
- * by the message handle.
- */
- private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>();
+ private TransientMessageData _transientMessageData = new
TransientMessageData();
/**
* Used to iterate through all the body frames associated with this
message. Will not
@@ -160,7 +140,7 @@
{
_messageId = messageId;
_txnContext = txnContext;
- _publishBody = publishBody;
+ _transientMessageData.setPublishBody(publishBody);
if (_log.isDebugEnabled())
{
_log.debug("Message created with id " + messageId);
@@ -168,6 +148,22 @@
}
/**
+ * Used when recovering, i.e. when the message store is creating
references to messages.
+ * In that case, the normal enqueue/routingComplete is not done since the
recovery process
+ * is responsible for routing the messages to queues.
+ * @param messageId
+ * @param store
+ * @param factory
+ * @throws AMQException
+ */
+ public AMQMessage(long messageId, MessageStore store, MessageHandleFactory
factory) throws AMQException
+ {
+ _messageId = messageId;
+ _messageHandle = factory.createMessageHandle(messageId, store, true);
+ _transientMessageData = null;
+ }
+
+ /**
* Used in testing only. This allows the passing of the content header
immediately
* on construction.
* @param messageId
@@ -200,7 +196,7 @@
MessageHandleFactory messageHandleFactory) throws
AMQException
{
this(messageId, publishBody, txnContext, contentHeader);
- _destinationQueues = destinationQueues;
+ _transientMessageData.setDestinationQueues(destinationQueues);
routingComplete(messageStore, messageHandleFactory);
for (ContentBody cb : contentBodies)
{
@@ -214,6 +210,7 @@
_messageHandle = msg._messageHandle;
_txnContext = msg._txnContext;
_deliveredToConsumer = msg._deliveredToConsumer;
+ _transientMessageData = msg._transientMessageData;
}
public Iterator<AMQDataBlock> getBodyFrameIterator(int channel)
@@ -228,9 +225,9 @@
public ContentHeaderBody getContentHeaderBody() throws AMQException
{
- if (_contentHeaderBody != null)
+ if (_transientMessageData != null)
{
- return _contentHeaderBody;
+ return _transientMessageData.getContentHeaderBody();
}
else
{
@@ -241,7 +238,7 @@
public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
throws AMQException
{
- _contentHeaderBody = contentHeaderBody;
+ _transientMessageData.setContentHeaderBody(contentHeaderBody);
}
public void routingComplete(MessageStore store, MessageHandleFactory
factory) throws AMQException
@@ -255,12 +252,12 @@
// enqueuing the messages ensure that if required the destinations are
recorded to a
// persistent store
- for (AMQQueue q : _destinationQueues)
+ for (AMQQueue q : _transientMessageData.getDestinationQueues())
{
_messageHandle.enqueue(_messageId, q);
}
- if (_contentHeaderBody.bodySize == 0)
+ if (_transientMessageData.getContentHeaderBody().bodySize == 0)
{
deliver();
}
@@ -268,7 +265,7 @@
public boolean addContentBodyFrame(ContentBody contentBody) throws
AMQException
{
- _bodyLengthReceived += contentBody.getSize();
+ _transientMessageData.addBodyLength(contentBody.getSize());
_messageHandle.addContentBodyFrame(_messageId, contentBody);
if (isAllContentReceived())
{
@@ -283,7 +280,7 @@
public boolean isAllContentReceived() throws AMQException
{
- return _bodyLengthReceived == _contentHeaderBody.bodySize;
+ return _transientMessageData.isAllContentReceived();
}
public long getMessageId()
@@ -384,7 +381,7 @@
*/
public void enqueue(AMQQueue queue) throws AMQException
{
- _destinationQueues.add(queue);
+ _transientMessageData.addDestinationQueue(queue);
}
public void dequeue(AMQQueue queue) throws AMQException
@@ -394,11 +391,9 @@
public boolean isPersistent() throws AMQException
{
- if (_contentHeaderBody != null)
+ if (_transientMessageData != null)
{
- //todo remove literal values to a constant file such as
AMQConstants in common
- return _contentHeaderBody.properties instanceof
BasicContentHeaderProperties &&
- ((BasicContentHeaderProperties)
_contentHeaderBody.properties).getDeliveryMode() == 2;
+ return _transientMessageData.isPersistent();
}
else
{
@@ -425,9 +420,9 @@
public BasicPublishBody getPublishBody() throws AMQException
{
BasicPublishBody pb;
- if (_publishBody != null)
+ if (_transientMessageData != null)
{
- pb = _publishBody;
+ pb = _transientMessageData.getPublishBody();
}
else
{
@@ -452,26 +447,30 @@
private void deliver() throws AMQException
{
- // first we allow the handle to know that the message has been fully
received. This is useful if it is
- // maintaining any calculated values based on content chunks
+ // we get a reference to the destination queues now so that we can
clear the
+ // transient message data as quickly as possible
+ List<AMQQueue> destinationQueues =
_transientMessageData.getDestinationQueues();
try
{
- _messageHandle.setPublishAndContentHeaderBody(_messageId,
_publishBody, _contentHeaderBody);
- _publishBody = null;
- _contentHeaderBody = null;
+ // first we allow the handle to know that the message has been
fully received. This is useful if it is
+ // maintaining any calculated values based on content chunks
+ _messageHandle.setPublishAndContentHeaderBody(_messageId,
_transientMessageData.getPublishBody(),
+
_transientMessageData.getContentHeaderBody());
// we then allow the transactional context to do something with
the message content
// now that it has all been received, before we attempt delivery
_txnContext.messageFullyReceived(isPersistent());
- for (AMQQueue q : _destinationQueues)
+
+ _transientMessageData = null;
+
+ for (AMQQueue q : destinationQueues)
{
_txnContext.deliver(this, q);
}
}
finally
{
- _destinationQueues.clear();
- _destinationQueues = null;
+ destinationQueues.clear();
decrementReference();
}
}
Added:
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java?view=auto&rev=483058
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
(added)
+++
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
Wed Dec 6 04:41:25 2006
@@ -0,0 +1,118 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.AMQException;
+
+import java.util.List;
+import java.util.LinkedList;
+
+/**
+ * Contains data that is only used in AMQMessage transiently, e.g. while the
content
+ * body fragments are arriving.
+ *
+ * Having this data stored in a separate class means that the AMQMessage class
avoids
+ * the small overhead of numerous guaranteed-null references.
+ *
+ * @author Apache Software Foundation
+ */
+public class TransientMessageData
+{
+ /**
+ * Stored temporarily until the header has been received at which point it
is used when
+ * constructing the handle
+ */
+ private BasicPublishBody _publishBody;
+
+ /**
+ * Also stored temporarily.
+ */
+ private ContentHeaderBody _contentHeaderBody;
+
+ /**
+ * Keeps a track of how many bytes we have received in body frames
+ */
+ private long _bodyLengthReceived = 0;
+
+ /**
+ * This is stored during routing, to know the queues to which this message
should immediately be
+ * delivered. It is <b>cleared after delivery has been attempted</b>. Any
persistent record of destinations is done
+ * by the message handle.
+ */
+ private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>();
+
+ public BasicPublishBody getPublishBody()
+ {
+ return _publishBody;
+ }
+
+ public void setPublishBody(BasicPublishBody publishBody)
+ {
+ _publishBody = publishBody;
+ }
+
+ public List<AMQQueue> getDestinationQueues()
+ {
+ return _destinationQueues;
+ }
+
+ public void setDestinationQueues(List<AMQQueue> destinationQueues)
+ {
+ _destinationQueues = destinationQueues;
+ }
+
+ public ContentHeaderBody getContentHeaderBody()
+ {
+ return _contentHeaderBody;
+ }
+
+ public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
+ {
+ _contentHeaderBody = contentHeaderBody;
+ }
+
+ public long getBodyLengthReceived()
+ {
+ return _bodyLengthReceived;
+ }
+
+ public void addBodyLength(int value)
+ {
+ _bodyLengthReceived += value;
+ }
+
+ public boolean isAllContentReceived() throws AMQException
+ {
+ return _bodyLengthReceived == _contentHeaderBody.bodySize;
+ }
+
+ public void addDestinationQueue(AMQQueue queue)
+ {
+ _destinationQueues.add(queue);
+ }
+
+ public boolean isPersistent()
+ {
+ //todo remove literal values to a constant file such as AMQConstants
in common
+ return _contentHeaderBody.properties instanceof
BasicContentHeaderProperties &&
+ ((BasicContentHeaderProperties)
_contentHeaderBody.properties).getDeliveryMode() == 2;
+ }
+}
Propchange:
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=diff&rev=483058&r1=483057&r2=483058
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
Wed Dec 6 04:41:25 2006
@@ -41,7 +41,7 @@
public ContentHeaderBody getContentHeaderBody(long messageId) throws
AMQException
{
- ContentHeaderBody chb = _contentHeaderBody.get();
+ ContentHeaderBody chb = (_contentHeaderBody !=
null?_contentHeaderBody.get():null);
if (chb == null)
{
MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
@@ -87,7 +87,7 @@
public BasicPublishBody getPublishBody(long messageId) throws AMQException
{
- BasicPublishBody bpb = _publishBody.get();
+ BasicPublishBody bpb = (_publishBody != null?_publishBody.get():null);
if (bpb == null)
{
MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
Modified:
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java?view=diff&rev=483058&r1=483057&r2=483058
==============================================================================
---
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java
(original)
+++
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java
Wed Dec 6 04:41:25 2006
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,7 +22,6 @@
import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.failover.FailoverException;
import javax.jms.JMSException;
@@ -56,7 +55,7 @@
}
catch (FailoverException e)
{
- _log.info("Failover exception caught during operation");
+ _log.info("Failover exception caught during operation: " +
e, e);
}
}
}