Merge all fixes from trunk in preparation for a v1.5.1 release.
Project: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/commit/c0368061 Tree: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/tree/c0368061 Diff: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/diff/c0368061 Branch: refs/heads/1.5.x Commit: c0368061e8b6bfa688406f27b2d55026832bdc51 Parents: 4d2a816 Author: Timothy A. Bish <tab...@apache.org> Authored: Wed Jan 26 21:25:30 2011 +0000 Committer: Timothy A. Bish <tab...@apache.org> Committed: Wed Jan 26 21:25:30 2011 +0000 ---------------------------------------------------------------------- src/main/csharp/Connection.cs | 145 ++++++++++--- src/main/csharp/Threads/ThreadPoolExecutor.cs | 167 +++++++++++++++ src/main/csharp/Transport/InactivityMonitor.cs | 49 +++-- src/main/csharp/Util/MessageDispatchChannel.cs | 21 +- .../csharp/Threads/ThreadPoolExecutorTest.cs | 202 +++++++++++++++++++ vs2008-stomp-test.csproj | 1 + vs2008-stomp.csproj | 1 + 7 files changed, 534 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/c0368061/src/main/csharp/Connection.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Connection.cs b/src/main/csharp/Connection.cs index 6ce7003..ffaf180 100755 --- a/src/main/csharp/Connection.cs +++ b/src/main/csharp/Connection.cs @@ -20,6 +20,7 @@ using System.Collections; using System.Collections.Specialized; using System.Threading; using Apache.NMS.Stomp.Commands; +using Apache.NMS.Stomp.Threads; using Apache.NMS.Stomp.Transport; using Apache.NMS.Stomp.Util; using Apache.NMS.Util; @@ -52,9 +53,11 @@ namespace Apache.NMS.Stomp private readonly IList sessions = ArrayList.Synchronized(new ArrayList()); private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable()); private readonly object myLock = new object(); - private bool connected = false; - private bool closed = false; - private bool closing = false; + private readonly Atomic<bool> connected = new Atomic<bool>(false); + private readonly Atomic<bool> closed = new Atomic<bool>(false); + private readonly Atomic<bool> closing = new Atomic<bool>(false); + private readonly Atomic<bool> transportFailed = new Atomic<bool>(false); + private Exception firstFailureError = null; private int sessionCounter = 0; private int temporaryDestinationCounter = 0; private int localTransactionCounter; @@ -64,6 +67,7 @@ namespace Apache.NMS.Stomp private readonly IdGenerator clientIdGenerator; private CountDownLatch transportInterruptionProcessingComplete; private readonly MessageTransformation messageTransformation; + private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor(); public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator) { @@ -72,7 +76,7 @@ namespace Apache.NMS.Stomp this.transport = transport; this.transport.Command = new CommandHandler(OnCommand); - this.transport.Exception = new ExceptionHandler(OnException); + this.transport.Exception = new ExceptionHandler(OnTransportException); this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted); this.transport.Resumed = new ResumedHandler(OnTransportResumed); @@ -215,6 +219,16 @@ namespace Apache.NMS.Stomp set { this.transport = value; } } + public bool TransportFailed + { + get { return this.transportFailed.Value; } + } + + public Exception FirstFailureError + { + get { return this.firstFailureError; } + } + public TimeSpan RequestTimeout { get { return this.requestTimeout; } @@ -232,7 +246,7 @@ namespace Apache.NMS.Stomp get { return info.ClientId; } set { - if(this.connected) + if(this.connected.Value) { throw new NMSException("You cannot change the ClientId once the Connection is connected"); } @@ -384,7 +398,7 @@ namespace Apache.NMS.Stomp internal void RemoveSession(Session session) { - if(!this.closing) + if(!this.closing.Value) { sessions.Remove(session); } @@ -404,7 +418,7 @@ namespace Apache.NMS.Stomp { lock(myLock) { - if(this.closed) + if(this.closed.Value) { return; } @@ -412,7 +426,7 @@ namespace Apache.NMS.Stomp try { Tracer.Info("Closing Connection."); - this.closing = true; + this.closing.Value = true; lock(sessions.SyncRoot) { foreach(Session session in sessions) @@ -422,7 +436,7 @@ namespace Apache.NMS.Stomp } sessions.Clear(); - if(connected) + if(connected.Value) { ShutdownInfo shutdowninfo = new ShutdownInfo(); transport.Oneway(shutdowninfo); @@ -438,9 +452,9 @@ namespace Apache.NMS.Stomp finally { this.transport = null; - this.closed = true; - this.connected = false; - this.closing = false; + this.closed.Value = true; + this.connected.Value = false; + this.closing.Value = false; } } } @@ -534,24 +548,24 @@ namespace Apache.NMS.Stomp protected void CheckConnected() { - if(closed) + if(closed.Value) { throw new ConnectionClosedException(); } - if(!connected) + if(!connected.Value) { if(!this.userSpecifiedClientID) { this.info.ClientId = this.clientIdGenerator.GenerateId(); } - connected = true; + connected.Value = true; // now lets send the connection and see if we get an ack/nak if(null == SyncRequest(info)) { - closed = true; - connected = false; + closed.Value = true; + connected.Value = false; throw new ConnectionClosedException(); } } @@ -581,7 +595,7 @@ namespace Apache.NMS.Stomp } else if(command.IsErrorCommand) { - if(!closing && !closed) + if(!closing.Value && !closed.Value) { ConnectionError connectionError = (ConnectionError) command; BrokerError brokerError = connectionError.Exception; @@ -597,7 +611,7 @@ namespace Apache.NMS.Stomp } } - OnException(commandTransport, new NMSConnectionException(message, cause)); + OnException(new NMSConnectionException(message, cause)); } } else @@ -632,17 +646,85 @@ namespace Apache.NMS.Stomp Tracer.Error("No such consumer active: " + dispatch.ConsumerId); } - protected void OnException(ITransport sender, Exception exception) + protected void OnTransportException(ITransport sender, Exception exception) + { + this.OnException(exception); + } + + internal void OnAsyncException(Exception error) + { + if(!this.closed.Value && !this.closing.Value) + { + if(this.ExceptionListener != null) + { + if(!(error is NMSException)) + { + error = NMSExceptionSupport.Create(error); + } + NMSException e = (NMSException)error; + + // Called in another thread so that processing can continue + // here, ensures no lock contention. + executor.QueueUserWorkItem(AsyncCallExceptionListener, e); + } + else + { + Tracer.Debug("Async exception with no exception listener: " + error); + } + } + } + + private void AsyncCallExceptionListener(object error) + { + NMSException exception = error as NMSException; + this.ExceptionListener(exception); + } + + internal void OnException(Exception error) + { + // Will fire an exception listener callback if there's any set. + OnAsyncException(error); + + if(!this.closing.Value && !this.closed.Value) + { + // Perform the actual work in another thread to avoid lock contention + // and allow the caller to continue on in its error cleanup. + executor.QueueUserWorkItem(AsyncOnExceptionHandler, error); + } + } + + private void AsyncOnExceptionHandler(object error) { - if(ExceptionListener != null && !this.closing) + Exception cause = error as Exception; + + MarkTransportFailed(cause); + + try + { + this.transport.Dispose(); + } + catch(Exception ex) + { + Tracer.Debug("Caught Exception While disposing of Transport: " + ex); + } + + IList sessionsCopy = null; + lock(this.sessions.SyncRoot) + { + sessionsCopy = new ArrayList(this.sessions); + } + + // Use a copy so we don't concurrently modify the Sessions list if the + // client is closing at the same time. + foreach(Session session in sessionsCopy) { try { - ExceptionListener(exception); + session.Dispose(); } - catch + catch(Exception ex) { - sender.Dispose(); + Tracer.Debug("Caught Exception While disposing of Sessions: " + ex); } } } @@ -662,7 +744,7 @@ namespace Apache.NMS.Stomp session.ClearMessagesInProgress(); } - if(this.ConnectionInterruptedListener != null && !this.closing) + if(this.ConnectionInterruptedListener != null && !this.closing.Value) { try { @@ -678,7 +760,7 @@ namespace Apache.NMS.Stomp { Tracer.Debug("Transport has resumed normal operation."); - if(this.ConnectionResumedListener != null && !this.closing) + if(this.ConnectionResumedListener != null && !this.closing.Value) { try { @@ -705,6 +787,15 @@ namespace Apache.NMS.Stomp } } + private void MarkTransportFailed(Exception error) + { + this.transportFailed.Value = true; + if(this.firstFailureError == null) + { + this.firstFailureError = error; + } + } + /// <summary> /// Creates a new temporary destination name /// </summary> @@ -739,7 +830,7 @@ namespace Apache.NMS.Stomp CountDownLatch cdl = this.transportInterruptionProcessingComplete; if(cdl != null) { - if(!closed && cdl.Remaining > 0) + if(!closed.Value && cdl.Remaining > 0) { Tracer.Warn("dispatch paused, waiting for outstanding dispatch interruption " + "processing (" + cdl.Remaining + ") to complete.."); http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/c0368061/src/main/csharp/Threads/ThreadPoolExecutor.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Threads/ThreadPoolExecutor.cs b/src/main/csharp/Threads/ThreadPoolExecutor.cs new file mode 100644 index 0000000..7072dfe --- /dev/null +++ b/src/main/csharp/Threads/ThreadPoolExecutor.cs @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.Threading; + +namespace Apache.NMS.Stomp.Threads +{ + /// <summary> + /// This class provides a wrapper around the ThreadPool mechanism in .NET + /// to allow for serial execution of jobs in the ThreadPool and provide + /// a means of shutting down the execution of jobs in a deterministic + /// way. + /// </summary> + public class ThreadPoolExecutor + { + private Queue<Future> workQueue = new Queue<Future>(); + private Mutex syncRoot = new Mutex(); + private bool running = false; + private bool closing = false; + private bool closed = false; + private ManualResetEvent executionComplete = new ManualResetEvent(true); + + /// <summary> + /// Represents an asynchronous task that is executed on the ThreadPool + /// at some point in the future. + /// </summary> + internal class Future + { + private WaitCallback callback; + private object callbackArg; + + public Future(WaitCallback callback, object arg) + { + this.callback = callback; + this.callbackArg = arg; + } + + public void Run() + { + if(this.callback == null) + { + throw new Exception("Future executed with null WaitCallback"); + } + + try + { + this.callback(callbackArg); + } + catch + { + } + } + } + + public void QueueUserWorkItem(WaitCallback worker) + { + this.QueueUserWorkItem(worker, null); + } + + public void QueueUserWorkItem(WaitCallback worker, object arg) + { + if(worker == null) + { + throw new ArgumentNullException("Invalid WaitCallback passed"); + } + + if(!this.closed) + { + lock(syncRoot) + { + if(!this.closed || !this.closing) + { + this.workQueue.Enqueue(new Future(worker, arg)); + + if(!this.running) + { + this.executionComplete.Reset(); + this.running = true; + ThreadPool.QueueUserWorkItem(new WaitCallback(QueueProcessor), null); + } + } + } + } + } + + public bool IsShutdown + { + get { return this.closed; } + } + + public void Shutdown() + { + if(!this.closed) + { + syncRoot.WaitOne(); + + if(!this.closed) + { + this.closing = true; + this.workQueue.Clear(); + + if(this.running) + { + syncRoot.ReleaseMutex(); + this.executionComplete.WaitOne(); + syncRoot.WaitOne(); + } + + this.closed = true; + } + + syncRoot.ReleaseMutex(); + } + } + + private void QueueProcessor(object unused) + { + Future theTask = null; + + lock(syncRoot) + { + if(this.workQueue.Count == 0 || this.closing) + { + this.running = false; + this.executionComplete.Set(); + return; + } + + theTask = this.workQueue.Dequeue(); + } + + try + { + theTask.Run(); + } + finally + { + if(this.closing) + { + this.running = false; + this.executionComplete.Set(); + } + else + { + ThreadPool.QueueUserWorkItem(new WaitCallback(QueueProcessor), null); + } + } + } + } +} + http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/c0368061/src/main/csharp/Transport/InactivityMonitor.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Transport/InactivityMonitor.cs b/src/main/csharp/Transport/InactivityMonitor.cs index 8fb5c95..1614394 100644 --- a/src/main/csharp/Transport/InactivityMonitor.cs +++ b/src/main/csharp/Transport/InactivityMonitor.cs @@ -45,6 +45,9 @@ namespace Apache.NMS.Stomp.Transport private AsyncWriteTask asyncWriteTask; private readonly Mutex monitor = new Mutex(); + + private static int id = 0; + private readonly int instanceId = 0; private bool disposing = false; private Timer connectionCheckTimer; @@ -83,7 +86,8 @@ namespace Apache.NMS.Stomp.Transport public InactivityMonitor(ITransport next) : base(next) { - Tracer.Debug("Creating Inactivity Monitor"); + this.instanceId = ++id; + Tracer.Debug("Creating Inactivity Monitor: " + instanceId); } ~InactivityMonitor() @@ -98,8 +102,14 @@ namespace Apache.NMS.Stomp.Transport // get rid of unmanaged stuff } - this.disposing = true; - StopMonitorThreads(); + lock(monitor) + { + this.localWireFormatInfo = null; + this.remoteWireFormatInfo = null; + this.disposing = true; + StopMonitorThreads(); + } + base.Dispose(disposing); } @@ -123,19 +133,19 @@ namespace Apache.NMS.Stomp.Transport { if(this.inWrite.Value || this.failed.Value) { - Tracer.Debug("Inactivity Monitor is in write or already failed."); + Tracer.DebugFormat("InactivityMonitor[{0}]: is in write or already failed.", instanceId); return; } if(!commandSent.Value) { - //Tracer.Debug("No Message sent since last write check. Sending a KeepAliveInfo"); + Tracer.DebugFormat("InactivityMonitor[{0}]: No Message sent since last write check. Sending a KeepAliveInfo.", instanceId); this.asyncWriteTask.IsPending = true; this.asyncTasks.Wakeup(); } else { - Tracer.Debug("Message sent since last write check. Resetting flag"); + Tracer.DebugFormat("InactivityMonitor[{0}]: Message sent since last write check. Resetting flag.", instanceId); } commandSent.Value = false; @@ -150,6 +160,7 @@ namespace Apache.NMS.Stomp.Transport if(!AllowReadCheck(elapsed)) { + Tracer.Debug("InactivityMonitor["+ instanceId +"]: A read check is not currently allowed."); return; } @@ -157,12 +168,13 @@ namespace Apache.NMS.Stomp.Transport if(this.inRead.Value || this.failed.Value || this.asyncErrorTask == null) { + Tracer.DebugFormat("InactivityMonitor[{0}]: A receive is in progress or already failed.", instanceId); return; } if(!commandReceived.Value) { - Tracer.Debug("No message received since last read check! Sending an InactivityException!"); + Tracer.DebugFormat("InactivityMonitor[{0}]: No message received since last read check! Sending an InactivityException!", instanceId); this.asyncErrorTask.IsPending = true; this.asyncTasks.Wakeup(); } @@ -215,9 +227,9 @@ namespace Apache.NMS.Stomp.Transport { if(Tracer.IsDebugEnabled) { - Tracer.Debug("InactivityMonitor: New Keep Alive Received at -> " + - DateTime.Now.ToLongTimeString().TrimEnd(" APM".ToCharArray()) + - "." + DateTime.Now.Millisecond); + Tracer.DebugFormat("InactivityMonitor[{0}]: New Keep Alive Received at -> " + + DateTime.Now.ToLongTimeString().TrimEnd(" APM".ToCharArray()) + + "." + DateTime.Now.Millisecond, instanceId); } } @@ -325,27 +337,29 @@ namespace Apache.NMS.Stomp.Transport initialDelayTime = localWireFormatInfo.MaxInactivityDurationInitialDelay; - Tracer.DebugFormat("Inactivity: Read Check time interval: {0}", readCheckTime ); - Tracer.DebugFormat("Inactivity: Initial Delay time interval: {0}", initialDelayTime ); - Tracer.DebugFormat("Inactivity: Write Check time interval: {0}", writeCheckTime ); + Tracer.DebugFormat("InactivityMonitor[{0}]: Read Check time interval: {1}", + instanceId, readCheckTime ); + Tracer.DebugFormat("InactivityMonitor[{0}]: Initial Delay time interval: {1}", + instanceId, initialDelayTime ); this.asyncTasks = new CompositeTaskRunner(); if(this.asyncErrorTask != null) { - Tracer.Debug("Inactivity: Adding the Async Read Check Task to the Runner."); + Tracer.DebugFormat("InactivityMonitor[{0}]: Adding the Async Read Check Task to the Runner.", instanceId); this.asyncTasks.AddTask(this.asyncErrorTask); } if(this.asyncWriteTask != null) { - Tracer.Debug("Inactivity: Adding the Async Write Check Task to the Runner."); + Tracer.DebugFormat("InactivityMonitor[{0}]: Write Check time interval: {1}", + instanceId, writeCheckTime ); this.asyncTasks.AddTask(this.asyncWriteTask); } if(this.asyncErrorTask != null || this.asyncWriteTask != null) { - Tracer.Debug("Inactivity: Starting the Monitor Timer."); + Tracer.DebugFormat("InactivityMonitor[{0}]: Starting the Monitor Timer.", instanceId); monitorStarted.Value = true; this.connectionCheckTimer = new Timer( @@ -427,10 +441,13 @@ namespace Apache.NMS.Stomp.Transport public bool Iterate() { + Tracer.DebugFormat("InactivityMonitor[{0}] perparing for another Write Check", parent.instanceId); if(this.pending.CompareAndSet(true, false) && this.parent.monitorStarted.Value) { try { + Tracer.DebugFormat("InactivityMonitor[{0}] Write Check required sending KeepAlive.", + parent.instanceId); KeepAliveInfo info = new KeepAliveInfo(); this.parent.next.Oneway(info); } http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/c0368061/src/main/csharp/Util/MessageDispatchChannel.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Util/MessageDispatchChannel.cs b/src/main/csharp/Util/MessageDispatchChannel.cs index a1d7cfc..64bd7aa 100644 --- a/src/main/csharp/Util/MessageDispatchChannel.cs +++ b/src/main/csharp/Util/MessageDispatchChannel.cs @@ -25,16 +25,13 @@ namespace Apache.NMS.Stomp.Util public class MessageDispatchChannel { private readonly Mutex mutex = new Mutex(); - private readonly ManualResetEvent wakeAll = new ManualResetEvent(false); - private readonly AutoResetEvent waiter = new AutoResetEvent(false); - private WaitHandle[] waiters; + private readonly ManualResetEvent waiter = new ManualResetEvent(false); private bool closed; private bool running; private readonly LinkedList<MessageDispatch> channel = new LinkedList<MessageDispatch>(); public MessageDispatchChannel() { - this.waiters = new WaitHandle[] { this.waiter, this.wakeAll }; } #region Properties @@ -113,7 +110,7 @@ namespace Apache.NMS.Stomp.Util if(!Closed) { this.running = true; - this.wakeAll.Reset(); + this.waiter.Reset(); } } } @@ -123,7 +120,7 @@ namespace Apache.NMS.Stomp.Util lock(mutex) { this.running = false; - this.wakeAll.Set(); + this.waiter.Set(); } } @@ -137,7 +134,7 @@ namespace Apache.NMS.Stomp.Util this.closed = true; } - this.wakeAll.Set(); + this.waiter.Set(); } } @@ -168,9 +165,15 @@ namespace Apache.NMS.Stomp.Util // Wait until the channel is ready to deliver messages. if( timeout != TimeSpan.Zero && !Closed && ( Empty || !Running ) ) { - this.mutex.ReleaseMutex(); + // This isn't the greatest way to do this but to work on the + // .NETCF its the only solution I could find so far. This + // code will only really work for one Thread using the event + // channel to wait as all waiters are going to drop out of + // here regardless of the fact that only one message could + // be on the Queue. this.waiter.Reset(); - ThreadUtil.WaitAny(this.waiters, (int)timeout.TotalMilliseconds, false); + this.mutex.ReleaseMutex(); + this.waiter.WaitOne((int)timeout.TotalMilliseconds, false); this.mutex.WaitOne(); } http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/c0368061/src/test/csharp/Threads/ThreadPoolExecutorTest.cs ---------------------------------------------------------------------- diff --git a/src/test/csharp/Threads/ThreadPoolExecutorTest.cs b/src/test/csharp/Threads/ThreadPoolExecutorTest.cs new file mode 100644 index 0000000..1da7241 --- /dev/null +++ b/src/test/csharp/Threads/ThreadPoolExecutorTest.cs @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Threading; +using Apache.NMS.Util; +using Apache.NMS.Stomp.Threads; +using NUnit.Framework; + +namespace Apache.NMS.Stomp.Test +{ + [TestFixture] + public class ThreadPoolExecutorTest + { + private const int JOB_COUNT = 100; + private ManualResetEvent complete = new ManualResetEvent(false); + private bool waitingTaskCompleted = false; + private CountDownLatch doneLatch; + private int count = 0; + + internal class DummyClass + { + private int data; + + public DummyClass(int data) + { + this.data = data; + } + + public int Data + { + get { return data; } + } + } + + public ThreadPoolExecutorTest() + { + } + + private void TaskThatSignalsWhenItsComplete(object unused) + { + waitingTaskCompleted = true; + complete.Set(); + } + + private void TaskThatCountsDown(object unused) + { + doneLatch.countDown(); + } + + private void TaskThatSleeps(object unused) + { + Thread.Sleep(5000); + } + + private void TaskThatIncrementsCount(object unused) + { + count++; + } + + private void TaskThatThrowsAnException(object unused) + { + throw new Exception("Throwing an Exception just because"); + } + + private void TaskThatValidatesTheArg(object arg) + { + DummyClass state = arg as DummyClass; + if(state != null && state.Data == 10 ) + { + waitingTaskCompleted = true; + } + complete.Set(); + } + + [SetUp] + public void SetUp() + { + this.complete.Reset(); + this.waitingTaskCompleted = false; + this.doneLatch = new CountDownLatch(JOB_COUNT); + this.count = 0; + } + + [Test] + public void TestConstructor() + { + ThreadPoolExecutor executor = new ThreadPoolExecutor(); + Assert.IsNotNull(executor); + Assert.IsFalse(executor.IsShutdown); + executor.Shutdown(); + Assert.IsTrue(executor.IsShutdown); + } + + [Test] + public void TestSingleTaskExecuted() + { + ThreadPoolExecutor executor = new ThreadPoolExecutor(); + Assert.IsNotNull(executor); + Assert.IsFalse(executor.IsShutdown); + + executor.QueueUserWorkItem(TaskThatSignalsWhenItsComplete); + + this.complete.WaitOne(); + Assert.IsTrue(this.waitingTaskCompleted); + + executor.Shutdown(); + Assert.IsTrue(executor.IsShutdown); + } + + [Test] + public void TestTaskParamIsPropagated() + { + ThreadPoolExecutor executor = new ThreadPoolExecutor(); + Assert.IsNotNull(executor); + Assert.IsFalse(executor.IsShutdown); + + executor.QueueUserWorkItem(TaskThatValidatesTheArg, new DummyClass(10)); + + this.complete.WaitOne(); + Assert.IsTrue(this.waitingTaskCompleted); + + executor.Shutdown(); + Assert.IsTrue(executor.IsShutdown); + } + + [Test] + public void TestAllTasksComplete() + { + ThreadPoolExecutor executor = new ThreadPoolExecutor(); + Assert.IsNotNull(executor); + Assert.IsFalse(executor.IsShutdown); + + for(int i = 0; i < JOB_COUNT; ++i) + { + executor.QueueUserWorkItem(TaskThatCountsDown); + } + + Assert.IsTrue(this.doneLatch.await(TimeSpan.FromMilliseconds(30 * 1000))); + + executor.Shutdown(); + Assert.IsTrue(executor.IsShutdown); + } + + [Test] + public void TestAllTasksCompleteAfterException() + { + ThreadPoolExecutor executor = new ThreadPoolExecutor(); + Assert.IsNotNull(executor); + Assert.IsFalse(executor.IsShutdown); + + executor.QueueUserWorkItem(TaskThatThrowsAnException); + + for(int i = 0; i < JOB_COUNT; ++i) + { + executor.QueueUserWorkItem(TaskThatCountsDown); + } + + Assert.IsTrue(this.doneLatch.await(TimeSpan.FromMilliseconds(30 * 1000))); + + executor.Shutdown(); + Assert.IsTrue(executor.IsShutdown); + } + + [Test] + public void TestThatShutdownPurgesTasks() + { + ThreadPoolExecutor executor = new ThreadPoolExecutor(); + Assert.IsNotNull(executor); + Assert.IsFalse(executor.IsShutdown); + + executor.QueueUserWorkItem(TaskThatSleeps); + + for(int i = 0; i < JOB_COUNT; ++i) + { + executor.QueueUserWorkItem(TaskThatIncrementsCount); + } + + Thread.Sleep(100); + + executor.Shutdown(); + Assert.AreEqual(0, count); + Assert.IsTrue(executor.IsShutdown); + } + + } +} + http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/c0368061/vs2008-stomp-test.csproj ---------------------------------------------------------------------- diff --git a/vs2008-stomp-test.csproj b/vs2008-stomp-test.csproj index b093a81..ffb03ae 100644 --- a/vs2008-stomp-test.csproj +++ b/vs2008-stomp-test.csproj @@ -101,6 +101,7 @@ <Compile Include="src\test\csharp\StompHelperTest.cs" /> <Compile Include="src\test\csharp\StompRedeliveryPolicyTest.cs" /> <Compile Include="src\test\csharp\Threads\CompositeTaskRunnerTest.cs" /> + <Compile Include="src\test\csharp\Threads\ThreadPoolExecutorTest.cs" /> <Compile Include="src\test\csharp\Util\MessageDispatchChannelTest.cs" /> <Compile Include="src\test\csharp\MessageListenerRedeliveryTest.cs" /> <Compile Include="src\test\csharp\StompTopicTransactionTest.cs" /> http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/c0368061/vs2008-stomp.csproj ---------------------------------------------------------------------- diff --git a/vs2008-stomp.csproj b/vs2008-stomp.csproj index 0b332d4..6921324 100644 --- a/vs2008-stomp.csproj +++ b/vs2008-stomp.csproj @@ -110,6 +110,7 @@ <Compile Include="src\main\csharp\RequestTimedOutException.cs" /> <Compile Include="src\main\csharp\Threads\CompositeTask.cs" /> <Compile Include="src\main\csharp\Threads\CompositeTaskRunner.cs" /> + <Compile Include="src\main\csharp\Threads\ThreadPoolExecutor.cs" /> <Compile Include="src\main\csharp\Transport\Tcp\TcpTransport.cs" /> <Compile Include="src\main\csharp\Transport\Tcp\TcpTransportFactory.cs" /> <Compile Include="src\main\csharp\Transport\FutureResponse.cs" />