Author: rgreig
Date: Sun Nov 19 12:31:57 2006
New Revision: 476911

URL: http://svn.apache.org/viewvc?view=rev&rev=476911
Log:
QPID-32: sync of changes.

Modified:
    incubator/qpid/branches/new_persistence/java/broker/etc/config.xml
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java

Modified: incubator/qpid/branches/new_persistence/java/broker/etc/config.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/etc/config.xml?view=diff&rev=476911&r1=476910&r2=476911
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/etc/config.xml 
(original)
+++ incubator/qpid/branches/new_persistence/java/broker/etc/config.xml Sun Nov 
19 12:31:57 2006
@@ -82,7 +82,8 @@
         <auto_register>true</auto_register>
     </queue>
     <store>
-        <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+        <!--<class>org.apache.qpid.server.store.MemoryMessageStore</class>-->
+        <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
     </store>
     <virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
 </broker>

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=476911&r1=476910&r2=476911
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
 Sun Nov 19 12:31:57 2006
@@ -23,6 +23,7 @@
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.log4j.Logger;
 
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -32,6 +33,8 @@
  */
 public class AMQMessage
 {
+    private static final Logger _log = Logger.getLogger(AMQMessage.class);
+    
     /**
      * Used in clustering
      */
@@ -260,7 +263,7 @@
 
     public boolean isAllContentReceived() throws AMQException
     {
-        return _bodyLengthReceived == _messageHandle.getBodySize();
+        return _bodyLengthReceived == _contentHeaderBody.bodySize;
     }
 
     public long getMessageId()
@@ -274,6 +277,10 @@
     public void incrementReference()
     {
         _referenceCount.incrementAndGet();
+        if (_log.isDebugEnabled())
+        {
+            _log.debug("Ref count on message " + _messageId + " incremented to 
" + _referenceCount);
+        }
     }
 
     /**
@@ -289,11 +296,15 @@
         // have to be atomic since the ref count starts at 1 and the exchange 
itself decrements that after
         // the message has been passed to all queues. i.e. we are
         // not relying on the all the increments having taken place before the 
delivery manager decrements.
-        /*if (_referenceCount.decrementAndGet() == 0)
+        if (_referenceCount.decrementAndGet() == 0)
         {
             try
             {
-                _store.removeMessage(_messageId);
+                if (_log.isDebugEnabled())
+                {
+                    _log.debug("Ref count on message " + _messageId + " is 
zero; removing message");
+                }
+                _messageHandle.removeMessage();
             }
             catch (AMQException e)
             {
@@ -301,7 +312,7 @@
                 incrementReference();
                 throw new MessageCleanupException(_messageId, e);
             }
-        } */
+        }
     }
 
     public void setPublisher(AMQProtocolSession publisher)

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java?view=diff&rev=476911&r1=476910&r2=476911
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
 Sun Nov 19 12:31:57 2006
@@ -47,6 +47,8 @@
     boolean isPersistent() throws AMQException;
 
     void setPublishAndContentHeaderBody(BasicPublishBody publishBody, 
ContentHeaderBody contentHeaderBody)
-            throws AMQException;    
+            throws AMQException;
+
+    void removeMessage() throws AMQException;
 
 }

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=diff&rev=476911&r1=476910&r2=476911
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
 Sun Nov 19 12:31:57 2006
@@ -128,4 +128,9 @@
         _publishBody = new WeakReference<BasicPublishBody>(publishBody);
         _contentHeaderBody = new 
WeakReference<ContentHeaderBody>(contentHeaderBody);
     }
+
+    public void removeMessage() throws AMQException
+    {
+        _messageStore.removeMessage(_messageId);
+    }
 }


Reply via email to