Author: steshaw
Date: Tue Nov 28 12:29:56 2006
New Revision: 480190
URL: http://svn.apache.org/viewvc?view=rev&rev=480190
Log:
Locked on FailoverMutex where necessary.
Noted that AMQConnection.CloseSession and BasicMessageConsumer.Close both lock
on FailoverMutex but do ProtocolWriter.SyncWrite which probably means that they
need to do the FailoverSupport thing instead. If it's a problem, it exists also
in the Java client.
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs?view=diff&rev=480190&r1=480189&r2=480190
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs Tue
Nov 28 12:29:56 2006
@@ -343,6 +343,7 @@
public void CloseSession(AmqChannel channel)
{
+ // FIXME: Don't we need FailoverSupport here (as we have
SyncWrite).
_protocolSession.CloseSession(channel);
AMQFrame frame = ChannelCloseBody.CreateAMQFrame(
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs?view=diff&rev=480190&r1=480189&r2=480190
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs Tue Nov
28 12:29:56 2006
@@ -708,7 +708,8 @@
}
/**
- * Resubscribes all producers and consumers. This is called when
performing failover.
+ * Replays frame on fail over.
+ *
* @throws AMQException
*/
internal void ReplayOnFailOver()
@@ -746,15 +747,19 @@
internal void DoBind(string queueName, string exchangeName, string
routingKey, FieldTable args)
{
+
_logger.Debug(string.Format("QueueBind queueName={0}
exchangeName={1} routingKey={2}, arg={3}",
- queueName, exchangeName, routingKey,
args));
+ queueName, exchangeName, routingKey,
args));
AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
queueName,
exchangeName,
routingKey,
true, args);
_replayFrames.Add(queueBind);
- _connection.ProtocolWriter.Write(queueBind);
+ lock (_connection.FailoverMutex)
+ {
+ _connection.ProtocolWriter.Write(queueBind);
+ }
}
private String ConsumeFromQueue(String queueName, int prefetch,
@@ -798,10 +803,7 @@
AbstractQmsMessage message, DeliveryMode
deliveryMode, int priority, uint timeToLive,
bool disableTimestamps)
{
- lock (Connection.FailoverMutex)
- {
- DoBasicPublish(exchangeName, routingKey, mandatory, immediate,
message, deliveryMode, timeToLive, priority, disableTimestamps);
- }
+ DoBasicPublish(exchangeName, routingKey, mandatory, immediate,
message, deliveryMode, timeToLive, priority, disableTimestamps);
}
private void DoBasicPublish(string exchangeName, string routingKey,
bool mandatory, bool immediate, AbstractQmsMessage message, DeliveryMode
deliveryMode, uint timeToLive, int priority, bool disableTimestamps)
@@ -854,7 +856,10 @@
frames[0] = publishFrame;
frames[1] = contentHeaderFrame;
CompositeAMQDataBlock compositeFrame = new
CompositeAMQDataBlock(frames);
- Connection.ConvenientProtocolWriter.WriteFrame(compositeFrame);
+
+ lock (_connection.FailoverMutex) {
+ _connection.ProtocolWriter.Write(compositeFrame);
+ }
}
/// <summary>
@@ -934,7 +939,10 @@
_replayFrames.Add(queueDeclare);
- _connection.ProtocolWriter.Write(queueDeclare);
+ lock (_connection.FailoverMutex)
+ {
+ _connection.ProtocolWriter.Write(queueDeclare);
+ }
}
public void DeclareExchange(String exchangeName, String exchangeClass)
@@ -959,11 +967,15 @@
if (noWait)
{
- _connection.ProtocolWriter.Write(declareExchange);
+ lock (_connection.FailoverMutex)
+ {
+ _connection.ProtocolWriter.Write(declareExchange);
+ }
}
else
{
-
_connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof
(ExchangeDeclareOkBody));
+ throw new NotImplementedException("Don't use nowait=false with
DeclareExchange");
+//
_connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof
(ExchangeDeclareOkBody));
}
}
}
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs?view=diff&rev=480190&r1=480189&r2=480190
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
Tue Nov 28 12:29:56 2006
@@ -255,6 +255,7 @@
public override void Close()
{
+ // FIXME: Don't we need FailoverSupport here (as we have
SyncWrite). i.e. rather than just locking FailOverMutex
lock (_channel.Connection.FailoverMutex)
{
lock (_closingLock)