Author: rgreig
Date: Wed Nov 22 01:37:02 2006
New Revision: 478100

URL: http://svn.apache.org/viewvc?view=rev&rev=478100
Log:
QPID-32 Work-in-progress commit.

Added:
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/InMemoryMessageHandle.java
   (with props)
Modified:
    incubator/qpid/branches/new_persistence/java/broker/etc/log4j.xml
    incubator/qpid/branches/new_persistence/java/broker/etc/virtualhosts.xml
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/AMQChannel.java
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnOp.java
    
incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/requestreply1/ServiceRequestingClient.java

Modified: incubator/qpid/branches/new_persistence/java/broker/etc/log4j.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/etc/log4j.xml?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/etc/log4j.xml (original)
+++ incubator/qpid/branches/new_persistence/java/broker/etc/log4j.xml Wed Nov 
22 01:37:02 2006
@@ -34,9 +34,9 @@
         </layout>
     </appender>
 
-    <!--<category name="org.apache.qpid.server.store">
+    <category name="org.apache.qpid.server.queue">
         <priority value="debug"/>
-    </category>-->
+    </category>
 
     <root>
         <priority value="info"/>

Modified: 
incubator/qpid/branches/new_persistence/java/broker/etc/virtualhosts.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/etc/virtualhosts.xml?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/etc/virtualhosts.xml 
(original)
+++ incubator/qpid/branches/new_persistence/java/broker/etc/virtualhosts.xml 
Wed Nov 22 01:37:02 2006
@@ -21,5 +21,6 @@
         <path>/development</path>
         <bind>direct://amq.direct//queue</bind>
         <bind>direct://amq.direct//ping</bind>
+        <bind>direct://amq.direct//serviceQ</bind>
     </virtualhost>
 </virtualhosts>

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/AMQChannel.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/AMQChannel.java
 Wed Nov 22 01:37:02 2006
@@ -115,7 +115,7 @@
      */
     public void setLocalTransactional()
     {
-        _txnContext = new LocalTransactionalContext(new 
TxnBuffer(_messageStore), _returnMessages);
+        _txnContext = new LocalTransactionalContext(_messageStore, new 
TxnBuffer(_messageStore), _returnMessages);
     }
 
     public int getChannelId()

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java
 Wed Nov 22 01:37:02 2006
@@ -212,7 +212,7 @@
 
             for (AMQQueue q : queues)
             {
-                payload.registerQueue(q);
+                payload.enqueue(q);
             }
         }
     }

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java
 Wed Nov 22 01:37:02 2006
@@ -183,8 +183,8 @@
         {
             // TODO: modify code generator to add clone() method then clone 
the deliver body
             // without this addition we have a race condition - we will be 
modifying the body
-            // before the encoder has encoded the body for delivery            
-            payload.registerQueue(q);
+            // before the encoder has encoded the body for delivery
+            payload.enqueue(q);
         }
     }
 

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java
 Wed Nov 22 01:37:02 2006
@@ -185,7 +185,7 @@
                     _logger.debug("Exchange " + getName() + ": delivering 
message with headers " +
                                   headers + " to " + e.queue.getName());
                 }
-                payload.registerQueue(e.queue);
+                payload.enqueue(e.queue);
                 routed = true;
             }
         }

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
 Wed Nov 22 01:37:02 2006
@@ -65,7 +65,7 @@
         {
             AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : 
queueRegistry.getQueue(body.queue);
 
-            if(queue == null)
+            if (queue == null)
             {
                 _log.info("No queue for '" + body.queue + "'");
             }
@@ -80,7 +80,7 @@
                 //now allow queue to start async processing of any backlog of 
messages
                 queue.deliverAsync();
             }
-            catch(ConsumerTagNotUniqueException e)
+            catch (ConsumerTagNotUniqueException e)
             {
                 String msg = "Non-unique consumer tag, '" + body.consumerTag + 
"'";
                 
session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, 
AMQConstant.NOT_ALLOWED.getCode(), msg, BasicConsumeBody.CLASS_ID, 
BasicConsumeBody.METHOD_ID));

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
 Wed Nov 22 01:37:02 2006
