We would like to change the RhinoServiceBus ErrorAction to a custom one 
that will do the retry the same way as current ErrorAction does, in a way 
so it does not put the message to error sub-queue and additionally write 
the status of processed message during retries(like message retried for the 
3 time, giving up after 5 retries, etc).

I've noticed the issue in original RSB ErrorAction when using transactional 
MSMQ queue.

Here is the unit test that reproduces the issue:

The MultipleThreadsMsmqTestBase is just a copy of MsmqTestBase but it has 6 
threads.


public class MultipleThreadsMsmqTestBase : IDisposable
  {
    private const int NumberOfThreads = 6;
 
    static MultipleThreadsMsmqTestBase()
    {
      LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter();
    }
 
    private readonly string subscriptionQueuePath;
    protected readonly Endpoint SubscriptionsUri;
 
    protected readonly string testQueuePath;
    protected readonly Endpoint TestQueueUri;
 
    protected readonly string testQueuePath2;
    protected readonly Endpoint TestQueueUri2;
 
    protected readonly string transactionalTestQueuePath;
    protected readonly Endpoint TransactionalTestQueueUri;
 
    protected MessageQueue queue;
    protected MessageQueue subscriptions;
    protected MessageQueue transactionalQueue;
 
    private ITransport transactionalTransport;
    private ITransport transport;
    protected readonly MessageQueue testQueue2;
    private readonly string subbscriptionQueuePath2;
 
    public MultipleThreadsMsmqTestBase()
    {
      TestQueueUri = new Uri("msmq://localhost/test_queue").ToEndpoint();
      testQueuePath = MsmqUtil.GetQueuePath(TestQueueUri).QueuePath;
 
      TestQueueUri2 = new Uri("msmq://localhost/test_queue2").ToEndpoint();
      testQueuePath2 = MsmqUtil.GetQueuePath(TestQueueUri2).QueuePath;
 
      TransactionalTestQueueUri = new 
Uri("msmq://localhost/transactional_test_queue").ToEndpoint();
      transactionalTestQueuePath = 
MsmqUtil.GetQueuePath(TransactionalTestQueueUri).QueuePath;
 
      SubscriptionsUri2 = new 
Uri("msmq://localhost/test_queue2;subscriptions").ToEndpoint();
      subbscriptionQueuePath2 = 
MsmqUtil.GetQueuePath(SubscriptionsUri2).QueuePathWithSubQueue;
 
      SubscriptionsUri = new 
Uri("msmq://localhost/test_queue;subscriptions").ToEndpoint();
      subscriptionQueuePath = 
MsmqUtil.GetQueuePath(SubscriptionsUri).QueuePathWithSubQueue;
 
      if (MessageQueue.Exists(testQueuePath) == false)
        MessageQueue.Create(testQueuePath);
 
      if (MessageQueue.Exists(testQueuePath2) == false)
        MessageQueue.Create(testQueuePath2);
 
      if (MessageQueue.Exists(transactionalTestQueuePath) == false)
        MessageQueue.Create(transactionalTestQueuePath, true);
 
      queue = new MessageQueue(testQueuePath);
      queue.Purge();
 
      using (var errQueue = new MessageQueue(testQueuePath + ";errors"))
      {
        errQueue.Purge();
      }
 
      testQueue2 = new MessageQueue(testQueuePath2);
      testQueue2.Purge();
 
      using (var errQueue2 = new MessageQueue(testQueuePath2 + ";errors"))
      {
        errQueue2.Purge();
      }
 
      transactionalQueue = new MessageQueue(transactionalTestQueuePath);
      transactionalQueue.Purge();
 
      using (var errQueue3 = new MessageQueue(transactionalTestQueuePath + 
";errors"))
      {
        errQueue3.Purge();
      }
 
      using (var discardedQueue = new MessageQueue(testQueuePath + 
";discarded"))
      {
        discardedQueue.Purge();
      }
 
      using (var timeoutQueue = new MessageQueue(testQueuePath + ";timeout"))
      {
        timeoutQueue.Purge();
      }
 
      subscriptions = new MessageQueue(subscriptionQueuePath)
      {
        Formatter = new XmlMessageFormatter(new[] { typeof(string) })
      };
      subscriptions.Purge();
 
      using (var subscriptions2 = new MessageQueue(subbscriptionQueuePath2))
      {
        subscriptions2.Purge();
      }
    }
 
    public Endpoint SubscriptionsUri2 { get; set; }
 
    public ITransport Transport
    {
      get
      {
        if (transport == null)
        {
          var serializer =
              new XmlMessageSerializer(
                  new DefaultReflection(),
                  new CastleServiceLocator(new WindsorContainer()));
          transport = new MsmqTransport(serializer,
                  new SubQueueStrategy(),
                  TestQueueUri.Uri, NumberOfThreads,
                  defaultTransportActions,
                  new EndpointRouter(),
    IsolationLevel.Serializable, TransactionalOptions.FigureItOut,
                  true,
                  new MsmqMessageBuilder(serializer, new 
CastleServiceLocator(new WindsorContainer())));
          transport.Start();
        }
        return transport;
      }
    }
 
    public ITransport TransactionalTransport
    {
      get
      {
        if (transactionalTransport == null)
        {
          var serializer =
              new XmlMessageSerializer(new DefaultReflection(),
                                       new CastleServiceLocator(new 
WindsorContainer()));
          transactionalTransport = new MsmqTransport(serializer,
              new SubQueueStrategy(),
              TransactionalTestQueueUri.Uri,
              NumberOfThreads,
              defaultTransportActions,
                  new EndpointRouter(),
    IsolationLevel.Serializable, TransactionalOptions.FigureItOut,
                  true,
                  new MsmqMessageBuilder(serializer, new 
CastleServiceLocator(new WindsorContainer())));
          transactionalTransport.Start();
        }
        return transactionalTransport;
      }
    }
 
    private static IMsmqTransportAction[] defaultTransportActions
    {
      get
      {
        var qs = new SubQueueStrategy();
        return new IMsmqTransportAction[]
                {
                    new AdministrativeAction(),
                    new ErrorAction(5, qs),
                    new ShutDownAction(),
                    new TimeoutAction(qs)
                };
      }
    }
     #region IDisposable Members
 
    public virtual void Dispose()
    {
      queue.Dispose();
      transactionalQueue.Dispose();
      subscriptions.Dispose();
 
      if (transport != null)
        transport.Dispose();
      if (transactionalTransport != null)
        transactionalTransport.Dispose();
    }
     #endregion
  }


