Author: rgreig
Date: Tue Feb 27 07:45:33 2007
New Revision: 512288

URL: http://svn.apache.org/viewvc?view=rev&rev=512288
Log:
(Patch submitted by Tomas Restrepo) QPID-354.
With the patch, blocking receive calls with and without timeouts work. Added 
new unit test class to support functionality added.

Added:
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/SimpleConsumer/
    
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs
    
incubator/qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Collections/TestConsumerProducerQueue.cs
    
incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs
Modified:
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid.Common.Tests.csproj
    incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Qpid.Common.csproj

Modified: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj?view=diff&rev=512288&r1=512287&r2=512288
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj 
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj 
Tue Feb 27 07:45:33 2007
@@ -55,6 +55,7 @@
     <Compile Include="requestreply1\ServiceProvidingClient.cs" />
     <Compile Include="requestreply1\ServiceRequestingClient.cs" />
     <Compile Include="Security\CallbackHandlerRegistryTests.cs" />
+    <Compile Include="SimpleConsumer\TestSyncConsumer.cs" />
     <Compile Include="undeliverable\UndeliverableTest.cs" />
     <Compile Include="url\ConnectionUrlTest.cs" />
   </ItemGroup>

Added: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs?view=auto&rev=512288
==============================================================================
--- 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs
 (added)
+++ 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs
 Tue Feb 27 07:45:33 2007
@@ -0,0 +1,127 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Qpid.Messaging;
+
+namespace Qpid.Client.Tests
+{
+   [TestFixture]
+   public class TestSyncConsumer : BaseMessagingTestFixture
+   {
+      private static readonly ILog _logger = 
LogManager.GetLogger(typeof(TestSyncConsumer));
+
+      private string _commandQueueName = "ServiceQ1";
+      private const int MESSAGE_COUNT = 1000;
+      private const string MESSAGE_DATA_BYTES = "jfd ghljgl hjvhlj cvhvjf 
ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk  ";
+
+      private static String GetData(int size)
+      {
+         StringBuilder buf = new StringBuilder(size);
+         int count = 0;
+         while ( count < size + MESSAGE_DATA_BYTES.Length )
+         {
+            buf.Append(MESSAGE_DATA_BYTES);
+            count += MESSAGE_DATA_BYTES.Length;
+         }
+         if ( count < size )
+         {
+            buf.Append(MESSAGE_DATA_BYTES, 0, size - count);
+         }
+
+         return buf.ToString();
+      }
+
+      private IMessageConsumer _consumer;
+      private IMessagePublisher _publisher;
+
+      [SetUp]
+      public override void Init()
+      {
+         base.Init();
+         _publisher = _channel.CreatePublisherBuilder()
+             .WithRoutingKey(_commandQueueName)
+             .WithExchangeName(ExchangeNameDefaults.TOPIC)
+             .Create();
+
+         _publisher.DisableMessageTimestamp = true;
+         _publisher.DeliveryMode = DeliveryMode.NonPersistent;
+
+         string queueName = _channel.GenerateUniqueName();
+         _channel.DeclareQueue(queueName, false, true, true);
+
+         _channel.Bind(queueName, ExchangeNameDefaults.TOPIC, 
_commandQueueName);
+
+         _consumer = _channel.CreateConsumerBuilder(queueName)
+             .WithPrefetchLow(100).Create();
+         _connection.Start();
+      }
+
+      [Test]
+      public void ReceiveWithInfiniteWait()
+      {
+         // send all messages
+         for ( int i = 0; i < MESSAGE_COUNT; i++ )
+         {
+            ITextMessage msg;
+            try
+            {
+               msg = _channel.CreateTextMessage(GetData(512 + 8 * i));
+            } catch ( Exception e )
+            {
+               _logger.Error("Error creating message: " + e, e);
+               break;
+            }
+            _publisher.Send(msg);
+         }
+
+         _logger.Error("All messages sent");
+         // receive all messages
+         for ( int i = 0; i < MESSAGE_COUNT; i++ )
+         {
+            try
+            {
+               IMessage msg = _consumer.Receive();
+               Assert.IsNotNull(msg);
+            } catch ( Exception e )
+            {
+               _logger.Error("Error receiving message: " + e, e);
+               Assert.Fail(e.ToString());
+            }
+         }
+      }
+
+      [Test]
+      public void ReceiveWithTimeout()
+      {
+         ITextMessage msg = _channel.CreateTextMessage(GetData(512 + 8));
+         _publisher.Send(msg);
+
+         IMessage recvMsg = _consumer.Receive();
+         Assert.IsNotNull(recvMsg);
+         // empty queue, should timeout
+         Assert.IsNull(_consumer.Receive(1000));
+      }
+   }
+}