@@ -34,7 +34,7 @@
 public class AMQMessage
 {
     private static final Logger _log = Logger.getLogger(AMQMessage.class);
-    
+
     /**
      * Used in clustering
      */
@@ -46,7 +46,7 @@
      */
     private AMQProtocolSession _publisher;
 
-    private long _messageId;
+    private final long _messageId;
 
     private final AtomicInteger _referenceCount = new AtomicInteger(1);
 
@@ -58,6 +58,9 @@
      */
     private BasicPublishBody _publishBody;
 
+    /**
+     * Also stored temporarily.
+     */
     private ContentHeaderBody _contentHeaderBody;
 
     /**
@@ -74,6 +77,11 @@
      */
     private boolean _deliveredToConsumer;
 
+    /**
+     * This is stored during routing, to know the queues to which this message 
should immediately be
+     * delivered. It is <b>cleared after delivery has been attempted</b>. Any 
persistent record of destinations is done
+     * by the message handle.
+     */
     private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>();
 
     /**
@@ -100,7 +108,7 @@
         {
             try
             {
-                ContentBody cb = _messageHandle.getContentBody(++_index);
+                ContentBody cb = _messageHandle.getContentBody(_messageId, 
++_index);
                 return ContentBody.createAMQFrame(_channel, cb);
             }
             catch (AMQException e)
@@ -131,7 +139,7 @@
         {
             try
             {
-                return _messageHandle.getContentBody(++_index);
+                return _messageHandle.getContentBody(_messageId, ++_index);
             }
             catch (AMQException e)
             {
@@ -150,6 +158,10 @@
         _messageId = messageId;
         _txnContext = txnContext;
         _publishBody = publishBody;
+        if (_log.isDebugEnabled())
+        {
+            _log.debug("Message created with id " + messageId);
+        }
     }
 
     protected AMQMessage(AMQMessage msg) throws AMQException
@@ -161,14 +173,6 @@
         _deliveredToConsumer = msg._deliveredToConsumer;
     }
 
-    public void storeMessage() throws AMQException
-    {
-        /*if (isPersistent())
-        {
-            _store.put(this);
-        } */
-    }
-
     public Iterator<AMQDataBlock> getBodyFrameIterator(int channel)
     {
         return new BodyFrameIterator(channel);
@@ -179,49 +183,9 @@
         return new BodyContentIterator();
     }
 
-    /*public CompositeAMQDataBlock getDataBlock(ByteBuffer encodedDeliverBody, 
int channel)
-    {
-        AMQFrame[] allFrames = new AMQFrame[1 + _contentBodies.size()];
-
-        allFrames[0] = ContentHeaderBody.createAMQFrame(channel, 
_contentHeaderBody);
-        for (int i = 1; i < allFrames.length; i++)
-        {
-            allFrames[i] = ContentBody.createAMQFrame(channel, 
_contentBodies.get(i - 1));
-        }
-        return new CompositeAMQDataBlock(encodedDeliverBody, allFrames);
-    }
-
-    public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, 
long deliveryTag)
-    {
-        AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()];
-
-        allFrames[0] = BasicDeliverBody.createAMQFrame(channel, consumerTag, 
deliveryTag, _redelivered,
-                                                       getExchangeName(), 
getRoutingKey());
-        allFrames[1] = ContentHeaderBody.createAMQFrame(channel, 
_contentHeaderBody);
-        for (int i = 2; i < allFrames.length; i++)
-        {
-            allFrames[i] = ContentBody.createAMQFrame(channel, 
_contentBodies.get(i - 2));
-        }
-        return new CompositeAMQDataBlock(allFrames);
-    }
-
-    public List<AMQBody> getPayload()
-    {
-        List<AMQBody> payload = new ArrayList<AMQBody>(2 + 
_contentBodies.size());
-        payload.add(_publishBody);
-        payload.add(_contentHeaderBody);
-        payload.addAll(_contentBodies);
-        return payload;
-    }
-
-    public BasicPublishBody getPublishBody()
-    {
-        return _publishBody;
-    } */
-
     public ContentHeaderBody getContentHeaderBody() throws AMQException
     {
-        return _messageHandle.getContentHeaderBody();
+        return _messageHandle.getContentHeaderBody(_messageId);
     }
 
     public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
