Author: rgreig
Date: Wed Dec  6 04:41:25 2006
New Revision: 483058

URL: http://svn.apache.org/viewvc?view=rev&rev=483058
Log:
Improvements to support message recovery on broker startup

Added:
    
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
   (with props)
Modified:
    
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
    
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=483058&r1=483057&r2=483058
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 Wed Dec  6 04:41:25 2006
@@ -54,23 +54,8 @@
 
     private AMQMessageHandle _messageHandle;
 
-    /**
-     * Stored temporarily until the header has been received at which point it 
is used when
-     * constructing the handle
-     */
-    private BasicPublishBody _publishBody;
-
-    /**
-     * Also stored temporarily.
-     */
-    private ContentHeaderBody _contentHeaderBody;
-
-    /**
-     * Keeps a track of how many bytes we have received in body frames
-     */
-    private long _bodyLengthReceived = 0;
-
-    private final TransactionalContext _txnContext;
+    // TODO: ideally this should be able to go into the transient message date 
- check this! (RG)
+    private TransactionalContext _txnContext;
 
     /**
      * Flag to indicate whether message has been delivered to a
@@ -79,12 +64,7 @@
      */
     private boolean _deliveredToConsumer;
 
-    /**
-     * This is stored during routing, to know the queues to which this message 
should immediately be
-     * delivered. It is <b>cleared after delivery has been attempted</b>. Any 
persistent record of destinations is done
-     * by the message handle.
-     */
-    private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>();
+    private TransientMessageData _transientMessageData = new 
TransientMessageData();
 
     /**
      * Used to iterate through all the body frames associated with this 
message. Will not
@@ -160,7 +140,7 @@
     {
         _messageId = messageId;
         _txnContext = txnContext;
-        _publishBody = publishBody;
+        _transientMessageData.setPublishBody(publishBody);
         if (_log.isDebugEnabled())
         {
             _log.debug("Message created with id " + messageId);
@@ -168,6 +148,22 @@
     }
 
     /**
+     * Used when recovering, i.e. when the message store is creating 
references to messages.
+     * In that case, the normal enqueue/routingComplete is not done since the 
recovery process
+     * is responsible for routing the messages to queues.
+     * @param messageId
+     * @param store
+     * @param factory
+     * @throws AMQException
+     */
+    public AMQMessage(long messageId, MessageStore store, MessageHandleFactory 
factory) throws AMQException
+    {
+        _messageId = messageId;
+        _messageHandle = factory.createMessageHandle(messageId, store, true);
+        _transientMessageData = null;
+    }
+
+    /**
      * Used in testing only. This allows the passing of the content header 
immediately
      * on construction.
      * @param messageId
@@ -200,7 +196,7 @@
                       MessageHandleFactory messageHandleFactory) throws 
AMQException
     {
         this(messageId, publishBody, txnContext, contentHeader);
-        _destinationQueues = destinationQueues;
+        _transientMessageData.setDestinationQueues(destinationQueues);
         routingComplete(messageStore, messageHandleFactory);
         for (ContentBody cb : contentBodies)
         {
@@ -214,6 +210,7 @@
         _messageHandle = msg._messageHandle;
         _txnContext = msg._txnContext;
         _deliveredToConsumer = msg._deliveredToConsumer;
+        _transientMessageData = msg._transientMessageData;
     }
 
     public Iterator<AMQDataBlock> getBodyFrameIterator(int channel)
@@ -228,9 +225,9 @@
 
     public ContentHeaderBody getContentHeaderBody() throws AMQException
     {
-        if (_contentHeaderBody != null)
+        if (_transientMessageData != null)
         {
-            return _contentHeaderBody;
+            return _transientMessageData.getContentHeaderBody();
         }
         else
         {
@@ -241,7 +238,7 @@
     public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
             throws AMQException
     {
-        _contentHeaderBody = contentHeaderBody;
+        _transientMessageData.setContentHeaderBody(contentHeaderBody);
     }
 
     public void routingComplete(MessageStore store, MessageHandleFactory 
factory) throws AMQException
@@ -255,12 +252,12 @@
 
         // enqueuing the messages ensure that if required the destinations are 
recorded to a
         // persistent store
-        for (AMQQueue q : _destinationQueues)
+        for (AMQQueue q : _transientMessageData.getDestinationQueues())
         {
             _messageHandle.enqueue(_messageId, q);
         }
 
-        if (_contentHeaderBody.bodySize == 0)
+        if (_transientMessageData.getContentHeaderBody().bodySize == 0)
         {
             deliver();
         }
@@ -268,7 +265,7 @@
 
     public boolean addContentBodyFrame(ContentBody contentBody) throws 
AMQException
     {
-        _bodyLengthReceived += contentBody.getSize();
+        _transientMessageData.addBodyLength(contentBody.getSize());
         _messageHandle.addContentBodyFrame(_messageId, contentBody);
         if (isAllContentReceived())
         {
@@ -283,7 +280,7 @@
 
     public boolean isAllContentReceived() throws AMQException
     {
-        return _bodyLengthReceived == _contentHeaderBody.bodySize;
+        return _transientMessageData.isAllContentReceived();
     }
 
     public long getMessageId()
@@ -384,7 +381,7 @@
      */
     public void enqueue(AMQQueue queue) throws AMQException
     {
-        _destinationQueues.add(queue);
+        _transientMessageData.addDestinationQueue(queue);
     }
 
     public void dequeue(AMQQueue queue) throws AMQException
