Author: rgodfrey
Date: Wed Apr 16 02:40:42 2008
New Revision: 648648
URL: http://svn.apache.org/viewvc?rev=648648&view=rev
Log:
QPID-926 : Perform all store operations associated with an acknowledge in a
single store transaction
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=648648&r1=648647&r2=648648&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
(original)
+++
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
Wed Apr 16 02:40:42 2008
@@ -49,8 +49,6 @@
/** Where to put undeliverable messages */
private final List<RequiredDeliveryException> _returnMessages;
- private final Set<Long> _browsedAcks;
-
private final MessageStore _messageStore;
private final StoreContext _storeContext;
@@ -61,11 +59,17 @@
public NonTransactionalContext(MessageStore messageStore, StoreContext
storeContext, AMQChannel channel,
List<RequiredDeliveryException>
returnMessages, Set<Long> browsedAcks)
{
+ this(messageStore,storeContext,channel,returnMessages);
+ }
+
+ public NonTransactionalContext(MessageStore messageStore, StoreContext
storeContext, AMQChannel channel,
+ List<RequiredDeliveryException>
returnMessages)
+ {
_channel = channel;
_storeContext = storeContext;
_returnMessages = returnMessages;
_messageStore = messageStore;
- _browsedAcks = browsedAcks;
+
}
@@ -112,6 +116,9 @@
boolean multiple, final
UnacknowledgedMessageMap unacknowledgedMessageMap)
throws AMQException
{
+
+ final boolean debug = _log.isDebugEnabled();
+
if (multiple)
{
if (deliveryTag == 0)
@@ -125,20 +132,17 @@
{
public boolean callback(UnacknowledgedMessage message)
throws AMQException
{
- if (!_browsedAcks.contains(deliveryTag))
+ if (debug)
{
- if (_log.isDebugEnabled())
- {
- _log.debug("Discarding message: " +
message.getMessage().getMessageId());
- }
-
- //Message has been ack so discard it. This will
dequeue and decrement the reference.
- message.discard(_storeContext);
+ _log.debug("Discarding message: " +
message.getMessage().getMessageId());
}
- else
+ if(message.getMessage().isPersistent())
{
- _browsedAcks.remove(deliveryTag);
+ beginTranIfNecessary();
}
+ //Message has been ack so discard it. This will
dequeue and decrement the reference.
+ message.discard(_storeContext);
+
return false;
}
@@ -159,20 +163,17 @@
unacknowledgedMessageMap.drainTo(acked, deliveryTag);
for (UnacknowledgedMessage msg : acked)
{
- if (!_browsedAcks.contains(deliveryTag))
+ if (debug)
{
- if (_log.isDebugEnabled())
- {
- _log.debug("Discarding message: " +
msg.getMessage().getMessageId());
- }
-
- //Message has been ack so discard it. This will
dequeue and decrement the reference.
- msg.discard(_storeContext);
+ _log.debug("Discarding message: " +
msg.getMessage().getMessageId());
}
- else
+ if(msg.getMessage().isPersistent())
{
- _browsedAcks.remove(deliveryTag);
+ beginTranIfNecessary();
}
+
+ //Message has been ack so discard it. This will dequeue
and decrement the reference.
+ msg.discard(_storeContext);
}
}
}
@@ -189,27 +190,31 @@
_channel.getChannelId());
}
- if (!_browsedAcks.contains(deliveryTag))
+ if (debug)
{
- if (_log.isDebugEnabled())
- {
- _log.debug("Discarding message: " +
msg.getMessage().getMessageId());
- }
-
- //Message has been ack so discard it. This will dequeue and
decrement the reference.
- msg.discard(_storeContext);
+ _log.debug("Discarding message: " +
msg.getMessage().getMessageId());
}
- else
+ if(msg.getMessage().isPersistent())
{
- _browsedAcks.remove(deliveryTag);
+ beginTranIfNecessary();
}
- if (_log.isDebugEnabled())
+ //Message has been ack so discard it. This will dequeue and
decrement the reference.
+ msg.discard(_storeContext);
+
+ if (debug)
{
_log.debug("Received non-multiple ack for messaging with
delivery tag " + deliveryTag + " msg id " +
msg.getMessage().getMessageId());
}
}
+
+ if(_inTran)
+ {
+ _messageStore.commitTran(_storeContext);
+ _inTran = false;
+ }
+
}
public void messageFullyReceived(boolean persistent) throws AMQException