@@ -233,13 +197,19 @@
     public void routingComplete(MessageStore store, MessageHandleFactory 
factory) throws AMQException
     {
         final boolean persistent = isPersistent();
-        _messageId = store.getNewMessageId();
         _messageHandle = factory.createMessageHandle(_messageId, store, 
persistent);
         if (persistent)
         {
             _txnContext.beginTranIfNecessary();
         }
 
+        // enqueuing the messages ensure that if required the destinations are 
recorded to a
+        // persistent store
+        for (AMQQueue q : _destinationQueues)
+        {
+            _messageHandle.enqueue(_messageId, q);
+        }
+
         if (_contentHeaderBody.bodySize == 0)
         {
             deliver();
@@ -249,7 +219,7 @@
     public boolean addContentBodyFrame(ContentBody contentBody) throws 
AMQException
     {
         _bodyLengthReceived += contentBody.getSize();
-        _messageHandle.addContentBodyFrame(contentBody);
+        _messageHandle.addContentBodyFrame(_messageId, contentBody);
         if (isAllContentReceived())
         {
             deliver();
@@ -304,7 +274,7 @@
                 {
                     _log.debug("Ref count on message " + _messageId + " is 
zero; removing message");
                 }
-                _messageHandle.removeMessage();
+                _messageHandle.removeMessage(_messageId);
             }
             catch (AMQException e)
             {
@@ -313,6 +283,17 @@
                 throw new MessageCleanupException(_messageId, e);
             }
         }
+        else
+        {
+            if (_log.isDebugEnabled())
+            {
+                _log.debug("Ref count is now " + _referenceCount + " for 
message id " + _messageId);
+                if (_referenceCount.get() < 0)
+                {
+                    Thread.dumpStack();
+                }
+            }
+        }
     }
 
     public void setPublisher(AMQProtocolSession publisher)
@@ -338,25 +319,22 @@
         }
     }
 
+    /**
+     * Registers a queue to which this message is to be delivered. This is
+     * called from the exchange when it is routing the message. This will be 
called before any content bodies have
+     * been received so that the choice of AMQMessageHandle implementation can 
be picked based on various criteria.
+     *
+     * @param queue the queue
+     * @throws org.apache.qpid.AMQException if there is an error enqueuing the 
message
+     */
     public void enqueue(AMQQueue queue) throws AMQException
     {
-        //if the message is not persistent or the queue is not durable
-        //we will not need to recover the association and so do not
-        //need to record it
-        /*if (isPersistent() && queue.isDurable())
-        {
-            _store.enqueueMessage(queue.getName(), _messageId);
-        } */
+        _destinationQueues.add(queue);
     }
 
     public void dequeue(AMQQueue queue) throws AMQException
     {
-        //only record associations where both queue and message will survive
-        //a restart, so only need to remove association if this is the case
-        /*if (isPersistent() && queue.isDurable())
-        {
-            _store.dequeueMessage(queue.getName(), _messageId);
-        } */
+        _messageHandle.dequeue(_messageId, queue);
     }
 
     public boolean isPersistent() throws AMQException
@@ -369,7 +347,7 @@
         }
         else
         {
-            return _messageHandle.isPersistent();
+            return _messageHandle.isPersistent(_messageId);
         }
     }
 
@@ -398,7 +376,7 @@
         }
         else
         {
-            pb = _messageHandle.getPublishBody();
+            pb = _messageHandle.getPublishBody(_messageId);
         }
         return pb;
     }
@@ -412,32 +390,34 @@
         _deliveredToConsumer = true;
     }
 