@@ -394,11 +391,9 @@
 
     public boolean isPersistent() throws AMQException
     {
-        if (_contentHeaderBody != null)
+        if (_transientMessageData != null)
         {
-            //todo remove literal values to a constant file such as 
AMQConstants in common
-            return _contentHeaderBody.properties instanceof 
BasicContentHeaderProperties &&
-                 ((BasicContentHeaderProperties) 
_contentHeaderBody.properties).getDeliveryMode() == 2;
+            return _transientMessageData.isPersistent();
         }
         else
         {
@@ -425,9 +420,9 @@
     public BasicPublishBody getPublishBody() throws AMQException
     {
         BasicPublishBody pb;
-        if (_publishBody != null)
+        if (_transientMessageData != null)
         {
-            pb = _publishBody;
+            pb = _transientMessageData.getPublishBody();
         }
         else
         {
@@ -452,26 +447,30 @@
 
     private void deliver() throws AMQException
     {
-        // first we allow the handle to know that the message has been fully 
received. This is useful if it is
-        // maintaining any calculated values based on content chunks
+        // we get a reference to the destination queues now so that we can 
clear the
+        // transient message data as quickly as possible
+        List<AMQQueue> destinationQueues = 
_transientMessageData.getDestinationQueues();
         try
         {
-            _messageHandle.setPublishAndContentHeaderBody(_messageId, 
_publishBody, _contentHeaderBody);
-            _publishBody = null;
-            _contentHeaderBody = null;
+            // first we allow the handle to know that the message has been 
fully received. This is useful if it is
+            // maintaining any calculated values based on content chunks
+            _messageHandle.setPublishAndContentHeaderBody(_messageId, 
_transientMessageData.getPublishBody(),
+                                                          
_transientMessageData.getContentHeaderBody());
 
             // we then allow the transactional context to do something with 
the message content
             // now that it has all been received, before we attempt delivery
             _txnContext.messageFullyReceived(isPersistent());
-            for (AMQQueue q : _destinationQueues)
+
+            _transientMessageData = null;
+
+            for (AMQQueue q : destinationQueues)
             {
                 _txnContext.deliver(this, q);
             }
         }
         finally
         {
-            _destinationQueues.clear();
-            _destinationQueues = null;
+            destinationQueues.clear();
             decrementReference();
         }
     }

Added: 
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java?view=auto&rev=483058
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
 (added)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
 Wed Dec  6 04:41:25 2006
@@ -0,0 +1,118 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.AMQException;
+
+import java.util.List;
+import java.util.LinkedList;
+
+/**
+ * Contains data that is only used in AMQMessage transiently, e.g. while the 
content
+ * body fragments are arriving.
+ *
+ * Having this data stored in a separate class means that the AMQMessage class 
avoids
+ * the small overhead of numerous guaranteed-null references.
+ *
+ * @author Apache Software Foundation
+ */
+public class TransientMessageData
+{
+    /**
+     * Stored temporarily until the header has been received at which point it 
is used when
+     * constructing the handle
+     */
+    private BasicPublishBody _publishBody;
+
+    /**
+     * Also stored temporarily.
+     */
+    private ContentHeaderBody _contentHeaderBody;
+
+    /**
+     * Keeps a track of how many bytes we have received in body frames
+     */
+    private long _bodyLengthReceived = 0;
+
+    /**
+     * This is stored during routing, to know the queues to which this message 
should immediately be
+     * delivered. It is <b>cleared after delivery has been attempted</b>. Any 
persistent record of destinations is done
+     * by the message handle.
+     */
+    private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>();
+
+    public BasicPublishBody getPublishBody()
+    {
+        return _publishBody;
+    }
+
+    public void setPublishBody(BasicPublishBody publishBody)
+    {
+        _publishBody = publishBody;
+    }
+
+    public List<AMQQueue> getDestinationQueues()
+    {
+        return _destinationQueues;
+    }
+
+    public void setDestinationQueues(List<AMQQueue> destinationQueues)
+    {
+        _destinationQueues = destinationQueues;
+    }
+
+    public ContentHeaderBody getContentHeaderBody()
+    {
+        return _contentHeaderBody;
+    }
+
+    public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
+    {
+        _contentHeaderBody = contentHeaderBody;
+    }
+
+    public long getBodyLengthReceived()
+    {
+        return _bodyLengthReceived;
+    }
+
+    public void addBodyLength(int value)
+    {
+        _bodyLengthReceived += value;
+    }
+
+    public boolean isAllContentReceived() throws AMQException
+    {
+        return _bodyLengthReceived == _contentHeaderBody.bodySize;
+    }
+
+    public void addDestinationQueue(AMQQueue queue)
+    {
+        _destinationQueues.add(queue);
+    }
+
+    public boolean isPersistent()
+    {
+        //todo remove literal values to a constant file such as AMQConstants 
in common
+        return _contentHeaderBody.properties instanceof 
BasicContentHeaderProperties &&
+             ((BasicContentHeaderProperties) 
_contentHeaderBody.properties).getDeliveryMode() == 2;
+    }
+}

Propchange: 
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=diff&rev=483058&r1=483057&r2=483058
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
 Wed Dec  6 04:41:25 2006
@@ -41,7 +41,7 @@
 
     public ContentHeaderBody getContentHeaderBody(long messageId) throws 
AMQException
     {
-        ContentHeaderBody chb = _contentHeaderBody.get();
+        ContentHeaderBody chb = (_contentHeaderBody != 
null?_contentHeaderBody.get():null);
         if (chb == null)
         {
             MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
@@ -87,7 +87,7 @@
 
     public BasicPublishBody getPublishBody(long messageId) throws AMQException
     {
-        BasicPublishBody bpb = _publishBody.get();
+        BasicPublishBody bpb = (_publishBody != null?_publishBody.get():null);
         if (bpb == null)
         {
             MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);

Modified: 
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java?view=diff&rev=483058&r1=483057&r2=483058
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java
 Wed Dec  6 04:41:25 2006
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,7 +22,6 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.failover.FailoverException;
 
 import javax.jms.JMSException;
 
@@ -56,7 +55,7 @@
                 }
                 catch (FailoverException e)
                 {
-                    _log.info("Failover exception caught during operation");
+                    _log.info("Failover exception caught during operation: " + 
e, e);
                 }
             }
         }


Reply via email to