// The unit test that reproduces the issue


public class MessageThatWillThrowException
  {
    public int Id { get; set; }
 
    public override string ToString()
    {
      return base.ToString()+Id;
    }
  }
 
  public class ErrorActionWithMultipleThreads: MultipleThreadsMsmqTestBase
  {
    [Fact]
    public void 
message_should_arrive_expected_number_of_times_using_transactional_queue()
    {
      //retry count is 5
 
      int count = 0;
      TransactionalTransport.MessageArrived += o =>
      {
        Console.WriteLine("MessageArrived " + o.MessageId);
        Interlocked.Increment(ref count);
        throw new InvalidOperationException();
      };
      const int numberOfMessages = 10;
      Parallel.For(1, numberOfMessages + 1, (i, state) => 
TransactionalTransport.Send(TransactionalTestQueueUri,
                                                                                
      new object[]
                                                                                
        {
                                                                                
          new MessageThatWillThrowException
                                                                                
            {Id = i}
                                                                                
        }));
 
      Thread.Sleep(TimeSpan.FromSeconds(20));
      Assert.Equal(5 * numberOfMessages, count); //passes for non-transactional 
queue
    }
 
    [Fact]
    public void 
message_should_arrive_expected_number_of_times_using_non_transactional_queue()
    {
      //retry count is 5
 
      int count = 0;
      Transport.MessageArrived += o =>
      {
        Console.WriteLine("MessageArrived " + o.MessageId);
        Interlocked.Increment(ref count);
        throw new InvalidOperationException();
      };
      const int numberOfMessages = 10;
      Parallel.For(1, numberOfMessages + 1, (i, state) => 
Transport.Send(TestQueueUri,
                                                                                
    new object[]
                                                                                
      {
                                                                                
        new MessageThatWillThrowException { Id = i }
                                                                                
      }));
 
      Thread.Sleep(TimeSpan.FromSeconds(20));
      Assert.Equal(5 * numberOfMessages, count); //Fails here as message 
arrived is called more times then expected
    }
  }


The first test pass. The second one fails.

The reason for the fail is that MessageArrived event is called more times than 
expected. As a consequence I got the logs that sometimes message arrived/failed 
more than 5 times, etc... 


Perhaps the problem is related in a way how AbstractMsmqListener works?

Here is the log. So message is "peeked" by 6 threads simultaneously...


AbstractMsmqListener - Got message 
Rhino.ServiceBus.Tests.MessageThatWillThrowException7 on 
msmq://pc-nko/transactional_test_queue from 
FORMATNAME:DIRECT=OS:PC-NKO\private$\transactional_test_queue
25.10.2012 10:34:20 [DEBUG] Rhino.ServiceBus.Msmq.AbstractMsmqListener - Got 
message Rhino.ServiceBus.Tests.MessageThatWillThrowException7 on 
msmq://pc-nko/transactional_test_queue from 
FORMATNAME:DIRECT=OS:PC-NKO\private$\transactional_test_queue
25.10.2012 10:34:20 [DEBUG] Rhino.ServiceBus.Msmq.AbstractMsmqListener - Got 
message Rhino.ServiceBus.Tests.MessageThatWillThrowException7 on 
msmq://pc-nko/transactional_test_queue from 
FORMATNAME:DIRECT=OS:PC-NKO\private$\transactional_test_queue
25.10.2012 10:34:20 [DEBUG] Rhino.ServiceBus.Msmq.AbstractMsmqListener - Got 
message Rhino.ServiceBus.Tests.MessageThatWillThrowException7 on 
msmq://pc-nko/transactional_test_queue from 
FORMATNAME:DIRECT=OS:PC-NKO\private$\transactional_test_queue
25.10.2012 10:34:20 [DEBUG] Rhino.ServiceBus.Msmq.AbstractMsmqListener - Got 
message Rhino.ServiceBus.Tests.MessageThatWillThrowException7 on 
msmq://pc-nko/transactional_test_queue from 
FORMATNAME:DIRECT=OS:PC-NKO\private$\transactional_test_queue
25.10.2012 10:34:20 [DEBUG] Rhino.ServiceBus.Msmq.AbstractMsmqListener - Got 
message Rhino.ServiceBus.Tests.MessageThatWillThrowException7 on 
msmq://pc-nko/transactional_test_queue from 
FORMATNAME:DIRECT=OS:PC-NKO\private$\transactional_test_queue


Strange is that even if message is "peeked" up simultaneously by 6 threads for 
both transactional and non-transactional there is a difference is number of 
MessageArrived events...


Any idea why do we have different behavior or how this can be fixed?

-- 
You received this message because you are subscribed to the Google Groups 
"Rhino Tools Dev" group.
To view this discussion on the web visit 
https://groups.google.com/d/msg/rhino-tools-dev/-/QFAcqgg8r0QJ.
To post to this group, send email to [email protected].
To unsubscribe from this group, send email to 
[email protected].
For more options, visit this group at 
http://groups.google.com/group/rhino-tools-dev?hl=en.

Reply via email to