-    /**
-     * Registers a queue to which this message is to be delivered. This is
-     * called from the exchange when it is routing the message. This will be 
called before any content bodies have
-     * been received so that the choice of AMQMessageHandle implementation can 
be picked based on various criteria.
-     *
-     * @param queue the queue
-     */
-    public void registerQueue(AMQQueue queue)
+    /*public void registerQueue(AMQQueue queue)
     {
         _destinationQueues.add(queue);
-    }
+    } */
 
     private void deliver() throws AMQException
     {
         // first we allow the handle to know that the message has been fully 
received. This is useful if it is
         // maintaining any calculated values based on content chunks
-        _messageHandle.setPublishAndContentHeaderBody(_publishBody, 
_contentHeaderBody);
-        _publishBody = null;
-        _contentHeaderBody = null;
-
-        // we then allow the transactional context to do something with the 
message content
-        // now that it has all been received, before we attempt delivery
-        _txnContext.messageFullyReceived(isPersistent());
-        for (AMQQueue q : _destinationQueues)
+        try
+        {
+            _messageHandle.setPublishAndContentHeaderBody(_messageId, 
_publishBody, _contentHeaderBody);
+            _publishBody = null;
+            _contentHeaderBody = null;
+
+            // we then allow the transactional context to do something with 
the message content
+            // now that it has all been received, before we attempt delivery
+            _txnContext.messageFullyReceived(isPersistent());
+            for (AMQQueue q : _destinationQueues)
+            {
+                _txnContext.deliver(this, q);
+            }
+        }
+        finally
         {
-            _txnContext.deliver(this, q);
+            _destinationQueues.clear();
+            _destinationQueues = null;
+            decrementReference();
         }
     }
 

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessageHandle.java
 Wed Nov 22 01:37:02 2006
@@ -9,16 +9,20 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.BasicPublishBody;
 
 /**
- * @author Robert Greig ([EMAIL PROTECTED])
+ * A pluggable way of getting message data. Implementations can provide 
intelligent caching for example or
+ * even no caching at all to minimise the broker memory footprint.
+ *
+ * The method all take a messageId to avoid having to store it in the instance 
- the AMQMessage container
+ * must already keen the messageId so it is pointless storing it twice. 
  */
 public interface AMQMessageHandle
 {
-    ContentHeaderBody getContentHeaderBody() throws AMQException;
+    ContentHeaderBody getContentHeaderBody(long messageId) throws AMQException;
 
     /**
      * @return the number of body frames associated with this message
@@ -28,7 +32,7 @@
     /**
      * @return the size of the body
      */
-    long getBodySize() throws AMQException;
+    long getBodySize(long messageId) throws AMQException;
 
     /**
      * Get a particular content body
@@ -36,19 +40,23 @@
      * @return a content body
      * @throws IllegalArgumentException if the index is invalid
      */
-    ContentBody getContentBody(int index) throws IllegalArgumentException, 
AMQException;
+    ContentBody getContentBody(long messageId, int index) throws 
IllegalArgumentException, AMQException;
 
-    void addContentBodyFrame(ContentBody contentBody) throws AMQException;
+    void addContentBodyFrame(long messageId, ContentBody contentBody) throws 
AMQException;
 
-    BasicPublishBody getPublishBody() throws AMQException;
+    BasicPublishBody getPublishBody(long messageId) throws AMQException;
 
     boolean isRedelivered();
 
-    boolean isPersistent() throws AMQException;
+    boolean isPersistent(long messageId) throws AMQException;
 
-    void setPublishAndContentHeaderBody(BasicPublishBody publishBody, 
ContentHeaderBody contentHeaderBody)
+    void setPublishAndContentHeaderBody(long messageId, BasicPublishBody 
publishBody,
+                                        ContentHeaderBody contentHeaderBody)
             throws AMQException;
 
-    void removeMessage() throws AMQException;
+    void removeMessage(long messageId) throws AMQException;
+
+    void enqueue(long messageId, AMQQueue queue) throws AMQException;
 
+    void dequeue(long messageId, AMQQueue queue) throws AMQException;
 }

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
 Wed Nov 22 01:37:02 2006
@@ -698,7 +698,7 @@
             msg.dequeue(this);
             msg.decrementReference();
         }
