Here's a patch for that

>From 5dc1b09401ae6e9c5cd719025c57121dd293aa34 Mon Sep 17 00:00:00 2001
From: Chris Ortman <[email protected]>
Date: Mon, 19 Jan 2009 13:00:33 -0600
Subject: [PATCH] Allows the FlatQueueStrategy to create its subqueues
on demand.

---
 .../MsmqFlatQueueTestBase.cs                       |   67 +++++++++
+----------
 ...rializableMessageWillBeForwardedToErrorQueue.cs |    1 +
 .../Impl/RhinoServiceBusFacility.cs                |    8 +++
 .../Rhino.ServiceBus/Msmq/FlatQueueStrategy.cs     |   17 ++++-
 .../Rhino.ServiceBus/Msmq/IInitializeSubQueues.cs  |   33 ++++++++++
 .../Rhino.ServiceBus/Msmq/IQueueStrategy.cs        |    1 +
 .../Rhino.ServiceBus/Msmq/MsmqExtensions.cs        |    8 ++-
 .../Rhino.ServiceBus/Msmq/MsmqTransport.cs         |   14 ++++
 .../Rhino.ServiceBus/Msmq/MsmqUtil.cs              |    4 +-
 .../Rhino.ServiceBus/Msmq/SubQueueStrategy.cs      |    5 ++
 .../Rhino.ServiceBus/Rhino.ServiceBus.csproj       |    1 +
 11 files changed, 119 insertions(+), 40 deletions(-)
 create mode 100644 rhino-service.bus/Rhino.ServiceBus/Msmq/
IInitializeSubQueues.cs

diff --git a/rhino-service.bus/Rhino.ServiceBus.Tests/
MsmqFlatQueueTestBase.cs b/rhino-service.bus/Rhino.ServiceBus.Tests/
MsmqFlatQueueTestBase.cs
index a08b167..0f0f89a 100644
--- a/rhino-service.bus/Rhino.ServiceBus.Tests/
MsmqFlatQueueTestBase.cs
+++ b/rhino-service.bus/Rhino.ServiceBus.Tests/
MsmqFlatQueueTestBase.cs
@@ -64,46 +64,47 @@ namespace Rhino.ServiceBus.Tests
             if (MessageQueue.Exists(transactionalTestQueuePath) ==
false)
                 MessageQueue.Create(transactionalTestQueuePath,
true);

-            if (MessageQueue.Exists(testQueuePath + errorsPathSuffix)
== false)
-                MessageQueue.Create(testQueuePath +
errorsPathSuffix);
-
-            if (MessageQueue.Exists(testQueuePath2 +
errorsPathSuffix) == false)
-                MessageQueue.Create(testQueuePath2 +
errorsPathSuffix);
-
-            if (MessageQueue.Exists(transactionalTestQueuePath +
errorsPathSuffix) == false)
-                MessageQueue.Create(transactionalTestQueuePath +
errorsPathSuffix, true);
-
-            if (MessageQueue.Exists(testQueuePath +
discardedPathSuffix) == false)
-                MessageQueue.Create(testQueuePath +
discardedPathSuffix);

             queue = new MessageQueue(testQueuePath);
             queue.Purge();

-            using (var errQueue = new MessageQueue(testQueuePath +
errorsPathSuffix))
-            {
-                errQueue.Purge();
-            }
-
-            using (var discardedQueue = new MessageQueue
(testQueuePath + discardedPathSuffix))
-            {
-                discardedQueue.Purge();
-            }
-
-            testQueue2 = new MessageQueue(testQueuePath2);
+                                               
if(MessageQueue.Exists(testQueuePath+ errorsPathSuffix))
+                                               {
+                                                       using (var errQueue = 
new MessageQueue(testQueuePath +
errorsPathSuffix))
+                                                       {
+                                                               
errQueue.Purge();
+                                                       }
+                                               }
+
+                                       
if(MessageQueue.Exists(testQueuePath+discardedPathSuffix))
+                                       {
+                                               using (var discardedQueue = new 
MessageQueue(testQueuePath +
discardedPathSuffix))
+                                               {
+                                                       discardedQueue.Purge();
+                                               }
+                                       }
+
+               testQueue2 = new MessageQueue(testQueuePath2);
             testQueue2.Purge();

