Please zip it here.
Mike, can you review that?

On Mon, Jan 19, 2009 at 2:02 PM, chrisortman <[email protected]> wrote:

>
> Here's a patch for that
>
> From 5dc1b09401ae6e9c5cd719025c57121dd293aa34 Mon Sep 17 00:00:00 2001
> From: Chris Ortman <[email protected]>
> Date: Mon, 19 Jan 2009 13:00:33 -0600
> Subject: [PATCH] Allows the FlatQueueStrategy to create its subqueues
> on demand.
>
> ---
>  .../MsmqFlatQueueTestBase.cs                       |   67 +++++++++
> +----------
>  ...rializableMessageWillBeForwardedToErrorQueue.cs |    1 +
>  .../Impl/RhinoServiceBusFacility.cs                |    8 +++
>  .../Rhino.ServiceBus/Msmq/FlatQueueStrategy.cs     |   17 ++++-
>  .../Rhino.ServiceBus/Msmq/IInitializeSubQueues.cs  |   33 ++++++++++
>  .../Rhino.ServiceBus/Msmq/IQueueStrategy.cs        |    1 +
>  .../Rhino.ServiceBus/Msmq/MsmqExtensions.cs        |    8 ++-
>  .../Rhino.ServiceBus/Msmq/MsmqTransport.cs         |   14 ++++
>  .../Rhino.ServiceBus/Msmq/MsmqUtil.cs              |    4 +-
>  .../Rhino.ServiceBus/Msmq/SubQueueStrategy.cs      |    5 ++
>  .../Rhino.ServiceBus/Rhino.ServiceBus.csproj       |    1 +
>  11 files changed, 119 insertions(+), 40 deletions(-)
>  create mode 100644 rhino-service.bus/Rhino.ServiceBus/Msmq/
> IInitializeSubQueues.cs
>
> diff --git a/rhino-service.bus/Rhino.ServiceBus.Tests/
> MsmqFlatQueueTestBase.cs b/rhino-service.bus/Rhino.ServiceBus.Tests/
> MsmqFlatQueueTestBase.cs
> index a08b167..0f0f89a 100644
> --- a/rhino-service.bus/Rhino.ServiceBus.Tests/
> MsmqFlatQueueTestBase.cs
> +++ b/rhino-service.bus/Rhino.ServiceBus.Tests/
> MsmqFlatQueueTestBase.cs
> @@ -64,46 +64,47 @@ namespace Rhino.ServiceBus.Tests
>             if (MessageQueue.Exists(transactionalTestQueuePath) ==
> false)
>                 MessageQueue.Create(transactionalTestQueuePath,
> true);
>
> -            if (MessageQueue.Exists(testQueuePath + errorsPathSuffix)
> == false)
> -                MessageQueue.Create(testQueuePath +
> errorsPathSuffix);
> -
> -            if (MessageQueue.Exists(testQueuePath2 +
> errorsPathSuffix) == false)
> -                MessageQueue.Create(testQueuePath2 +
> errorsPathSuffix);
> -
> -            if (MessageQueue.Exists(transactionalTestQueuePath +
> errorsPathSuffix) == false)
> -                MessageQueue.Create(transactionalTestQueuePath +
> errorsPathSuffix, true);
> -
> -            if (MessageQueue.Exists(testQueuePath +
> discardedPathSuffix) == false)
> -                MessageQueue.Create(testQueuePath +
> discardedPathSuffix);
>
>             queue = new MessageQueue(testQueuePath);
>             queue.Purge();
>
> -            using (var errQueue = new MessageQueue(testQueuePath +
> errorsPathSuffix))
> -            {
> -                errQueue.Purge();
> -            }
> -
> -            using (var discardedQueue = new MessageQueue
> (testQueuePath + discardedPathSuffix))
> -            {
> -                discardedQueue.Purge();
> -            }
> -
> -            testQueue2 = new MessageQueue(testQueuePath2);
> +
> if(MessageQueue.Exists(testQueuePath+ errorsPathSuffix))
> +                                               {
> +                                                       using (var errQueue
> = new MessageQueue(testQueuePath +
> errorsPathSuffix))
> +                                                       {
> +
> errQueue.Purge();
> +                                                       }
> +                                               }
> +
> +
> if(MessageQueue.Exists(testQueuePath+discardedPathSuffix))
> +                                       {
> +                                               using (var discardedQueue =
> new MessageQueue(testQueuePath +
> discardedPathSuffix))
> +                                               {
> +
> discardedQueue.Purge();
> +                                               }
> +                                       }
> +
> +               testQueue2 = new MessageQueue(testQueuePath2);
>             testQueue2.Purge();
>
> -            using (var errQueue2 = new MessageQueue(testQueuePath2 +
> errorsPathSuffix))
> -            {
> -                errQueue2.Purge();
> -            }
> +
> if(MessageQueue.Exists(testQueuePath2+errorsPathSuffix))
> +                                       {
> +                                               using (var errQueue2 = new
> MessageQueue(testQueuePath2 +
> errorsPathSuffix))
> +                                               {
> +                                                       errQueue2.Purge();
> +                                               }
> +                                       }
>
> -            transactionalQueue = new MessageQueue
> (transactionalTestQueuePath);
> +               transactionalQueue = new MessageQueue
> (transactionalTestQueuePath);
>             transactionalQueue.Purge();
>
> -            using (var errQueue3 = new MessageQueue
> (transactionalTestQueuePath + errorsPathSuffix))
> -            {
> -                errQueue3.Purge();
> -            }
> +               if (MessageQueue.Exists(transactionalTestQueuePath +
> errorsPathSuffix))
> +               {
> +                       using (var errQueue3 = new MessageQueue
> (transactionalTestQueuePath + errorsPathSuffix))
> +                       {
> +                               errQueue3.Purge();
> +                       }
> +               }
>
>             subscriptions = new MessageQueue(subbscriptionQueuePath)
>             {
> @@ -122,7 +123,7 @@ namespace Rhino.ServiceBus.Tests
>                         new XmlMessageSerializer(
>                             new DefaultReflection(),
>                             new DefaultKernel()), TestQueueUri, 1,
> -                        DefaultMessageActions(TestQueueUri));
> +                        DefaultMessageActions(TestQueueUri))
> { SubQueueInitializer = new FlatQueueSubQueueInitializer()};
>                     transport.Start();
>                 }
>                 return transport;
> @@ -150,7 +151,7 @@ namespace Rhino.ServiceBus.Tests
>                 if (transactionalTransport == null)
>                 {
>                     transactionalTransport = new MsmqTransport(new
> XmlMessageSerializer(new DefaultReflection(), new DefaultKernel()),
> -
> TransactionalTestQueueUri, 1, DefaultMessageActions
> (TransactionalTestQueueUri));
> +
> TransactionalTestQueueUri, 1, DefaultMessageActions
> (TransactionalTestQueueUri) ){ SubQueueInitializer = new
> FlatQueueSubQueueInitializer()};
>                     transactionalTransport.Start();
>                 }
>                 return transactionalTransport;
> diff --git a/rhino-service.bus/Rhino.ServiceBus.Tests/
> UnserializableMessageWillBeForwardedToErrorQueue.cs b/rhino-
> service.bus/Rhino.ServiceBus.Tests/
> UnserializableMessageWillBeForwardedToErrorQueue.cs
> index 60110b7..04db6a9 100644
> --- a/rhino-service.bus/Rhino.ServiceBus.Tests/
> UnserializableMessageWillBeForwardedToErrorQueue.cs
> +++ b/rhino-service.bus/Rhino.ServiceBus.Tests/
> UnserializableMessageWillBeForwardedToErrorQueue.cs
> @@ -36,6 +36,7 @@ namespace Rhino.ServiceBus.Tests
>             Assert.True(wasCalled);
>         }
>     }
> +
>     public class
> UnserializableMessageWillBeForwardedToFlattenedErrorQueue :
> MsmqFlatQueueTestBase
>     {
>         [Fact]
> diff --git a/rhino-service.bus/Rhino.ServiceBus/Impl/
> RhinoServiceBusFacility.cs b/rhino-service.bus/Rhino.ServiceBus/Impl/
> RhinoServiceBusFacility.cs
> index 30a791a..b39ba30 100644
> --- a/rhino-service.bus/Rhino.ServiceBus/Impl/
> RhinoServiceBusFacility.cs
> +++ b/rhino-service.bus/Rhino.ServiceBus/Impl/
> RhinoServiceBusFacility.cs
> @@ -109,6 +109,14 @@ namespace Rhino.ServiceBus.Impl
>                     .ImplementedBy(serializerImpl)
>                 );
>
> +                                               if(queueStrategyImpl ==
> typeof(FlatQueueStrategy))
> +                                               {
> +                                                       Kernel.Register(
> +
> Component.For<IInitializeSubQueues>()
> +
> .ImplementedBy<FlatQueueSubQueueInitializer>()
> +                                                               );
> +                                               }
> +
>             Kernel.Register(
>                 AllTypes.Of<IMessageAction>()
>                     .FromAssembly(typeof(IMessageAction).Assembly)
> diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/
> FlatQueueStrategy.cs b/rhino-service.bus/Rhino.ServiceBus/Msmq/
> FlatQueueStrategy.cs
> index 6872c60..7e1a35f 100644
> --- a/rhino-service.bus/Rhino.ServiceBus/Msmq/FlatQueueStrategy.cs
> +++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/FlatQueueStrategy.cs
> @@ -62,7 +62,8 @@ namespace Rhino.ServiceBus.Msmq
>                /// <param name="message">The message.</param>
>                public void MoveToErrorsQueue(MessageQueue queue, Message
> message)
>                {
> -                       using (var destinationQueue = new
> MessageQueue(GetErrorsQueuePath
> (), QueueAccessMode.Send))
> +
> +                       using (var destinationQueue = new
> MessageQueue(GetErrorsQueuePath
> (),QueueAccessMode.Send))
>                        {
>
>  destinationQueue.Send(queue.ReceiveByLookupId(message.LookupId),
>
>  queue.GetTransactionType());
> @@ -77,7 +78,7 @@ namespace Rhino.ServiceBus.Msmq
>                /// <param name="message">The message.</param>
>                public void MoveToDiscardedQueue(MessageQueue queue, Message
> message)
>                {
> -                       using (var destinationQueue = new MessageQueue
> (GetDiscardedQueuePath(), QueueAccessMode.Send))
> +                       using (var destinationQueue = new MessageQueue
> (GetDiscardedQueuePath(),QueueAccessMode.Send))
>                        {
>
>  destinationQueue.Send(queue.ReceiveByLookupId(message.LookupId),
> destinationQueue.GetTransactionType());
>                        }
> @@ -90,7 +91,7 @@ namespace Rhino.ServiceBus.Msmq
>                /// <param name="message">The message.</param>
>                public void MoveToTimeoutQueue(MessageQueue queue, Message
> message)
>                {
> -                       using (var destinationQueue = new
> MessageQueue(GetTimeoutQueuePath
> (), QueueAccessMode.Send))
> +                       using (var destinationQueue = new
> MessageQueue(GetTimeoutQueuePath
> (),QueueAccessMode.Send))
>                        {
>
>  destinationQueue.Send(queue.ReceiveByLookupId(message.LookupId),
> destinationQueue.GetTransactionType());
>                        }
> @@ -120,6 +121,15 @@ namespace Rhino.ServiceBus.Msmq
>                        }
>                }
>
> +               internal static void InitializeSubQueues(Uri endpoint, bool
> transactional)
> +               {
> +                       var siblingQueues = new[] {"#errors", "#discarded",
> "#timeout"};
> +                       foreach(var sibling in siblingQueues)
> +                       {
> +                               new Uri(endpoint.ToString() +
> sibling).CreateQueue
> (transactional,QueueAccessMode.Peek).Close();
> +                       }
> +               }
> +
>                /// <summary>
>                /// Gets the errors queue path.
>                /// </summary>
> @@ -149,5 +159,6 @@ namespace Rhino.ServiceBus.Msmq
>                        var path = MsmqUtil.GetQueuePath(endpoint);
>                        return path + "#timeout";
>                }
> +
>        }
>  }
> \ No newline at end of file
> diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/
> IInitializeSubQueues.cs b/rhino-service.bus/Rhino.ServiceBus/Msmq/
> IInitializeSubQueues.cs
> new file mode 100644
> index 0000000..93f8f34
> --- /dev/null
> +++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/IInitializeSubQueues.cs
> @@ -0,0 +1,33 @@
> +namespace Rhino.ServiceBus.Msmq
> +{
> +       using System;
> +
> +       /// <summary>
> +       /// Provides an interface for the registering creating sub queues
> +       /// </summary>
> +       /// <remarks>
> +       /// Since <see cref="FlatQueueStrategy"/> uses sibling queues in
> place
> +       /// of MSMQ 4.0 subqueues we need a way to create these when the
> app
> +       /// starts up.
> +       /// </remarks>
> +       public interface IInitializeSubQueues
> +       {
> +               /// <summary>
> +               /// Creates subqueues for an endpoint (#timeouts, #errors,
> #discarded)
> +               /// </summary>
> +               /// <param name="endpoint">Used to figure out the path of
> the
> subqueue.
> +               /// The subqueue will append #something to your queue
> path</param>
> +               /// <param name="transactional">If the queues should be
> created
> transactional
> +               /// or not.
> +               /// </param>
> +               void InitializeSubQueues(Uri endpoint, bool transactional);
> +       }
> +
> +       public class FlatQueueSubQueueInitializer : IInitializeSubQueues
> +       {
> +               public void InitializeSubQueues(Uri endpoint, bool
> transactional)
> +               {
> +
> FlatQueueStrategy.InitializeSubQueues(endpoint,transactional);
> +               }
> +       }
> +}
> \ No newline at end of file
> diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/IQueueStrategy.cs
> b/rhino-service.bus/Rhino.ServiceBus/Msmq/IQueueStrategy.cs
> index 465d805..4c344eb 100644
> --- a/rhino-service.bus/Rhino.ServiceBus/Msmq/IQueueStrategy.cs
> +++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/IQueueStrategy.cs
> @@ -54,5 +54,6 @@ namespace Rhino.ServiceBus.Msmq
>                /// <param name="queue">The queue.</param>
>                /// <param name="messageId">The message id.</param>
>                void MoveTimeoutToMainQueue(MessageQueue queue, string
> messageId);
> +
>     }
>  }
> diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqExtensions.cs
> b/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqExtensions.cs
> index 6b36b5a..350b97a 100644
> --- a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqExtensions.cs
> +++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqExtensions.cs
> @@ -98,8 +98,12 @@ namespace Rhino.ServiceBus.Msmq
>
>             }
>         }
> +                        public static MessageQueue CreateQueue(this Uri
> queueUri,
> QueueAccessMode accessMode)
> +        {
> +                               return CreateQueue(queueUri, true,
> accessMode);
> +        }
>
> -        public static MessageQueue CreateQueue(this Uri queueUri,
> QueueAccessMode accessMode)
> +        public static MessageQueue CreateQueue(this Uri queueUri,bool
> transactional, QueueAccessMode accessMode)
>         {
>             var queuePath = MsmqUtil.GetQueuePath(queueUri);
>
> @@ -109,7 +113,7 @@ namespace Rhino.ServiceBus.Msmq
>                 {
>                     try
>                     {
> -                        MessageQueue.Create(queuePath, true);
> +                        MessageQueue.Create(queuePath,
> transactional);
>                     }
>                     catch (Exception e)
>                     {
> diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqTransport.cs
> b/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqTransport.cs
> index 128849d..3077fab 100644
> --- a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqTransport.cs
> +++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqTransport.cs
> @@ -22,7 +22,10 @@ namespace Rhino.ServiceBus.Msmq
>                private readonly IMessageSerializer serializer;
>                private readonly int threadCount;
>                private readonly WaitHandle[] waitHandles;
> +               private IInitializeSubQueues subQueueInitializer;
> +
>                private bool haveStarted;
> +
>                private MessageQueue queue;
>            private readonly IMessageAction[] messageActions;
>
> @@ -39,6 +42,11 @@ namespace Rhino.ServiceBus.Msmq
>                        waitHandles = new WaitHandle[threadCount];
>                }
>
> +               public IInitializeSubQueues SubQueueInitializer
> +               {
> +                       get { return subQueueInitializer; }
> +                       set { this.subQueueInitializer = value;}
> +               }
>
>                public volatile bool ShouldStop;
>
> @@ -56,6 +64,11 @@ namespace Rhino.ServiceBus.Msmq
>
>                        logger.DebugFormat("Starting msmq transport on:
> {0}", Endpoint);
>                        queue = InitalizeQueue(endpoint);
> +
> +                       if(subQueueInitializer != null)
> +                       {
> +                               subQueueInitializer.InitializeSubQueues
> (Endpoint,queue.Transactional);
> +                       }
>
>                    foreach (var messageAction in messageActions)
>                    {
> @@ -232,6 +245,7 @@ namespace Rhino.ServiceBus.Msmq
>                {
>                    try
>                        {
> +
>                                var messageQueue = endpoint.CreateQueue
> (QueueAccessMode.SendAndReceive);
>                                var filter = new MessagePropertyFilter();
>                                filter.SetAll();
> diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqUtil.cs b/
> rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqUtil.cs
> index 337bc47..e40358f 100644
> --- a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqUtil.cs
> +++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqUtil.cs
> @@ -30,10 +30,10 @@ namespace Rhino.ServiceBus.Msmq
>                 string.Compare(hostName, "localhost", true) == 0)
>             {
>                 hostName = localhost;
> -                uri = new Uri("msmq://" + localhost +
> uri.AbsolutePath);
> +                uri = new Uri("msmq://" + localhost +
> uri.AbsolutePath + uri.Fragment);
>             }
>
> -            return string.Format(hostName + "\\private$\\" +
> uri.AbsolutePath.Substring(1));
> +            return string.Format(hostName + "\\private$\\" +
> uri.AbsolutePath.Substring(1) + uri.Fragment);
>         }
>
>         public static Uri GetQueueUri(MessageQueue queue)
> diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/
> SubQueueStrategy.cs b/rhino-service.bus/Rhino.ServiceBus/Msmq/
> SubQueueStrategy.cs
> index 9d8d5e9..d852a41 100644
> --- a/rhino-service.bus/Rhino.ServiceBus/Msmq/SubQueueStrategy.cs
> +++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/SubQueueStrategy.cs
> @@ -102,5 +102,10 @@ namespace Rhino.ServiceBus.Msmq
>                                queue.Send(message,
> queue.GetTransactionType());
>                        }
>                }
> +
> +               public void InitializeQueue(Uri endpoint)
> +               {
> +
> +               }
>        }
>  }
> \ No newline at end of file
> diff --git a/rhino-service.bus/Rhino.ServiceBus/
> Rhino.ServiceBus.csproj b/rhino-service.bus/Rhino.ServiceBus/
> Rhino.ServiceBus.csproj
> index 0df3e73..d773b7a 100644
> --- a/rhino-service.bus/Rhino.ServiceBus/Rhino.ServiceBus.csproj
> +++ b/rhino-service.bus/Rhino.ServiceBus/Rhino.ServiceBus.csproj
> @@ -87,6 +87,7 @@
>     <Compile Include="Exceptions\SubscriptionException.cs" />
>     <Compile Include="Exceptions\TransportException.cs" />
>     <Compile Include="Messages\MergeSagaState.cs" />
> +    <Compile Include="Msmq\IInitializeSubQueues.cs" />
>     <Compile Include="Sagas\ISagaStateMerger.cs" />
>     <Compile Include="Sagas\IVersionedSagaState.cs" />
>     <Compile Include="Sagas\Persisters\InMemorySagaPersister.cs" />
> --
> 1.6.0.2.1172.ga5ed0
>
>
>
>
>
>
> On Jan 19, 10:54 am, Ayende Rahien <[email protected]> wrote:
> > Yes, it should
> >
> > On Mon, Jan 19, 2009 at 11:52 AM, chrisortman <[email protected]>
> wrote:
> >
> > > Should flatqueuestrategy automatically create the sibling queues for
> > > discarded, errors etc?
> >
> > > In current trunk it seems that it does not...
> >
>

--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"Rhino Tools Dev" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to 
[email protected]
For more options, visit this group at 
http://groups.google.com/group/rhino-tools-dev?hl=en
-~----------~----~----~----~------~----~------~--~---

Reply via email to