-        catch(MessageCleanupException e)
+        catch (MessageCleanupException e)
         {
             //Message was dequeued, but could notthen be deleted
             //though it is no longer referenced. This should be very

Added: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/InMemoryMessageHandle.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/InMemoryMessageHandle.java?view=auto&rev=478100
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/InMemoryMessageHandle.java
 (added)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/InMemoryMessageHandle.java
 Wed Nov 22 01:37:02 2006
@@ -0,0 +1,110 @@
+/**
+ * User: Robert Greig
+ * Date: 21-Nov-2006
+ ******************************************************************************
+ * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
+ * this program may be photocopied reproduced or translated to another
+ * program language without prior written consent of JP Morgan Chase Ltd
+ 
******************************************************************************/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ */
+public class InMemoryMessageHandle implements AMQMessageHandle
+{
+
+    private ContentHeaderBody _contentHeaderBody;
+
+    private BasicPublishBody _publishBody;
+
+    private List<ContentBody> _contentBodies = new LinkedList<ContentBody>();
+
+    private boolean _redelivered;
+
+    public InMemoryMessageHandle()
+    {
+    }
+
+    public ContentHeaderBody getContentHeaderBody(long messageId) throws 
AMQException
+    {
+        return _contentHeaderBody;
+    }
+
+    public int getBodyCount()
+    {
+        return _contentBodies.size();
+    }
+
+    public long getBodySize(long messageId) throws AMQException
+    {
+        return getContentHeaderBody(messageId).bodySize;
+    }
+
+    public ContentBody getContentBody(long messageId, int index) throws 
AMQException, IllegalArgumentException
+    {
+        if (index > _contentBodies.size() - 1)
+        {
+            throw new IllegalArgumentException("Index " + index + " out of 
valid range 0 to " +
+                                               (_contentBodies.size() - 1));
+        }
+        return _contentBodies.get(index);
+    }
+
+    public void addContentBodyFrame(long messageId, ContentBody contentBody) 
throws AMQException
+    {
+        _contentBodies.add(contentBody);
+    }
+
+    public BasicPublishBody getPublishBody(long messageId) throws AMQException
+    {
+        return _publishBody;
+    }
+
+    public boolean isRedelivered()
+    {
+        return _redelivered;
+    }
+
+    public boolean isPersistent(long messageId) throws AMQException
+    {
+        //todo remove literal values to a constant file such as AMQConstants 
in common
+        ContentHeaderBody chb = getContentHeaderBody(messageId);
+        return chb.properties instanceof BasicContentHeaderProperties &&
+               ((BasicContentHeaderProperties) 
chb.properties).getDeliveryMode() == 2;
+    }
+
+    /**
+     * This is called when all the content has been received.
+     * @param publishBody
+     * @param contentHeaderBody
+     * @throws AMQException
+     */
+    public void setPublishAndContentHeaderBody(long messageId, 
BasicPublishBody publishBody,
+                                               ContentHeaderBody 
contentHeaderBody)
+            throws AMQException
+    {
+        _publishBody = publishBody;
+        _contentHeaderBody = contentHeaderBody;
+    }
+
+    public void removeMessage(long messageId) throws AMQException
+    {
+    }
+
+    public void enqueue(long messageId, AMQQueue queue) throws AMQException
+    {
+    }
+
+    public void dequeue(long messageId, AMQQueue queue) throws AMQException
+    {
+    }
+}

Propchange: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/InMemoryMessageHandle.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/MessageHandleFactory.java
 Wed Nov 22 01:37:02 2006
@@ -22,6 +22,13 @@
     public AMQMessageHandle createMessageHandle(long messageId, MessageStore 
store, boolean persistent)
     {
         // just hardcoded for now
-        return new WeakReferenceMessageHandle(store, messageId);
+        if (persistent)
+        {
+            return new WeakReferenceMessageHandle(store);
+        }
+        else
+        {
+            return new InMemoryMessageHandle();
+        }
     }
 }

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
 Wed Nov 22 01:37:02 2006
@@ -9,15 +9,15 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.server.store.MessageStore;
 
-import java.util.List;
-import java.util.LinkedList;
 import java.lang.ref.WeakReference;
