Author: rupertlssmith
Date: Thu Jan 24 08:32:53 2008
New Revision: 614918
URL: http://svn.apache.org/viewvc?rev=614918&view=rev
Log:
Qpid-727, Commit rollback test adjusted as some tests were wrong. Now passes.
Modified:
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/MandatoryMessageTest.cs
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/SyncConsumerTest.cs
Modified:
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs?rev=614918&r1=614917&r2=614918&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs
(original)
+++
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs
Thu Jan 24 08:32:53 2008
@@ -40,18 +40,15 @@
/// <summary> Used to build dummy data to fill test messages with.
</summary>
private const string MESSAGE_DATA_BYTES = "-- Test Message -- Test
Message -- Test Message -- Test Message -- Test Message ";
+ /// <summary> The default timeout in milliseconds to use on receives.
</summary>
+ private const long RECEIVE_WAIT = 500;
+
/// <summary> The default AMQ connection URL to use for tests.
</summary>
- const string connectionUri = "amqp://guest:[EMAIL
PROTECTED]/test?brokerlist='tcp://localhost:5672'";
+ const string connectionUri = "amqp://guest:[EMAIL
PROTECTED]/test?brokerlist='tcp://localhost:5672'";
/// <summary> The default AMQ connection URL parsed as a connection
info. </summary>
protected IConnectionInfo connectionInfo =
QpidConnectionInfo.FromUrl(connectionUri);
- /// <summary> Holds the test connection. </summary>
- protected IConnection _connection;
-
- /// <summary> Holds the test channel. </summary>
- protected IChannel _channel;
-
/// <summary> Holds an array of connections for building mutiple test
end-points. </summary>
protected IConnection[] testConnection = new IConnection[10];
@@ -65,14 +62,12 @@
protected IMessageConsumer[] testConsumer = new IMessageConsumer[10];
/// <summary> A counter used to supply unique ids. </summary>
- private int uniqueId = 0;
+ private static int uniqueId = 0;
/// <summary> Used to hold unique ids per test. </summary>
protected int testId;
- /// <summary>
- /// Creates the test connection and channel.
- /// </summary>
+ /// <summary> Creates the test connection and channel. </summary>
[SetUp]
public virtual void Init()
{
@@ -80,9 +75,6 @@
// Set up a unique id for this test.
testId = uniqueId++;
-
- _connection = new AMQConnection(connectionInfo);
- _channel = _connection.CreateChannel(false,
AcknowledgeMode.AutoAcknowledge, 500, 300);
}
/// <summary>
@@ -93,14 +85,6 @@
public virtual void Shutdown()
{
log.Debug("public virtual void Shutdown(): called");
-
- if (_connection != null)
- {
- log.Debug("Disposing connection.");
- _connection.Close();
- _connection.Dispose();
- log.Debug("Connection disposed.");
- }
}
/// <summary> Sets up the nth test end-point. </summary>
@@ -112,12 +96,13 @@
/// <param name="ackMode">The ack mode for the end-points
channel.</param>
/// <param name="transacted"><tt>true</tt> to use transactions on the
end-points channel.</param>
/// <param name="exchangeName">The exchange to produce or consume
on.</param>
+ /// <param name="declareBind"><tt>true</tt> if the consumers queue
should be declared and bound, <tt>false</tt> if it has already been.</param>
/// <param name="durable"><tt>true</tt> to declare the consumers queue
as durable.</param>
/// <param name="subscriptionName">If durable is true, the fixed
unique queue name to use.</param>
- public void SetUpEndPoint(int n, bool producer, bool consumer, string
routingKey, AcknowledgeMode ackMode, bool transacted,
- string exchangeName, bool durable, string
subscriptionName)
+ public void SetUpEndPoint(int n, bool producer, bool consumer, string
routingKey, AcknowledgeMode ackMode, bool transacted,
+ string exchangeName, bool declareBind, bool
durable, string subscriptionName)
{
- testConnection[n] = new AMQConnection(connectionInfo);
+ testConnection[n] = new AMQConnection(connectionInfo);
testConnection[n].Start();
testChannel[n] = testConnection[n].CreateChannel(transacted,
ackMode, 1);
@@ -136,15 +121,25 @@
// Use the subscription name as the queue name if the
subscription is durable, otherwise use a generated name.
if (durable)
{
+ // The durable queue is declared without auto-delete, and
passively, in case it has already been declared.
queueName = subscriptionName;
+
+ if (declareBind)
+ {
+ testChannel[n].DeclareQueue(queueName, durable, true,
false);
+ testChannel[n].Bind(queueName, exchangeName,
routingKey);
+ }
}
else
{
queueName = testChannel[n].GenerateUniqueName();
- }
- testChannel[n].DeclareQueue(queueName, durable, true, true);
- testChannel[n].Bind(queueName, exchangeName, routingKey);
+ if (declareBind)
+ {
+ testChannel[n].DeclareQueue(queueName, durable, true,
true);
+ testChannel[n].Bind(queueName, exchangeName,
routingKey);
+ }
+ }
testConsumer[n] =
testChannel[n].CreateConsumerBuilder(queueName).Create();
}
@@ -169,10 +164,13 @@
testConsumer[n] = null;
}
- testConnection[n].Stop();
- testConnection[n].Close();
- testConnection[n].Dispose();
- testConnection[n] = null;
+ if (testConnection[n] != null)
+ {
+ testConnection[n].Stop();
+ testConnection[n].Close();
+ testConnection[n].Dispose();
+ testConnection[n] = null;
+ }
}
/// <summary>
@@ -188,7 +186,7 @@
ConsumeNMessages(n, body, consumer);
// Check that one more than n cannot be received.
- IMessage msg = consumer.Receive(500);
+ IMessage msg = consumer.Receive(RECEIVE_WAIT);
Assert.IsNull(msg, "Consumer got more messages than the number
requested (" + n + ").");
}
@@ -207,7 +205,7 @@
// Try to receive n messages.
for (int i = 0; i < n; i++)
{
- msg = consumer.Receive(500);
+ msg = consumer.Receive(RECEIVE_WAIT);
Assert.IsNotNull(msg, "Consumer did not receive message
number: " + i);
Assert.AreEqual(body, ((ITextMessage)msg).Text, "Incorrect
Message recevied on consumer1.");
}
@@ -239,6 +237,6 @@
}
return buf.ToString();
- }
+ }
}
}
Modified:
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs?rev=614918&r1=614917&r2=614918&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs
(original)
+++
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs
Thu Jan 24 08:32:53 2008
@@ -56,8 +56,10 @@
base.Init();
// Create one producer and one consumer, p2p, tx, consumer with
queue bound to producers routing key.
- SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId,
AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, false,
null);
- SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId,
AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, false,
null);
+ SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId,
AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT,
+ true, false, null);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId,
AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT,
+ true, false, null);
}
[TearDown]
@@ -117,6 +119,10 @@
[Test]
public void TestUncommittedReceiveCanBeRereceived()
{
+ // Create a third end-point as an alternative delivery route for
the message.
+ SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId,
AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT,
+ true, false, null);
+
// Send messages.
testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
testChannel[0].Commit();
@@ -126,10 +132,11 @@
// Close end-point 1 without committing the message, then re-open
to consume again.
CloseEndPoint(1);
- SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId,
AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, false,
null);
- // Try to receive messages.
- ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+ // Check that the message was released from the rolled back
end-point an can be received on the alternative one instead.
+ ConsumeNMessagesOnly(1, "A", testConsumer[2]);
+
+ CloseEndPoint(2);
}
/// <summary> Check that a committed receive cannot be re-received.
</summary>
@@ -144,10 +151,6 @@
ConsumeNMessagesOnly(1, "A", testConsumer[1]);
testChannel[1].Commit();
- // Close end-point 1 without committing the message, then re-open
to consume again.
- CloseEndPoint(1);
- SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId,
AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, false,
null);
-
// Try to receive messages.
ConsumeNMessagesOnly(0, "A", testConsumer[1]);
}
@@ -156,20 +159,23 @@
[Test]
public void TestRolledBackReceiveCanBeRereceived()
{
+ // Create a third end-point as an alternative delivery route for
the message.
+ SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId,
AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT,
+ true, false, null);
+
// Send messages.
testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
testChannel[0].Commit();
-
+
// Try to receive messages.
ConsumeNMessagesOnly(1, "A", testConsumer[1]);
- testChannel[1].Rollback();
- // Close end-point 1 without committing the message, then re-open
to consume again.
- CloseEndPoint(1);
- SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId,
AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, false,
null);
+ testChannel[1].Rollback();
// Try to receive messages.
- ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+ ConsumeNMessagesOnly(1, "A", testConsumer[2]);
+
+ CloseEndPoint(2);
}
}
}
Modified:
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs?rev=614918&r1=614917&r2=614918&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs
(original)
+++
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs
Thu Jan 24 08:32:53 2008
@@ -50,21 +50,12 @@
public override void Init()
{
base.Init();
-
- _connection.Start();
}
[TearDown]
public override void Shutdown()
{
- try
- {
- _connection.Stop();
- }
- finally
- {
- base.Shutdown();
- }
+ base.Shutdown();
}
[Test]
@@ -82,9 +73,9 @@
private void TestDurableSubscription(AcknowledgeMode ackMode)
{
// Create a topic with one producer and two consumers.
- SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, ackMode,
false, ExchangeNameDefaults.TOPIC, false, null);
- SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, ackMode,
false, ExchangeNameDefaults.TOPIC, false, null);
- SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, ackMode,
false, ExchangeNameDefaults.TOPIC,
+ SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, ackMode,
false, ExchangeNameDefaults.TOPIC, true, false, null);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, ackMode,
false, ExchangeNameDefaults.TOPIC, true, false, null);
+ SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, ackMode,
false, ExchangeNameDefaults.TOPIC, true,
true, "TestSubscription" + testId);
// Send messages and receive on both consumers.
@@ -102,7 +93,7 @@
ConsumeNMessagesOnly(1, "B", testConsumer[1]);
// Re-attach consumer, check that it gets the messages that it
missed.
- SetUpEndPoint(3, false, true, TEST_ROUTING_KEY + testId, ackMode,
false, ExchangeNameDefaults.TOPIC,
+ SetUpEndPoint(3, false, true, TEST_ROUTING_KEY + testId, ackMode,
false, ExchangeNameDefaults.TOPIC, true,
true, "TestSubscription" + testId);
ConsumeNMessagesOnly(1, "B", testConsumer[3]);
@@ -111,6 +102,63 @@
CloseEndPoint(0);
CloseEndPoint(1);
CloseEndPoint(3);
+ }
+
+ /// <summary> Check that an uncommitted receive can be re-received, on
re-consume from the same durable subscription. </summary>
+ [Test]
+ public void TestUncommittedReceiveCanBeRereceivedNewConnection()
+ {
+ SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId,
AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
+ true, false, null);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId,
AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
+ true, false, null);
+
+ // Send messages.
+ testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+ testChannel[0].Commit();
+
+ // Try to receive messages, but don't commit them.
+ ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+
+ // Close end-point 1 without committing the message, then re-open
the subscription to consume again.
+ CloseEndPoint(1);
+ SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId,
AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT,
+ true, false, null);
+
+ // Check that the message was released from the rolled back
end-point an can be received on the alternative one instead.
+ ConsumeNMessagesOnly(1, "A", testConsumer[2]);
+
+ CloseEndPoint(2);
+ CloseEndPoint(0);
+ }
+
+ /// <summary> Check that a rolled back receive can be re-received, on
re-consume from the same durable subscription. </summary>
+ [Test]
+ public void TestRolledBackReceiveCanBeRereceivedNewConnection()
+ {
+ SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId,
AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
+ true, false, null);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId,
AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
+ true, false, null);
+
+ // Send messages.
+ testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+ testChannel[0].Commit();
+
+ // Try to receive messages, but roll them back.
+ ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+ testChannel[1].Rollback();
+
+ // Close end-point 1 without committing the message, then re-open
the subscription to consume again.
+ CloseEndPoint(1);
+ SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId,
AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT,
+ true, false, null);
+
+ // Check that the message was released from the rolled back
end-point an can be received on the alternative one instead.
+ ConsumeNMessagesOnly(1, "A", testConsumer[2]);
+
+ CloseEndPoint(2);
+ CloseEndPoint(0);
}
}
}
Modified:
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs?rev=614918&r1=614917&r2=614918&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs
(original)
+++
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs
Thu Jan 24 08:32:53 2008
@@ -24,6 +24,8 @@
using NUnit.Framework;
using Apache.Qpid.Framing;
using Apache.Qpid.Messaging;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
namespace Apache.Qpid.Integration.Tests.testcases
{
@@ -42,7 +44,7 @@
///
/// <todo>Consider not using a delegate to callback the OnMessage method.
Easier to just call receive on the consumer but using the
/// callback does demonstrate how to do so.</todo>
- [TestFixture, Category("Integration")]
+ [TestFixture, Category("Integration")]
public class HeadersExchangeTest : BaseMessagingTestFixture
{
private static ILog _logger =
LogManager.GetLogger(typeof(HeadersExchangeTest));
@@ -69,11 +71,20 @@
private MessageReceivedDelegate _msgRecDelegate;
private ExceptionListenerDelegate _exceptionDelegate;
+ /// <summary> Holds the test connection. </summary>
+ protected IConnection _connection;
+
+ /// <summary> Holds the test channel. </summary>
+ protected IChannel _channel;
+
[SetUp]
public override void Init()
{
// Ensure that the base init method is called. It establishes a
connection with the broker.
- base.Init();
+ base.Init();
+
+ _connection = new AMQConnection(connectionInfo);
+ _channel = _connection.CreateChannel(false,
AcknowledgeMode.AutoAcknowledge, 500, 300);
_logger.Info("Starting...");
_logger.Info("Exchange name is '" + _exchangeName + "'...");
@@ -130,6 +141,8 @@
//_connection.ExceptionListener -= _exceptionDelegate;
_connection.Stop();
+ _connection.Close();
+ _connection.Dispose();
base.Shutdown();
}
Modified:
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/MandatoryMessageTest.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/MandatoryMessageTest.cs?rev=614918&r1=614917&r2=614918&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/MandatoryMessageTest.cs
(original)
+++
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/MandatoryMessageTest.cs
Thu Jan 24 08:32:53 2008
@@ -23,6 +23,8 @@
using log4net;
using NUnit.Framework;
using Apache.Qpid.Messaging;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
namespace Apache.Qpid.Integration.Tests.testcases
{
@@ -52,12 +54,21 @@
/// <summary>Holds the last received error condition, for examination
by the tests sending thread.</summary>
private Exception lastErrorException;
+
+ /// <summary> Holds the test connection. </summary>
+ protected IConnection _connection;
+
+ /// <summary> Holds the test channel. </summary>
+ protected IChannel _channel;
[SetUp]
public override void Init()
{
base.Init();
+ _connection = new AMQConnection(connectionInfo);
+ _channel = _connection.CreateChannel(false,
AcknowledgeMode.AutoAcknowledge, 500, 300);
+
errorEvent = new ManualResetEvent(false);
lastErrorException = null;
_connection.ExceptionListener = new
ExceptionListenerDelegate(OnException);
@@ -70,7 +81,9 @@
{
try
{
- _connection.Stop();
+ _connection.Stop();
+ _connection.Close();
+ _connection.Dispose();
}
finally
{
Modified:
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs?rev=614918&r1=614917&r2=614918&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs
(original)
+++
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs
Thu Jan 24 08:32:53 2008
@@ -24,92 +24,106 @@
using log4net;
using NUnit.Framework;
using Apache.Qpid.Messaging;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
namespace Apache.Qpid.Integration.Tests.testcases
{
+ /// ProducerMultiConsumerTest provides some simple topic exchange based
fan-out testing.
+ ///
+ /// <p><table id="crc"><caption>CRC Card</caption>
+ /// <tr><th> Responsibilities <th> Collaborations
+ /// <tr><td> Check that all consumers on a topic each receive all message
on it.
+ /// </table>
+ /// </summary>
[TestFixture, Category("Integration")]
public class ProducerMultiConsumerTest : BaseMessagingTestFixture
{
private static readonly ILog _logger =
LogManager.GetLogger(typeof(ProducerMultiConsumerTest));
- private string _commandQueueName = "ServiceQ1";
+ /// <summary>Base name for the routing key used for this test (made
unique by adding in test id).</summary>
+ private const string TEST_ROUTING_KEY = "ProducerMultiConsumerTest";
+ /// <summary>The number of consumers to test.</summary>
private const int CONSUMER_COUNT = 5;
- private const int MESSAGE_COUNT = 1000;
+ /// <summary>The number of test messages to send.</summary>
+ private const int MESSAGE_COUNT = 10;
+ /// <summary>Monitor used to signal succesfull receipt of all test
messages.</summary>
AutoResetEvent _finishedEvent = new AutoResetEvent(false);
- private IMessagePublisher _publisher;
-
- private IMessageConsumer[] _consumers = new
IMessageConsumer[CONSUMER_COUNT];
-
+ /// <summary>Used to count test messages received so far.</summary>
private int _messageReceivedCount = 0;
- //[SetUp]
+ /// <summary>Flag used to indicate that all messages really were
received, and that the test did not just time out. </summary>
+ private bool allReceived = false;
+
+ /// <summary> Creates one producing end-point and many consuming
end-points connected on a topic. </summary>
+ [SetUp]
public override void Init()
{
base.Init();
- _publisher = _channel.CreatePublisherBuilder()
- .WithRoutingKey(_commandQueueName)
- .WithExchangeName(ExchangeNameDefaults.TOPIC)
- .Create();
- _publisher.DisableMessageTimestamp = true;
- _publisher.DeliveryMode = DeliveryMode.NonPersistent;
-
- for (int i = 0; i < CONSUMER_COUNT; i++)
+ // Create end-points for all the consumers in the test.
+ for (int i = 1; i <= CONSUMER_COUNT; i++)
{
- string queueName = _channel.GenerateUniqueName();
- _channel.DeclareQueue(queueName, false, true, true);
-
- _channel.Bind(queueName, ExchangeNameDefaults.TOPIC,
_commandQueueName);
-
- _consumers[i] = _channel.CreateConsumerBuilder(queueName)
- .WithPrefetchLow(100).Create();
- _consumers[i].OnMessage = new
MessageReceivedDelegate(OnMessage);
+ SetUpEndPoint(i, false, true, TEST_ROUTING_KEY + testId,
AcknowledgeMode.AutoAcknowledge, false, ExchangeNameDefaults.TOPIC,
+ true, false, null);
+ testConsumer[i].OnMessage = new
MessageReceivedDelegate(OnMessage);
}
- _connection.Start();
+
+ // Create an end-point to publish to the test topic.
+ SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId,
AcknowledgeMode.AutoAcknowledge, false, ExchangeNameDefaults.TOPIC,
+ true, false, null);
}
- //[TearDown]
+ /// <summary> Cleans up all test end-points. </summary>
+ [TearDown]
public override void Shutdown()
{
- _connection.Stop();
- base.Shutdown();
+ try
+ {
+ CloseEndPoint(0);
+
+ for (int i = 1; i <= CONSUMER_COUNT; i++)
+ {
+ CloseEndPoint(i);
+ }
+ }
+ finally
+ {
+ base.Shutdown();
+ }
}
- //[Test]
- public void RunTest()
+ /// <summary> Check that all consumers on a topic each receive all
message on it. </summary>
+ [Test]
+ public void AllConsumerReceiveAllMessagesOnTopic()
{
+ Thread.Sleep(500);
+
for (int i = 0; i < MESSAGE_COUNT; i++)
{
- ITextMessage msg;
- try
- {
- msg = _channel.CreateTextMessage(GetData(512 + 8*i));
- }
- catch (Exception e)
- {
- _logger.Error("Error creating message: " + e, e);
- break;
- }
- _publisher.Send(msg);
+ testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
}
- _finishedEvent.WaitOne();
+
+ _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 10), false);
+
+ // Check that all messages really were received.
+ Assert.IsTrue(allReceived, "All messages were not received, only
got: " + _messageReceivedCount);
}
+ /// <summary> Atomically increments the message count on every
message, and signals once all messages in the test are received. </summary>
public void OnMessage(IMessage m)
{
int newCount = Interlocked.Increment(ref _messageReceivedCount);
- if (newCount % 1000 == 0) _logger.Info("Received count=" +
newCount);
- if (newCount == (MESSAGE_COUNT * CONSUMER_COUNT))
+
+ if (newCount > (MESSAGE_COUNT * CONSUMER_COUNT))
{
- _logger.Info("All messages received");
+ allReceived = true;
_finishedEvent.Set();
}
- if ( newCount % 100 == 0 )
- System.Diagnostics.Debug.WriteLine(((ITextMessage)m).Text);
}
}
}
Modified:
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs?rev=614918&r1=614917&r2=614918&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs
(original)
+++
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs
Thu Jan 24 08:32:53 2008
@@ -39,7 +39,7 @@
/// Make a test TLS connection to the broker
/// without using client-certificates
/// </summary>
- //[Test]
+ [Test]
public void DoSslConnection()
{
// because for tests we don't usually trust the server certificate
Modified:
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/SyncConsumerTest.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/SyncConsumerTest.cs?rev=614918&r1=614917&r2=614918&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/SyncConsumerTest.cs
(original)
+++
incubator/qpid/branches/M2.1/dotnet/Qpid.Integration.Tests/testcases/SyncConsumerTest.cs
Thu Jan 24 08:32:53 2008
@@ -24,6 +24,8 @@
using log4net;
using NUnit.Framework;
using Apache.Qpid.Messaging;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
namespace Apache.Qpid.Integration.Tests.testcases
{
@@ -37,11 +39,21 @@
private IMessageConsumer _consumer;
private IMessagePublisher _publisher;
+
+ /// <summary> Holds the test connection. </summary>
+ protected IConnection _connection;
+
+ /// <summary> Holds the test channel. </summary>
+ protected IChannel _channel;
- //[SetUp]
+ [SetUp]
public override void Init()
{
base.Init();
+
+ _connection = new AMQConnection(connectionInfo);
+ _channel = _connection.CreateChannel(false,
AcknowledgeMode.AutoAcknowledge, 500, 300);
+
_publisher = _channel.CreatePublisherBuilder()
.WithRoutingKey(_commandQueueName)
.WithExchangeName(ExchangeNameDefaults.TOPIC)
@@ -60,7 +72,25 @@
_connection.Start();
}
- //[Test]
+ /// <summary>
+ /// Deregisters the on message delegate before closing the connection.
+ /// </summary>
+ [TearDown]
+ public override void Shutdown()
+ {
+ _logger.Info("public void Shutdown(): called");
+
+ //_consumer.OnMessage -= _msgRecDelegate;
+ //_connection.ExceptionListener -= _exceptionDelegate;
+
+ _connection.Stop();
+ _connection.Close();
+ _connection.Dispose();
+
+ base.Shutdown();
+ }
+
+ [Test]
public void ReceiveWithInfiniteWait()
{
// send all messages
@@ -94,7 +124,7 @@
}
}
- //[Test]
+ [Test]
public void ReceiveWithTimeout()
{
ITextMessage msg = _channel.CreateTextMessage(GetData(512 + 8));