Author: steshaw
Date: Tue Nov 28 12:29:56 2006
New Revision: 480190

URL: http://svn.apache.org/viewvc?view=rev&rev=480190
Log:
Locked on FailoverMutex where necessary.
Noted that AMQConnection.CloseSession and BasicMessageConsumer.Close both lock 
on FailoverMutex but do ProtocolWriter.SyncWrite which probably means that they 
need to do the FailoverSupport thing instead. If it's a problem, it exists also 
in the Java client.

Modified:
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs?view=diff&rev=480190&r1=480189&r2=480190
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs 
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs Tue 
Nov 28 12:29:56 2006
@@ -343,6 +343,7 @@
 
         public void CloseSession(AmqChannel channel)
         {
+            // FIXME: Don't we need FailoverSupport here (as we have 
SyncWrite).
             _protocolSession.CloseSession(channel);
             
             AMQFrame frame = ChannelCloseBody.CreateAMQFrame(

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs?view=diff&rev=480190&r1=480189&r2=480190
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs Tue Nov 
28 12:29:56 2006
@@ -708,7 +708,8 @@
         }
 
         /**
-         * Resubscribes all producers and consumers. This is called when 
performing failover.
+         * Replays frame on fail over.
+         * 
          * @throws AMQException
          */
         internal void ReplayOnFailOver()
@@ -746,15 +747,19 @@
 
         internal void DoBind(string queueName, string exchangeName, string 
routingKey, FieldTable args)
         {
+
             _logger.Debug(string.Format("QueueBind queueName={0} 
exchangeName={1} routingKey={2}, arg={3}",
-                                        queueName, exchangeName, routingKey, 
args));
+                                    queueName, exchangeName, routingKey, 
args));
 
             AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
                                                               queueName, 
exchangeName,
                                                               routingKey, 
true, args);
             _replayFrames.Add(queueBind);
 
-            _connection.ProtocolWriter.Write(queueBind);
+            lock (_connection.FailoverMutex)
+            {
+                _connection.ProtocolWriter.Write(queueBind);
+            }
         }
 
         private String ConsumeFromQueue(String queueName, int prefetch,
@@ -798,10 +803,7 @@
                                    AbstractQmsMessage message, DeliveryMode 
deliveryMode, int priority, uint timeToLive,
                                    bool disableTimestamps)
         {
-            lock (Connection.FailoverMutex)
-            {
-                DoBasicPublish(exchangeName, routingKey, mandatory, immediate, 
message, deliveryMode, timeToLive, priority, disableTimestamps);
-            }
+            DoBasicPublish(exchangeName, routingKey, mandatory, immediate, 
message, deliveryMode, timeToLive, priority, disableTimestamps);
         }
 
         private void DoBasicPublish(string exchangeName, string routingKey, 
bool mandatory, bool immediate, AbstractQmsMessage message, DeliveryMode 
deliveryMode, uint timeToLive, int priority, bool disableTimestamps)
@@ -854,7 +856,10 @@
             frames[0] = publishFrame;
             frames[1] = contentHeaderFrame;
             CompositeAMQDataBlock compositeFrame = new 
CompositeAMQDataBlock(frames);
-            Connection.ConvenientProtocolWriter.WriteFrame(compositeFrame);
+
+            lock (_connection.FailoverMutex) {
+                _connection.ProtocolWriter.Write(compositeFrame);
+            }   
         }
 
         /// <summary>
@@ -934,7 +939,10 @@
 
             _replayFrames.Add(queueDeclare);
 
-            _connection.ProtocolWriter.Write(queueDeclare);
+            lock (_connection.FailoverMutex)
+            {
+                _connection.ProtocolWriter.Write(queueDeclare);
+            }
         }
 
         public void DeclareExchange(String exchangeName, String exchangeClass)
@@ -959,11 +967,15 @@
             
             if (noWait)
             {
-                _connection.ProtocolWriter.Write(declareExchange);
+                lock (_connection.FailoverMutex)
+                {
+                    _connection.ProtocolWriter.Write(declareExchange);
+                }
             }
             else
             {
-                
_connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof 
(ExchangeDeclareOkBody));
+                throw new NotImplementedException("Don't use nowait=false with 
DeclareExchange");
+//                
_connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof 
(ExchangeDeclareOkBody));
             }
         }
     }

Modified: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs?view=diff&rev=480190&r1=480189&r2=480190
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs 
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs 
Tue Nov 28 12:29:56 2006
@@ -255,6 +255,7 @@
 
         public override void Close()
         {
+            // FIXME: Don't we need FailoverSupport here (as we have 
SyncWrite). i.e. rather than just locking FailOverMutex
             lock (_channel.Connection.FailoverMutex)
             {
                 lock (_closingLock)


Reply via email to