+import java.util.LinkedList;
+import java.util.List;
 
 /**
  * @author Robert Greig ([EMAIL PROTECTED])
@@ -34,20 +34,17 @@
 
     private final MessageStore _messageStore;
 
-    private final long _messageId;
-
-    public WeakReferenceMessageHandle(MessageStore messageStore, long 
messageId)
+    public WeakReferenceMessageHandle(MessageStore messageStore)
     {
         _messageStore = messageStore;
-        _messageId = messageId;
     }
 
-    public ContentHeaderBody getContentHeaderBody() throws AMQException
+    public ContentHeaderBody getContentHeaderBody(long messageId) throws 
AMQException
     {
         ContentHeaderBody chb = _contentHeaderBody.get();
         if (chb == null)
         {
-            MessageMetaData mmd = _messageStore.getMessageMetaData(_messageId);
+            MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
             chb = mmd.getContentHeaderBody();
             _contentHeaderBody = new WeakReference<ContentHeaderBody>(chb);
             _publishBody = new 
WeakReference<BasicPublishBody>(mmd.getPublishBody());
@@ -60,12 +57,12 @@
         return _contentBodies.size();
     }
 
-    public long getBodySize() throws AMQException
+    public long getBodySize(long messageId) throws AMQException
     {
-        return getContentHeaderBody().bodySize;
+        return getContentHeaderBody(messageId).bodySize;
     }
 
-    public ContentBody getContentBody(int index) throws AMQException, 
IllegalArgumentException
+    public ContentBody getContentBody(long messageId, int index) throws 
AMQException, IllegalArgumentException
     {
         if (index > _contentBodies.size() - 1)
         {
@@ -76,24 +73,24 @@
         ContentBody cb = wr.get();
         if (cb == null)
         {
-            cb = _messageStore.getContentBodyChunk(_messageId, index);
+            cb = _messageStore.getContentBodyChunk(messageId, index);
             _contentBodies.set(index, new WeakReference<ContentBody>(cb));
         }
         return cb;
     }
 
-    public void addContentBodyFrame(ContentBody contentBody) throws 
AMQException
+    public void addContentBodyFrame(long messageId, ContentBody contentBody) 
throws AMQException
     {
         _contentBodies.add(new WeakReference<ContentBody>(contentBody));
-        _messageStore.storeContentBodyChunk(_messageId, _contentBodies.size() 
- 1, contentBody);
+        _messageStore.storeContentBodyChunk(messageId, _contentBodies.size() - 
1, contentBody);
     }
 
-    public BasicPublishBody getPublishBody() throws AMQException
+    public BasicPublishBody getPublishBody(long messageId) throws AMQException
     {
         BasicPublishBody bpb = _publishBody.get();
         if (bpb == null)
         {
-            MessageMetaData mmd = _messageStore.getMessageMetaData(_messageId);
+            MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
             bpb = mmd.getPublishBody();
             _publishBody = new WeakReference<BasicPublishBody>(bpb);
             _contentHeaderBody = new 
WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody());
@@ -106,10 +103,10 @@
         return _redelivered;
     }
 
-    public boolean isPersistent() throws AMQException
+    public boolean isPersistent(long messageId) throws AMQException
     {
         //todo remove literal values to a constant file such as AMQConstants 
in common
-        ContentHeaderBody chb = getContentHeaderBody();
+        ContentHeaderBody chb = getContentHeaderBody(messageId);
         return chb.properties instanceof BasicContentHeaderProperties &&
                ((BasicContentHeaderProperties) 
chb.properties).getDeliveryMode() == 2;
     }
@@ -120,17 +117,28 @@
      * @param contentHeaderBody
      * @throws AMQException
      */
