Author: rgreig
Date: Sun Nov 19 12:31:57 2006
New Revision: 476911
URL: http://svn.apache.org/viewvc?view=rev&rev=476911
Log:
QPID-32: sync of changes.
Modified:
incubator/qpid/branches/new_persistence/java/broker/etc/config.xml
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
Modified: incubator/qpid/branches/new_persistence/java/broker/etc/config.xml
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/etc/config.xml?view=diff&rev=476911&r1=476910&r2=476911
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/etc/config.xml
(original)
+++ incubator/qpid/branches/new_persistence/java/broker/etc/config.xml Sun Nov
19 12:31:57 2006
@@ -82,7 +82,8 @@
<auto_register>true</auto_register>
</queue>
<store>
- <class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ <!--<class>org.apache.qpid.server.store.MemoryMessageStore</class>-->
+ <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
</store>
<virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
</broker>
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=476911&r1=476910&r2=476911
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
Sun Nov 19 12:31:57 2006
@@ -23,6 +23,7 @@
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.log4j.Logger;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -32,6 +33,8 @@
*/
public class AMQMessage
{
+ private static final Logger _log = Logger.getLogger(AMQMessage.class);
+
/**
* Used in clustering
*/
@@ -260,7 +263,7 @@
public boolean isAllContentReceived() throws AMQException
{
- return _bodyLengthReceived == _messageHandle.getBodySize();
+ return _bodyLengthReceived == _contentHeaderBody.bodySize;
}
public long getMessageId()
@@ -274,6 +277,10 @@
public void incrementReference()
{
_referenceCount.incrementAndGet();
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Ref count on message " + _messageId + " incremented to
" + _referenceCount);
+ }
}
/**
@@ -289,11 +296,15 @@
// have to be atomic since the ref count starts at 1 and the exchange
itself decrements that after
// the message has been passed to all queues. i.e. we are
// not relying on the all the increments having taken place before the
delivery manager decrements.
- /*if (_referenceCount.decrementAndGet() == 0)
+ if (_referenceCount.decrementAndGet() == 0)
{
try
{
- _store.removeMessage(_messageId);
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Ref count on message " + _messageId + " is
zero; removing message");
+ }
+ _messageHandle.removeMessage();
}
catch (AMQException e)
{
@@ -301,7 +312,7 @@
incrementReference();
throw new MessageCleanupException(_messageId, e);
}
- } */
+ }
}
public void setPublisher(AMQProtocolSession publisher)
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java?view=diff&rev=476911&r1=476910&r2=476911
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
Sun Nov 19 12:31:57 2006
@@ -47,6 +47,8 @@
boolean isPersistent() throws AMQException;
void setPublishAndContentHeaderBody(BasicPublishBody publishBody,
ContentHeaderBody contentHeaderBody)
- throws AMQException;
+ throws AMQException;
+
+ void removeMessage() throws AMQException;
}
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=diff&rev=476911&r1=476910&r2=476911
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
Sun Nov 19 12:31:57 2006
@@ -128,4 +128,9 @@
_publishBody = new WeakReference<BasicPublishBody>(publishBody);
_contentHeaderBody = new
WeakReference<ContentHeaderBody>(contentHeaderBody);
}
+
+ public void removeMessage() throws AMQException
+ {
+ _messageStore.removeMessage(_messageId);
+ }
}