Author: tomasr
Date: Thu May 10 15:25:01 2007
New Revision: 537019
URL: http://svn.apache.org/viewvc?view=rev&rev=537019
Log:
QPID-441 Fix handling of bounced messages
Modified:
incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs
incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs
incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj
incubator/qpid/branches/M2/dotnet/Qpid.Common/Protocol/AMQConstant.cs
incubator/qpid/branches/M2/dotnet/Qpid.Common/Qpid.Common.csproj
Modified:
incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs?view=diff&rev=537019&r1=537018&r2=537019
==============================================================================
---
incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs
(original)
+++
incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs
Thu May 10 15:25:01 2007
@@ -26,69 +26,103 @@
namespace Qpid.Client.Tests
{
- [TestFixture]
- public class UndeliverableTest : BaseMessagingTestFixture
- {
- private static ILog _logger =
LogManager.GetLogger(typeof(UndeliverableTest));
-
- [SetUp]
- public override void Init()
- {
- base.Init();
-
- try
- {
- _connection.ExceptionListener = new
ExceptionListenerDelegate(OnException);
- }
- catch (QpidException e)
- {
- _logger.Error("Could not add ExceptionListener", e);
- }
- }
-
- public static void OnException(Exception e)
- {
- // Here we dig out the AMQUndelivered exception (if present) in
order to log the returned message.
-
- _logger.Error("OnException handler received connection-level
exception", e);
- if (e is QpidException)
- {
- QpidException qe = (QpidException)e;
- if (qe.InnerException is AMQUndeliveredException)
- {
- AMQUndeliveredException ue =
(AMQUndeliveredException)qe.InnerException;
- _logger.Error("inner exception is
AMQUndeliveredException", ue);
- _logger.Error(string.Format("Returned message = {0}",
ue.GetUndeliveredMessage()));
-
- }
- }
- }
-
- [Test]
- public void SendUndeliverableMessage()
- {
- SendOne("default exchange", null);
- SendOne("direct exchange", ExchangeNameDefaults.DIRECT);
- SendOne("topic exchange", ExchangeNameDefaults.TOPIC);
- SendOne("headers exchange", ExchangeNameDefaults.HEADERS);
-
- Thread.Sleep(1000); // Wait for message returns!
- }
-
- private void SendOne(string exchangeNameFriendly, string exchangeName)
- {
- _logger.Info("Sending undeliverable message to " +
exchangeNameFriendly);
-
- // Send a test message to a non-existant queue on the default
exchange. See if message is returned!
- MessagePublisherBuilder builder = _channel.CreatePublisherBuilder()
- .WithRoutingKey("Non-existant route key!")
- .WithMandatory(true);
- if (exchangeName != null)
+ /// <summary>
+ /// Tests that when sending undeliverable messages with the
+ /// mandatory flag set, an exception is raised on the connection
+ /// as the message is bounced back by the broker
+ /// </summary>
+ [TestFixture]
+ public class UndeliverableTest : BaseMessagingTestFixture
+ {
+ private static ILog _logger =
LogManager.GetLogger(typeof(UndeliverableTest));
+ private ManualResetEvent _event;
+ public const int TIMEOUT = 1000;
+ private Exception _lastException;
+
+ [SetUp]
+ public override void Init()
+ {
+ base.Init();
+ _event = new ManualResetEvent(false);
+ _lastException = null;
+
+ try
+ {
+ _connection.ExceptionListener = new
ExceptionListenerDelegate(OnException);
+ } catch ( QpidException e )
+ {
+ _logger.Error("Could not add ExceptionListener", e);
+ }
+ }
+
+ public void OnException(Exception e)
+ {
+ // Here we dig out the AMQUndelivered exception (if present) in order
to log the returned message.
+
+ _lastException = e;
+ _logger.Error("OnException handler received connection-level
exception", e);
+ if ( e is QpidException )
+ {
+ QpidException qe = (QpidException)e;
+ if ( qe.InnerException is AMQUndeliveredException )
{
- builder.WithExchangeName(exchangeName);
+ AMQUndeliveredException ue =
(AMQUndeliveredException)qe.InnerException;
+ _logger.Error("inner exception is AMQUndeliveredException", ue);
+ _logger.Error(string.Format("Returned message = {0}",
ue.GetUndeliveredMessage()));
}
- IMessagePublisher publisher = builder.Create();
- publisher.Send(_channel.CreateTextMessage("Hiya!"));
- }
- }
+ }
+ _event.Set();
+ }
+
+ [Test]
+ public void SendUndeliverableMessageOnDefaultExchange()
+ {
+ SendOne("default exchange", null);
+ }
+ [Test]
+ public void SendUndeliverableMessageOnDirectExchange()
+ {
+ SendOne("direct exchange", ExchangeNameDefaults.DIRECT);
+ }
+ [Test]
+ public void SendUndeliverableMessageOnTopicExchange()
+ {
+ SendOne("topic exchange", ExchangeNameDefaults.TOPIC);
+ }
+ [Test]
+ public void SendUndeliverableMessageOnHeadersExchange()
+ {
+ SendOne("headers exchange", ExchangeNameDefaults.HEADERS);
+ }
+
+ private void SendOne(string exchangeNameFriendly, string exchangeName)
+ {
+ _logger.Info("Sending undeliverable message to " +
exchangeNameFriendly);
+
+ // Send a test message to a non-existant queue
+ // on the specified exchange. See if message is returned!
+ MessagePublisherBuilder builder = _channel.CreatePublisherBuilder()
+ .WithRoutingKey("Non-existant route key!")
+ .WithMandatory(true); // necessary so that the server bounces the
message back
+ if ( exchangeName != null )
+ {
+ builder.WithExchangeName(exchangeName);
+ }
+ IMessagePublisher publisher = builder.Create();
+ publisher.Send(_channel.CreateTextMessage("Hiya!"));
+
+ // check we received an exception on the connection
+ // and that it is of the right type
+ _event.WaitOne(TIMEOUT, true);
+
+ Type expectedException = typeof(AMQUndeliveredException);
+ Exception ex = _lastException;
+ Assert.IsNotNull(ex, "No exception was thrown by the test. Expected "
+ expectedException);
+
+ if ( ex.InnerException != null )
+ ex = ex.InnerException;
+
+ Assert.IsInstanceOfType(expectedException, ex);
+ }
+ }
}
Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs?view=diff&rev=537019&r1=537018&r2=537019
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs
(original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs Thu May
10 15:25:01 2007
@@ -28,6 +28,7 @@
using Qpid.Collections;
using Qpid.Framing;
using Qpid.Messaging;
+using Qpid.Protocol;
namespace Qpid.Client
{
@@ -568,8 +569,14 @@
if (_logger.IsDebugEnabled)
{
_logger.Debug("Message received in session with channel id " +
_channelId);
- }
- _queue.EnqueueBlocking(message);
+ }
+ if ( message.DeliverBody == null )
+ {
+ ReturnBouncedMessage(message);
+ } else
+ {
+ _queue.EnqueueBlocking(message);
+ }
}
public int DefaultPrefetch
@@ -986,5 +993,42 @@
// FIXME: lock FailoverMutex here?
_connection.ProtocolWriter.Write(ackFrame);
}
+
+ /// <summary>
+ /// Handle a message that bounced from the server, creating
+ /// the corresponding exception and notifying the connection about it
+ /// </summary>
+ /// <param name="message">Unprocessed message</param>
+ private void ReturnBouncedMessage(UnprocessedMessage message)
+ {
+ try
+ {
+ AbstractQmsMessage bouncedMessage =
+ _messageFactoryRegistry.CreateMessage(
+ 0, false, message.ContentHeader,
+ message.Bodies
+ );
+
+ int errorCode = message.BounceBody.ReplyCode;
+ string reason = message.BounceBody.ReplyText;
+ _logger.Debug("Message returned with error code " + errorCode + "
(" + reason + ")");
+ AMQException exception;
+ if ( errorCode == AMQConstant.NO_CONSUMERS.Code )
+ {
+ exception = new AMQNoConsumersException(reason,
bouncedMessage);
+ } else if ( errorCode == AMQConstant.NO_ROUTE.Code )
+ {
+ exception = new AMQNoRouteException(reason, bouncedMessage);
+ } else
+ {
+ exception = new AMQUndeliveredException(errorCode, reason,
bouncedMessage);
+ }
+ _connection.ExceptionReceived(exception);
+ } catch ( Exception ex )
+ {
+ _logger.Error("Caught exception trying to raise undelivered
message exception (dump follows) - ignoring...", ex);
+ }
+
+ }
}
}
Modified:
incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs?view=diff&rev=537019&r1=537018&r2=537019
==============================================================================
---
incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
(original)
+++
incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
Thu May 10 15:25:01 2007
@@ -32,7 +32,7 @@
public void MethodReceived(AMQStateManager stateManager,
AMQMethodEvent evt)
{
- _logger.Debug("New JmsBounce method received");
+ _logger.Debug("New Basic.Return method received");
UnprocessedMessage msg = new UnprocessedMessage();
msg.DeliverBody = null;
msg.BounceBody = (BasicReturnBody) evt.Method;
Modified:
incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs?view=diff&rev=537019&r1=537018&r2=537019
==============================================================================
---
incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
(original)
+++
incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
Thu May 10 15:25:01 2007
@@ -44,11 +44,20 @@
AMQFrame frame = ChannelCloseOkBody.CreateAMQFrame(evt.ChannelId);
evt.ProtocolSession.WriteFrame(frame);
- // HACK
+
if ( errorCode != AMQConstant.REPLY_SUCCESS.Code )
{
- _logger.Debug("Channel close received with errorCode " +
errorCode + ", throwing exception");
- evt.ProtocolSession.AMQConnection.ExceptionReceived(new
AMQChannelClosedException(errorCode, "Error: " + reason));
+ _logger.Debug("Channel close received with errorCode " +
errorCode + ", throwing exception");
+ if ( errorCode == AMQConstant.NO_CONSUMERS.Code )
+ throw new AMQNoConsumersException(reason);
+ if ( errorCode == AMQConstant.NO_ROUTE.Code )
+ throw new AMQNoRouteException(reason);
+ if ( errorCode == AMQConstant.INVALID_ARGUMENT.Code )
+ throw new AMQInvalidArgumentException(reason);
+ if ( errorCode == AMQConstant.INVALID_ROUTING_KEY.Code )
+ throw new AMQInvalidRoutingKeyException(reason);
+ // any other
+ throw new AMQChannelClosedException(errorCode, "Error: " +
reason);
}
evt.ProtocolSession.ChannelClosed(evt.ChannelId, errorCode,
reason);
}
Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj?view=diff&rev=537019&r1=537018&r2=537019
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj Thu May 10
15:25:01 2007
@@ -43,6 +43,8 @@
<Compile Include="Client\AMQDestination.cs" />
<Compile Include="Client\AmqChannel.cs" />
<Compile Include="Client\AMQAuthenticationException.cs" />
+ <Compile Include="Client\AMQNoConsumersException.cs" />
+ <Compile Include="Client\AMQNoRouteException.cs" />
<Compile
Include="Client\Configuration\AuthenticationConfigurationSectionHandler.cs" />
<Compile Include="Client\Message\QpidHeaders.cs" />
<Compile Include="Client\QpidConnectionInfo.cs" />
@@ -144,4 +146,4 @@
<Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project>
\ No newline at end of file
Modified: incubator/qpid/branches/M2/dotnet/Qpid.Common/Protocol/AMQConstant.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Common/Protocol/AMQConstant.cs?view=diff&rev=537019&r1=537018&r2=537019
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Common/Protocol/AMQConstant.cs
(original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Common/Protocol/AMQConstant.cs Thu
May 10 15:25:01 2007
@@ -77,17 +77,20 @@
public static readonly AMQConstant NO_ROUTE = new AMQConstant(312, "no
route", true);
public static readonly AMQConstant NO_CONSUMERS = new AMQConstant(313,
"no consumers", true);
public static readonly AMQConstant CONTEXT_IN_USE = new AMQConstant(320,
"context in use", true);
- public static readonly AMQConstant CONTEXT_UNKNOWN = new
AMQConstant(321, "context unknown", true);
- public static readonly AMQConstant INVALID_SELECTOR = new
AMQConstant(322, "selector invalid", true);
public static readonly AMQConstant INVALID_PATH = new AMQConstant(402,
"invalid path", true);
public static readonly AMQConstant ACCESS_REFUSED = new AMQConstant(403,
"access refused", true);
public static readonly AMQConstant NOT_FOUND = new AMQConstant(404, "not
found", true);
+ public static readonly AMQConstant ALREADY_EXISTS = new AMQConstant(405,
"already exists", true);
+ public static readonly AMQConstant IN_USE = new AMQConstant(406, "in
use", true);
+ public static readonly AMQConstant INVALID_ROUTING_KEY = new
AMQConstant(407, "routing key invalid", true);
+ public static readonly AMQConstant REQUEST_TIMEOUT = new
AMQConstant(408, "request timeout", true);
+ public static readonly AMQConstant INVALID_ARGUMENT = new
AMQConstant(409, "argument invalid", true);
public static readonly AMQConstant FRAME_ERROR = new AMQConstant(501,
"frame error", true);
public static readonly AMQConstant SYNTAX_ERROR = new AMQConstant(502,
"syntax error", true);
public static readonly AMQConstant COMMAND_INVALID = new
AMQConstant(503, "command invalid", true);
public static readonly AMQConstant CHANNEL_ERROR = new AMQConstant(504,
"channel error", true);
public static readonly AMQConstant RESOURCE_ERROR = new AMQConstant(506,
"resource error", true);
- public static readonly AMQConstant NOT_ALLOWED = new AMQConstant(530,
"not allowed", true);
+ public static readonly AMQConstant NOT_ALLOWED = new AMQConstant(507,
"not allowed", true);
public static readonly AMQConstant NOT_IMPLEMENTED = new
AMQConstant(540, "not implemented", true);
public static readonly AMQConstant INTERNAL_ERROR = new AMQConstant(541,
"internal error", true);
Modified: incubator/qpid/branches/M2/dotnet/Qpid.Common/Qpid.Common.csproj
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Common/Qpid.Common.csproj?view=diff&rev=537019&r1=537018&r2=537019
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Common/Qpid.Common.csproj (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Common/Qpid.Common.csproj Thu May 10
15:25:01 2007
@@ -44,6 +44,8 @@
<Compile Include="AMQConnectionClosedException.cs" />
<Compile Include="AMQDisconnectedException.cs" />
<Compile Include="AMQException.cs" />
+ <Compile Include="AMQInvalidArgumentException.cs" />
+ <Compile Include="AMQInvalidRoutingKeyException.cs" />
<Compile Include="AMQUndeliveredException.cs" />
<Compile Include="AssemblySettings.cs" />
<Compile Include="Collections\LinkedHashtable.cs" />
@@ -208,4 +210,4 @@
<Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project>
\ No newline at end of file