-            using (var errQueue2 = new MessageQueue(testQueuePath2 +
errorsPathSuffix))
-            {
-                errQueue2.Purge();
-            }
+                                       
if(MessageQueue.Exists(testQueuePath2+errorsPathSuffix))
+                                       {
+                                               using (var errQueue2 = new 
MessageQueue(testQueuePath2 +
errorsPathSuffix))
+                                               {
+                                                       errQueue2.Purge();
+                                               }
+                                       }

-            transactionalQueue = new MessageQueue
(transactionalTestQueuePath);
+               transactionalQueue = new MessageQueue
(transactionalTestQueuePath);
             transactionalQueue.Purge();

-            using (var errQueue3 = new MessageQueue
(transactionalTestQueuePath + errorsPathSuffix))
-            {
-                errQueue3.Purge();
-            }
+               if (MessageQueue.Exists(transactionalTestQueuePath +
errorsPathSuffix))
+               {
+                       using (var errQueue3 = new MessageQueue
(transactionalTestQueuePath + errorsPathSuffix))
+                       {
+                               errQueue3.Purge();
+                       }
+               }

             subscriptions = new MessageQueue(subbscriptionQueuePath)
             {
@@ -122,7 +123,7 @@ namespace Rhino.ServiceBus.Tests
                         new XmlMessageSerializer(
                             new DefaultReflection(),
                             new DefaultKernel()), TestQueueUri, 1,
-                        DefaultMessageActions(TestQueueUri));
+                        DefaultMessageActions(TestQueueUri))
{ SubQueueInitializer = new FlatQueueSubQueueInitializer()};
                     transport.Start();
                 }
                 return transport;
@@ -150,7 +151,7 @@ namespace Rhino.ServiceBus.Tests
                 if (transactionalTransport == null)
                 {
                     transactionalTransport = new MsmqTransport(new
XmlMessageSerializer(new DefaultReflection(), new DefaultKernel()),
-
TransactionalTestQueueUri, 1, DefaultMessageActions
(TransactionalTestQueueUri));
+
TransactionalTestQueueUri, 1, DefaultMessageActions
(TransactionalTestQueueUri) ){ SubQueueInitializer = new
FlatQueueSubQueueInitializer()};
                     transactionalTransport.Start();
                 }
                 return transactionalTransport;
diff --git a/rhino-service.bus/Rhino.ServiceBus.Tests/
UnserializableMessageWillBeForwardedToErrorQueue.cs b/rhino-
service.bus/Rhino.ServiceBus.Tests/
UnserializableMessageWillBeForwardedToErrorQueue.cs
index 60110b7..04db6a9 100644
--- a/rhino-service.bus/Rhino.ServiceBus.Tests/
UnserializableMessageWillBeForwardedToErrorQueue.cs
+++ b/rhino-service.bus/Rhino.ServiceBus.Tests/
UnserializableMessageWillBeForwardedToErrorQueue.cs
@@ -36,6 +36,7 @@ namespace Rhino.ServiceBus.Tests
             Assert.True(wasCalled);
         }
     }
+
     public class
UnserializableMessageWillBeForwardedToFlattenedErrorQueue :
MsmqFlatQueueTestBase
     {
         [Fact]
diff --git a/rhino-service.bus/Rhino.ServiceBus/Impl/
RhinoServiceBusFacility.cs b/rhino-service.bus/Rhino.ServiceBus/Impl/
RhinoServiceBusFacility.cs
index 30a791a..b39ba30 100644
--- a/rhino-service.bus/Rhino.ServiceBus/Impl/
RhinoServiceBusFacility.cs
+++ b/rhino-service.bus/Rhino.ServiceBus/Impl/
RhinoServiceBusFacility.cs
@@ -109,6 +109,14 @@ namespace Rhino.ServiceBus.Impl
                     .ImplementedBy(serializerImpl)
                 );

