Author: rgreig
Date: Wed Nov 22 01:37:02 2006
New Revision: 478100
URL: http://svn.apache.org/viewvc?view=rev&rev=478100
Log:
QPID-32 Work-in-progress commit.
Added:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/InMemoryMessageHandle.java
(with props)
Modified:
incubator/qpid/branches/new_persistence/java/broker/etc/log4j.xml
incubator/qpid/branches/new_persistence/java/broker/etc/virtualhosts.xml
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnOp.java
incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/requestreply1/ServiceRequestingClient.java
Modified: incubator/qpid/branches/new_persistence/java/broker/etc/log4j.xml
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/etc/log4j.xml?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/etc/log4j.xml (original)
+++ incubator/qpid/branches/new_persistence/java/broker/etc/log4j.xml Wed Nov
22 01:37:02 2006
@@ -34,9 +34,9 @@
</layout>
</appender>
- <!--<category name="org.apache.qpid.server.store">
+ <category name="org.apache.qpid.server.queue">
<priority value="debug"/>
- </category>-->
+ </category>
<root>
<priority value="info"/>
Modified:
incubator/qpid/branches/new_persistence/java/broker/etc/virtualhosts.xml
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/etc/virtualhosts.xml?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/etc/virtualhosts.xml
(original)
+++ incubator/qpid/branches/new_persistence/java/broker/etc/virtualhosts.xml
Wed Nov 22 01:37:02 2006
@@ -21,5 +21,6 @@
<path>/development</path>
<bind>direct://amq.direct//queue</bind>
<bind>direct://amq.direct//ping</bind>
+ <bind>direct://amq.direct//serviceQ</bind>
</virtualhost>
</virtualhosts>
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/AMQChannel.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/AMQChannel.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/AMQChannel.java
Wed Nov 22 01:37:02 2006
@@ -115,7 +115,7 @@
*/
public void setLocalTransactional()
{
- _txnContext = new LocalTransactionalContext(new
TxnBuffer(_messageStore), _returnMessages);
+ _txnContext = new LocalTransactionalContext(_messageStore, new
TxnBuffer(_messageStore), _returnMessages);
}
public int getChannelId()
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java
Wed Nov 22 01:37:02 2006
@@ -212,7 +212,7 @@
for (AMQQueue q : queues)
{
- payload.registerQueue(q);
+ payload.enqueue(q);
}
}
}
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java
Wed Nov 22 01:37:02 2006
@@ -183,8 +183,8 @@
{
// TODO: modify code generator to add clone() method then clone
the deliver body
// without this addition we have a race condition - we will be
modifying the body
- // before the encoder has encoded the body for delivery
- payload.registerQueue(q);
+ // before the encoder has encoded the body for delivery
+ payload.enqueue(q);
}
}
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java
Wed Nov 22 01:37:02 2006
@@ -185,7 +185,7 @@
_logger.debug("Exchange " + getName() + ": delivering
message with headers " +
headers + " to " + e.queue.getName());
}
- payload.registerQueue(e.queue);
+ payload.enqueue(e.queue);
routed = true;
}
}
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
Wed Nov 22 01:37:02 2006
@@ -65,7 +65,7 @@
{
AMQQueue queue = body.queue == null ? channel.getDefaultQueue() :
queueRegistry.getQueue(body.queue);
- if(queue == null)
+ if (queue == null)
{
_log.info("No queue for '" + body.queue + "'");
}
@@ -80,7 +80,7 @@
//now allow queue to start async processing of any backlog of
messages
queue.deliverAsync();
}
- catch(ConsumerTagNotUniqueException e)
+ catch (ConsumerTagNotUniqueException e)
{
String msg = "Non-unique consumer tag, '" + body.consumerTag +
"'";
session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
AMQConstant.NOT_ALLOWED.getCode(), msg, BasicConsumeBody.CLASS_ID,
BasicConsumeBody.METHOD_ID));
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
Wed Nov 22 01:37:02 2006
@@ -34,7 +34,7 @@
public class AMQMessage
{
private static final Logger _log = Logger.getLogger(AMQMessage.class);
-
+
/**
* Used in clustering
*/
@@ -46,7 +46,7 @@
*/
private AMQProtocolSession _publisher;
- private long _messageId;
+ private final long _messageId;
private final AtomicInteger _referenceCount = new AtomicInteger(1);
@@ -58,6 +58,9 @@
*/
private BasicPublishBody _publishBody;
+ /**
+ * Also stored temporarily.
+ */
private ContentHeaderBody _contentHeaderBody;
/**
@@ -74,6 +77,11 @@
*/
private boolean _deliveredToConsumer;
+ /**
+ * This is stored during routing, to know the queues to which this message
should immediately be
+ * delivered. It is <b>cleared after delivery has been attempted</b>. Any
persistent record of destinations is done
+ * by the message handle.
+ */
private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>();
/**
@@ -100,7 +108,7 @@
{
try
{
- ContentBody cb = _messageHandle.getContentBody(++_index);
+ ContentBody cb = _messageHandle.getContentBody(_messageId,
++_index);
return ContentBody.createAMQFrame(_channel, cb);
}
catch (AMQException e)
@@ -131,7 +139,7 @@
{
try
{
- return _messageHandle.getContentBody(++_index);
+ return _messageHandle.getContentBody(_messageId, ++_index);
}
catch (AMQException e)
{
@@ -150,6 +158,10 @@
_messageId = messageId;
_txnContext = txnContext;
_publishBody = publishBody;
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Message created with id " + messageId);
+ }
}
protected AMQMessage(AMQMessage msg) throws AMQException
@@ -161,14 +173,6 @@
_deliveredToConsumer = msg._deliveredToConsumer;
}
- public void storeMessage() throws AMQException
- {
- /*if (isPersistent())
- {
- _store.put(this);
- } */
- }
-
public Iterator<AMQDataBlock> getBodyFrameIterator(int channel)
{
return new BodyFrameIterator(channel);
@@ -179,49 +183,9 @@
return new BodyContentIterator();
}
- /*public CompositeAMQDataBlock getDataBlock(ByteBuffer encodedDeliverBody,
int channel)
- {
- AMQFrame[] allFrames = new AMQFrame[1 + _contentBodies.size()];
-
- allFrames[0] = ContentHeaderBody.createAMQFrame(channel,
_contentHeaderBody);
- for (int i = 1; i < allFrames.length; i++)
- {
- allFrames[i] = ContentBody.createAMQFrame(channel,
_contentBodies.get(i - 1));
- }
- return new CompositeAMQDataBlock(encodedDeliverBody, allFrames);
- }
-
- public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag,
long deliveryTag)
- {
- AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()];
-
- allFrames[0] = BasicDeliverBody.createAMQFrame(channel, consumerTag,
deliveryTag, _redelivered,
- getExchangeName(),
getRoutingKey());
- allFrames[1] = ContentHeaderBody.createAMQFrame(channel,
_contentHeaderBody);
- for (int i = 2; i < allFrames.length; i++)
- {
- allFrames[i] = ContentBody.createAMQFrame(channel,
_contentBodies.get(i - 2));
- }
- return new CompositeAMQDataBlock(allFrames);
- }
-
- public List<AMQBody> getPayload()
- {
- List<AMQBody> payload = new ArrayList<AMQBody>(2 +
_contentBodies.size());
- payload.add(_publishBody);
- payload.add(_contentHeaderBody);
- payload.addAll(_contentBodies);
- return payload;
- }
-
- public BasicPublishBody getPublishBody()
- {
- return _publishBody;
- } */
-
public ContentHeaderBody getContentHeaderBody() throws AMQException
{
- return _messageHandle.getContentHeaderBody();
+ return _messageHandle.getContentHeaderBody(_messageId);
}
public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
@@ -233,13 +197,19 @@
public void routingComplete(MessageStore store, MessageHandleFactory
factory) throws AMQException
{
final boolean persistent = isPersistent();
- _messageId = store.getNewMessageId();
_messageHandle = factory.createMessageHandle(_messageId, store,
persistent);
if (persistent)
{
_txnContext.beginTranIfNecessary();
}
+ // enqueuing the messages ensure that if required the destinations are
recorded to a
+ // persistent store
+ for (AMQQueue q : _destinationQueues)
+ {
+ _messageHandle.enqueue(_messageId, q);
+ }
+
if (_contentHeaderBody.bodySize == 0)
{
deliver();
@@ -249,7 +219,7 @@
public boolean addContentBodyFrame(ContentBody contentBody) throws
AMQException
{
_bodyLengthReceived += contentBody.getSize();
- _messageHandle.addContentBodyFrame(contentBody);
+ _messageHandle.addContentBodyFrame(_messageId, contentBody);
if (isAllContentReceived())
{
deliver();
@@ -304,7 +274,7 @@
{
_log.debug("Ref count on message " + _messageId + " is
zero; removing message");
}
- _messageHandle.removeMessage();
+ _messageHandle.removeMessage(_messageId);
}
catch (AMQException e)
{
@@ -313,6 +283,17 @@
throw new MessageCleanupException(_messageId, e);
}
}
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Ref count is now " + _referenceCount + " for
message id " + _messageId);
+ if (_referenceCount.get() < 0)
+ {
+ Thread.dumpStack();
+ }
+ }
+ }
}
public void setPublisher(AMQProtocolSession publisher)
@@ -338,25 +319,22 @@
}
}
+ /**
+ * Registers a queue to which this message is to be delivered. This is
+ * called from the exchange when it is routing the message. This will be
called before any content bodies have
+ * been received so that the choice of AMQMessageHandle implementation can
be picked based on various criteria.
+ *
+ * @param queue the queue
+ * @throws org.apache.qpid.AMQException if there is an error enqueuing the
message
+ */
public void enqueue(AMQQueue queue) throws AMQException
{
- //if the message is not persistent or the queue is not durable
- //we will not need to recover the association and so do not
- //need to record it
- /*if (isPersistent() && queue.isDurable())
- {
- _store.enqueueMessage(queue.getName(), _messageId);
- } */
+ _destinationQueues.add(queue);
}
public void dequeue(AMQQueue queue) throws AMQException
{
- //only record associations where both queue and message will survive
- //a restart, so only need to remove association if this is the case
- /*if (isPersistent() && queue.isDurable())
- {
- _store.dequeueMessage(queue.getName(), _messageId);
- } */
+ _messageHandle.dequeue(_messageId, queue);
}
public boolean isPersistent() throws AMQException
@@ -369,7 +347,7 @@
}
else
{
- return _messageHandle.isPersistent();
+ return _messageHandle.isPersistent(_messageId);
}
}
@@ -398,7 +376,7 @@
}
else
{
- pb = _messageHandle.getPublishBody();
+ pb = _messageHandle.getPublishBody(_messageId);
}
return pb;
}
@@ -412,32 +390,34 @@
_deliveredToConsumer = true;
}
- /**
- * Registers a queue to which this message is to be delivered. This is
- * called from the exchange when it is routing the message. This will be
called before any content bodies have
- * been received so that the choice of AMQMessageHandle implementation can
be picked based on various criteria.
- *
- * @param queue the queue
- */
- public void registerQueue(AMQQueue queue)
+ /*public void registerQueue(AMQQueue queue)
{
_destinationQueues.add(queue);
- }
+ } */
private void deliver() throws AMQException
{
// first we allow the handle to know that the message has been fully
received. This is useful if it is
// maintaining any calculated values based on content chunks
- _messageHandle.setPublishAndContentHeaderBody(_publishBody,
_contentHeaderBody);
- _publishBody = null;
- _contentHeaderBody = null;
-
- // we then allow the transactional context to do something with the
message content
- // now that it has all been received, before we attempt delivery
- _txnContext.messageFullyReceived(isPersistent());
- for (AMQQueue q : _destinationQueues)
+ try
+ {
+ _messageHandle.setPublishAndContentHeaderBody(_messageId,
_publishBody, _contentHeaderBody);
+ _publishBody = null;
+ _contentHeaderBody = null;
+
+ // we then allow the transactional context to do something with
the message content
+ // now that it has all been received, before we attempt delivery
+ _txnContext.messageFullyReceived(isPersistent());
+ for (AMQQueue q : _destinationQueues)
+ {
+ _txnContext.deliver(this, q);
+ }
+ }
+ finally
{
- _txnContext.deliver(this, q);
+ _destinationQueues.clear();
+ _destinationQueues = null;
+ decrementReference();
}
}
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
Wed Nov 22 01:37:02 2006
@@ -9,16 +9,20 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.BasicPublishBody;
/**
- * @author Robert Greig ([EMAIL PROTECTED])
+ * A pluggable way of getting message data. Implementations can provide
intelligent caching for example or
+ * even no caching at all to minimise the broker memory footprint.
+ *
+ * The method all take a messageId to avoid having to store it in the instance
- the AMQMessage container
+ * must already keen the messageId so it is pointless storing it twice.
*/
public interface AMQMessageHandle
{
- ContentHeaderBody getContentHeaderBody() throws AMQException;
+ ContentHeaderBody getContentHeaderBody(long messageId) throws AMQException;
/**
* @return the number of body frames associated with this message
@@ -28,7 +32,7 @@
/**
* @return the size of the body
*/
- long getBodySize() throws AMQException;
+ long getBodySize(long messageId) throws AMQException;
/**
* Get a particular content body
@@ -36,19 +40,23 @@
* @return a content body
* @throws IllegalArgumentException if the index is invalid
*/
- ContentBody getContentBody(int index) throws IllegalArgumentException,
AMQException;
+ ContentBody getContentBody(long messageId, int index) throws
IllegalArgumentException, AMQException;
- void addContentBodyFrame(ContentBody contentBody) throws AMQException;
+ void addContentBodyFrame(long messageId, ContentBody contentBody) throws
AMQException;
- BasicPublishBody getPublishBody() throws AMQException;
+ BasicPublishBody getPublishBody(long messageId) throws AMQException;
boolean isRedelivered();
- boolean isPersistent() throws AMQException;
+ boolean isPersistent(long messageId) throws AMQException;
- void setPublishAndContentHeaderBody(BasicPublishBody publishBody,
ContentHeaderBody contentHeaderBody)
+ void setPublishAndContentHeaderBody(long messageId, BasicPublishBody
publishBody,
+ ContentHeaderBody contentHeaderBody)
throws AMQException;
- void removeMessage() throws AMQException;
+ void removeMessage(long messageId) throws AMQException;
+
+ void enqueue(long messageId, AMQQueue queue) throws AMQException;
+ void dequeue(long messageId, AMQQueue queue) throws AMQException;
}
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
Wed Nov 22 01:37:02 2006
@@ -698,7 +698,7 @@
msg.dequeue(this);
msg.decrementReference();
}
- catch(MessageCleanupException e)
+ catch (MessageCleanupException e)
{
//Message was dequeued, but could notthen be deleted
//though it is no longer referenced. This should be very
Added:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/InMemoryMessageHandle.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/InMemoryMessageHandle.java?view=auto&rev=478100
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/InMemoryMessageHandle.java
(added)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/InMemoryMessageHandle.java
Wed Nov 22 01:37:02 2006
@@ -0,0 +1,110 @@
+/**
+ * User: Robert Greig
+ * Date: 21-Nov-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+
******************************************************************************/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ */
+public class InMemoryMessageHandle implements AMQMessageHandle
+{
+
+ private ContentHeaderBody _contentHeaderBody;
+
+ private BasicPublishBody _publishBody;
+
+ private List<ContentBody> _contentBodies = new LinkedList<ContentBody>();
+
+ private boolean _redelivered;
+
+ public InMemoryMessageHandle()
+ {
+ }
+
+ public ContentHeaderBody getContentHeaderBody(long messageId) throws
AMQException
+ {
+ return _contentHeaderBody;
+ }
+
+ public int getBodyCount()
+ {
+ return _contentBodies.size();
+ }
+
+ public long getBodySize(long messageId) throws AMQException
+ {
+ return getContentHeaderBody(messageId).bodySize;
+ }
+
+ public ContentBody getContentBody(long messageId, int index) throws
AMQException, IllegalArgumentException
+ {
+ if (index > _contentBodies.size() - 1)
+ {
+ throw new IllegalArgumentException("Index " + index + " out of
valid range 0 to " +
+ (_contentBodies.size() - 1));
+ }
+ return _contentBodies.get(index);
+ }
+
+ public void addContentBodyFrame(long messageId, ContentBody contentBody)
throws AMQException
+ {
+ _contentBodies.add(contentBody);
+ }
+
+ public BasicPublishBody getPublishBody(long messageId) throws AMQException
+ {
+ return _publishBody;
+ }
+
+ public boolean isRedelivered()
+ {
+ return _redelivered;
+ }
+
+ public boolean isPersistent(long messageId) throws AMQException
+ {
+ //todo remove literal values to a constant file such as AMQConstants
in common
+ ContentHeaderBody chb = getContentHeaderBody(messageId);
+ return chb.properties instanceof BasicContentHeaderProperties &&
+ ((BasicContentHeaderProperties)
chb.properties).getDeliveryMode() == 2;
+ }
+
+ /**
+ * This is called when all the content has been received.
+ * @param publishBody
+ * @param contentHeaderBody
+ * @throws AMQException
+ */
+ public void setPublishAndContentHeaderBody(long messageId,
BasicPublishBody publishBody,
+ ContentHeaderBody
contentHeaderBody)
+ throws AMQException
+ {
+ _publishBody = publishBody;
+ _contentHeaderBody = contentHeaderBody;
+ }
+
+ public void removeMessage(long messageId) throws AMQException
+ {
+ }
+
+ public void enqueue(long messageId, AMQQueue queue) throws AMQException
+ {
+ }
+
+ public void dequeue(long messageId, AMQQueue queue) throws AMQException
+ {
+ }
+}
Propchange:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/InMemoryMessageHandle.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java
Wed Nov 22 01:37:02 2006
@@ -22,6 +22,13 @@
public AMQMessageHandle createMessageHandle(long messageId, MessageStore
store, boolean persistent)
{
// just hardcoded for now
- return new WeakReferenceMessageHandle(store, messageId);
+ if (persistent)
+ {
+ return new WeakReferenceMessageHandle(store);
+ }
+ else
+ {
+ return new InMemoryMessageHandle();
+ }
}
}
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
Wed Nov 22 01:37:02 2006
@@ -9,15 +9,15 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.server.store.MessageStore;
-import java.util.List;
-import java.util.LinkedList;
import java.lang.ref.WeakReference;
+import java.util.LinkedList;
+import java.util.List;
/**
* @author Robert Greig ([EMAIL PROTECTED])
@@ -34,20 +34,17 @@
private final MessageStore _messageStore;
- private final long _messageId;
-
- public WeakReferenceMessageHandle(MessageStore messageStore, long
messageId)
+ public WeakReferenceMessageHandle(MessageStore messageStore)
{
_messageStore = messageStore;
- _messageId = messageId;
}
- public ContentHeaderBody getContentHeaderBody() throws AMQException
+ public ContentHeaderBody getContentHeaderBody(long messageId) throws
AMQException
{
ContentHeaderBody chb = _contentHeaderBody.get();
if (chb == null)
{
- MessageMetaData mmd = _messageStore.getMessageMetaData(_messageId);
+ MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
chb = mmd.getContentHeaderBody();
_contentHeaderBody = new WeakReference<ContentHeaderBody>(chb);
_publishBody = new
WeakReference<BasicPublishBody>(mmd.getPublishBody());
@@ -60,12 +57,12 @@
return _contentBodies.size();
}
- public long getBodySize() throws AMQException
+ public long getBodySize(long messageId) throws AMQException
{
- return getContentHeaderBody().bodySize;
+ return getContentHeaderBody(messageId).bodySize;
}
- public ContentBody getContentBody(int index) throws AMQException,
IllegalArgumentException
+ public ContentBody getContentBody(long messageId, int index) throws
AMQException, IllegalArgumentException
{
if (index > _contentBodies.size() - 1)
{
@@ -76,24 +73,24 @@
ContentBody cb = wr.get();
if (cb == null)
{
- cb = _messageStore.getContentBodyChunk(_messageId, index);
+ cb = _messageStore.getContentBodyChunk(messageId, index);
_contentBodies.set(index, new WeakReference<ContentBody>(cb));
}
return cb;
}
- public void addContentBodyFrame(ContentBody contentBody) throws
AMQException
+ public void addContentBodyFrame(long messageId, ContentBody contentBody)
throws AMQException
{
_contentBodies.add(new WeakReference<ContentBody>(contentBody));
- _messageStore.storeContentBodyChunk(_messageId, _contentBodies.size()
- 1, contentBody);
+ _messageStore.storeContentBodyChunk(messageId, _contentBodies.size() -
1, contentBody);
}
- public BasicPublishBody getPublishBody() throws AMQException
+ public BasicPublishBody getPublishBody(long messageId) throws AMQException
{
BasicPublishBody bpb = _publishBody.get();
if (bpb == null)
{
- MessageMetaData mmd = _messageStore.getMessageMetaData(_messageId);
+ MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
bpb = mmd.getPublishBody();
_publishBody = new WeakReference<BasicPublishBody>(bpb);
_contentHeaderBody = new
WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody());
@@ -106,10 +103,10 @@
return _redelivered;
}
- public boolean isPersistent() throws AMQException
+ public boolean isPersistent(long messageId) throws AMQException
{
//todo remove literal values to a constant file such as AMQConstants
in common
- ContentHeaderBody chb = getContentHeaderBody();
+ ContentHeaderBody chb = getContentHeaderBody(messageId);
return chb.properties instanceof BasicContentHeaderProperties &&
((BasicContentHeaderProperties)
chb.properties).getDeliveryMode() == 2;
}
@@ -120,17 +117,28 @@
* @param contentHeaderBody
* @throws AMQException
*/
- public void setPublishAndContentHeaderBody(BasicPublishBody publishBody,
ContentHeaderBody contentHeaderBody)
+ public void setPublishAndContentHeaderBody(long messageId,
BasicPublishBody publishBody,
+ ContentHeaderBody
contentHeaderBody)
throws AMQException
{
- _messageStore.storeMessageMetaData(_messageId, new
MessageMetaData(publishBody, contentHeaderBody,
+ _messageStore.storeMessageMetaData(messageId, new
MessageMetaData(publishBody, contentHeaderBody,
_contentBodies.size()));
_publishBody = new WeakReference<BasicPublishBody>(publishBody);
_contentHeaderBody = new
WeakReference<ContentHeaderBody>(contentHeaderBody);
}
- public void removeMessage() throws AMQException
+ public void removeMessage(long messageId) throws AMQException
+ {
+ _messageStore.removeMessage(messageId);
+ }
+
+ public void enqueue(long messageId, AMQQueue queue) throws AMQException
+ {
+ _messageStore.enqueueMessage(queue.getName(), messageId);
+ }
+
+ public void dequeue(long messageId, AMQQueue queue) throws AMQException
{
- _messageStore.removeMessage(_messageId);
+ _messageStore.dequeueMessage(queue.getName(), messageId);
}
}
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java
Wed Nov 22 01:37:02 2006
@@ -28,12 +28,11 @@
{
_msg = msg;
_queue = queue;
+ _msg.incrementReference();
}
public void prepare() throws AMQException
- {
- //do the persistent part of the record()
- _msg.enqueue(_queue);
+ {
}
public void undoPrepare()
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java
Wed Nov 22 01:37:02 2006
@@ -23,6 +23,7 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.store.MessageStore;
import java.util.List;
@@ -41,10 +42,17 @@
private List<RequiredDeliveryException> _returnMessages;
- public LocalTransactionalContext(TxnBuffer txnBuffer,
List<RequiredDeliveryException> returnMessages)
+ private final MessageStore _messageStore;
+
+ private boolean _inTran = false;
+
+ public LocalTransactionalContext(MessageStore messageStore,
+ TxnBuffer txnBuffer,
List<RequiredDeliveryException> returnMessages)
{
+ _messageStore = messageStore;
_txnBuffer = txnBuffer;
_returnMessages = returnMessages;
+ _txnBuffer.enlist(new StoreMessageOperation(messageStore));
}
public void rollback() throws AMQException
@@ -66,7 +74,7 @@
// be added for every queue onto which the message is
// enqueued. Finally a cleanup op will be added to decrement
// the reference associated with the routing.
- _txnBuffer.enlist(new StoreMessageOperation(message));
+
_txnBuffer.enlist(new DeliverMessageOperation(message, queue));
_txnBuffer.enlist(new CleanupMessageOperation(message,
_returnMessages));
}
@@ -110,12 +118,16 @@
public void messageFullyReceived(boolean persistent) throws AMQException
{
- //To change body of implemented methods use File | Settings | File
Templates.
+ // Not required in this transactional context
}
public void beginTranIfNecessary() throws AMQException
{
- //To change body of implemented methods use File | Settings | File
Templates.
+ if (!_inTran)
+ {
+ _messageStore.beginTran();
+ _inTran = true;
+ }
}
public void commit() throws AMQException
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java
Wed Nov 22 01:37:02 2006
@@ -21,12 +21,12 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.store.MessageStore;
import java.util.LinkedList;
import java.util.List;
@@ -60,7 +60,7 @@
{
_channel = channel;
_returnMessages = returnMessages;
- _messageStore = messageStore;
+ _messageStore = messageStore;
}
public void beginTranIfNecessary() throws AMQException
@@ -86,6 +86,7 @@
{
try
{
+ message.incrementReference();
queue.process(message);
//following check implements the functionality
//required by the 'immediate' flag:
@@ -95,10 +96,6 @@
{
_returnMessages.add(e);
}
- finally
- {
- message.decrementReference();
- }
}
public void acknowledgeMessage(final long deliveryTag, long
lastDeliveryTag,
@@ -162,7 +159,7 @@
}
}
}
-
+
public void messageFullyReceived(boolean persistent) throws AMQException
{
if (persistent)
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java
Wed Nov 22 01:37:02 2006
@@ -8,40 +8,38 @@
******************************************************************************/
package org.apache.qpid.server.txn;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.MessageStore;
/**
- * @author Robert Greig ([EMAIL PROTECTED])
+ * A transactional operation to store messages in an underlying persistent
store. When this operation
+ * commits it will do everything to ensure that all messages are safely
committed to persistent
+ * storage.
*/
public class StoreMessageOperation implements TxnOp
{
- //just use this to do a store of the message during the
- //prepare phase. Any enqueueing etc is done by TxnOps enlisted
- //by the queues themselves.
- private final AMQMessage _msg;
+ private final MessageStore _messsageStore;
- public StoreMessageOperation(AMQMessage msg)
+ public StoreMessageOperation(MessageStore messageStore)
{
- _msg = msg;
+ _messsageStore = messageStore;
}
public void prepare() throws AMQException
{
- _msg.storeMessage();
- // the router's reference can now be released
- _msg.decrementReference();
}
public void undoPrepare()
{
}
- public void commit()
+ public void commit() throws AMQException
{
+ _messsageStore.commitTran();
}
- public void rollback()
+ public void rollback() throws AMQException
{
+ _messsageStore.abortTran();
}
}
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java
Wed Nov 22 01:37:02 2006
@@ -50,11 +50,9 @@
if (_containsPersistentChanges)
{
_log.debug("Begin Transaction.");
- _store.beginTran();
if (prepare())
{
_log.debug("Transaction Succeeded");
- _store.commitTran();
for (TxnOp op : _ops)
{
op.commit();
@@ -62,8 +60,7 @@
}
else
{
- _log.debug("Transaction Failed");
- _store.abortTran();
+ _log.debug("Transaction Failed");
}
}
else
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnOp.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnOp.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnOp.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnOp.java
Wed Nov 22 01:37:02 2006
@@ -26,14 +26,14 @@
public interface TxnOp
{
/**
- * Do the part of the operation that updates persistent state
+ * Do the part of the operation that updates persistent state
*/
public void prepare() throws AMQException;
/**
* Complete the operation started by prepare. Can now update in
* memory state or make netork transfers.
*/
- public void commit();
+ public void commit() throws AMQException;
/**
* This is not the same as rollback. Unfortunately the use of an
* in memory reference count as a locking mechanism and a test for
@@ -47,5 +47,5 @@
/**
* Rolls back the operation.
*/
- public void rollback();
+ public void rollback() throws AMQException;
}
Modified:
incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/requestreply1/ServiceRequestingClient.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/requestreply1/ServiceRequestingClient.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
---
incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/requestreply1/ServiceRequestingClient.java
(original)
+++
incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/requestreply1/ServiceRequestingClient.java
Wed Nov 22 01:37:02 2006
@@ -172,7 +172,8 @@
try
{
createConnection(brokerHosts, clientID, username, password, vpath);
- _session = (Session) _connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ //_session = (Session) _connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ _session = (Session) _connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
_connection.setExceptionListener(this);
@@ -192,6 +193,7 @@
TextMessage first = _session.createTextMessage(MESSAGE_DATA);
first.setJMSReplyTo(_tempDestination);
_producer.send(first);
+ _session.commit(); // TODO REMOVE
try
{
Thread.sleep(1000);
@@ -231,6 +233,7 @@
}
_producer.send(msg);
}
+ _session.commit(); // TODO REMOVE
_log.info("Finished sending " + _messageCount + " messages");
}