Here's a patch for that
>From 5dc1b09401ae6e9c5cd719025c57121dd293aa34 Mon Sep 17 00:00:00 2001
From: Chris Ortman <[email protected]>
Date: Mon, 19 Jan 2009 13:00:33 -0600
Subject: [PATCH] Allows the FlatQueueStrategy to create its subqueues
on demand.
---
.../MsmqFlatQueueTestBase.cs | 67 +++++++++
+----------
...rializableMessageWillBeForwardedToErrorQueue.cs | 1 +
.../Impl/RhinoServiceBusFacility.cs | 8 +++
.../Rhino.ServiceBus/Msmq/FlatQueueStrategy.cs | 17 ++++-
.../Rhino.ServiceBus/Msmq/IInitializeSubQueues.cs | 33 ++++++++++
.../Rhino.ServiceBus/Msmq/IQueueStrategy.cs | 1 +
.../Rhino.ServiceBus/Msmq/MsmqExtensions.cs | 8 ++-
.../Rhino.ServiceBus/Msmq/MsmqTransport.cs | 14 ++++
.../Rhino.ServiceBus/Msmq/MsmqUtil.cs | 4 +-
.../Rhino.ServiceBus/Msmq/SubQueueStrategy.cs | 5 ++
.../Rhino.ServiceBus/Rhino.ServiceBus.csproj | 1 +
11 files changed, 119 insertions(+), 40 deletions(-)
create mode 100644 rhino-service.bus/Rhino.ServiceBus/Msmq/
IInitializeSubQueues.cs
diff --git a/rhino-service.bus/Rhino.ServiceBus.Tests/
MsmqFlatQueueTestBase.cs b/rhino-service.bus/Rhino.ServiceBus.Tests/
MsmqFlatQueueTestBase.cs
index a08b167..0f0f89a 100644
--- a/rhino-service.bus/Rhino.ServiceBus.Tests/
MsmqFlatQueueTestBase.cs
+++ b/rhino-service.bus/Rhino.ServiceBus.Tests/
MsmqFlatQueueTestBase.cs
@@ -64,46 +64,47 @@ namespace Rhino.ServiceBus.Tests
if (MessageQueue.Exists(transactionalTestQueuePath) ==
false)
MessageQueue.Create(transactionalTestQueuePath,
true);
- if (MessageQueue.Exists(testQueuePath + errorsPathSuffix)
== false)
- MessageQueue.Create(testQueuePath +
errorsPathSuffix);
-
- if (MessageQueue.Exists(testQueuePath2 +
errorsPathSuffix) == false)
- MessageQueue.Create(testQueuePath2 +
errorsPathSuffix);
-
- if (MessageQueue.Exists(transactionalTestQueuePath +
errorsPathSuffix) == false)
- MessageQueue.Create(transactionalTestQueuePath +
errorsPathSuffix, true);
-
- if (MessageQueue.Exists(testQueuePath +
discardedPathSuffix) == false)
- MessageQueue.Create(testQueuePath +
discardedPathSuffix);
queue = new MessageQueue(testQueuePath);
queue.Purge();
- using (var errQueue = new MessageQueue(testQueuePath +
errorsPathSuffix))
- {
- errQueue.Purge();
- }
-
- using (var discardedQueue = new MessageQueue
(testQueuePath + discardedPathSuffix))
- {
- discardedQueue.Purge();
- }
-
- testQueue2 = new MessageQueue(testQueuePath2);
+
if(MessageQueue.Exists(testQueuePath+ errorsPathSuffix))
+ {
+ using (var errQueue =
new MessageQueue(testQueuePath +
errorsPathSuffix))
+ {
+
errQueue.Purge();
+ }
+ }
+
+
if(MessageQueue.Exists(testQueuePath+discardedPathSuffix))
+ {
+ using (var discardedQueue = new
MessageQueue(testQueuePath +
discardedPathSuffix))
+ {
+ discardedQueue.Purge();
+ }
+ }
+
+ testQueue2 = new MessageQueue(testQueuePath2);
testQueue2.Purge();
- using (var errQueue2 = new MessageQueue(testQueuePath2 +
errorsPathSuffix))
- {
- errQueue2.Purge();
- }
+
if(MessageQueue.Exists(testQueuePath2+errorsPathSuffix))
+ {
+ using (var errQueue2 = new
MessageQueue(testQueuePath2 +
errorsPathSuffix))
+ {
+ errQueue2.Purge();
+ }
+ }
- transactionalQueue = new MessageQueue
(transactionalTestQueuePath);
+ transactionalQueue = new MessageQueue
(transactionalTestQueuePath);
transactionalQueue.Purge();
- using (var errQueue3 = new MessageQueue
(transactionalTestQueuePath + errorsPathSuffix))
- {
- errQueue3.Purge();
- }
+ if (MessageQueue.Exists(transactionalTestQueuePath +
errorsPathSuffix))
+ {
+ using (var errQueue3 = new MessageQueue
(transactionalTestQueuePath + errorsPathSuffix))
+ {
+ errQueue3.Purge();
+ }
+ }
subscriptions = new MessageQueue(subbscriptionQueuePath)
{
@@ -122,7 +123,7 @@ namespace Rhino.ServiceBus.Tests
new XmlMessageSerializer(
new DefaultReflection(),
new DefaultKernel()), TestQueueUri, 1,
- DefaultMessageActions(TestQueueUri));
+ DefaultMessageActions(TestQueueUri))
{ SubQueueInitializer = new FlatQueueSubQueueInitializer()};
transport.Start();
}
return transport;
@@ -150,7 +151,7 @@ namespace Rhino.ServiceBus.Tests
if (transactionalTransport == null)
{
transactionalTransport = new MsmqTransport(new
XmlMessageSerializer(new DefaultReflection(), new DefaultKernel()),
-
TransactionalTestQueueUri, 1, DefaultMessageActions
(TransactionalTestQueueUri));
+
TransactionalTestQueueUri, 1, DefaultMessageActions
(TransactionalTestQueueUri) ){ SubQueueInitializer = new
FlatQueueSubQueueInitializer()};
transactionalTransport.Start();
}
return transactionalTransport;
diff --git a/rhino-service.bus/Rhino.ServiceBus.Tests/
UnserializableMessageWillBeForwardedToErrorQueue.cs b/rhino-
service.bus/Rhino.ServiceBus.Tests/
UnserializableMessageWillBeForwardedToErrorQueue.cs
index 60110b7..04db6a9 100644
--- a/rhino-service.bus/Rhino.ServiceBus.Tests/
UnserializableMessageWillBeForwardedToErrorQueue.cs
+++ b/rhino-service.bus/Rhino.ServiceBus.Tests/
UnserializableMessageWillBeForwardedToErrorQueue.cs
@@ -36,6 +36,7 @@ namespace Rhino.ServiceBus.Tests
Assert.True(wasCalled);
}
}
+
public class
UnserializableMessageWillBeForwardedToFlattenedErrorQueue :
MsmqFlatQueueTestBase
{
[Fact]
diff --git a/rhino-service.bus/Rhino.ServiceBus/Impl/
RhinoServiceBusFacility.cs b/rhino-service.bus/Rhino.ServiceBus/Impl/
RhinoServiceBusFacility.cs
index 30a791a..b39ba30 100644
--- a/rhino-service.bus/Rhino.ServiceBus/Impl/
RhinoServiceBusFacility.cs
+++ b/rhino-service.bus/Rhino.ServiceBus/Impl/
RhinoServiceBusFacility.cs
@@ -109,6 +109,14 @@ namespace Rhino.ServiceBus.Impl
.ImplementedBy(serializerImpl)
);
+ if(queueStrategyImpl ==
typeof(FlatQueueStrategy))
+ {
+ Kernel.Register(
+
Component.For<IInitializeSubQueues>()
+
.ImplementedBy<FlatQueueSubQueueInitializer>()
+ );
+ }
+
Kernel.Register(
AllTypes.Of<IMessageAction>()
.FromAssembly(typeof(IMessageAction).Assembly)
diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/
FlatQueueStrategy.cs b/rhino-service.bus/Rhino.ServiceBus/Msmq/
FlatQueueStrategy.cs
index 6872c60..7e1a35f 100644
--- a/rhino-service.bus/Rhino.ServiceBus/Msmq/FlatQueueStrategy.cs
+++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/FlatQueueStrategy.cs
@@ -62,7 +62,8 @@ namespace Rhino.ServiceBus.Msmq
/// <param name="message">The message.</param>
public void MoveToErrorsQueue(MessageQueue queue, Message
message)
{
- using (var destinationQueue = new
MessageQueue(GetErrorsQueuePath
(), QueueAccessMode.Send))
+
+ using (var destinationQueue = new
MessageQueue(GetErrorsQueuePath
(),QueueAccessMode.Send))
{
destinationQueue.Send(queue.ReceiveByLookupId(message.LookupId),
queue.GetTransactionType());
@@ -77,7 +78,7 @@ namespace Rhino.ServiceBus.Msmq
/// <param name="message">The message.</param>
public void MoveToDiscardedQueue(MessageQueue queue, Message
message)
{
- using (var destinationQueue = new MessageQueue
(GetDiscardedQueuePath(), QueueAccessMode.Send))
+ using (var destinationQueue = new MessageQueue
(GetDiscardedQueuePath(),QueueAccessMode.Send))
{
destinationQueue.Send(queue.ReceiveByLookupId(message.LookupId),
destinationQueue.GetTransactionType());
}
@@ -90,7 +91,7 @@ namespace Rhino.ServiceBus.Msmq
/// <param name="message">The message.</param>
public void MoveToTimeoutQueue(MessageQueue queue, Message
message)
{
- using (var destinationQueue = new
MessageQueue(GetTimeoutQueuePath
(), QueueAccessMode.Send))
+ using (var destinationQueue = new
MessageQueue(GetTimeoutQueuePath
(),QueueAccessMode.Send))
{
destinationQueue.Send(queue.ReceiveByLookupId(message.LookupId),
destinationQueue.GetTransactionType());
}
@@ -120,6 +121,15 @@ namespace Rhino.ServiceBus.Msmq
}
}
+ internal static void InitializeSubQueues(Uri endpoint, bool
transactional)
+ {
+ var siblingQueues = new[] {"#errors", "#discarded",
"#timeout"};
+ foreach(var sibling in siblingQueues)
+ {
+ new Uri(endpoint.ToString() +
sibling).CreateQueue
(transactional,QueueAccessMode.Peek).Close();
+ }
+ }
+
/// <summary>
/// Gets the errors queue path.
/// </summary>
@@ -149,5 +159,6 @@ namespace Rhino.ServiceBus.Msmq
var path = MsmqUtil.GetQueuePath(endpoint);
return path + "#timeout";
}
+
}
}
\ No newline at end of file
diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/
IInitializeSubQueues.cs b/rhino-service.bus/Rhino.ServiceBus/Msmq/
IInitializeSubQueues.cs
new file mode 100644
index 0000000..93f8f34
--- /dev/null
+++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/IInitializeSubQueues.cs
@@ -0,0 +1,33 @@
+namespace Rhino.ServiceBus.Msmq
+{
+ using System;
+
+ /// <summary>
+ /// Provides an interface for the registering creating sub queues
+ /// </summary>
+ /// <remarks>
+ /// Since <see cref="FlatQueueStrategy"/> uses sibling queues in
place
+ /// of MSMQ 4.0 subqueues we need a way to create these when the app
+ /// starts up.
+ /// </remarks>
+ public interface IInitializeSubQueues
+ {
+ /// <summary>
+ /// Creates subqueues for an endpoint (#timeouts, #errors,
#discarded)
+ /// </summary>
+ /// <param name="endpoint">Used to figure out the path of the
subqueue.
+ /// The subqueue will append #something to your queue
path</param>
+ /// <param name="transactional">If the queues should be created
transactional
+ /// or not.
+ /// </param>
+ void InitializeSubQueues(Uri endpoint, bool transactional);
+ }
+
+ public class FlatQueueSubQueueInitializer : IInitializeSubQueues
+ {
+ public void InitializeSubQueues(Uri endpoint, bool
transactional)
+ {
+
FlatQueueStrategy.InitializeSubQueues(endpoint,transactional);
+ }
+ }
+}
\ No newline at end of file
diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/IQueueStrategy.cs
b/rhino-service.bus/Rhino.ServiceBus/Msmq/IQueueStrategy.cs
index 465d805..4c344eb 100644
--- a/rhino-service.bus/Rhino.ServiceBus/Msmq/IQueueStrategy.cs
+++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/IQueueStrategy.cs
@@ -54,5 +54,6 @@ namespace Rhino.ServiceBus.Msmq
/// <param name="queue">The queue.</param>
/// <param name="messageId">The message id.</param>
void MoveTimeoutToMainQueue(MessageQueue queue, string
messageId);
+
}
}
diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqExtensions.cs
b/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqExtensions.cs
index 6b36b5a..350b97a 100644
--- a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqExtensions.cs
+++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqExtensions.cs
@@ -98,8 +98,12 @@ namespace Rhino.ServiceBus.Msmq
}
}
+ public static MessageQueue CreateQueue(this Uri
queueUri,
QueueAccessMode accessMode)
+ {
+ return CreateQueue(queueUri, true, accessMode);
+ }
- public static MessageQueue CreateQueue(this Uri queueUri,
QueueAccessMode accessMode)
+ public static MessageQueue CreateQueue(this Uri queueUri,bool
transactional, QueueAccessMode accessMode)
{
var queuePath = MsmqUtil.GetQueuePath(queueUri);
@@ -109,7 +113,7 @@ namespace Rhino.ServiceBus.Msmq
{
try
{
- MessageQueue.Create(queuePath, true);
+ MessageQueue.Create(queuePath,
transactional);
}
catch (Exception e)
{
diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqTransport.cs
b/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqTransport.cs
index 128849d..3077fab 100644
--- a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqTransport.cs
+++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqTransport.cs
@@ -22,7 +22,10 @@ namespace Rhino.ServiceBus.Msmq
private readonly IMessageSerializer serializer;
private readonly int threadCount;
private readonly WaitHandle[] waitHandles;
+ private IInitializeSubQueues subQueueInitializer;
+
private bool haveStarted;
+
private MessageQueue queue;
private readonly IMessageAction[] messageActions;
@@ -39,6 +42,11 @@ namespace Rhino.ServiceBus.Msmq
waitHandles = new WaitHandle[threadCount];
}
+ public IInitializeSubQueues SubQueueInitializer
+ {
+ get { return subQueueInitializer; }
+ set { this.subQueueInitializer = value;}
+ }
public volatile bool ShouldStop;
@@ -56,6 +64,11 @@ namespace Rhino.ServiceBus.Msmq
logger.DebugFormat("Starting msmq transport on: {0}",
Endpoint);
queue = InitalizeQueue(endpoint);
+
+ if(subQueueInitializer != null)
+ {
+ subQueueInitializer.InitializeSubQueues
(Endpoint,queue.Transactional);
+ }
foreach (var messageAction in messageActions)
{
@@ -232,6 +245,7 @@ namespace Rhino.ServiceBus.Msmq
{
try
{
+
var messageQueue = endpoint.CreateQueue
(QueueAccessMode.SendAndReceive);
var filter = new MessagePropertyFilter();
filter.SetAll();
diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqUtil.cs b/
rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqUtil.cs
index 337bc47..e40358f 100644
--- a/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqUtil.cs
+++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/MsmqUtil.cs
@@ -30,10 +30,10 @@ namespace Rhino.ServiceBus.Msmq
string.Compare(hostName, "localhost", true) == 0)
{
hostName = localhost;
- uri = new Uri("msmq://" + localhost +
uri.AbsolutePath);
+ uri = new Uri("msmq://" + localhost +
uri.AbsolutePath + uri.Fragment);
}
- return string.Format(hostName + "\\private$\\" +
uri.AbsolutePath.Substring(1));
+ return string.Format(hostName + "\\private$\\" +
uri.AbsolutePath.Substring(1) + uri.Fragment);
}
public static Uri GetQueueUri(MessageQueue queue)
diff --git a/rhino-service.bus/Rhino.ServiceBus/Msmq/
SubQueueStrategy.cs b/rhino-service.bus/Rhino.ServiceBus/Msmq/
SubQueueStrategy.cs
index 9d8d5e9..d852a41 100644
--- a/rhino-service.bus/Rhino.ServiceBus/Msmq/SubQueueStrategy.cs
+++ b/rhino-service.bus/Rhino.ServiceBus/Msmq/SubQueueStrategy.cs
@@ -102,5 +102,10 @@ namespace Rhino.ServiceBus.Msmq
queue.Send(message, queue.GetTransactionType());
}
}
+
+ public void InitializeQueue(Uri endpoint)
+ {
+
+ }
}
}
\ No newline at end of file
diff --git a/rhino-service.bus/Rhino.ServiceBus/
Rhino.ServiceBus.csproj b/rhino-service.bus/Rhino.ServiceBus/
Rhino.ServiceBus.csproj
index 0df3e73..d773b7a 100644
--- a/rhino-service.bus/Rhino.ServiceBus/Rhino.ServiceBus.csproj
+++ b/rhino-service.bus/Rhino.ServiceBus/Rhino.ServiceBus.csproj
@@ -87,6 +87,7 @@
<Compile Include="Exceptions\SubscriptionException.cs" />
<Compile Include="Exceptions\TransportException.cs" />
<Compile Include="Messages\MergeSagaState.cs" />
+ <Compile Include="Msmq\IInitializeSubQueues.cs" />
<Compile Include="Sagas\ISagaStateMerger.cs" />
<Compile Include="Sagas\IVersionedSagaState.cs" />
<Compile Include="Sagas\Persisters\InMemorySagaPersister.cs" />
--
1.6.0.2.1172.ga5ed0
On Jan 19, 10:54 am, Ayende Rahien <[email protected]> wrote:
> Yes, it should
>
> On Mon, Jan 19, 2009 at 11:52 AM, chrisortman <[email protected]> wrote:
>
> > Should flatqueuestrategy automatically create the sibling queues for
> > discarded, errors etc?
>
> > In current trunk it seems that it does not...
--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups
"Rhino Tools Dev" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/rhino-tools-dev?hl=en
-~----------~----~----~----~------~----~------~--~---