Author: aidan
Date: Mon May  5 05:29:15 2008
New Revision: 653451

URL: http://svn.apache.org/viewvc?rev=653451&view=rev
Log:
QPID-1022 Use synchronous writes to fix race conditions

Modified:
    incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs

Modified: incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs?rev=653451&r1=653450&r2=653451&view=diff
==============================================================================
--- incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs 
(original)
+++ incubator/qpid/branches/M2.x/dotnet/Qpid.Client/Client/AmqChannel.cs Mon 
May  5 05:29:15 2008
@@ -888,10 +888,14 @@
         /// <param name="consumer"></param>
         private void RegisterConsumer(BasicMessageConsumer consumer)
         {
+            // Need to generate a consumer tag on the client so we can exploit 
the nowait flag.
+            String tag = string.Format("{0}-{1}", _sessionNumber, 
_nextConsumerNumber++);
+            consumer.ConsumerTag = tag;
+            _consumers.Add(tag, consumer);
+            
             String consumerTag = ConsumeFromQueue(consumer.QueueName, 
consumer.NoLocal,
-                                                  consumer.Exclusive, 
consumer.AcknowledgeMode);
-            consumer.ConsumerTag = consumerTag;
-            _consumers.Add(consumerTag, consumer);
+                                                  consumer.Exclusive, 
consumer.AcknowledgeMode, tag);
+            
         }
 
         internal void DoBind(string queueName, string exchangeName, string 
routingKey, FieldTable args)
@@ -902,19 +906,17 @@
 
             AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
                                                               queueName, 
exchangeName,
-                                                              routingKey, 
true, args);
+                                                              routingKey, 
false, args);
             _replayFrames.Add(queueBind);
 
             lock (_connection.FailoverMutex)
             {
-                _connection.ProtocolWriter.Write(queueBind);
+                _connection.ConvenientProtocolWriter.SyncWrite(queueBind, 
typeof(QueueBindOkBody));
             }
         }
 
-        private String ConsumeFromQueue(String queueName, bool noLocal, bool 
exclusive, AcknowledgeMode acknowledgeMode)
+        private String ConsumeFromQueue(String queueName, bool noLocal, bool 
exclusive, AcknowledgeMode acknowledgeMode, String tag)
         {
-            // Need to generate a consumer tag on the client so we can exploit 
the nowait flag.
-            String tag = string.Format("{0}-{1}", _sessionNumber, 
_nextConsumerNumber++);
             
             AMQFrame basicConsume = 
BasicConsumeBody.CreateAMQFrame(_channelId, 0,
                                                                     queueName, 
tag, noLocal,
@@ -958,13 +960,13 @@
                                         queueName, isDurable, isExclusive, 
isAutoDelete));
 
             AMQFrame queueDeclare = 
QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, 
isExclusive,
-                                                                    
isAutoDelete, true, null);
+                                                                    
isAutoDelete, false, null);
 
             _replayFrames.Add(queueDeclare);
 
             lock (_connection.FailoverMutex)
             {
-                _connection.ProtocolWriter.Write(queueDeclare);
+                _connection.ConvenientProtocolWriter.SyncWrite(queueDeclare, 
typeof(QueueDeclareOkBody));
             }
         }
 


Reply via email to