Modified: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs?view=diff&rev=512288&r1=512287&r2=512288
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs 
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs 
Tue Feb 27 07:45:33 2007
@@ -100,9 +100,9 @@
 
         /// <summary>
         /// Used in the blocking receive methods to receive a message from
-        /// the Channel thread. Argument true indicates we want strict FIFO 
semantics
+        /// the Channel thread. 
         /// </summary>
-        private readonly SynchronousQueue _synchronousQueue = new 
SynchronousQueue(true);
+        private readonly ConsumerProducerQueue _messageQueue = new 
ConsumerProducerQueue();
 
         private MessageFactoryRegistry _messageFactory;
 
@@ -188,17 +188,8 @@
 
             try
             {
-                object o;
-                if (delay > 0)
-                {
-                    //o = _synchronousQueue.Poll(l, TimeUnit.MILLISECONDS);
-                    throw new NotImplementedException("Need to implement 
synchronousQueue.Poll(timeout");
-                }
-                else
-                {
-                    o = _synchronousQueue.DequeueBlocking();
-                }
-
+                object o = _messageQueue.Dequeue(delay);
+                
                 return ReturnMessageOrThrowAndPostDeliver(o);
             }
             finally
@@ -222,42 +213,12 @@
 
         public IMessage Receive()
         {
-            return Receive(0);
+            return Receive(Timeout.Infinite);
         }
 
         public IMessage ReceiveNoWait()
         {
-            CheckNotClosed();
-
-            lock (_syncLock)
-            {
-                // If someone is already receiving
-                if (_receiving)
-                {
-                    throw new InvalidOperationException("Another thread is 
already receiving (possibly asynchronously)...");
-                }
-
-                _receiving = true;
-            }
-
-            try
-            {
-                if (_synchronousQueue.Count > 0)
-                {
-                    return 
ReturnMessageOrThrowAndPostDeliver(_synchronousQueue.Dequeue());
-                }
-                else
-                {
-                    return null;
-                }
-            }
-            finally
-            {
-                lock (_syncLock)
-                {
-                    _receiving = false;
-                }
-            }
+           return Receive(0);
         }
 
         #endregion
@@ -359,7 +320,7 @@
                 }
                 else
                 {
-                    _synchronousQueue.Enqueue(jmsMessage);
+                    _messageQueue.Enqueue(jmsMessage);
                 }
             }
             catch (Exception e)
@@ -380,10 +341,8 @@
                 if (_messageListener == null)
                 {
                     // offer only succeeds if there is a thread waiting for an 
item from the queue
-                    if (_synchronousQueue.EnqueueNoThrow(cause))
-                    {
-                        _logger.Debug("Passed exception to synchronous queue 
for propagation to receive()");
-                    }
+                   _messageQueue.Enqueue(cause);
+                    _logger.Debug("Passed exception to synchronous queue for 
propagation to receive()");
                 }
                 DeregisterConsumer();
             }

Modified: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid.Common.Tests.csproj
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid.Common.Tests.csproj?view=diff&rev=512288&r1=512287&r2=512288
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid.Common.Tests.csproj 
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid.Common.Tests.csproj 
Tue Feb 27 07:45:33 2007
@@ -39,6 +39,7 @@
   </ItemGroup>
   <ItemGroup>
     <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="Qpid\Collections\TestConsumerProducerQueue.cs" />
     <Compile Include="Qpid\Collections\TestLinkedHashtable.cs" />
     <Compile Include="Qpid\Framing\TestEncodingUtils.cs" />
     <Compile Include="Qpid\Framing\TestAMQType.cs" />

Added: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Collections/TestConsumerProducerQueue.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Collections/TestConsumerProducerQueue.cs?view=auto&rev=512288
==============================================================================
--- 
incubator/qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Collections/TestConsumerProducerQueue.cs
 (added)
+++ 
incubator/qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Collections/TestConsumerProducerQueue.cs
 Tue Feb 27 07:45:33 2007
@@ -0,0 +1,85 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text;
+using System.Threading;
+using NUnit.Framework;
+using Qpid.Collections;
+
+namespace Qpid.Collections.Tests
+{
+    [TestFixture]
+    public class TestConsumerProducerQueue
+    {
+       private ConsumerProducerQueue _queue;
+
+       [SetUp]
+       public void SetUp()
+       {
+          _queue = new ConsumerProducerQueue();
+       }
+
+       [Test]
+       public void CanDequeueWithInifiniteWait()
+       {
+          Thread producer = new Thread(new ThreadStart(ProduceFive));
+          producer.Start();
+          for ( int i = 0; i < 5; i++ )
+          {
+             object item = _queue.Dequeue();
+             Assert.IsNotNull(item);
+          }
+       }
+
+       [Test]
+       public void ReturnsNullOnDequeueTimeout()
+       {
+          // queue is empty
+          Assert.IsNull(_queue.Dequeue(500));
+       }
+
+       [Test]
+       public void DequeueTillEmpty()
+       {
+          _queue.Enqueue(1);
+          _queue.Enqueue(2);
+          _queue.Enqueue(3);
+          Assert.AreEqual(1, _queue.Dequeue());
+          Assert.AreEqual(2, _queue.Dequeue());
+          Assert.AreEqual(3, _queue.Dequeue());
+          // no messages in queue, will timeout
+          Assert.IsNull(_queue.Dequeue(500));
+       }
+
+
+       private void ProduceFive()
+       {
+          Thread.Sleep(1000);
+          _queue.Enqueue("test item 1");
+          _queue.Enqueue("test item 2");
+          _queue.Enqueue("test item 3");
+          Thread.Sleep(0);
+          _queue.Enqueue("test item 4");
+          _queue.Enqueue("test item 5");
+       }
+    }
+}

Added: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs?view=auto&rev=512288
==============================================================================
--- 
incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs
 (added)
+++ 
incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs
 Tue Feb 27 07:45:33 2007
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Threading;
+
+
+namespace Qpid.Collections
+{
+   /// <summary>
+   /// Simple FIFO queue to support multi-threaded consumer
+   /// and producers. It supports timeouts in dequeue operations.
+   /// </summary>
+   public sealed class ConsumerProducerQueue 
+   {
+      private Queue _queue = new Queue();
+      private WaitSemaphore _semaphore = new WaitSemaphore();
+
+      /// <summary>
+      /// Put an item into the tail of the queue
+      /// </summary>
+      /// <param name="item"></param>
+      public void Enqueue(object item)
+      {
+         lock ( _queue.SyncRoot )
+         {
+            _queue.Enqueue(item);
+            _semaphore.Increment();
+         }
+      }
+
+      /// <summary>
+      /// Wait indefinitely for an item to be available
+      /// on the queue.
+      /// </summary>
+      /// <returns>The object at the head of the queue</returns>
+      public object Dequeue()
+      {
+         return Dequeue(Timeout.Infinite);
+      }
+
+      /// <summary>
+      /// Wait up to the number of milliseconds specified
+      /// for an item to be available on the queue
+      /// </summary>
+      /// <param name="timeout">Number of milliseconds to wait</param>
+      /// <returns>The object at the head of the queue, or null 
+      /// if the timeout expires</returns>
+      public object Dequeue(long timeout)
+      {
+         if ( _semaphore.Decrement(timeout) )
+         {
+            lock ( _queue.SyncRoot )
+            {
+               return _queue.Dequeue();
+            }
+         }
+         return null;
+      }
+
+      #region Simple Semaphore
+      //
+      // Simple Semaphore
+      //
+
+      class WaitSemaphore
+      {
+         private int _count;
+         private AutoResetEvent _event = new AutoResetEvent(false);
+
+         public void Increment()
+         {
+            Interlocked.Increment(ref _count);
+            _event.Set();
+         }
+
+         public bool Decrement(long timeout)
+         {
+            if ( timeout > int.MaxValue )
+               throw new ArgumentOutOfRangeException("timeout", timeout, "Must 
be <= Int32.MaxValue");
+
+            int millis = (int) (timeout & 0x7FFFFFFF);
+            if ( Interlocked.Decrement(ref _count) > 0 )
+            {
+               // there are messages in queue, so no need to wait
+               return true;
+            } else
+            {
+               return _event.WaitOne(millis, false);
+            }
+         }
+      }
+      #endregion // Simple Semaphore
+   }
+}

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Qpid.Common.csproj
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Qpid.Common.csproj?view=diff&rev=512288&r1=512287&r2=512288
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Qpid.Common.csproj (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Qpid.Common.csproj Tue Feb 27 
07:45:33 2007
@@ -47,6 +47,7 @@
     <Compile Include="AMQUndeliveredException.cs" />
     <Compile Include="AssemblySettings.cs" />
     <Compile Include="Collections\LinkedHashtable.cs" />
+    <Compile Include="Collections\ConsumerProducerQueue.cs" />
     <Compile Include="Framing\AMQDataBlockDecoder.cs" />
     <Compile Include="Framing\AMQDataBlockEncoder.cs" />
     <Compile Include="Framing\AMQFrame.cs" />


Reply via email to