Author: rgodfrey
Date: Thu Apr 5 09:37:40 2007
New Revision: 525862
URL: http://svn.apache.org/viewvc?view=rev&rev=525862
Log:
QPID-443 : Fix to transactionality of message publishing
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/Event.java
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?view=diff&rev=525862&r1=525861&r2=525862
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
Thu Apr 5 09:37:40 2007
@@ -89,6 +89,11 @@
public void rollback() throws AMQException
{
_txnBuffer.rollback(_storeContext);
+ // Hack to deal with uncommitted non-transactional writes
+ if(_messageStore.inTran(_storeContext))
+ {
+ _messageStore.abortTran(_storeContext);
+ }
_postCommitDeliveryList.clear();
}
@@ -103,6 +108,7 @@
// message.incrementReference();
_postCommitDeliveryList.add(new DeliveryDetails(message, queue,
deliverFirst));
_messageDelivered = true;
+ _txnBuffer.enlist(new CleanupMessageOperation(message,
_returnMessages));
/*_txnBuffer.enlist(new DeliverMessageOperation(message, queue));
if (_log.isDebugEnabled())
{
@@ -111,7 +117,7 @@
}
message.incrementReference();
_messageDelivered = true;
- _txnBuffer.enlist(new CleanupMessageOperation(message,
_returnMessages));
+
*/
}
Modified:
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/Event.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/Event.java?view=diff&rev=525862&r1=525861&r2=525862
==============================================================================
---
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/Event.java
(original)
+++
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/Event.java
Thu Apr 5 09:37:40 2007
@@ -85,4 +85,27 @@
}
+
+ public static final class CloseEvent extends Event
+ {
+ private final IoFilter.NextFilter _nextFilter;
+
+ public CloseEvent(final IoFilter.NextFilter nextFilter)
+ {
+ super();
+ _nextFilter = nextFilter;
+ }
+
+
+ public void process(IoSession session)
+ {
+ _nextFilter.sessionClosed(session);
+ }
+
+ public IoFilter.NextFilter getNextFilter()
+ {
+ return _nextFilter;
+ }
+ }
+
}
Modified:
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java?view=diff&rev=525862&r1=525861&r2=525862
==============================================================================
---
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
(original)
+++
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
Thu Apr 5 09:37:40 2007
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.pool;
+import org.apache.qpid.pool.Event.CloseEvent;
+
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -206,6 +208,10 @@
fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter,
message));
}
+ public void sessionClosed(final NextFilter nextFilter, final IoSession
session) throws Exception
+ {
+ fireAsynchEvent(session, new CloseEvent(nextFilter));
+ }
}
@@ -223,6 +229,12 @@
{
fireAsynchEvent(session, new Event.WriteEvent(nextFilter,
writeRequest));
}
+
+ public void sessionClosed(final NextFilter nextFilter, final IoSession
session) throws Exception
+ {
+ fireAsynchEvent(session, new CloseEvent(nextFilter));
+ }
+
}