Added: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java?view=auto&rev=470908
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java
 (added)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java
 Fri Nov  3 09:17:11 2006
@@ -0,0 +1,30 @@
+/**
+ * User: Robert Greig
+ * Date: 31-Oct-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+ 
******************************************************************************/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.store.MessageStore;
+
+/**
+ * Constructs a message handle based on the publish body, the content header 
and the queue to which the message
+ * has been routed.
+ *
+ * @author Robert Greig ([EMAIL PROTECTED])
+ */
+public class MessageHandleFactory
+{
+
+    public AMQMessageHandle createMessageHandle(MessageStore store, 
BasicPublishBody publish,
+                                                ContentHeaderBody 
contentHeader)
+    {
+        // just hardcoded for now
+        return new WeakReferenceMessageHandle(store, contentHeader);
+    }
+}

Propchange: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java?view=diff&rev=470908&r1=470907&r2=470908
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/NoConsumersException.java
 Fri Nov  3 09:17:11 2006
@@ -17,13 +17,8 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.protocol.AMQConstant;
-
-import java.util.List;
+import org.apache.qpid.server.RequiredDeliveryException;
 
 /**
  * Signals that no consumers exist for a message at a given point in time.
@@ -32,19 +27,9 @@
  */
 public class NoConsumersException extends RequiredDeliveryException
 {
-    public NoConsumersException(String queue,
-                                BasicPublishBody publishBody,
-                                ContentHeaderBody contentHeaderBody,
-                                List<ContentBody> contentBodies)
-    {
-        super("Immediate delivery to " + queue + " is not possible.", 
publishBody, contentHeaderBody, contentBodies);
-    }
-
-    public NoConsumersException(BasicPublishBody publishBody,
-                                ContentHeaderBody contentHeaderBody,
-                                List<ContentBody> contentBodies)
+    public NoConsumersException(AMQMessage message)
     {
-        super("Immediate delivery is not possible.", publishBody, 
contentHeaderBody, contentBodies);
+        super("Immediate delivery is not possible.", message);
     }
 
     public int getReplyCode()

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/Subscription.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=470908&r1=470907&r2=470908
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/Subscription.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/Subscription.java
 Fri Nov  3 09:17:11 2006
@@ -25,5 +25,5 @@
 
     boolean isSuspended();
 
-    void queueDeleted(AMQQueue queue);
+    void queueDeleted(AMQQueue queue) throws AMQException;
 }

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=470908&r1=470907&r2=470908
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
 Fri Nov  3 09:17:11 2006
@@ -18,11 +18,7 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicDeliverBody;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 
@@ -127,8 +123,8 @@
         if (msg != null)
         {
             // if we do not need to wait for client acknowledgements
-            // we can decrement the reference count immediately. 
-            
+            // we can decrement the reference count immediately.
+
             // By doing this _before_ the send we ensure that it
             // doesn't get sent if it can't be dequeued, preventing
             // duplicate delivery on recovery.
@@ -148,10 +144,7 @@
                     channel.addUnacknowledgedMessage(msg, deliveryTag, 
consumerTag, queue);
                 }
 
-                ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, 
msg.getRoutingKey(), msg.getExchangeName());
-                AMQDataBlock frame = msg.getDataBlock(deliver, 
channel.getChannelId());
-
-                protocolSession.writeFrame(frame);
+                msg.writeDeliver(protocolSession, channel.getChannelId(), 
deliveryTag, consumerTag);
             }
         }
         else
@@ -170,19 +163,8 @@
      *
      * @param queue
      */
-    public void queueDeleted(AMQQueue queue)
+    public void queueDeleted(AMQQueue queue) throws AMQException
     {
         channel.queueDeleted(queue);
-    }
-
-    private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String 
routingKey, String exchange)
-    {
-        AMQFrame deliverFrame = 
BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag,
-                                                                deliveryTag, 
false, exchange,
-                                                                routingKey);
-        ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // 
XXX: Could cast be a problem?
-        deliverFrame.writePayload(buf);
-        buf.flip();
-        return buf;
     }
 }

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=470908&r1=470907&r2=470908
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java
 Fri Nov  3 09:17:11 2006