+                                               if(queueStrategyImpl == 
typeof(FlatQueueStrategy))
+                                               {
+                                                       Kernel.Register(
+                                                               
Component.For<IInitializeSubQueues>()
+                                                                       
.ImplementedBy<FlatQueueSubQueueInitializer>()
+                                                               );
+                                               }
+
             Kernel.Register(
                 AllTypes.Of<IMessageAction>()
                     .FromAssembly(typeof(IMessageAction).Assembly)
diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/
FlatQueueStrategy.cs b/rhino-service.bus/Rhino.ServiceBus/Msmq/
FlatQueueStrategy.cs
index 6872c60..7e1a35f 100644
--- a/rhino-service.bus/Rhino.ServiceBus/Msmq/FlatQueueStrategy.cs
+++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/FlatQueueStrategy.cs
@@ -62,7 +62,8 @@ namespace Rhino.ServiceBus.Msmq
                /// <param name="message">The message.</param>
                public void MoveToErrorsQueue(MessageQueue queue, Message 
message)
                {
-                       using (var destinationQueue = new 
MessageQueue(GetErrorsQueuePath
(), QueueAccessMode.Send))
+
+                       using (var destinationQueue = new 
MessageQueue(GetErrorsQueuePath
(),QueueAccessMode.Send))
                        {
                                
destinationQueue.Send(queue.ReceiveByLookupId(message.LookupId),
                                                                          
queue.GetTransactionType());
@@ -77,7 +78,7 @@ namespace Rhino.ServiceBus.Msmq
                /// <param name="message">The message.</param>
                public void MoveToDiscardedQueue(MessageQueue queue, Message
message)
                {
-                       using (var destinationQueue = new MessageQueue
(GetDiscardedQueuePath(), QueueAccessMode.Send))
+                       using (var destinationQueue = new MessageQueue
(GetDiscardedQueuePath(),QueueAccessMode.Send))
                        {
                                
destinationQueue.Send(queue.ReceiveByLookupId(message.LookupId),
destinationQueue.GetTransactionType());
                        }
@@ -90,7 +91,7 @@ namespace Rhino.ServiceBus.Msmq
                /// <param name="message">The message.</param>
                public void MoveToTimeoutQueue(MessageQueue queue, Message 
message)
                {
-                       using (var destinationQueue = new 
MessageQueue(GetTimeoutQueuePath
(), QueueAccessMode.Send))
+                       using (var destinationQueue = new 
MessageQueue(GetTimeoutQueuePath
(),QueueAccessMode.Send))
                        {
                                
destinationQueue.Send(queue.ReceiveByLookupId(message.LookupId),
destinationQueue.GetTransactionType());
                        }
@@ -120,6 +121,15 @@ namespace Rhino.ServiceBus.Msmq
                        }
                }

+               internal static void InitializeSubQueues(Uri endpoint, bool
transactional)
+               {
+                       var siblingQueues = new[] {"#errors", "#discarded", 
"#timeout"};
+                       foreach(var sibling in siblingQueues)
+                       {
+                               new Uri(endpoint.ToString() + 
sibling).CreateQueue
(transactional,QueueAccessMode.Peek).Close();
+                       }
+               }
+
                /// <summary>
                /// Gets the errors queue path.
                /// </summary>
@@ -149,5 +159,6 @@ namespace Rhino.ServiceBus.Msmq
                        var path = MsmqUtil.GetQueuePath(endpoint);
                        return path + "#timeout";
                }
+
        }
 }
