We would like to change the RhinoServiceBus ErrorAction to a custom one
that will do the retry the same way as current ErrorAction does, in a way
so it does not put the message to error sub-queue and additionally write
the status of processed message during retries(like message retried for the
3 time, giving up after 5 retries, etc).
I've noticed the issue in original RSB ErrorAction when using transactional
MSMQ queue.
Here is the unit test that reproduces the issue:
The MultipleThreadsMsmqTestBase is just a copy of MsmqTestBase but it has 6
threads.
public class MultipleThreadsMsmqTestBase : IDisposable
{
private const int NumberOfThreads = 6;
static MultipleThreadsMsmqTestBase()
{
LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter();
}
private readonly string subscriptionQueuePath;
protected readonly Endpoint SubscriptionsUri;
protected readonly string testQueuePath;
protected readonly Endpoint TestQueueUri;
protected readonly string testQueuePath2;
protected readonly Endpoint TestQueueUri2;
protected readonly string transactionalTestQueuePath;
protected readonly Endpoint TransactionalTestQueueUri;
protected MessageQueue queue;
protected MessageQueue subscriptions;
protected MessageQueue transactionalQueue;
private ITransport transactionalTransport;
private ITransport transport;
protected readonly MessageQueue testQueue2;
private readonly string subbscriptionQueuePath2;
public MultipleThreadsMsmqTestBase()
{
TestQueueUri = new Uri("msmq://localhost/test_queue").ToEndpoint();
testQueuePath = MsmqUtil.GetQueuePath(TestQueueUri).QueuePath;
TestQueueUri2 = new Uri("msmq://localhost/test_queue2").ToEndpoint();
testQueuePath2 = MsmqUtil.GetQueuePath(TestQueueUri2).QueuePath;
TransactionalTestQueueUri = new
Uri("msmq://localhost/transactional_test_queue").ToEndpoint();
transactionalTestQueuePath =
MsmqUtil.GetQueuePath(TransactionalTestQueueUri).QueuePath;
SubscriptionsUri2 = new
Uri("msmq://localhost/test_queue2;subscriptions").ToEndpoint();
subbscriptionQueuePath2 =
MsmqUtil.GetQueuePath(SubscriptionsUri2).QueuePathWithSubQueue;
SubscriptionsUri = new
Uri("msmq://localhost/test_queue;subscriptions").ToEndpoint();
subscriptionQueuePath =
MsmqUtil.GetQueuePath(SubscriptionsUri).QueuePathWithSubQueue;
if (MessageQueue.Exists(testQueuePath) == false)
MessageQueue.Create(testQueuePath);
if (MessageQueue.Exists(testQueuePath2) == false)
MessageQueue.Create(testQueuePath2);
if (MessageQueue.Exists(transactionalTestQueuePath) == false)
MessageQueue.Create(transactionalTestQueuePath, true);
queue = new MessageQueue(testQueuePath);
queue.Purge();
using (var errQueue = new MessageQueue(testQueuePath + ";errors"))
{
errQueue.Purge();
}
testQueue2 = new MessageQueue(testQueuePath2);
testQueue2.Purge();
using (var errQueue2 = new MessageQueue(testQueuePath2 + ";errors"))
{
errQueue2.Purge();
}
transactionalQueue = new MessageQueue(transactionalTestQueuePath);
transactionalQueue.Purge();
using (var errQueue3 = new MessageQueue(transactionalTestQueuePath +
";errors"))
{
errQueue3.Purge();
}
using (var discardedQueue = new MessageQueue(testQueuePath +
";discarded"))
{
discardedQueue.Purge();
}
using (var timeoutQueue = new MessageQueue(testQueuePath + ";timeout"))
{
timeoutQueue.Purge();
}
subscriptions = new MessageQueue(subscriptionQueuePath)
{
Formatter = new XmlMessageFormatter(new[] { typeof(string) })
};
subscriptions.Purge();
using (var subscriptions2 = new MessageQueue(subbscriptionQueuePath2))
{
subscriptions2.Purge();
}
}
public Endpoint SubscriptionsUri2 { get; set; }
public ITransport Transport
{
get
{
if (transport == null)
{
var serializer =
new XmlMessageSerializer(
new DefaultReflection(),
new CastleServiceLocator(new WindsorContainer()));
transport = new MsmqTransport(serializer,
new SubQueueStrategy(),
TestQueueUri.Uri, NumberOfThreads,
defaultTransportActions,
new EndpointRouter(),
IsolationLevel.Serializable, TransactionalOptions.FigureItOut,
true,
new MsmqMessageBuilder(serializer, new
CastleServiceLocator(new WindsorContainer())));
transport.Start();
}
return transport;
}
}
public ITransport TransactionalTransport
{
get
{
if (transactionalTransport == null)
{
var serializer =
new XmlMessageSerializer(new DefaultReflection(),
new CastleServiceLocator(new
WindsorContainer()));
transactionalTransport = new MsmqTransport(serializer,
new SubQueueStrategy(),
TransactionalTestQueueUri.Uri,
NumberOfThreads,
defaultTransportActions,
new EndpointRouter(),
IsolationLevel.Serializable, TransactionalOptions.FigureItOut,
true,
new MsmqMessageBuilder(serializer, new
CastleServiceLocator(new WindsorContainer())));
transactionalTransport.Start();
}
return transactionalTransport;
}
}
private static IMsmqTransportAction[] defaultTransportActions
{
get
{
var qs = new SubQueueStrategy();
return new IMsmqTransportAction[]
{
new AdministrativeAction(),
new ErrorAction(5, qs),
new ShutDownAction(),
new TimeoutAction(qs)
};
}
}
#region IDisposable Members
public virtual void Dispose()
{
queue.Dispose();
transactionalQueue.Dispose();
subscriptions.Dispose();
if (transport != null)
transport.Dispose();
if (transactionalTransport != null)
transactionalTransport.Dispose();
}
#endregion
}
// The unit test that reproduces the issue
public class MessageThatWillThrowException
{
public int Id { get; set; }
public override string ToString()
{
return base.ToString()+Id;
}
}
public class ErrorActionWithMultipleThreads: MultipleThreadsMsmqTestBase
{
[Fact]
public void
message_should_arrive_expected_number_of_times_using_transactional_queue()
{
//retry count is 5
int count = 0;
TransactionalTransport.MessageArrived += o =>
{
Console.WriteLine("MessageArrived " + o.MessageId);
Interlocked.Increment(ref count);
throw new InvalidOperationException();
};
const int numberOfMessages = 10;
Parallel.For(1, numberOfMessages + 1, (i, state) =>
TransactionalTransport.Send(TransactionalTestQueueUri,
new object[]
{
new MessageThatWillThrowException
{Id = i}
}));
Thread.Sleep(TimeSpan.FromSeconds(20));
Assert.Equal(5 * numberOfMessages, count); //passes for non-transactional
queue
}
[Fact]
public void
message_should_arrive_expected_number_of_times_using_non_transactional_queue()
{
//retry count is 5
int count = 0;
Transport.MessageArrived += o =>
{
Console.WriteLine("MessageArrived " + o.MessageId);
Interlocked.Increment(ref count);
throw new InvalidOperationException();
};
const int numberOfMessages = 10;
Parallel.For(1, numberOfMessages + 1, (i, state) =>
Transport.Send(TestQueueUri,
new object[]
{
new MessageThatWillThrowException { Id = i }
}));
Thread.Sleep(TimeSpan.FromSeconds(20));
Assert.Equal(5 * numberOfMessages, count); //Fails here as message
arrived is called more times then expected
}
}
The first test pass. The second one fails.
The reason for the fail is that MessageArrived event is called more times than
expected. As a consequence I got the logs that sometimes message arrived/failed
more than 5 times, etc...
Perhaps the problem is related in a way how AbstractMsmqListener works?
Here is the log. So message is "peeked" by 6 threads simultaneously...
AbstractMsmqListener - Got message
Rhino.ServiceBus.Tests.MessageThatWillThrowException7 on
msmq://pc-nko/transactional_test_queue from
FORMATNAME:DIRECT=OS:PC-NKO\private$\transactional_test_queue
25.10.2012 10:34:20 [DEBUG] Rhino.ServiceBus.Msmq.AbstractMsmqListener - Got
message Rhino.ServiceBus.Tests.MessageThatWillThrowException7 on
msmq://pc-nko/transactional_test_queue from
FORMATNAME:DIRECT=OS:PC-NKO\private$\transactional_test_queue
25.10.2012 10:34:20 [DEBUG] Rhino.ServiceBus.Msmq.AbstractMsmqListener - Got
message Rhino.ServiceBus.Tests.MessageThatWillThrowException7 on
msmq://pc-nko/transactional_test_queue from
FORMATNAME:DIRECT=OS:PC-NKO\private$\transactional_test_queue
25.10.2012 10:34:20 [DEBUG] Rhino.ServiceBus.Msmq.AbstractMsmqListener - Got
message Rhino.ServiceBus.Tests.MessageThatWillThrowException7 on
msmq://pc-nko/transactional_test_queue from
FORMATNAME:DIRECT=OS:PC-NKO\private$\transactional_test_queue
25.10.2012 10:34:20 [DEBUG] Rhino.ServiceBus.Msmq.AbstractMsmqListener - Got
message Rhino.ServiceBus.Tests.MessageThatWillThrowException7 on
msmq://pc-nko/transactional_test_queue from
FORMATNAME:DIRECT=OS:PC-NKO\private$\transactional_test_queue
25.10.2012 10:34:20 [DEBUG] Rhino.ServiceBus.Msmq.AbstractMsmqListener - Got
message Rhino.ServiceBus.Tests.MessageThatWillThrowException7 on
msmq://pc-nko/transactional_test_queue from
FORMATNAME:DIRECT=OS:PC-NKO\private$\transactional_test_queue
Strange is that even if message is "peeked" up simultaneously by 6 threads for
both transactional and non-transactional there is a difference is number of
MessageArrived events...
Any idea why do we have different behavior or how this can be fixed?
--
You received this message because you are subscribed to the Google Groups
"Rhino Tools Dev" group.
To view this discussion on the web visit
https://groups.google.com/d/msg/rhino-tools-dev/-/QFAcqgg8r0QJ.
To post to this group, send email to [email protected].
To unsubscribe from this group, send email to
[email protected].
For more options, visit this group at
http://groups.google.com/group/rhino-tools-dev?hl=en.