Added:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java?view=auto&rev=470908
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java
(added)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java
Fri Nov 3 09:17:11 2006
@@ -0,0 +1,30 @@
+/**
+ * User: Robert Greig
+ * Date: 31-Oct-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+
******************************************************************************/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.store.MessageStore;
+
+/**
+ * Constructs a message handle based on the publish body, the content header
and the queue to which the message
+ * has been routed.
+ *
+ * @author Robert Greig ([EMAIL PROTECTED])
+ */
+public class MessageHandleFactory
+{
+
+ public AMQMessageHandle createMessageHandle(MessageStore store,
BasicPublishBody publish,
+ ContentHeaderBody
contentHeader)
+ {
+ // just hardcoded for now
+ return new WeakReferenceMessageHandle(store, contentHeader);
+ }
+}
Propchange:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java?view=diff&rev=470908&r1=470907&r2=470908
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java
Fri Nov 3 09:17:11 2006
@@ -17,13 +17,8 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.protocol.AMQConstant;
-
-import java.util.List;
+import org.apache.qpid.server.RequiredDeliveryException;
/**
* Signals that no consumers exist for a message at a given point in time.
@@ -32,19 +27,9 @@
*/
public class NoConsumersException extends RequiredDeliveryException
{
- public NoConsumersException(String queue,
- BasicPublishBody publishBody,
- ContentHeaderBody contentHeaderBody,
- List<ContentBody> contentBodies)
- {
- super("Immediate delivery to " + queue + " is not possible.",
publishBody, contentHeaderBody, contentBodies);
- }
-
- public NoConsumersException(BasicPublishBody publishBody,
- ContentHeaderBody contentHeaderBody,
- List<ContentBody> contentBodies)
+ public NoConsumersException(AMQMessage message)
{
- super("Immediate delivery is not possible.", publishBody,
contentHeaderBody, contentBodies);
+ super("Immediate delivery is not possible.", message);
}
public int getReplyCode()
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/Subscription.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=470908&r1=470907&r2=470908
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/Subscription.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/Subscription.java
Fri Nov 3 09:17:11 2006
@@ -25,5 +25,5 @@
boolean isSuspended();
- void queueDeleted(AMQQueue queue);
+ void queueDeleted(AMQQueue queue) throws AMQException;
}
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=470908&r1=470907&r2=470908
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
Fri Nov 3 09:17:11 2006
@@ -18,11 +18,7 @@
package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicDeliverBody;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -127,8 +123,8 @@
if (msg != null)
{
// if we do not need to wait for client acknowledgements
- // we can decrement the reference count immediately.
-
+ // we can decrement the reference count immediately.
+
// By doing this _before_ the send we ensure that it
// doesn't get sent if it can't be dequeued, preventing
// duplicate delivery on recovery.
@@ -148,10 +144,7 @@
channel.addUnacknowledgedMessage(msg, deliveryTag,
consumerTag, queue);
}
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag,
msg.getRoutingKey(), msg.getExchangeName());
- AMQDataBlock frame = msg.getDataBlock(deliver,
channel.getChannelId());
-
- protocolSession.writeFrame(frame);
+ msg.writeDeliver(protocolSession, channel.getChannelId(),
deliveryTag, consumerTag);
}
}
else
@@ -170,19 +163,8 @@
*
* @param queue
*/
- public void queueDeleted(AMQQueue queue)
+ public void queueDeleted(AMQQueue queue) throws AMQException
{
channel.queueDeleted(queue);
- }
-
- private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String
routingKey, String exchange)
- {
- AMQFrame deliverFrame =
BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag,
- deliveryTag,
false, exchange,
- routingKey);
- ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); //
XXX: Could cast be a problem?
- deliverFrame.writePayload(buf);
- buf.flip();
- return buf;
}
}
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=470908&r1=470907&r2=470908
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java
Fri Nov 3 09:17:11 2006
@@ -18,6 +18,8 @@
package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -166,7 +168,7 @@
* channel, which in turn can update its list of unacknowledged messages.
* @param queue
*/
- public void queueDeleted(AMQQueue queue)
+ public void queueDeleted(AMQQueue queue) throws AMQException
{
for (Subscription s : _subscriptions)
{
Added:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=auto&rev=470908
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
(added)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
Fri Nov 3 09:17:11 2006
@@ -0,0 +1,99 @@
+/**
+ * User: Robert Greig
+ * Date: 23-Oct-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+
******************************************************************************/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import java.util.List;
+
+/**
+ * @author Robert Greig ([EMAIL PROTECTED])
+ */
+public class WeakReferenceMessageHandle implements AMQMessageHandle
+{
+ private ContentHeaderBody _contentHeaderBody;
+
+ private List<ContentBody> _contentBodies;
+
+ private boolean _redelivered;
+
+ private final MessageStore _messageStore;
+
+ public WeakReferenceMessageHandle(MessageStore messageStore,
ContentHeaderBody contentHeader)
+ {
+ _messageStore = messageStore;
+ _contentHeaderBody = contentHeader;
+ }
+
+ public ContentHeaderBody getContentHeaderBody()
+ {
+ return _contentHeaderBody;
+ }
+
+ public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
throws AMQException
+ {
+ _contentHeaderBody = contentHeaderBody;
+ }
+
+ public int getBodyCount()
+ {
+ return _contentBodies.size();
+ }
+
+ public long getBodySize()
+ {
+ return 0; //To change body of implemented methods use File | Settings
| File Templates.
+ }
+
+ public ContentBody getContentBody(int index) throws
IllegalArgumentException
+ {
+ return null; //To change body of implemented methods use File |
Settings | File Templates.
+ }
+
+ public void addContentBodyFrame(ContentBody contentBody) throws
AMQException
+ {
+ _contentBodies.add(contentBody);
+ }
+
+ public String getExchangeName()
+ {
+ return null; //To change body of implemented methods use File |
Settings | File Templates.
+ }
+
+ public String getRoutingKey()
+ {
+ return null; //To change body of implemented methods use File |
Settings | File Templates.
+ }
+
+ public boolean isImmediate()
+ {
+ return false; //To change body of implemented methods use File |
Settings | File Templates.
+ }
+
+ public boolean isRedelivered()
+ {
+ return _redelivered;
+ }
+
+ public boolean isPersistent() throws AMQException
+ {
+ if (_contentHeaderBody == null)
+ {
+ throw new AMQException("Cannot determine delivery mode of message.
Content header not found.");
+ }
+
+ //todo remove literal values to a constant file such as AMQConstants
in common
+ return _contentHeaderBody.properties instanceof
BasicContentHeaderProperties
+ && ((BasicContentHeaderProperties)
_contentHeaderBody.properties).getDeliveryMode() == 2;
+ }
+}
Propchange:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/CleanupMessageOperation.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/CleanupMessageOperation.java?view=auto&rev=470908
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/CleanupMessageOperation.java
(added)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/CleanupMessageOperation.java
Fri Nov 3 09:17:11 2006
@@ -0,0 +1,84 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.RequiredDeliveryException;
+
+import java.util.List;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class CleanupMessageOperation implements TxnOp
+{
+ private static final Logger _log =
Logger.getLogger(CleanupMessageOperation.class);
+
+ private final AMQMessage _msg;
+
+ private final List<RequiredDeliveryException> _returns;
+
+ public CleanupMessageOperation(AMQMessage msg,
List<RequiredDeliveryException> returns)
+ {
+ _msg = msg;
+ _returns = returns;
+ }
+
+ public void prepare() throws AMQException
+ {
+ }
+
+ public void undoPrepare()
+ {
+ //don't need to do anything here, if the store's txn failed
+ //when processing prepare then the message was not stored
+ //or enqueued on any queues and can be discarded
+ }
+
+ public void commit()
+ {
+ //The routers reference can now be released. This is done
+ //here to ensure that it happens after the queues that
+ //enqueue it have incremented their counts (which as a
+ //memory only operation is done in the commit phase).
+ try
+ {
+ _msg.decrementReference();
+ }
+ catch (AMQException e)
+ {
+ _log.error("On commiting transaction, failed to cleanup unused
message: " + e, e);
+ }
+ try
+ {
+ _msg.checkDeliveredToConsumer();
+ }
+ catch (NoConsumersException e)
+ {
+ //TODO: store this for delivery after the commit-ok
+ _returns.add(e);
+ }
+ }
+
+ public void rollback()
+ {
+ }
+}
Propchange:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/CleanupMessageOperation.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java?view=auto&rev=470908
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java
(added)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java
Fri Nov 3 09:17:11 2006
@@ -0,0 +1,63 @@
+/**
+ * User: Robert Greig
+ * Date: 01-Nov-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+
******************************************************************************/
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.FailedDequeueException;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.log4j.Logger;
+
+/**
+ * @author Robert Greig ([EMAIL PROTECTED])
+ */
+public class DeliverMessageOperation implements TxnOp
+{
+ private static final Logger _logger =
Logger.getLogger(DeliverMessageOperation.class);
+
+ private final AMQMessage _msg;
+
+ private final AMQQueue _queue;
+
+ public DeliverMessageOperation(AMQMessage msg, AMQQueue queue)
+ {
+ _msg = msg;
+ _queue = queue;
+ }
+
+ public void prepare() throws AMQException
+ {
+ //do the persistent part of the record()
+ _msg.enqueue(_queue);
+ }
+
+ public void undoPrepare()
+ {
+ }
+
+ public void commit()
+ {
+ //do the memeory part of the record()
+ _msg.incrementReference();
+ //then process the message
+ try
+ {
+ _queue.process(_msg);
+ }
+ catch (FailedDequeueException e)
+ {
+ //TODO: is there anything else we can do here? I think not...
+ _logger.error("Error during commit of a queue delivery: " + e, e);
+ }
+ }
+
+ public void rollback()
+ {
+ }
+}
Propchange:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java?view=auto&rev=470908
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java
(added)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java
Fri Nov 3 09:17:11 2006
@@ -0,0 +1,118 @@
+/**
+ * User: Robert Greig
+ * Date: 01-Nov-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+
******************************************************************************/
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.ack.TxAck;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.RequiredDeliveryException;
+
+import java.util.List;
+
+/**
+ * @author Robert Greig ([EMAIL PROTECTED])
+ */
+public class LocalTransactionalContext implements TransactionalContext
+{
+ private final TxnBuffer _txnBuffer;
+
+ /**
+ * We keep hold of the ack operation so that we can consolidate acks, i.e.
multiple acks within a txn are
+ * consolidated into a single operation
+ */
+ private TxAck _ackOp;
+
+ private List<RequiredDeliveryException> _returnMessages;
+
+ public LocalTransactionalContext(TxnBuffer txnBuffer,
List<RequiredDeliveryException> returnMessages)
+ {
+ _txnBuffer = txnBuffer;
+ _returnMessages = returnMessages;
+ }
+
+ public void rollback() throws AMQException
+ {
+ _txnBuffer.rollback();
+ }
+
+ public void deliver(AMQMessage message, AMQQueue queue) throws AMQException
+ {
+ // don't create a transaction unless needed
+ if (message.isPersistent())
+ {
+ _txnBuffer.containsPersistentChanges();
+ }
+
+ // A publication will result in the enlisting of several
+ // TxnOps. The first is an op that will store the message.
+ // Following that (and ordering is important), an op will
+ // be added for every queue onto which the message is
+ // enqueued. Finally a cleanup op will be added to decrement
+ // the reference associated with the routing.
+ _txnBuffer.enlist(new StoreMessageOperation(message));
+ _txnBuffer.enlist(new DeliverMessageOperation(message, queue));
+ _txnBuffer.enlist(new CleanupMessageOperation(message,
_returnMessages));
+ }
+
+ private void checkAck(long deliveryTag, UnacknowledgedMessageMap
unacknowledgedMessageMap) throws AMQException
+ {
+ if (!unacknowledgedMessageMap.contains(deliveryTag))
+ {
+ throw new AMQException("Ack with delivery tag " + deliveryTag + "
not known for channel");
+ }
+ }
+
+ public void acknowledgeMessage(long deliveryTag, long lastDeliveryTag,
boolean multiple,
+ UnacknowledgedMessageMap
unacknowledgedMessageMap) throws AMQException
+ {
+ //check that the tag exists to give early failure
+ if (!multiple || deliveryTag > 0)
+ {
+ checkAck(deliveryTag, unacknowledgedMessageMap);
+ }
+ //we use a single txn op for all acks and update this op
+ //as new acks come in. If this is the first ack in the txn
+ //we will need to create and enlist the op.
+ if (_ackOp == null)
+ {
+ _ackOp = new TxAck(unacknowledgedMessageMap);
+ _txnBuffer.enlist(_ackOp);
+ }
+ // update the op to include this ack request
+ if (multiple && deliveryTag == 0)
+ {
+ // if have signalled to ack all, that refers only
+ // to all at this time
+ _ackOp.update(lastDeliveryTag, multiple);
+ }
+ else
+ {
+ _ackOp.update(deliveryTag, multiple);
+ }
+ }
+
+ public void commit() throws AMQException
+ {
+ if (_ackOp != null)
+ {
+ _ackOp.consolidate();
+ if (_ackOp.checkPersistent())
+ {
+ _txnBuffer.containsPersistentChanges();
+ }
+ //already enlisted, after commit will reset regardless of outcome
+ _ackOp = null;
+ }
+
+ _txnBuffer.commit();
+ //TODO: may need to return 'immediate' messages at this point
+ }
+}
Propchange:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java?view=auto&rev=470908
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java
(added)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java
Fri Nov 3 09:17:11 2006
@@ -0,0 +1,146 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.NoConsumersException;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class NonTransactionalContext implements TransactionalContext
+{
+ private static final Logger _log =
Logger.getLogger(NonTransactionalContext.class);
+
+ /**
+ * Channel is useful for logging
+ */
+ private AMQChannel _channel;
+
+ /**
+ * Where to put undeliverable messages
+ */
+ private List<RequiredDeliveryException> _returnMessages;
+
+ public NonTransactionalContext(AMQChannel channel,
List<RequiredDeliveryException> returnMessages)
+ {
+ _channel = channel;
+ _returnMessages = returnMessages;
+ }
+
+ public void commit() throws AMQException
+ {
+ // Does not apply to this context
+ }
+
+ public void rollback() throws AMQException
+ {
+ // Does not apply to this context
+ }
+
+ public void deliver(AMQMessage message, AMQQueue queue) throws AMQException
+ {
+ try
+ {
+ queue.process(message);
+ //following check implements the functionality
+ //required by the 'immediate' flag:
+ message.checkDeliveredToConsumer();
+ }
+ catch (NoConsumersException e)
+ {
+ _returnMessages.add(e);
+ }
+ finally
+ {
+ message.decrementReference();
+ }
+ }
+
+ public void acknowledgeMessage(final long deliveryTag, long
lastDeliveryTag,
+ boolean multiple, final
UnacknowledgedMessageMap unacknowledgedMessageMap)
+ throws AMQException
+ {
+ if (multiple)
+ {
+ if (deliveryTag == 0)
+ {
+
+ //Spec 2.1.6.11 ... If the multiple field is 1, and the
delivery tag is zero,
+ // tells the server to acknowledge all outstanding mesages.
+ _log.info("Multiple ack on delivery tag 0. ACKing all
messages. Current count:" +
+ unacknowledgedMessageMap.size());
+ unacknowledgedMessageMap.visit(new
UnacknowledgedMessageMap.Visitor()
+ {
+ public boolean callback(UnacknowledgedMessage message)
throws AMQException
+ {
+ message.discard();
+ return false;
+ }
+
+ public void visitComplete()
+ {
+ unacknowledgedMessageMap.clear();
+ }
+ });
+ }
+ else
+ {
+ if (!unacknowledgedMessageMap.contains(deliveryTag))
+ {
+ throw new AMQException("Multiple ack on delivery tag " +
deliveryTag + " not known for channel");
+ }
+
+ LinkedList<UnacknowledgedMessage> acked = new
LinkedList<UnacknowledgedMessage>();
+ unacknowledgedMessageMap.drainTo(acked, deliveryTag);
+ for (UnacknowledgedMessage msg : acked)
+ {
+ msg.discard();
+ }
+ }
+ }
+ else
+ {
+ UnacknowledgedMessage msg;
+ msg = unacknowledgedMessageMap.remove(deliveryTag);
+
+ if (msg == null)
+ {
+ _log.info("Single ack on delivery tag " + deliveryTag + " not
known for channel:" +
+ _channel.getChannelId());
+ throw new AMQException("Single ack on delivery tag " +
deliveryTag + " not known for channel:" +
+ _channel.getChannelId());
+ }
+ msg.discard();
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Received non-multiple ack for messaging with
delivery tag " + deliveryTag);
+ }
+ }
+ }
+}
Propchange:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java?view=auto&rev=470908
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java
(added)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java
Fri Nov 3 09:17:11 2006
@@ -0,0 +1,47 @@
+/**
+ * User: Robert Greig
+ * Date: 01-Nov-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+
******************************************************************************/
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
+
+/**
+ * @author Robert Greig ([EMAIL PROTECTED])
+ */
+public class StoreMessageOperation implements TxnOp
+{
+ //just use this to do a store of the message during the
+ //prepare phase. Any enqueueing etc is done by TxnOps enlisted
+ //by the queues themselves.
+ private final AMQMessage _msg;
+
+ public StoreMessageOperation(AMQMessage msg)
+ {
+ _msg = msg;
+ }
+
+ public void prepare() throws AMQException
+ {
+ _msg.storeMessage();
+ // the router's reference can now be released
+ _msg.decrementReference();
+ }
+
+ public void undoPrepare()
+ {
+ }
+
+ public void commit()
+ {
+ }
+
+ public void rollback()
+ {
+ }
+}
Propchange:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TransactionalContext.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TransactionalContext.java?view=auto&rev=470908
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TransactionalContext.java
(added)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TransactionalContext.java
Fri Nov 3 09:17:11 2006
@@ -0,0 +1,29 @@
+/**
+ * User: Robert Greig
+ * Date: 01-Nov-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+
******************************************************************************/
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+
+/**
+ * @author Robert Greig ([EMAIL PROTECTED])
+ */
+public interface TransactionalContext
+{
+ void commit() throws AMQException;
+
+ void rollback() throws AMQException;
+
+ void deliver(AMQMessage message, AMQQueue queue) throws AMQException;
+
+ void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean
multiple,
+ UnacknowledgedMessageMap unacknowledgedMessageMap)
throws AMQException;
+}
Propchange:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TransactionalContext.java
------------------------------------------------------------------------------
svn:eol-style = native