Author: steshaw
Date: Tue Nov 28 15:37:52 2006
New Revision: 480283
URL: http://svn.apache.org/viewvc?view=rev&rev=480283
Log:
QPID-135 Ported enough transaction support to run FailoverTxTest. Still has
same problem as the Java client in that on fail-over the "transaction"
continues but the earlier part of the transaction is forgotten.
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
incubator/qpid/trunk/qpid/dotnet/TODO.txt
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs?view=diff&rev=480283&r1=480282&r2=480283
==============================================================================
---
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
(original)
+++
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
Tue Nov 28 15:37:52 2006
@@ -59,32 +59,34 @@
_log.Info("connectionInfo = " + connectionInfo);
_log.Info("connection.asUrl = " + _connection.toURL());
- IChannel channel = _connection.CreateChannel(false,
AcknowledgeMode.NoAcknowledge);
+ IChannel receivingChannel = _connection.CreateChannel(false,
AcknowledgeMode.NoAcknowledge);
- string queueName = channel.GenerateUniqueName();
+ string queueName = receivingChannel.GenerateUniqueName();
// Queue.Declare
- channel.DeclareQueue(queueName, false, true, true);
+ receivingChannel.DeclareQueue(queueName, false, true, true);
// No need to call Queue.Bind as automatically bound to default
direct exchange.
- channel.Bind(queueName, "amq.direct", queueName);
+ receivingChannel.Bind(queueName, "amq.direct", queueName);
- channel.CreateConsumerBuilder(queueName).Create().OnMessage = new
MessageReceivedDelegate(onMessage);
+
receivingChannel.CreateConsumerBuilder(queueName).Create().OnMessage = new
MessageReceivedDelegate(onMessage);
_connection.Start();
- sendInTx(queueName);
+ publishInTx(queueName);
+
+ Thread.Sleep(2000); // Wait a while for last messages.
_connection.Close();
_log.Info("FailoverTxText complete");
}
- private void sendInTx(string routingKey)
+ private void publishInTx(string routingKey)
{
_log.Info("sendInTx");
- bool transacted = false;
- IChannel channel = _connection.CreateChannel(transacted,
AcknowledgeMode.NoAcknowledge);
- IMessagePublisher publisher = channel.CreatePublisherBuilder()
+ bool transacted = true;
+ IChannel publishingChannel = _connection.CreateChannel(transacted,
AcknowledgeMode.NoAcknowledge);
+ IMessagePublisher publisher =
publishingChannel.CreatePublisherBuilder()
.withRoutingKey(routingKey)
.Create();
@@ -92,12 +94,12 @@
{
for (int j = 1; j <= NUM_MESSAGES; ++j)
{
- ITextMessage msg = channel.CreateTextMessage("Tx=" + i + "
msg=" + j);
+ ITextMessage msg =
publishingChannel.CreateTextMessage("Tx=" + i + " msg=" + j);
_log.Info("sending message = " + msg.Text);
publisher.Send(msg);
Thread.Sleep(SLEEP_MILLIS);
}
- if (transacted) channel.Commit();
+ if (transacted) publishingChannel.Commit();
}
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs?view=diff&rev=480283&r1=480282&r2=480283
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs Tue Nov
28 15:37:52 2006
@@ -276,20 +276,23 @@
CheckNotClosed();
CheckTransacted(); // throws IllegalOperationException if not a
transacted session
- /*Channel.Commit frame = new Channel.Commit();
- frame.channelId = _channelId;
- frame.confirmTag = 1;*/
+ try
+ {
+ // Acknowledge up to message last delivered (if any) for each
consumer.
+ // Need to send ack for messages delivered to consumers so far.
+ foreach (BasicMessageConsumer consumer in _consumers.Values)
+ {
+ // Sends acknowledgement to server.
+ consumer.AcknowledgeLastDelivered();
+ }
- // try
- // {
- //
_connection.getProtocolHandler().writeCommandFrameAndWaitForReply(frame, new
ChannelReplyListener(_channelId));
- // }
- // catch (AMQException e)
- // {
- // throw new JMSException("Error creating session: " +
e);
- // }
- throw new NotImplementedException();
- //_logger.Info("Transaction commited on channel " + _channelId);
+ // Commits outstanding messages sent and outstanding
acknowledgements.
+
_connection.ConvenientProtocolWriter.SyncWrite(TxCommitBody.CreateAMQFrame(_channelId),
typeof(TxCommitOkBody));
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Failed to commit", e);
+ }
}
public void Rollback()
@@ -977,6 +980,27 @@
throw new NotImplementedException("Don't use nowait=false with
DeclareExchange");
//
_connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof
(ExchangeDeclareOkBody));
}
+ }
+
+ /**
+ * Acknowledge a message or several messages. This method can be
called via AbstractJMSMessage or from
+ * a BasicConsumer. The former where the mode is CLIENT_ACK and the
latter where the mode is
+ * AUTO_ACK or similar.
+ *
+ * @param deliveryTag the tag of the last message to be acknowledged
+ * @param multiple if true will acknowledge all messages up to and
including the one specified by the
+ * delivery tag
+ */
+ public void AcknowledgeMessage(long deliveryTag, bool multiple)
+ {
+ // XXX: cast to ulong evil?
+ AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId,
(ulong)deliveryTag, multiple);
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("Sending ack for delivery tag " + deliveryTag +
" on channel " + _channelId);
+ }
+ // FIXME: lock FailoverMutex here?
+ _connection.ProtocolWriter.Write(ackFrame);
}
}
}
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs?view=diff&rev=480283&r1=480282&r2=480283
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
Tue Nov 28 15:37:52 2006
@@ -98,6 +98,11 @@
private AmqChannel _channel;
+ /// <summary>
+ /// Tag of last message delievered, whoch should be acknowledged on
commit in transaction mode.
+ /// </summary>
+ private long _lastDeliveryTag;
+
public BasicMessageConsumer(ushort channelId, string queueName, bool
noLocal,
MessageFactoryRegistry messageFactory,
AmqChannel channel)
{
@@ -167,7 +172,13 @@
{
o = _synchronousQueue.DequeueBlocking();
}
- return ReturnMessageOrThrow(o);
+
+ IMessage m = ReturnMessageOrThrow(o);
+ if (m != null)
+ {
+ PostDeliver(m);
+ }
+ return m;
}
finally
{
@@ -222,7 +233,7 @@
/// <returns> a message only if o is a Message</returns>
/// <exception>JMSException if the argument is a throwable. If it is a
QpidMessagingException it is rethrown as is, but if not
/// a QpidMessagingException is created with the linked exception set
appropriately</exception>
- private IMessage ReturnMessageOrThrow(object o)
+ private IMessage ReturnMessageOrThrow(object o)
{
// errors are passed via the queue too since there is no way of
interrupting the poll() via the API.
if (o is Exception)
@@ -397,5 +408,49 @@
{
get { return _queueName; }
}
+
+ /// <summary>
+ /// Acknowledge up to last message delivered (if any). Used when
commiting.
+ /// </summary>
+ internal void AcknowledgeLastDelivered()
+ {
+ if (_lastDeliveryTag > 0)
+ {
+ _channel.AcknowledgeMessage(_lastDeliveryTag, true);
+ _lastDeliveryTag = -1;
+ }
+ }
+
+ private void PostDeliver(IMessage m)
+ {
+ AbstractQmsMessage msg = (AbstractQmsMessage) m;
+ switch (_acknowledgeMode)
+ {
+/* TODO
+ case AcknowledgeMode.DupsOkAcknowledge:
+ if (++_outstanding >= _prefetchHigh)
+ {
+ _dups_ok_acknowledge_send = true;
+ }
+ if (_outstanding <= _prefetchLow)
+ {
+ _dups_ok_acknowledge_send = false;
+ }
+
+ if (_dups_ok_acknowledge_send)
+ {
+ _channel.AcknowledgeMessage(msg.getDeliveryTag(),
true);
+ }
+ break;
+ */
+ case AcknowledgeMode.AutoAcknowledge:
+ _channel.AcknowledgeMessage(msg.DeliveryTag, false);
+ break;
+ case AcknowledgeMode.SessionTransacted:
+ _lastDeliveryTag = msg.DeliveryTag;
+ break;
+ }
+ }
+
}
}
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs?view=diff&rev=480283&r1=480282&r2=480283
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
Tue Nov 28 15:37:52 2006
@@ -31,22 +31,27 @@
/// </summary>
protected AmqChannel _channel;
- public AMQMessage(IContentHeaderProperties properties)
+ private long _deliveryTag;
+
+ public AMQMessage(IContentHeaderProperties properties, long
deliveryTag)
{
_contentHeaderProperties = properties;
+ _deliveryTag = deliveryTag;
}
- public AmqChannel Channel
+ public AMQMessage(IContentHeaderProperties properties) :
this(properties, -1)
+ {
+ }
+
+ public long DeliveryTag
{
- get
- {
- return _channel;
- }
+ get { return _deliveryTag; }
+ }
- set
- {
- _channel = value;
- }
+ public AmqChannel Channel
+ {
+ get { return _channel; }
+ set { _channel = value; }
}
}
}
Modified: incubator/qpid/trunk/qpid/dotnet/TODO.txt
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/TODO.txt?view=diff&rev=480283&r1=480282&r2=480283
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/TODO.txt (original)
+++ incubator/qpid/trunk/qpid/dotnet/TODO.txt Tue Nov 28 15:37:52 2006
@@ -1,9 +1,4 @@
-https://issues.apache.org/jira/browse/QPID-134
-* Failover.
- * Review new API methods for fail over requirements.
- i.e. lock on mutex for non-blocking methods, FailoverSupport (for blocking
methods)
-
https://issues.apache.org/jira/browse/QPID-135
* transactions Tx.Select and Tx.Commit
* Do the TxSelect message after opening a transactional channel