Author: rgodfrey
Date: Thu Apr  5 09:37:40 2007
New Revision: 525862

URL: http://svn.apache.org/viewvc?view=rev&rev=525862
Log:
QPID-443 : Fix to transactionality of message publishing

Modified:
    
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
    
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/Event.java
    
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java

Modified: 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?view=diff&rev=525862&r1=525861&r2=525862
==============================================================================
--- 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
 (original)
+++ 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
 Thu Apr  5 09:37:40 2007
@@ -89,6 +89,11 @@
     public void rollback() throws AMQException
     {
         _txnBuffer.rollback(_storeContext);
+        // Hack to deal with uncommitted non-transactional writes
+        if(_messageStore.inTran(_storeContext))
+        {
+            _messageStore.abortTran(_storeContext);
+        }
         _postCommitDeliveryList.clear();
     }
 
@@ -103,6 +108,7 @@
 //        message.incrementReference();
         _postCommitDeliveryList.add(new DeliveryDetails(message, queue, 
deliverFirst));
         _messageDelivered = true;
+        _txnBuffer.enlist(new CleanupMessageOperation(message, 
_returnMessages));
         /*_txnBuffer.enlist(new DeliverMessageOperation(message, queue));
         if (_log.isDebugEnabled())
         {
@@ -111,7 +117,7 @@
         }
         message.incrementReference();
         _messageDelivered = true;
-        _txnBuffer.enlist(new CleanupMessageOperation(message, 
_returnMessages));
+
         */
     }
 

Modified: 
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/Event.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/Event.java?view=diff&rev=525862&r1=525861&r2=525862
==============================================================================
--- 
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/Event.java
 (original)
+++ 
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/Event.java
 Thu Apr  5 09:37:40 2007
@@ -85,4 +85,27 @@
     }
 
 
+
+    public static final class CloseEvent extends Event
+    {
+        private final IoFilter.NextFilter _nextFilter;
+
+        public CloseEvent(final IoFilter.NextFilter nextFilter)
+        {
+            super();
+            _nextFilter = nextFilter;
+        }
+
+
+        public void process(IoSession session)
+        {
+            _nextFilter.sessionClosed(session);
+        }
+
+        public IoFilter.NextFilter getNextFilter()
+        {
+            return _nextFilter;
+        }
+    }
+
 }

Modified: 
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java?view=diff&rev=525862&r1=525861&r2=525862
==============================================================================
--- 
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
 (original)
+++ 
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
 Thu Apr  5 09:37:40 2007
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.pool;
 
+import org.apache.qpid.pool.Event.CloseEvent;
+
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -206,6 +208,10 @@
             fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter, 
message));
         }
 
+        public void sessionClosed(final NextFilter nextFilter, final IoSession 
session) throws Exception
+        {
+            fireAsynchEvent(session, new CloseEvent(nextFilter));
+        }
 
     }
 
@@ -223,6 +229,12 @@
         {
             fireAsynchEvent(session, new Event.WriteEvent(nextFilter, 
writeRequest));
         }
+
+        public void sessionClosed(final NextFilter nextFilter, final IoSession 
session) throws Exception
+        {
+            fireAsynchEvent(session, new CloseEvent(nextFilter));
+        }
+        
 
     }
 


Reply via email to