Author: aidan
Date: Mon May 5 05:29:15 2008
New Revision: 653451
URL: http://svn.apache.org/viewvc?rev=653451&view=rev
Log:
QPID-1022 Use synchronous writes to fix race conditions
Modified:
incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs
Modified: incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs?rev=653451&r1=653450&r2=653451&view=diff
==============================================================================
--- incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs
(original)
+++ incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs Mon
May 5 05:29:15 2008
@@ -888,10 +888,14 @@
/// <param name="consumer"></param>
private void RegisterConsumer(BasicMessageConsumer consumer)
{
+ // Need to generate a consumer tag on the client so we can exploit
the nowait flag.
+ String tag = string.Format("{0}-{1}", _sessionNumber,
_nextConsumerNumber++);
+ consumer.ConsumerTag = tag;
+ _consumers.Add(tag, consumer);
+
String consumerTag = ConsumeFromQueue(consumer.QueueName,
consumer.NoLocal,
- consumer.Exclusive,
consumer.AcknowledgeMode);
- consumer.ConsumerTag = consumerTag;
- _consumers.Add(consumerTag, consumer);
+ consumer.Exclusive,
consumer.AcknowledgeMode, tag);
+
}
internal void DoBind(string queueName, string exchangeName, string
routingKey, FieldTable args)
@@ -902,19 +906,17 @@
AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
queueName,
exchangeName,
- routingKey,
true, args);
+ routingKey,
false, args);
_replayFrames.Add(queueBind);
lock (_connection.FailoverMutex)
{
- _connection.ProtocolWriter.Write(queueBind);
+ _connection.ConvenientProtocolWriter.SyncWrite(queueBind,
typeof(QueueBindOkBody));
}
}
- private String ConsumeFromQueue(String queueName, bool noLocal, bool
exclusive, AcknowledgeMode acknowledgeMode)
+ private String ConsumeFromQueue(String queueName, bool noLocal, bool
exclusive, AcknowledgeMode acknowledgeMode, String tag)
{
- // Need to generate a consumer tag on the client so we can exploit
the nowait flag.
- String tag = string.Format("{0}-{1}", _sessionNumber,
_nextConsumerNumber++);
AMQFrame basicConsume =
BasicConsumeBody.CreateAMQFrame(_channelId, 0,
queueName,
tag, noLocal,
@@ -958,13 +960,13 @@
queueName, isDurable, isExclusive,
isAutoDelete));
AMQFrame queueDeclare =
QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable,
isExclusive,
-
isAutoDelete, true, null);
+
isAutoDelete, false, null);
_replayFrames.Add(queueDeclare);
lock (_connection.FailoverMutex)
{
- _connection.ProtocolWriter.Write(queueDeclare);
+ _connection.ConvenientProtocolWriter.SyncWrite(queueDeclare,
typeof(QueueDeclareOkBody));
}
}