Author: rupertlssmith
Date: Mon Feb 11 06:50:18 2008
New Revision: 620496
URL: http://svn.apache.org/viewvc?rev=620496&view=rev
Log:
QPID-729 : Added explicit list of unacked messages, acked on commit, rejected
on roll-back.
Modified:
incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/AmqChannel.cs
incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
Modified: incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/AmqChannel.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/AmqChannel.cs?rev=620496&r1=620495&r2=620496&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/AmqChannel.cs
(original)
+++ incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/AmqChannel.cs Mon
Feb 11 06:50:18 2008
@@ -457,7 +457,7 @@
foreach (BasicMessageConsumer consumer in _consumers.Values)
{
// Sends acknowledgement to server.
- consumer.AcknowledgeLastDelivered();
+ consumer.AcknowledgeDelivered();
}
// Commits outstanding messages sent and outstanding
acknowledgements.
@@ -485,13 +485,16 @@
{
Suspend(true);
}
-
- // todo: rollback dispatcher when TX support is added
- //if ( _dispatcher != null )
- // _dispatcher.Rollback();
- _connection.ConvenientProtocolWriter.SyncWrite(
-
TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody));
+ // Reject up to message last delivered (if any) for each
consumer.
+ // Need to send reject for messages delivered to consumers
so far.
+ foreach (BasicMessageConsumer consumer in
_consumers.Values)
+ {
+ // Sends acknowledgement to server.
+ consumer.RejectUnacked();
+ }
+
+
_connection.ConvenientProtocolWriter.SyncWrite(TxRollbackBody.CreateAMQFrame(_channelId),
typeof(TxRollbackOkBody));
if ( !suspended )
{
@@ -1012,6 +1015,15 @@
_connection.ProtocolWriter.Write(ackFrame);
}
+ public void RejectMessage(ulong deliveryTag, bool requeue)
+ {
+ if ((_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) ||
(_acknowledgeMode == AcknowledgeMode.SessionTransacted))
+ {
+ AMQFrame rejectFrame =
BasicRejectBody.CreateAMQFrame(_channelId, deliveryTag, requeue);
+ _connection.ProtocolWriter.Write(rejectFrame);
+ }
+ }
+
/// <summary>
/// Handle a message that bounced from the server, creating
/// the corresponding exception and notifying the connection about it
@@ -1104,8 +1116,8 @@
/// Placing stop check after consume may also be wrong as it may cause
a message to be thrown away. Seems more correct to use interupt on
/// the block thread to cause it to prematurely return from its wait,
whereupon it can be made to re-check the stop flag.</remarks>
///
- /// <remarks>Exception swalled, if there is an exception whilst
notifying the connection on bounced messages. Unhandled excetpion should
- /// fall through and termiante the loop, as it is a bug if it
occurrs.</remarks>
+ /// <remarks>Exception swallowed, if there is an exception whilst
notifying the connection on bounced messages. Unhandled excetpion should
+ /// fall through and terminate the loop, as it is a bug if it
occurrs.</remarks>
private class Dispatcher
{
/// <summary> Flag used to indicate when this dispatcher is to be
stopped (0=go, 1=stop). </summary>
Modified:
incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs?rev=620496&r1=620495&r2=620496&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
(original)
+++
incubator/qpid/branches/M2.1/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
Mon Feb 11 06:50:18 2008
@@ -20,6 +20,8 @@
*/
using System;
using System.Threading;
+using System.Collections;
+using System.Collections.Generic;
using log4net;
using Apache.Qpid.Client.Message;
using Apache.Qpid.Collections;
@@ -106,10 +108,15 @@
private AmqChannel _channel;
+ // <summary>
+ // Tag of last message delievered, whoch should be acknowledged on
commit in transaction mode.
+ // </summary>
+ //private long _lastDeliveryTag;
+
/// <summary>
- /// Tag of last message delievered, whoch should be acknowledged on
commit in transaction mode.
+ /// Explicit list of all received but un-acked messages in a
transaction. Used to ensure acking is completed when transaction is committed.
/// </summary>
- private long _lastDeliveryTag;
+ private LinkedList<long> _receivedDeliveryTags;
/// <summary>
/// Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
@@ -135,6 +142,11 @@
_prefetchHigh = prefetchHigh;
_prefetchLow = prefetchLow;
_exclusive = exclusive;
+
+ if (_acknowledgeMode == AcknowledgeMode.SessionTransacted)
+ {
+ _receivedDeliveryTags = new LinkedList<long>();
+ }
}
#region IMessageConsumer Members
@@ -391,13 +403,24 @@
/// <summary>
/// Acknowledge up to last message delivered (if any). Used when
commiting.
/// </summary>
- internal void AcknowledgeLastDelivered()
+ internal void AcknowledgeDelivered()
{
- if (_lastDeliveryTag > 0)
+ foreach (long tag in _receivedDeliveryTags)
{
- _channel.AcknowledgeMessage((ulong)_lastDeliveryTag, true); //
XXX evil cast
- _lastDeliveryTag = -1;
+ _channel.AcknowledgeMessage((ulong)tag, false);
}
+
+ _receivedDeliveryTags.Clear();
+ }
+
+ internal void RejectUnacked()
+ {
+ foreach (long tag in _receivedDeliveryTags)
+ {
+ _channel.RejectMessage((ulong)tag, true);
+ }
+
+ _receivedDeliveryTags.Clear();
}
private void PreDeliver(AbstractQmsMessage msg)
@@ -442,7 +465,7 @@
break;
case AcknowledgeMode.SessionTransacted:
- _lastDeliveryTag = msg.DeliveryTag;
+ _receivedDeliveryTags.AddLast(msg.DeliveryTag);
break;
}
}