\ No newline at end of file
diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/
IInitializeSubQueues.cs b/rhino-service.bus/Rhino.ServiceBus/Msmq/
IInitializeSubQueues.cs
new file mode 100644
index 0000000..93f8f34
--- /dev/null
+++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/IInitializeSubQueues.cs
@@ -0,0 +1,33 @@
+namespace Rhino.ServiceBus.Msmq
+{
+       using System;
+
+       /// <summary>
+       /// Provides an interface for the registering creating sub queues
+       /// </summary>
+       /// <remarks>
+       /// Since <see cref="FlatQueueStrategy"/> uses sibling queues in
place
+       /// of MSMQ 4.0 subqueues we need a way to create these when the app
+       /// starts up.
+       /// </remarks>
+       public interface IInitializeSubQueues
+       {
+               /// <summary>
+               /// Creates subqueues for an endpoint (#timeouts, #errors,
#discarded)
+               /// </summary>
+               /// <param name="endpoint">Used to figure out the path of the
subqueue.
+               /// The subqueue will append #something to your queue 
path</param>
+               /// <param name="transactional">If the queues should be created
transactional
+               /// or not.
+               /// </param>
+               void InitializeSubQueues(Uri endpoint, bool transactional);
+       }
+
+       public class FlatQueueSubQueueInitializer : IInitializeSubQueues
+       {
+               public void InitializeSubQueues(Uri endpoint, bool 
transactional)
+               {
+                       
FlatQueueStrategy.InitializeSubQueues(endpoint,transactional);
+               }
+       }
+}
\ No newline at end of file
diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/IQueueStrategy.cs
b/rhino-service.bus/Rhino.ServiceBus/Msmq/IQueueStrategy.cs
index 465d805..4c344eb 100644
--- a/rhino-service.bus/Rhino.ServiceBus/Msmq/IQueueStrategy.cs
+++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/IQueueStrategy.cs
@@ -54,5 +54,6 @@ namespace Rhino.ServiceBus.Msmq
                /// <param name="queue">The queue.</param>
                /// <param name="messageId">The message id.</param>
                void MoveTimeoutToMainQueue(MessageQueue queue, string 
messageId);
+
     }
 }
diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqExtensions.cs
b/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqExtensions.cs
index 6b36b5a..350b97a 100644
--- a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqExtensions.cs
+++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqExtensions.cs
@@ -98,8 +98,12 @@ namespace Rhino.ServiceBus.Msmq

             }
         }
+                        public static MessageQueue CreateQueue(this Uri 
queueUri,
QueueAccessMode accessMode)
+        {
+                               return CreateQueue(queueUri, true, accessMode);
+        }