@@ -18,6 +18,8 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+
 import java.util.List;
 import java.util.ListIterator;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -166,7 +168,7 @@
      * channel, which in turn can update its list of unacknowledged messages.
      * @param queue
      */
-    public void queueDeleted(AMQQueue queue)
+    public void queueDeleted(AMQQueue queue) throws AMQException
     {
         for (Subscription s : _subscriptions)
         {

Added: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=auto&rev=470908
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
 (added)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
 Fri Nov  3 09:17:11 2006
@@ -0,0 +1,99 @@
+/**
+ * User: Robert Greig
+ * Date: 23-Oct-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+ 
******************************************************************************/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import java.util.List;
+
+/**
+ * @author Robert Greig ([EMAIL PROTECTED])
+ */
+public class WeakReferenceMessageHandle implements AMQMessageHandle
+{
+    private ContentHeaderBody _contentHeaderBody;
+
+    private List<ContentBody> _contentBodies;
+
+    private boolean _redelivered;
+
+    private final MessageStore _messageStore;
+
+    public WeakReferenceMessageHandle(MessageStore messageStore, 
ContentHeaderBody contentHeader)
+    {
+        _messageStore = messageStore;
+        _contentHeaderBody = contentHeader;
+    }
+
+    public ContentHeaderBody getContentHeaderBody()
+    {
+        return _contentHeaderBody;
+    }
+
+    public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) 
throws AMQException
+    {
+        _contentHeaderBody = contentHeaderBody;
+    }
+
+    public int getBodyCount()
+    {
+        return _contentBodies.size();
+    }
+
+    public long getBodySize()
+    {
+        return 0;  //To change body of implemented methods use File | Settings 
| File Templates.
+    }
+
+    public ContentBody getContentBody(int index) throws 
IllegalArgumentException
+    {
+        return null;  //To change body of implemented methods use File | 
Settings | File Templates.
+    }
+
+    public void addContentBodyFrame(ContentBody contentBody) throws 
AMQException
+    {
+        _contentBodies.add(contentBody);
+    }
+
+    public String getExchangeName()
+    {
+        return null;  //To change body of implemented methods use File | 
Settings | File Templates.
+    }
+
+    public String getRoutingKey()
+    {
+        return null;  //To change body of implemented methods use File | 
Settings | File Templates.
+    }
+
+    public boolean isImmediate()
+    {
+        return false;  //To change body of implemented methods use File | 
Settings | File Templates.
+    }
+
+    public boolean isRedelivered()
+    {
+        return _redelivered;
+    }
+
+    public boolean isPersistent() throws AMQException
+    {
+        if (_contentHeaderBody == null)
+        {
+            throw new AMQException("Cannot determine delivery mode of message. 
Content header not found.");
+        }
+
+        //todo remove literal values to a constant file such as AMQConstants 
in common
+        return _contentHeaderBody.properties instanceof 
BasicContentHeaderProperties
+                && ((BasicContentHeaderProperties) 
_contentHeaderBody.properties).getDeliveryMode() == 2;
+    }
+}

Propchange: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/CleanupMessageOperation.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/CleanupMessageOperation.java?view=auto&rev=470908
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/CleanupMessageOperation.java
 (added)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/CleanupMessageOperation.java
 Fri Nov  3 09:17:11 2006
@@ -0,0 +1,84 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.RequiredDeliveryException;
+
+import java.util.List;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class CleanupMessageOperation implements TxnOp
+{
+    private static final Logger _log = 
Logger.getLogger(CleanupMessageOperation.class);
+
+    private final AMQMessage _msg;
+
+    private final List<RequiredDeliveryException> _returns;
+
+    public CleanupMessageOperation(AMQMessage msg, 
List<RequiredDeliveryException> returns)
+    {
+        _msg = msg;
+        _returns = returns;
+    }
+
+    public void prepare() throws AMQException
+    {
+    }
+
+    public void undoPrepare()
+    {
+        //don't need to do anything here, if the store's txn failed
+        //when processing prepare then the message was not stored
+        //or enqueued on any queues and can be discarded
+    }
+
+    public void commit()
+    {
+        //The routers reference can now be released.  This is done
+        //here to ensure that it happens after the queues that
+        //enqueue it have incremented their counts (which as a
+        //memory only operation is done in the commit phase).
+        try
+        {
+            _msg.decrementReference();
+        }
+        catch (AMQException e)
+        {
+            _log.error("On commiting transaction, failed to cleanup unused 
message: " + e, e);
+        }
+        try
+        {
+            _msg.checkDeliveredToConsumer();
+        }
+        catch (NoConsumersException e)
+        {
+            //TODO: store this for delivery after the commit-ok
+            _returns.add(e);
+        }
+    }
+
+    public void rollback()
+    {
+    }
+}

Propchange: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/CleanupMessageOperation.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java?view=auto&rev=470908
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java
 (added)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java
 Fri Nov  3 09:17:11 2006
@@ -0,0 +1,63 @@
+/**
+ * User: Robert Greig
+ * Date: 01-Nov-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+ 
******************************************************************************/
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.FailedDequeueException;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.log4j.Logger;
+
+/**
+ * @author Robert Greig ([EMAIL PROTECTED])
+ */
+public class DeliverMessageOperation implements TxnOp
+{
+    private static final Logger _logger = 
Logger.getLogger(DeliverMessageOperation.class);
+
+    private final AMQMessage _msg;
+
+    private final AMQQueue _queue;
+
+    public DeliverMessageOperation(AMQMessage msg, AMQQueue queue)
+    {
+        _msg = msg;
+        _queue = queue;
+    }
+
+    public void prepare() throws AMQException
+    {
+        //do the persistent part of the record()
+        _msg.enqueue(_queue);
+    }
+
+    public void undoPrepare()
+    {
+    }
+
+    public void commit()
+    {
+        //do the memeory part of the record()
+        _msg.incrementReference();
+        //then process the message
+        try
+        {
+            _queue.process(_msg);
+        }
+        catch (FailedDequeueException e)
+        {
+            //TODO: is there anything else we can do here? I think not...
+            _logger.error("Error during commit of a queue delivery: " + e, e);
+        }
+    }
+
+    public void rollback()
+    {
+    }
+}

Propchange: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java?view=auto&rev=470908
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java
 (added)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java
 Fri Nov  3 09:17:11 2006
@@ -0,0 +1,118 @@
+/**
+ * User: Robert Greig
+ * Date: 01-Nov-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+ 
******************************************************************************/
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.ack.TxAck;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.RequiredDeliveryException;
+
+import java.util.List;
+
+/**
+ * @author Robert Greig ([EMAIL PROTECTED])
+ */
+public class LocalTransactionalContext implements TransactionalContext
+{
+    private final TxnBuffer _txnBuffer;
+
+    /**
+     * We keep hold of the ack operation so that we can consolidate acks, i.e. 
multiple acks within a txn are
+     * consolidated into a single operation
+     */
+    private TxAck _ackOp;
+
+    private List<RequiredDeliveryException> _returnMessages;
+
+    public LocalTransactionalContext(TxnBuffer txnBuffer, 
List<RequiredDeliveryException> returnMessages)
+    {
+        _txnBuffer = txnBuffer;
+        _returnMessages = returnMessages;
+    }
+
+    public void rollback() throws AMQException
+    {
+        _txnBuffer.rollback();
+    }
+
+    public void deliver(AMQMessage message, AMQQueue queue) throws AMQException
+    {
+        // don't create a transaction unless needed
+        if (message.isPersistent())
+        {
+            _txnBuffer.containsPersistentChanges();
+        }
+
+        // A publication will result in the enlisting of several
+        // TxnOps. The first is an op that will store the message.
+        // Following that (and ordering is important), an op will
+        // be added for every queue onto which the message is
+        // enqueued. Finally a cleanup op will be added to decrement
+        // the reference associated with the routing.
+        _txnBuffer.enlist(new StoreMessageOperation(message));
+        _txnBuffer.enlist(new DeliverMessageOperation(message, queue));
+        _txnBuffer.enlist(new CleanupMessageOperation(message, 
_returnMessages));
+    }
+
+    private void checkAck(long deliveryTag, UnacknowledgedMessageMap 
unacknowledgedMessageMap) throws AMQException
+    {
+        if (!unacknowledgedMessageMap.contains(deliveryTag))
+        {
+            throw new AMQException("Ack with delivery tag " + deliveryTag + " 
not known for channel");
+        }
+    }
+
+    public void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, 
boolean multiple,
+                                   UnacknowledgedMessageMap 
unacknowledgedMessageMap) throws AMQException
+    {
+        //check that the tag exists to give early failure
+        if (!multiple || deliveryTag > 0)
+        {
+            checkAck(deliveryTag, unacknowledgedMessageMap);
+        }
+        //we use a single txn op for all acks and update this op
+        //as new acks come in. If this is the first ack in the txn
+        //we will need to create and enlist the op.
+        if (_ackOp == null)
+        {
+            _ackOp = new TxAck(unacknowledgedMessageMap);
+            _txnBuffer.enlist(_ackOp);
+        }
+        // update the op to include this ack request
+        if (multiple && deliveryTag == 0)
+        {
+            // if have signalled to ack all, that refers only
+            // to all at this time
+            _ackOp.update(lastDeliveryTag, multiple);
+        }
+        else
+        {
+            _ackOp.update(deliveryTag, multiple);
+        }
+    }
+
+    public void commit() throws AMQException
+    {
+        if (_ackOp != null)
+        {
+            _ackOp.consolidate();
+            if (_ackOp.checkPersistent())
+            {
+                _txnBuffer.containsPersistentChanges();
+            }
+            //already enlisted, after commit will reset regardless of outcome
+            _ackOp = null;
+        }
+
+        _txnBuffer.commit();
+        //TODO: may need to return 'immediate' messages at this point
+    }
+}

Propchange: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java?view=auto&rev=470908
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java
 (added)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java
 Fri Nov  3 09:17:11 2006
@@ -0,0 +1,146 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.NoConsumersException;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class NonTransactionalContext implements TransactionalContext
+{
+    private static final Logger _log = 
Logger.getLogger(NonTransactionalContext.class);
+
+    /**
+     * Channel is useful for logging
+     */
+    private AMQChannel _channel;
+
+    /**
+     * Where to put undeliverable messages
+     */
+    private List<RequiredDeliveryException> _returnMessages;
+
+    public NonTransactionalContext(AMQChannel channel, 
List<RequiredDeliveryException> returnMessages)
+    {
+        _channel = channel;
+        _returnMessages = returnMessages;
+    }
+
+    public void commit() throws AMQException
+    {
+        // Does not apply to this context
+    }
+
+    public void rollback() throws AMQException
+    {
+        // Does not apply to this context
+    }
+
+    public void deliver(AMQMessage message, AMQQueue queue) throws AMQException
+    {
+        try
+        {
+            queue.process(message);
+            //following check implements the functionality
+            //required by the 'immediate' flag:
+            message.checkDeliveredToConsumer();
+        }
+        catch (NoConsumersException e)
+        {
+            _returnMessages.add(e);
+        }
+        finally
+        {
+            message.decrementReference();
+        }
+    }
+
+    public void acknowledgeMessage(final long deliveryTag, long 
lastDeliveryTag,
+                                   boolean multiple, final 
UnacknowledgedMessageMap unacknowledgedMessageMap)
+            throws AMQException
+    {
+        if (multiple)
+        {
+            if (deliveryTag == 0)
+            {
+
+                //Spec 2.1.6.11 ... If the multiple field is 1, and the 
delivery tag is zero,
+                // tells the server to acknowledge all outstanding mesages.
+                _log.info("Multiple ack on delivery tag 0. ACKing all 
messages. Current count:" +
+                        unacknowledgedMessageMap.size());
+                unacknowledgedMessageMap.visit(new 
UnacknowledgedMessageMap.Visitor()
+                {
+                    public boolean callback(UnacknowledgedMessage message) 
throws AMQException
+                    {
+                        message.discard();
+                        return false;
+                    }
+
+                    public void visitComplete()
+                    {
+                        unacknowledgedMessageMap.clear();
+                    }
+                });
+            }
+            else
+            {
+                if (!unacknowledgedMessageMap.contains(deliveryTag))
+                {
+                    throw new AMQException("Multiple ack on delivery tag " + 
deliveryTag + " not known for channel");
+                }
+
+                LinkedList<UnacknowledgedMessage> acked = new 
LinkedList<UnacknowledgedMessage>();
+                unacknowledgedMessageMap.drainTo(acked, deliveryTag);
+                for (UnacknowledgedMessage msg : acked)
+                {
+                    msg.discard();
+                }
+            }
+        }
+        else
+        {
+            UnacknowledgedMessage msg;
+            msg = unacknowledgedMessageMap.remove(deliveryTag);
+
+            if (msg == null)
+            {
+                _log.info("Single ack on delivery tag " + deliveryTag + " not 
known for channel:" +
+                          _channel.getChannelId());
+                throw new AMQException("Single ack on delivery tag " + 
deliveryTag + " not known for channel:" +
+                                       _channel.getChannelId());
+            }
+            msg.discard();
+            if (_log.isDebugEnabled())
+            {
+                _log.debug("Received non-multiple ack for messaging with 
delivery tag " + deliveryTag);
+            }
+        }
+    }
+}

Propchange: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java?view=auto&rev=470908
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java
 (added)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java
 Fri Nov  3 09:17:11 2006
@@ -0,0 +1,47 @@
+/**
+ * User: Robert Greig
+ * Date: 01-Nov-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+ 
******************************************************************************/
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
+
+/**
+ * @author Robert Greig ([EMAIL PROTECTED])
+ */
+public class StoreMessageOperation implements TxnOp
+{
+    //just use this to do a store of the message during the
+    //prepare phase. Any enqueueing etc is done by TxnOps enlisted
+    //by the queues themselves.
+    private final AMQMessage _msg;
+
+    public StoreMessageOperation(AMQMessage msg)
+    {
+        _msg = msg;
+    }
+
+    public void prepare() throws AMQException
+    {
+        _msg.storeMessage();
+        // the router's reference can now be released
+        _msg.decrementReference();
+    }
+
+    public void undoPrepare()
+    {
+    }
+
+    public void commit()
+    {
+    }
+
+    public void rollback()
+    {
+    }
+}

Propchange: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TransactionalContext.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TransactionalContext.java?view=auto&rev=470908
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TransactionalContext.java
 (added)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TransactionalContext.java
 Fri Nov  3 09:17:11 2006
@@ -0,0 +1,29 @@
+/**
+ * User: Robert Greig
+ * Date: 01-Nov-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+ 
******************************************************************************/
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+
+/**
+ * @author Robert Greig ([EMAIL PROTECTED])
+ */
+public interface TransactionalContext
+{
+    void commit() throws AMQException;
+
+    void rollback() throws AMQException;
+
+    void deliver(AMQMessage message, AMQQueue queue) throws AMQException;
+
+    void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean 
multiple,
+                            UnacknowledgedMessageMap unacknowledgedMessageMap) 
throws AMQException;
+}

Propchange: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TransactionalContext.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to