Author: rgreig
Date: Thu Jan 11 09:35:49 2007
New Revision: 495304
URL: http://svn.apache.org/viewvc?view=rev&rev=495304
Log:
QPID-32: transaction fixes
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=495304&r1=495303&r2=495304
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Thu Jan 11 09:35:49 2007
@@ -37,7 +37,6 @@
import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.txn.TxnBuffer;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -126,7 +125,7 @@
*/
public void setLocalTransactional()
{
- _txnContext = new LocalTransactionalContext(_messageStore,
_storeContext, new TxnBuffer(), _returnMessages);
+ _txnContext = new LocalTransactionalContext(_messageStore,
_storeContext, _returnMessages);
}
public boolean isTransactional()
@@ -190,6 +189,10 @@
}
else
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Content header received on channel " + _channelId);
+ }
_currentMessage.setContentHeaderBody(contentHeaderBody);
routeCurrentMessage();
_currentMessage.routingComplete(_messageStore, _storeContext,
_messageHandleFactory);
@@ -212,6 +215,10 @@
// returns true iff the message was delivered (i.e. if all data was
// received
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Content body received on channel " + _channelId);
+ }
try
{
if (_currentMessage.addContentBodyFrame(_storeContext,
contentBody))
@@ -484,6 +491,10 @@
public void commit() throws AMQException
{
+ if (!isTransactional())
+ {
+ throw new AMQException("Fatal error: commit called on
non-transactional channel");
+ }
_txnContext.commit();
}
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java?view=diff&rev=495304&r1=495303&r2=495304
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
Thu Jan 11 09:35:49 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,9 +29,12 @@
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.AMQChannel;
+import org.apache.log4j.Logger;
public class BasicAckMethodHandler implements
StateAwareMethodListener<BasicAckBody>
-{
+{
+ private static final Logger _log =
Logger.getLogger(BasicAckMethodHandler.class);
+
private static final BasicAckMethodHandler _instance = new
BasicAckMethodHandler();
public static BasicAckMethodHandler getInstance()
@@ -47,6 +50,10 @@
ExchangeRegistry exchangeRegistry,
AMQProtocolSession protocolSession,
AMQMethodEvent<BasicAckBody> evt) throws
AMQException
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Ack received on channel " + evt.getChannelId());
+ }
BasicAckBody body = evt.getMethod();
final AMQChannel channel =
protocolSession.getChannel(evt.getChannelId());
// this method throws an AMQException if the delivery tag is not known
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java?view=diff&rev=495304&r1=495303&r2=495304
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
Thu Jan 11 09:35:49 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,11 +34,14 @@
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
public class BasicPublishMethodHandler implements
StateAwareMethodListener<BasicPublishBody>
{
+ private static final Logger _log =
Logger.getLogger(BasicPublishMethodHandler.class);
+
private static final BasicPublishMethodHandler _instance = new
BasicPublishMethodHandler();
-
+
private static final AMQShortString UNKNOWN_EXCHANGE_NAME = new
AMQShortString("Unknown exchange name");
public static BasicPublishMethodHandler getInstance()
@@ -55,6 +58,11 @@
AMQMethodEvent<BasicPublishBody> evt) throws
AMQException
{
final BasicPublishBody body = evt.getMethod();
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Publish received on channel " + evt.getChannelId());
+ }
// TODO: check the delivery tag field details - is it unique across
the broker or per subscriber?
if (body.exchange == null)
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?view=diff&rev=495304&r1=495303&r2=495304
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
Thu Jan 11 09:35:49 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,9 +30,12 @@
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
{
+ private static final Logger _log = Logger.getLogger(TxCommitHandler.class);
+
private static TxCommitHandler _instance = new TxCommitHandler();
public static TxCommitHandler getInstance()
@@ -51,6 +54,10 @@
try
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Commit received on channel " + evt.getChannelId());
+ }
AMQChannel channel =
protocolSession.getChannel(evt.getChannelId());
channel.commit();
// AMQP version change: Hardwire the version to 0-8 (major=8,
minor=0)
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=495304&r1=495303&r2=495304
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Thu Jan 11 09:35:49 2007
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
@@ -27,14 +28,13 @@
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.message.MessageDecorator;
-import org.apache.qpid.server.message.jms.JMSMessage;
-import org.apache.log4j.Logger;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Combines the information that make up a deliverable message into a more
manageable form.
@@ -166,7 +166,7 @@
_messageId = messageId;
_txnContext = txnContext;
_transientMessageData.setPublishBody(publishBody);
-
+
_taken = new AtomicBoolean(false);
if (_log.isDebugEnabled())
{
@@ -468,7 +468,7 @@
if (pb.immediate && !_deliveredToConsumer)
{
throw new NoConsumersException(this);
- }
+ }
}
public BasicPublishBody getPublishBody() throws AMQException
@@ -509,6 +509,10 @@
// we get a reference to the destination queues now so that we can
clear the
// transient message data as quickly as possible
List<AMQQueue> destinationQueues =
_transientMessageData.getDestinationQueues();
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Delivering message " + _messageId);
+ }
try
{
// first we allow the handle to know that the message has been
fully received. This is useful if it is
@@ -555,7 +559,7 @@
//
// Optimise the case where we have a single content body. In that
case we create a composite block
// so that we can writeDeliver out the deliver, header and body
with a single network writeDeliver.
- //
+ //
ContentBody cb = _messageHandle.getContentBody(_messageId, 0);
AMQDataBlock firstContentBody =
ContentBody.createAMQFrame(channelId, cb);
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java?view=diff&rev=495304&r1=495303&r2=495304
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
Thu Jan 11 09:35:49 2007
@@ -52,7 +52,7 @@
{
}
- public void commit(StoreContext context)
+ public void commit(StoreContext context) throws AMQException
{
//do the memeory part of the record()
_msg.incrementReference();
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?view=diff&rev=495304&r1=495303&r2=495304
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
Thu Jan 11 09:35:49 2007
@@ -17,16 +17,18 @@
*/
package org.apache.qpid.server.txn;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
+import java.util.LinkedList;
import java.util.List;
/**
@@ -34,7 +36,11 @@
*/
public class LocalTransactionalContext implements TransactionalContext
{
- private final TxnBuffer _txnBuffer;
+ private static final Logger _log =
Logger.getLogger(LocalTransactionalContext.class);
+
+ private final TxnBuffer _txnBuffer = new TxnBuffer();
+
+ private final List<DeliveryDetails> _postCommitDeliveryList = new
LinkedList<DeliveryDetails>();
/**
* We keep hold of the ack operation so that we can consolidate acks, i.e.
multiple acks within a txn are
@@ -50,14 +56,34 @@
private boolean _inTran = false;
+ private boolean _messageDelivered = false;
+
+ private static class DeliveryDetails
+ {
+ public AMQMessage message;
+ public AMQQueue queue;
+
+
+ public DeliveryDetails(AMQMessage message, AMQQueue queue)
+ {
+ this.message = message;
+ this.queue = queue;
+ }
+ }
+
public LocalTransactionalContext(MessageStore messageStore, StoreContext
storeContext,
- TxnBuffer txnBuffer,
List<RequiredDeliveryException> returnMessages)
+ List<RequiredDeliveryException>
returnMessages)
{
_messageStore = messageStore;
_storeContext = storeContext;
- _txnBuffer = txnBuffer;
_returnMessages = returnMessages;
- _txnBuffer.enlist(new StoreMessageOperation(messageStore));
+ //_txnBuffer.enlist(new StoreMessageOperation(messageStore));
+ }
+
+
+ public StoreContext getStoreContext()
+ {
+ return _storeContext;
}
public void rollback() throws AMQException
@@ -73,9 +99,19 @@
// be added for every queue onto which the message is
// enqueued. Finally a cleanup op will be added to decrement
// the reference associated with the routing.
-
- _txnBuffer.enlist(new DeliverMessageOperation(message, queue));
+ message.incrementReference();
+ _postCommitDeliveryList.add(new DeliveryDetails(message, queue));
+ _messageDelivered = true;
+ /*_txnBuffer.enlist(new DeliverMessageOperation(message, queue));
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Incrementing ref count on message and enlisting
cleanup operation - id " +
+ message.getMessageId());
+ }
+ message.incrementReference();
+ _messageDelivered = true;
_txnBuffer.enlist(new CleanupMessageOperation(message,
_returnMessages));
+ */
}
private void checkAck(long deliveryTag, UnacknowledgedMessageMap
unacknowledgedMessageMap) throws AMQException
@@ -129,6 +165,10 @@
{
if (!_inTran)
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Starting transaction on message store");
+ }
_messageStore.beginTran(_storeContext);
_inTran = true;
}
@@ -136,6 +176,10 @@
public void commit() throws AMQException
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Committing transactional context");
+ }
if (_ackOp != null)
{
_ackOp.consolidate();
@@ -143,13 +187,48 @@
_ackOp = null;
}
+ if (_messageDelivered)
+ {
+ _txnBuffer.enlist(new StoreMessageOperation(_messageStore));
+ }
try
{
_txnBuffer.commit(_storeContext);
}
finally
{
- _inTran = false;
+ _messageDelivered = false;
+ _inTran = _messageStore.inTran(_storeContext);
}
+
+ try
+ {
+ postCommitDelivery();
+ }
+ catch (AMQException e)
+ {
+ // OK so what do we do now...?
+ _log.error("Failed to deliver messages following txn commit: " +
e, e);
+ }
+ }
+
+ private void postCommitDelivery() throws AMQException
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Performing post commit delivery");
+ }
+ try
+ {
+ for (DeliveryDetails dd : _postCommitDeliveryList)
+ {
+ dd.queue.process(_storeContext, dd.message);
+ }
+ }
+ finally
+ {
+ _postCommitDeliveryList.clear();
+ }
+
}
}
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?view=diff&rev=495304&r1=495303&r2=495304
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
Thu Jan 11 09:35:49 2007
@@ -72,6 +72,12 @@
_browsedAcks = browsedAcks;
}
+
+ public StoreContext getStoreContext()
+ {
+ return _storeContext;
+ }
+
public void beginTranIfNecessary() throws AMQException
{
if (!_inTran)
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java?view=diff&rev=495304&r1=495303&r2=495304
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
Thu Jan 11 09:35:49 2007
@@ -25,6 +25,7 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.StoreContext;
/**
* @author Robert Greig ([EMAIL PROTECTED])
@@ -45,4 +46,6 @@
void messageFullyReceived(boolean persistent) throws AMQException;
void messageProcessed(AMQProtocolSession protocolSession) throws
AMQException;
+
+ StoreContext getStoreContext();
}
Modified:
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java?view=diff&rev=495304&r1=495303&r2=495304
==============================================================================
---
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
(original)
+++
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
Thu Jan 11 09:35:49 2007
@@ -59,15 +59,15 @@
{
}
- public void removeQueue(String name) throws AMQException
+ public void removeQueue(AMQShortString name) throws AMQException
{
}
- public void enqueueMessage(StoreContext s, String name, long messageId)
throws AMQException
+ public void enqueueMessage(StoreContext s, AMQShortString name, long
messageId) throws AMQException
{
}
- public void dequeueMessage(StoreContext s, String name, long messageId)
throws AMQException
+ public void dequeueMessage(StoreContext s, AMQShortString name, long
messageId) throws AMQException
{
}