-        public static MessageQueue CreateQueue(this Uri queueUri,
QueueAccessMode accessMode)
+        public static MessageQueue CreateQueue(this Uri queueUri,bool
transactional, QueueAccessMode accessMode)
         {
             var queuePath = MsmqUtil.GetQueuePath(queueUri);

@@ -109,7 +113,7 @@ namespace Rhino.ServiceBus.Msmq
                 {
                     try
                     {
-                        MessageQueue.Create(queuePath, true);
+                        MessageQueue.Create(queuePath,
transactional);
                     }
                     catch (Exception e)
                     {
diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqTransport.cs
b/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqTransport.cs
index 128849d..3077fab 100644
--- a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqTransport.cs
+++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqTransport.cs
@@ -22,7 +22,10 @@ namespace Rhino.ServiceBus.Msmq
                private readonly IMessageSerializer serializer;
                private readonly int threadCount;
                private readonly WaitHandle[] waitHandles;
+               private IInitializeSubQueues subQueueInitializer;
+
                private bool haveStarted;
+
                private MessageQueue queue;
            private readonly IMessageAction[] messageActions;

@@ -39,6 +42,11 @@ namespace Rhino.ServiceBus.Msmq
                        waitHandles = new WaitHandle[threadCount];
                }

+               public IInitializeSubQueues SubQueueInitializer
+               {
+                       get { return subQueueInitializer; }
+                       set { this.subQueueInitializer = value;}
+               }

                public volatile bool ShouldStop;

@@ -56,6 +64,11 @@ namespace Rhino.ServiceBus.Msmq

                        logger.DebugFormat("Starting msmq transport on: {0}", 
Endpoint);
                        queue = InitalizeQueue(endpoint);
+
+                       if(subQueueInitializer != null)
+                       {
+                               subQueueInitializer.InitializeSubQueues
(Endpoint,queue.Transactional);
+                       }

                    foreach (var messageAction in messageActions)
                    {
@@ -232,6 +245,7 @@ namespace Rhino.ServiceBus.Msmq
                {
                    try
                        {
+
                                var messageQueue = endpoint.CreateQueue
(QueueAccessMode.SendAndReceive);
                                var filter = new MessagePropertyFilter();
                                filter.SetAll();
diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqUtil.cs b/
rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqUtil.cs
index 337bc47..e40358f 100644
--- a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqUtil.cs
+++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqUtil.cs
@@ -30,10 +30,10 @@ namespace Rhino.ServiceBus.Msmq
                 string.Compare(hostName, "localhost", true) == 0)
             {
                 hostName = localhost;
-                uri = new Uri("msmq://" + localhost +
uri.AbsolutePath);
+                uri = new Uri("msmq://" + localhost +
uri.AbsolutePath + uri.Fragment);
             }

-            return string.Format(hostName + "\\private$\\" +
uri.AbsolutePath.Substring(1));
+            return string.Format(hostName + "\\private$\\" +
uri.AbsolutePath.Substring(1) + uri.Fragment);
         }

         public static Uri GetQueueUri(MessageQueue queue)
diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/
SubQueueStrategy.cs b/rhino-service.bus/Rhino.ServiceBus/Msmq/
SubQueueStrategy.cs
index 9d8d5e9..d852a41 100644
--- a/rhino-service.bus/Rhino.ServiceBus/Msmq/SubQueueStrategy.cs
+++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/SubQueueStrategy.cs
@@ -102,5 +102,10 @@ namespace Rhino.ServiceBus.Msmq
                                queue.Send(message, queue.GetTransactionType());
                        }
                }
+
+               public void InitializeQueue(Uri endpoint)
+               {
+
+               }
        }
 }
\ No newline at end of file
diff --git a/rhino-service.bus/Rhino.ServiceBus/
Rhino.ServiceBus.csproj b/rhino-service.bus/Rhino.ServiceBus/
Rhino.ServiceBus.csproj
index 0df3e73..d773b7a 100644
--- a/rhino-service.bus/Rhino.ServiceBus/Rhino.ServiceBus.csproj
+++ b/rhino-service.bus/Rhino.ServiceBus/Rhino.ServiceBus.csproj
@@ -87,6 +87,7 @@
     <Compile Include="Exceptions\SubscriptionException.cs" />
     <Compile Include="Exceptions\TransportException.cs" />
     <Compile Include="Messages\MergeSagaState.cs" />
+    <Compile Include="Msmq\IInitializeSubQueues.cs" />
     <Compile Include="Sagas\ISagaStateMerger.cs" />
     <Compile Include="Sagas\IVersionedSagaState.cs" />
     <Compile Include="Sagas\Persisters\InMemorySagaPersister.cs" />
--
1.6.0.2.1172.ga5ed0






On Jan 19, 10:54 am, Ayende Rahien <[email protected]> wrote:
> Yes, it should
>
> On Mon, Jan 19, 2009 at 11:52 AM, chrisortman <[email protected]> wrote:
>
> > Should flatqueuestrategy automatically create the sibling queues for
> > discarded, errors etc?
>
> > In current trunk it seems that it does not...
--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"Rhino Tools Dev" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to 
[email protected]
For more options, visit this group at 
http://groups.google.com/group/rhino-tools-dev?hl=en
-~----------~----~----~----~------~----~------~--~---

Reply via email to