Please zip it here. Mike, can you review that? On Mon, Jan 19, 2009 at 2:02 PM, chrisortman <[email protected]> wrote:
> > Here's a patch for that > > From 5dc1b09401ae6e9c5cd719025c57121dd293aa34 Mon Sep 17 00:00:00 2001 > From: Chris Ortman <[email protected]> > Date: Mon, 19 Jan 2009 13:00:33 -0600 > Subject: [PATCH] Allows the FlatQueueStrategy to create its subqueues > on demand. > > --- > .../MsmqFlatQueueTestBase.cs | 67 +++++++++ > +---------- > ...rializableMessageWillBeForwardedToErrorQueue.cs | 1 + > .../Impl/RhinoServiceBusFacility.cs | 8 +++ > .../Rhino.ServiceBus/Msmq/FlatQueueStrategy.cs | 17 ++++- > .../Rhino.ServiceBus/Msmq/IInitializeSubQueues.cs | 33 ++++++++++ > .../Rhino.ServiceBus/Msmq/IQueueStrategy.cs | 1 + > .../Rhino.ServiceBus/Msmq/MsmqExtensions.cs | 8 ++- > .../Rhino.ServiceBus/Msmq/MsmqTransport.cs | 14 ++++ > .../Rhino.ServiceBus/Msmq/MsmqUtil.cs | 4 +- > .../Rhino.ServiceBus/Msmq/SubQueueStrategy.cs | 5 ++ > .../Rhino.ServiceBus/Rhino.ServiceBus.csproj | 1 + > 11 files changed, 119 insertions(+), 40 deletions(-) > create mode 100644 rhino-service.bus/Rhino.ServiceBus/Msmq/ > IInitializeSubQueues.cs > > diff --git a/rhino-service.bus/Rhino.ServiceBus.Tests/ > MsmqFlatQueueTestBase.cs b/rhino-service.bus/Rhino.ServiceBus.Tests/ > MsmqFlatQueueTestBase.cs > index a08b167..0f0f89a 100644 > --- a/rhino-service.bus/Rhino.ServiceBus.Tests/ > MsmqFlatQueueTestBase.cs > +++ b/rhino-service.bus/Rhino.ServiceBus.Tests/ > MsmqFlatQueueTestBase.cs > @@ -64,46 +64,47 @@ namespace Rhino.ServiceBus.Tests > if (MessageQueue.Exists(transactionalTestQueuePath) == > false) > MessageQueue.Create(transactionalTestQueuePath, > true); > > - if (MessageQueue.Exists(testQueuePath + errorsPathSuffix) > == false) > - MessageQueue.Create(testQueuePath + > errorsPathSuffix); > - > - if (MessageQueue.Exists(testQueuePath2 + > errorsPathSuffix) == false) > - MessageQueue.Create(testQueuePath2 + > errorsPathSuffix); > - > - if (MessageQueue.Exists(transactionalTestQueuePath + > errorsPathSuffix) == false) > - MessageQueue.Create(transactionalTestQueuePath + > errorsPathSuffix, true); > - > - if (MessageQueue.Exists(testQueuePath + > discardedPathSuffix) == false) > - MessageQueue.Create(testQueuePath + > discardedPathSuffix); > > queue = new MessageQueue(testQueuePath); > queue.Purge(); > > - using (var errQueue = new MessageQueue(testQueuePath + > errorsPathSuffix)) > - { > - errQueue.Purge(); > - } > - > - using (var discardedQueue = new MessageQueue > (testQueuePath + discardedPathSuffix)) > - { > - discardedQueue.Purge(); > - } > - > - testQueue2 = new MessageQueue(testQueuePath2); > + > if(MessageQueue.Exists(testQueuePath+ errorsPathSuffix)) > + { > + using (var errQueue > = new MessageQueue(testQueuePath + > errorsPathSuffix)) > + { > + > errQueue.Purge(); > + } > + } > + > + > if(MessageQueue.Exists(testQueuePath+discardedPathSuffix)) > + { > + using (var discardedQueue = > new MessageQueue(testQueuePath + > discardedPathSuffix)) > + { > + > discardedQueue.Purge(); > + } > + } > + > + testQueue2 = new MessageQueue(testQueuePath2); > testQueue2.Purge(); > > - using (var errQueue2 = new MessageQueue(testQueuePath2 + > errorsPathSuffix)) > - { > - errQueue2.Purge(); > - } > + > if(MessageQueue.Exists(testQueuePath2+errorsPathSuffix)) > + { > + using (var errQueue2 = new > MessageQueue(testQueuePath2 + > errorsPathSuffix)) > + { > + errQueue2.Purge(); > + } > + } > > - transactionalQueue = new MessageQueue > (transactionalTestQueuePath); > + transactionalQueue = new MessageQueue > (transactionalTestQueuePath); > transactionalQueue.Purge(); > > - using (var errQueue3 = new MessageQueue > (transactionalTestQueuePath + errorsPathSuffix)) > - { > - errQueue3.Purge(); > - } > + if (MessageQueue.Exists(transactionalTestQueuePath + > errorsPathSuffix)) > + { > + using (var errQueue3 = new MessageQueue > (transactionalTestQueuePath + errorsPathSuffix)) > + { > + errQueue3.Purge(); > + } > + } > > subscriptions = new MessageQueue(subbscriptionQueuePath) > { > @@ -122,7 +123,7 @@ namespace Rhino.ServiceBus.Tests > new XmlMessageSerializer( > new DefaultReflection(), > new DefaultKernel()), TestQueueUri, 1, > - DefaultMessageActions(TestQueueUri)); > + DefaultMessageActions(TestQueueUri)) > { SubQueueInitializer = new FlatQueueSubQueueInitializer()}; > transport.Start(); > } > return transport; > @@ -150,7 +151,7 @@ namespace Rhino.ServiceBus.Tests > if (transactionalTransport == null) > { > transactionalTransport = new MsmqTransport(new > XmlMessageSerializer(new DefaultReflection(), new DefaultKernel()), > - > TransactionalTestQueueUri, 1, DefaultMessageActions > (TransactionalTestQueueUri)); > + > TransactionalTestQueueUri, 1, DefaultMessageActions > (TransactionalTestQueueUri) ){ SubQueueInitializer = new > FlatQueueSubQueueInitializer()}; > transactionalTransport.Start(); > } > return transactionalTransport; > diff --git a/rhino-service.bus/Rhino.ServiceBus.Tests/ > UnserializableMessageWillBeForwardedToErrorQueue.cs b/rhino- > service.bus/Rhino.ServiceBus.Tests/ > UnserializableMessageWillBeForwardedToErrorQueue.cs > index 60110b7..04db6a9 100644 > --- a/rhino-service.bus/Rhino.ServiceBus.Tests/ > UnserializableMessageWillBeForwardedToErrorQueue.cs > +++ b/rhino-service.bus/Rhino.ServiceBus.Tests/ > UnserializableMessageWillBeForwardedToErrorQueue.cs > @@ -36,6 +36,7 @@ namespace Rhino.ServiceBus.Tests > Assert.True(wasCalled); > } > } > + > public class > UnserializableMessageWillBeForwardedToFlattenedErrorQueue : > MsmqFlatQueueTestBase > { > [Fact] > diff --git a/rhino-service.bus/Rhino.ServiceBus/Impl/ > RhinoServiceBusFacility.cs b/rhino-service.bus/Rhino.ServiceBus/Impl/ > RhinoServiceBusFacility.cs > index 30a791a..b39ba30 100644 > --- a/rhino-service.bus/Rhino.ServiceBus/Impl/ > RhinoServiceBusFacility.cs > +++ b/rhino-service.bus/Rhino.ServiceBus/Impl/ > RhinoServiceBusFacility.cs > @@ -109,6 +109,14 @@ namespace Rhino.ServiceBus.Impl > .ImplementedBy(serializerImpl) > ); > > + if(queueStrategyImpl == > typeof(FlatQueueStrategy)) > + { > + Kernel.Register( > + > Component.For<IInitializeSubQueues>() > + > .ImplementedBy<FlatQueueSubQueueInitializer>() > + ); > + } > + > Kernel.Register( > AllTypes.Of<IMessageAction>() > .FromAssembly(typeof(IMessageAction).Assembly) > diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/ > FlatQueueStrategy.cs b/rhino-service.bus/Rhino.ServiceBus/Msmq/ > FlatQueueStrategy.cs > index 6872c60..7e1a35f 100644 > --- a/rhino-service.bus/Rhino.ServiceBus/Msmq/FlatQueueStrategy.cs > +++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/FlatQueueStrategy.cs > @@ -62,7 +62,8 @@ namespace Rhino.ServiceBus.Msmq > /// <param name="message">The message.</param> > public void MoveToErrorsQueue(MessageQueue queue, Message > message) > { > - using (var destinationQueue = new > MessageQueue(GetErrorsQueuePath > (), QueueAccessMode.Send)) > + > + using (var destinationQueue = new > MessageQueue(GetErrorsQueuePath > (),QueueAccessMode.Send)) > { > > destinationQueue.Send(queue.ReceiveByLookupId(message.LookupId), > > queue.GetTransactionType()); > @@ -77,7 +78,7 @@ namespace Rhino.ServiceBus.Msmq > /// <param name="message">The message.</param> > public void MoveToDiscardedQueue(MessageQueue queue, Message > message) > { > - using (var destinationQueue = new MessageQueue > (GetDiscardedQueuePath(), QueueAccessMode.Send)) > + using (var destinationQueue = new MessageQueue > (GetDiscardedQueuePath(),QueueAccessMode.Send)) > { > > destinationQueue.Send(queue.ReceiveByLookupId(message.LookupId), > destinationQueue.GetTransactionType()); > } > @@ -90,7 +91,7 @@ namespace Rhino.ServiceBus.Msmq > /// <param name="message">The message.</param> > public void MoveToTimeoutQueue(MessageQueue queue, Message > message) > { > - using (var destinationQueue = new > MessageQueue(GetTimeoutQueuePath > (), QueueAccessMode.Send)) > + using (var destinationQueue = new > MessageQueue(GetTimeoutQueuePath > (),QueueAccessMode.Send)) > { > > destinationQueue.Send(queue.ReceiveByLookupId(message.LookupId), > destinationQueue.GetTransactionType()); > } > @@ -120,6 +121,15 @@ namespace Rhino.ServiceBus.Msmq > } > } > > + internal static void InitializeSubQueues(Uri endpoint, bool > transactional) > + { > + var siblingQueues = new[] {"#errors", "#discarded", > "#timeout"}; > + foreach(var sibling in siblingQueues) > + { > + new Uri(endpoint.ToString() + > sibling).CreateQueue > (transactional,QueueAccessMode.Peek).Close(); > + } > + } > + > /// <summary> > /// Gets the errors queue path. > /// </summary> > @@ -149,5 +159,6 @@ namespace Rhino.ServiceBus.Msmq > var path = MsmqUtil.GetQueuePath(endpoint); > return path + "#timeout"; > } > + > } > } > \ No newline at end of file > diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/ > IInitializeSubQueues.cs b/rhino-service.bus/Rhino.ServiceBus/Msmq/ > IInitializeSubQueues.cs > new file mode 100644 > index 0000000..93f8f34 > --- /dev/null > +++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/IInitializeSubQueues.cs > @@ -0,0 +1,33 @@ > +namespace Rhino.ServiceBus.Msmq > +{ > + using System; > + > + /// <summary> > + /// Provides an interface for the registering creating sub queues > + /// </summary> > + /// <remarks> > + /// Since <see cref="FlatQueueStrategy"/> uses sibling queues in > place > + /// of MSMQ 4.0 subqueues we need a way to create these when the > app > + /// starts up. > + /// </remarks> > + public interface IInitializeSubQueues > + { > + /// <summary> > + /// Creates subqueues for an endpoint (#timeouts, #errors, > #discarded) > + /// </summary> > + /// <param name="endpoint">Used to figure out the path of > the > subqueue. > + /// The subqueue will append #something to your queue > path</param> > + /// <param name="transactional">If the queues should be > created > transactional > + /// or not. > + /// </param> > + void InitializeSubQueues(Uri endpoint, bool transactional); > + } > + > + public class FlatQueueSubQueueInitializer : IInitializeSubQueues > + { > + public void InitializeSubQueues(Uri endpoint, bool > transactional) > + { > + > FlatQueueStrategy.InitializeSubQueues(endpoint,transactional); > + } > + } > +} > \ No newline at end of file > diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/IQueueStrategy.cs > b/rhino-service.bus/Rhino.ServiceBus/Msmq/IQueueStrategy.cs > index 465d805..4c344eb 100644 > --- a/rhino-service.bus/Rhino.ServiceBus/Msmq/IQueueStrategy.cs > +++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/IQueueStrategy.cs > @@ -54,5 +54,6 @@ namespace Rhino.ServiceBus.Msmq > /// <param name="queue">The queue.</param> > /// <param name="messageId">The message id.</param> > void MoveTimeoutToMainQueue(MessageQueue queue, string > messageId); > + > } > } > diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqExtensions.cs > b/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqExtensions.cs > index 6b36b5a..350b97a 100644 > --- a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqExtensions.cs > +++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqExtensions.cs > @@ -98,8 +98,12 @@ namespace Rhino.ServiceBus.Msmq > > } > } > + public static MessageQueue CreateQueue(this Uri > queueUri, > QueueAccessMode accessMode) > + { > + return CreateQueue(queueUri, true, > accessMode); > + } > > - public static MessageQueue CreateQueue(this Uri queueUri, > QueueAccessMode accessMode) > + public static MessageQueue CreateQueue(this Uri queueUri,bool > transactional, QueueAccessMode accessMode) > { > var queuePath = MsmqUtil.GetQueuePath(queueUri); > > @@ -109,7 +113,7 @@ namespace Rhino.ServiceBus.Msmq > { > try > { > - MessageQueue.Create(queuePath, true); > + MessageQueue.Create(queuePath, > transactional); > } > catch (Exception e) > { > diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqTransport.cs > b/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqTransport.cs > index 128849d..3077fab 100644 > --- a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqTransport.cs > +++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqTransport.cs > @@ -22,7 +22,10 @@ namespace Rhino.ServiceBus.Msmq > private readonly IMessageSerializer serializer; > private readonly int threadCount; > private readonly WaitHandle[] waitHandles; > + private IInitializeSubQueues subQueueInitializer; > + > private bool haveStarted; > + > private MessageQueue queue; > private readonly IMessageAction[] messageActions; > > @@ -39,6 +42,11 @@ namespace Rhino.ServiceBus.Msmq > waitHandles = new WaitHandle[threadCount]; > } > > + public IInitializeSubQueues SubQueueInitializer > + { > + get { return subQueueInitializer; } > + set { this.subQueueInitializer = value;} > + } > > public volatile bool ShouldStop; > > @@ -56,6 +64,11 @@ namespace Rhino.ServiceBus.Msmq > > logger.DebugFormat("Starting msmq transport on: > {0}", Endpoint); > queue = InitalizeQueue(endpoint); > + > + if(subQueueInitializer != null) > + { > + subQueueInitializer.InitializeSubQueues > (Endpoint,queue.Transactional); > + } > > foreach (var messageAction in messageActions) > { > @@ -232,6 +245,7 @@ namespace Rhino.ServiceBus.Msmq > { > try > { > + > var messageQueue = endpoint.CreateQueue > (QueueAccessMode.SendAndReceive); > var filter = new MessagePropertyFilter(); > filter.SetAll(); > diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqUtil.cs b/ > rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqUtil.cs > index 337bc47..e40358f 100644 > --- a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqUtil.cs > +++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqUtil.cs > @@ -30,10 +30,10 @@ namespace Rhino.ServiceBus.Msmq > string.Compare(hostName, "localhost", true) == 0) > { > hostName = localhost; > - uri = new Uri("msmq://" + localhost + > uri.AbsolutePath); > + uri = new Uri("msmq://" + localhost + > uri.AbsolutePath + uri.Fragment); > } > > - return string.Format(hostName + "\\private$\\" + > uri.AbsolutePath.Substring(1)); > + return string.Format(hostName + "\\private$\\" + > uri.AbsolutePath.Substring(1) + uri.Fragment); > } > > public static Uri GetQueueUri(MessageQueue queue) > diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/ > SubQueueStrategy.cs b/rhino-service.bus/Rhino.ServiceBus/Msmq/ > SubQueueStrategy.cs > index 9d8d5e9..d852a41 100644 > --- a/rhino-service.bus/Rhino.ServiceBus/Msmq/SubQueueStrategy.cs > +++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/SubQueueStrategy.cs > @@ -102,5 +102,10 @@ namespace Rhino.ServiceBus.Msmq > queue.Send(message, > queue.GetTransactionType()); > } > } > + > + public void InitializeQueue(Uri endpoint) > + { > + > + } > } > } > \ No newline at end of file > diff --git a/rhino-service.bus/Rhino.ServiceBus/ > Rhino.ServiceBus.csproj b/rhino-service.bus/Rhino.ServiceBus/ > Rhino.ServiceBus.csproj > index 0df3e73..d773b7a 100644 > --- a/rhino-service.bus/Rhino.ServiceBus/Rhino.ServiceBus.csproj > +++ b/rhino-service.bus/Rhino.ServiceBus/Rhino.ServiceBus.csproj > @@ -87,6 +87,7 @@ > <Compile Include="Exceptions\SubscriptionException.cs" /> > <Compile Include="Exceptions\TransportException.cs" /> > <Compile Include="Messages\MergeSagaState.cs" /> > + <Compile Include="Msmq\IInitializeSubQueues.cs" /> > <Compile Include="Sagas\ISagaStateMerger.cs" /> > <Compile Include="Sagas\IVersionedSagaState.cs" /> > <Compile Include="Sagas\Persisters\InMemorySagaPersister.cs" /> > -- > 1.6.0.2.1172.ga5ed0 > > > > > > > On Jan 19, 10:54 am, Ayende Rahien <[email protected]> wrote: > > Yes, it should > > > > On Mon, Jan 19, 2009 at 11:52 AM, chrisortman <[email protected]> > wrote: > > > > > Should flatqueuestrategy automatically create the sibling queues for > > > discarded, errors etc? > > > > > In current trunk it seems that it does not... > > > --~--~---------~--~----~------------~-------~--~----~ You received this message because you are subscribed to the Google Groups "Rhino Tools Dev" group. To post to this group, send email to [email protected] To unsubscribe from this group, send email to [email protected] For more options, visit this group at http://groups.google.com/group/rhino-tools-dev?hl=en -~----------~----~----~----~------~----~------~--~---
