Author: rgreig
Date: Thu Jan 11 09:35:49 2007
New Revision: 495304

URL: http://svn.apache.org/viewvc?view=rev&rev=495304
Log:
QPID-32: transaction fixes

Modified:
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
    
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=495304&r1=495303&r2=495304
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Thu Jan 11 09:35:49 2007
@@ -37,7 +37,6 @@
 import org.apache.qpid.server.txn.LocalTransactionalContext;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.txn.TxnBuffer;
 
 import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -126,7 +125,7 @@
      */
     public void setLocalTransactional()
     {
-        _txnContext = new LocalTransactionalContext(_messageStore, 
_storeContext, new TxnBuffer(), _returnMessages);
+        _txnContext = new LocalTransactionalContext(_messageStore, 
_storeContext, _returnMessages);
     }
 
     public boolean isTransactional()
@@ -190,6 +189,10 @@
         }
         else
         {
+            if (_log.isDebugEnabled())
+            {
+                _log.debug("Content header received on channel " + _channelId);
+            }
             _currentMessage.setContentHeaderBody(contentHeaderBody);
             routeCurrentMessage();
             _currentMessage.routingComplete(_messageStore, _storeContext, 
_messageHandleFactory);
@@ -212,6 +215,10 @@
 
         // returns true iff the message was delivered (i.e. if all data was
         // received
+        if (_log.isDebugEnabled())
+        {
+            _log.debug("Content body received on channel " + _channelId);
+        }
         try
         {
             if (_currentMessage.addContentBodyFrame(_storeContext, 
contentBody))
@@ -484,6 +491,10 @@
 
     public void commit() throws AMQException
     {
+        if (!isTransactional())
+        {
+            throw new AMQException("Fatal error: commit called on 
non-transactional channel");
+        }
         _txnContext.commit();
     }
 

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java?view=diff&rev=495304&r1=495303&r2=495304
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
 Thu Jan 11 09:35:49 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,9 +29,12 @@
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.log4j.Logger;
 
 public class BasicAckMethodHandler implements 
StateAwareMethodListener<BasicAckBody>
-{     
+{
+    private static final Logger _log = 
Logger.getLogger(BasicAckMethodHandler.class);
+
     private static final BasicAckMethodHandler _instance = new 
BasicAckMethodHandler();
 
     public static BasicAckMethodHandler getInstance()
@@ -47,6 +50,10 @@
                                ExchangeRegistry exchangeRegistry, 
AMQProtocolSession protocolSession,
                                AMQMethodEvent<BasicAckBody> evt) throws 
AMQException
     {
+        if (_log.isDebugEnabled())
+        {
+            _log.debug("Ack received on channel " + evt.getChannelId());
+        }
         BasicAckBody body = evt.getMethod();
         final AMQChannel channel = 
protocolSession.getChannel(evt.getChannelId());
         // this method throws an AMQException if the delivery tag is not known

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java?view=diff&rev=495304&r1=495303&r2=495304
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
 Thu Jan 11 09:35:49 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,11 +34,14 @@
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
 
 public class BasicPublishMethodHandler  implements 
StateAwareMethodListener<BasicPublishBody>
 {
+    private static final Logger _log = 
Logger.getLogger(BasicPublishMethodHandler.class);
+
     private static final BasicPublishMethodHandler _instance = new 
BasicPublishMethodHandler();
-    
+
     private static final AMQShortString UNKNOWN_EXCHANGE_NAME = new 
AMQShortString("Unknown exchange name");
 
     public static BasicPublishMethodHandler getInstance()
@@ -55,6 +58,11 @@
                                AMQMethodEvent<BasicPublishBody> evt) throws 
AMQException
     {
         final BasicPublishBody body = evt.getMethod();
+
+        if (_log.isDebugEnabled())
+        {
+            _log.debug("Publish received on channel " + evt.getChannelId());
+        }
 
         // TODO: check the delivery tag field details - is it unique across 
the broker or per subscriber?
         if (body.exchange == null)

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?view=diff&rev=495304&r1=495303&r2=495304
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
 Thu Jan 11 09:35:49 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,9 +30,12 @@
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
 
 public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
 {
+    private static final Logger _log = Logger.getLogger(TxCommitHandler.class);
+
     private static TxCommitHandler _instance = new TxCommitHandler();
 
     public static TxCommitHandler getInstance()
@@ -51,6 +54,10 @@
 
         try
         {
+            if (_log.isDebugEnabled())
+            {
+                _log.debug("Commit received on channel " + evt.getChannelId());
+            }
             AMQChannel channel = 
protocolSession.getChannel(evt.getChannelId());
             channel.commit();
             // AMQP version change: Hardwire the version to 0-8 (major=8, 
minor=0)

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=495304&r1=495303&r2=495304
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 Thu Jan 11 09:35:49 2007
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.*;
@@ -27,14 +28,13 @@
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.message.MessageDecorator;
-import org.apache.qpid.server.message.jms.JMSMessage;
-import org.apache.log4j.Logger;
 
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Combines the information that make up a deliverable message into a more 
manageable form.
@@ -166,7 +166,7 @@
         _messageId = messageId;
         _txnContext = txnContext;
         _transientMessageData.setPublishBody(publishBody);
-        
+
         _taken = new AtomicBoolean(false);
         if (_log.isDebugEnabled())
         {
@@ -468,7 +468,7 @@
         if (pb.immediate && !_deliveredToConsumer)
         {
             throw new NoConsumersException(this);
-        }
+        }        
     }
 
     public BasicPublishBody getPublishBody() throws AMQException
@@ -509,6 +509,10 @@
         // we get a reference to the destination queues now so that we can 
clear the
         // transient message data as quickly as possible
         List<AMQQueue> destinationQueues = 
_transientMessageData.getDestinationQueues();
+        if (_log.isDebugEnabled())
+        {
+            _log.debug("Delivering message " + _messageId);
+        }
         try
         {
             // first we allow the handle to know that the message has been 
fully received. This is useful if it is
@@ -555,7 +559,7 @@
             //
             // Optimise the case where we have a single content body. In that 
case we create a composite block
             // so that we can writeDeliver out the deliver, header and body 
with a single network writeDeliver.
-            //            
+            //
             ContentBody cb = _messageHandle.getContentBody(_messageId, 0);
 
             AMQDataBlock firstContentBody = 
ContentBody.createAMQFrame(channelId, cb);

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java?view=diff&rev=495304&r1=495303&r2=495304
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
 Thu Jan 11 09:35:49 2007
@@ -52,7 +52,7 @@
     {
     }
 
-    public void commit(StoreContext context)
+    public void commit(StoreContext context) throws AMQException
     {
         //do the memeory part of the record()
         _msg.incrementReference();

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?view=diff&rev=495304&r1=495303&r2=495304
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
 Thu Jan 11 09:35:49 2007
@@ -17,16 +17,18 @@
  */
 package org.apache.qpid.server.txn;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.ack.TxAck;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 
+import java.util.LinkedList;
 import java.util.List;
 
 /**
@@ -34,7 +36,11 @@
  */
 public class LocalTransactionalContext implements TransactionalContext
 {
-    private final TxnBuffer _txnBuffer;
+    private static final Logger _log = 
Logger.getLogger(LocalTransactionalContext.class);
+
+    private final TxnBuffer _txnBuffer = new TxnBuffer();
+
+    private final List<DeliveryDetails> _postCommitDeliveryList = new 
LinkedList<DeliveryDetails>();
 
     /**
      * We keep hold of the ack operation so that we can consolidate acks, i.e. 
multiple acks within a txn are
@@ -50,14 +56,34 @@
 
     private boolean _inTran = false;
 
+    private boolean _messageDelivered = false;
+
+    private static class DeliveryDetails
+    {
+        public AMQMessage message;
+        public AMQQueue queue;
+
+
+        public DeliveryDetails(AMQMessage message, AMQQueue queue)
+        {
+            this.message = message;
+            this.queue = queue;
+        }
+    }
+
     public LocalTransactionalContext(MessageStore messageStore, StoreContext 
storeContext,
-                                     TxnBuffer txnBuffer, 
List<RequiredDeliveryException> returnMessages)
+                                     List<RequiredDeliveryException> 
returnMessages)
     {
         _messageStore = messageStore;
         _storeContext = storeContext;
-        _txnBuffer = txnBuffer;
         _returnMessages = returnMessages;
-        _txnBuffer.enlist(new StoreMessageOperation(messageStore));
+        //_txnBuffer.enlist(new StoreMessageOperation(messageStore));
+    }
+
+
+    public StoreContext getStoreContext()
+    {
+        return _storeContext;
     }
 
     public void rollback() throws AMQException
@@ -73,9 +99,19 @@
         // be added for every queue onto which the message is
         // enqueued. Finally a cleanup op will be added to decrement
         // the reference associated with the routing.
-
-        _txnBuffer.enlist(new DeliverMessageOperation(message, queue));
+        message.incrementReference();
+        _postCommitDeliveryList.add(new DeliveryDetails(message, queue));
+        _messageDelivered = true;
+        /*_txnBuffer.enlist(new DeliverMessageOperation(message, queue));
+        if (_log.isDebugEnabled())
+        {
+            _log.debug("Incrementing ref count on message and enlisting 
cleanup operation - id " +
+                       message.getMessageId());
+        }
+        message.incrementReference();
+        _messageDelivered = true;
         _txnBuffer.enlist(new CleanupMessageOperation(message, 
_returnMessages));
+        */
     }
 
     private void checkAck(long deliveryTag, UnacknowledgedMessageMap 
unacknowledgedMessageMap) throws AMQException
@@ -129,6 +165,10 @@
     {
         if (!_inTran)
         {
+            if (_log.isDebugEnabled())
+            {
+                _log.debug("Starting transaction on message store");
+            }
             _messageStore.beginTran(_storeContext);
             _inTran = true;
         }
@@ -136,6 +176,10 @@
 
     public void commit() throws AMQException
     {
+        if (_log.isDebugEnabled())
+        {
+            _log.debug("Committing transactional context");
+        }
         if (_ackOp != null)
         {
             _ackOp.consolidate();
@@ -143,13 +187,48 @@
             _ackOp = null;
         }
 
+        if (_messageDelivered)
+        {
+            _txnBuffer.enlist(new StoreMessageOperation(_messageStore));
+        }
         try
         {
             _txnBuffer.commit(_storeContext);
         }
         finally
         {
-            _inTran = false;
+            _messageDelivered = false;
+            _inTran = _messageStore.inTran(_storeContext);
         }
+
+        try
+        {
+            postCommitDelivery();
+        }
+        catch (AMQException e)
+        {
+            // OK so what do we do now...?
+            _log.error("Failed to deliver messages following txn commit: " + 
e, e);
+        }
+    }
+
+    private void postCommitDelivery() throws AMQException
+    {
+        if (_log.isDebugEnabled())
+        {
+            _log.debug("Performing post commit delivery");
+        }
+        try
+        {
+            for (DeliveryDetails dd : _postCommitDeliveryList)
+            {
+                dd.queue.process(_storeContext, dd.message);
+            }
+        }
+        finally
+        {
+            _postCommitDeliveryList.clear();
+        }
+
     }
 }

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?view=diff&rev=495304&r1=495303&r2=495304
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
 Thu Jan 11 09:35:49 2007
@@ -72,6 +72,12 @@
         _browsedAcks = browsedAcks;
     }
 
+
+    public StoreContext getStoreContext()
+    {
+        return _storeContext;
+    }
+
     public void beginTranIfNecessary() throws AMQException
     {
         if (!_inTran)

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java?view=diff&rev=495304&r1=495303&r2=495304
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
 Thu Jan 11 09:35:49 2007
@@ -25,6 +25,7 @@
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.StoreContext;
 
 /**
  * @author Robert Greig ([EMAIL PROTECTED])
@@ -45,4 +46,6 @@
     void messageFullyReceived(boolean persistent) throws AMQException;
 
     void messageProcessed(AMQProtocolSession protocolSession) throws 
AMQException;
+
+    StoreContext getStoreContext();
 }

Modified: 
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java?view=diff&rev=495304&r1=495303&r2=495304
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
 Thu Jan 11 09:35:49 2007
@@ -59,15 +59,15 @@
     {
     }
 
-    public void removeQueue(String name) throws AMQException
+    public void removeQueue(AMQShortString name) throws AMQException
     {
     }
 
-    public void enqueueMessage(StoreContext s, String name, long messageId) 
throws AMQException
+    public void enqueueMessage(StoreContext s, AMQShortString name, long 
messageId) throws AMQException
     {
     }
 
-    public void dequeueMessage(StoreContext s, String name, long messageId) 
throws AMQException
+    public void dequeueMessage(StoreContext s, AMQShortString name, long 
messageId) throws AMQException
     {
     }
 


Reply via email to