-    public void setPublishAndContentHeaderBody(BasicPublishBody publishBody, 
ContentHeaderBody contentHeaderBody)
+    public void setPublishAndContentHeaderBody(long messageId, 
BasicPublishBody publishBody,
+                                               ContentHeaderBody 
contentHeaderBody)
             throws AMQException
     {
-        _messageStore.storeMessageMetaData(_messageId, new 
MessageMetaData(publishBody, contentHeaderBody,
+        _messageStore.storeMessageMetaData(messageId, new 
MessageMetaData(publishBody, contentHeaderBody,
                                                                            
_contentBodies.size()));
         _publishBody = new WeakReference<BasicPublishBody>(publishBody);
         _contentHeaderBody = new 
WeakReference<ContentHeaderBody>(contentHeaderBody);
     }
 
-    public void removeMessage() throws AMQException
+    public void removeMessage(long messageId) throws AMQException
+    {
+        _messageStore.removeMessage(messageId);
+    }
+
+    public void enqueue(long messageId, AMQQueue queue) throws AMQException
+    {
+        _messageStore.enqueueMessage(queue.getName(), messageId);
+    }
+
+    public void dequeue(long messageId, AMQQueue queue) throws AMQException
     {
-        _messageStore.removeMessage(_messageId);
+        _messageStore.dequeueMessage(queue.getName(), messageId);
     }
 }

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/DeliverMessageOperation.java
 Wed Nov 22 01:37:02 2006
@@ -28,12 +28,11 @@
     {
         _msg = msg;
         _queue = queue;
+        _msg.incrementReference();
     }
 
     public void prepare() throws AMQException
-    {
-        //do the persistent part of the record()
-        _msg.enqueue(_queue);
+    {        
     }
 
     public void undoPrepare()

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/LocalTransactionalContext.java
 Wed Nov 22 01:37:02 2006
@@ -23,6 +23,7 @@
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.store.MessageStore;
 
 import java.util.List;
 
@@ -41,10 +42,17 @@
 
     private List<RequiredDeliveryException> _returnMessages;
 
-    public LocalTransactionalContext(TxnBuffer txnBuffer, 
List<RequiredDeliveryException> returnMessages)
+    private final MessageStore _messageStore;
+
+    private boolean _inTran = false;
+
+    public LocalTransactionalContext(MessageStore messageStore,
+                                     TxnBuffer txnBuffer, 
List<RequiredDeliveryException> returnMessages)
     {
+        _messageStore = messageStore;
         _txnBuffer = txnBuffer;
         _returnMessages = returnMessages;
+        _txnBuffer.enlist(new StoreMessageOperation(messageStore));
     }
 
     public void rollback() throws AMQException
@@ -66,7 +74,7 @@
         // be added for every queue onto which the message is
         // enqueued. Finally a cleanup op will be added to decrement
         // the reference associated with the routing.
-        _txnBuffer.enlist(new StoreMessageOperation(message));
+
         _txnBuffer.enlist(new DeliverMessageOperation(message, queue));
         _txnBuffer.enlist(new CleanupMessageOperation(message, 
_returnMessages));
     }
@@ -110,12 +118,16 @@
 
     public void messageFullyReceived(boolean persistent) throws AMQException
     {
-        //To change body of implemented methods use File | Settings | File 
Templates.
+        // Not required in this transactional context
     }
 
     public void beginTranIfNecessary() throws AMQException
     {
-        //To change body of implemented methods use File | Settings | File 
Templates.
+        if (!_inTran)
+        {
+            _messageStore.beginTran();
+            _inTran = true;
+        }
     }
 
     public void commit() throws AMQException

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/NonTransactionalContext.java
 Wed Nov 22 01:37:02 2006
@@ -21,12 +21,12 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.ack.UnacknowledgedMessage;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.store.MessageStore;
 
 import java.util.LinkedList;
 import java.util.List;
@@ -60,7 +60,7 @@
     {
         _channel = channel;
         _returnMessages = returnMessages;
-        _messageStore = messageStore;
+        _messageStore = messageStore;        
     }
 
     public void beginTranIfNecessary() throws AMQException
@@ -86,6 +86,7 @@
     {
         try
         {
+            message.incrementReference();
             queue.process(message);
             //following check implements the functionality
             //required by the 'immediate' flag:
@@ -95,10 +96,6 @@
         {
             _returnMessages.add(e);
         }
-        finally
-        {
-            message.decrementReference();
-        }
     }
 
     public void acknowledgeMessage(final long deliveryTag, long 
lastDeliveryTag,
@@ -162,7 +159,7 @@
             }
         }
     }
-                                  
+
     public void messageFullyReceived(boolean persistent) throws AMQException
     {
         if (persistent)

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/StoreMessageOperation.java
 Wed Nov 22 01:37:02 2006
@@ -8,40 +8,38 @@
  
******************************************************************************/
 package org.apache.qpid.server.txn;
 
-import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.MessageStore;
 
 /**
- * @author Robert Greig ([EMAIL PROTECTED])
+ * A transactional operation to store messages in an underlying persistent 
store. When this operation
+ * commits it will do everything to ensure that all messages are safely 
committed to persistent
+ * storage.
  */
 public class StoreMessageOperation implements TxnOp
 {
-    //just use this to do a store of the message during the
-    //prepare phase. Any enqueueing etc is done by TxnOps enlisted
-    //by the queues themselves.
-    private final AMQMessage _msg;
+    private final MessageStore _messsageStore;
 
-    public StoreMessageOperation(AMQMessage msg)
+    public StoreMessageOperation(MessageStore messageStore)
     {
-        _msg = msg;
+        _messsageStore = messageStore;
     }
 
     public void prepare() throws AMQException
     {
-        _msg.storeMessage();
-        // the router's reference can now be released
-        _msg.decrementReference();
     }
 
     public void undoPrepare()
     {
     }
 
-    public void commit()
+    public void commit() throws AMQException
     {
+        _messsageStore.commitTran();
     }
 
-    public void rollback()
+    public void rollback() throws AMQException
     {
+        _messsageStore.abortTran();
     }
 }

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java
 Wed Nov 22 01:37:02 2006
@@ -50,11 +50,9 @@
         if (_containsPersistentChanges)
         {
             _log.debug("Begin Transaction.");
-            _store.beginTran();
             if (prepare())
             {
                 _log.debug("Transaction Succeeded");
-                _store.commitTran();
                 for (TxnOp op : _ops)
                 {
                     op.commit();
@@ -62,8 +60,7 @@
             }
             else
             {
-                _log.debug("Transaction Failed");
-                _store.abortTran();
+                _log.debug("Transaction Failed");                
             }
         }
         else

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnOp.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnOp.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnOp.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnOp.java
 Wed Nov 22 01:37:02 2006
@@ -26,14 +26,14 @@
 public interface TxnOp
 {
     /**
-     * Do the part of the operation that updates persistent state 
+     * Do the part of the operation that updates persistent state
      */
     public void prepare() throws AMQException;
     /**
      * Complete the operation started by prepare. Can now update in
      * memory state or make netork transfers.
      */
-    public void commit();
+    public void commit() throws AMQException;
     /**
      * This is not the same as rollback. Unfortunately the use of an
      * in memory reference count as a locking mechanism and a test for
@@ -47,5 +47,5 @@
     /**
      * Rolls back the operation.
      */
-    public void rollback();
+    public void rollback() throws AMQException;
 }

Modified: 
incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/requestreply1/ServiceRequestingClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/requestreply1/ServiceRequestingClient.java?view=diff&rev=478100&r1=478099&r2=478100
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/requestreply1/ServiceRequestingClient.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/requestreply1/ServiceRequestingClient.java
 Wed Nov 22 01:37:02 2006
@@ -172,7 +172,8 @@
         try
         {
             createConnection(brokerHosts, clientID, username, password, vpath);
-            _session = (Session) _connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            //_session = (Session) _connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            _session = (Session) _connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
 
 
             _connection.setExceptionListener(this);
@@ -192,6 +193,7 @@
             TextMessage first = _session.createTextMessage(MESSAGE_DATA);
             first.setJMSReplyTo(_tempDestination);
             _producer.send(first);
+            _session.commit(); // TODO REMOVE
             try
             {
                 Thread.sleep(1000);
@@ -231,6 +233,7 @@
             }
             _producer.send(msg);
         }
+        _session.commit(); // TODO REMOVE
         _log.info("Finished sending " + _messageCount + " messages");
     }
 


Reply via email to