Author: rgodfrey
Date: Wed Oct 10 01:51:27 2007
New Revision: 583389
URL: http://svn.apache.org/viewvc?rev=583389&view=rev
Log:
Merged revisions 583105 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1
........
r583105 | rgodfrey | 2007-10-09 11:48:25 +0100 (Tue, 09 Oct 2007) | 1 line
QPID-625 : Fix commit rollback test to prevent failures caused by incorrect
assertions in the test
........
Modified:
incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
Modified:
incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java?rev=583389&r1=583388&r2=583389&view=diff
==============================================================================
---
incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
(original)
+++
incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
Wed Oct 10 01:51:27 2007
@@ -32,7 +32,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -40,6 +39,7 @@
import javax.jms.Session;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.UUID;
public class MessageRequeueTest extends TestCase
{
@@ -50,7 +50,7 @@
protected final int consumeTimeout = 3000;
- protected final String queue = "direct://amq.direct//queue";
+
protected String payload = "Message:";
protected final String BROKER = "vm://:1";
@@ -64,32 +64,13 @@
super.setUp();
TransportConnection.createVMBroker(1);
- QpidClientConnection conn = new QpidClientConnection(BROKER);
- conn.connect();
- // clear queue
- conn.consume(queue, consumeTimeout);
- // load test data
- _logger.info("creating test data, " + numTestMessages + " messages");
- conn.put(queue, payload, numTestMessages);
- // close this connection
- conn.disconnect();
}
protected void tearDown() throws Exception
{
super.tearDown();
- if (!passed) // clean up
- {
- QpidClientConnection conn = new QpidClientConnection(BROKER);
-
- conn.connect();
- // clear queue
- conn.consume(queue, consumeTimeout);
-
- conn.disconnect();
- }
TransportConnection.killVMBroker(1);
}
@@ -102,12 +83,25 @@
*/
public void testDrain() throws JMSException, InterruptedException
{
+ final String queueName = "direct://amq.direct//queue" +
UUID.randomUUID().toString();
+
QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
+ // clear queue
+ conn.consume(queueName, consumeTimeout);
+ // load test data
+ _logger.info("creating test data, " + numTestMessages + " messages");
+ conn.put(queueName, payload, numTestMessages);
+ // close this connection
+ conn.disconnect();
+
+ conn = new QpidClientConnection(BROKER);
+
+ conn.connect();
- _logger.info("consuming queue " + queue);
- Queue q = conn.getSession().createQueue(queue);
+ _logger.info("consuming queue " + queueName);
+ Queue q = conn.getSession().createQueue(queueName);
final MessageConsumer consumer = conn.getSession().createConsumer(q);
int messagesReceived = 0;
@@ -173,17 +167,33 @@
_logger.info("consumed: " + messagesReceived);
conn.disconnect();
passed = true;
+
}
+
+
/** multiple consumers
* Based on code subbmitted by client FT-304
*/
- public void testTwoCompetingConsumers()
+ public void testCompetingConsumers() throws JMSException,
InterruptedException
{
- Consumer c1 = new Consumer();
- Consumer c2 = new Consumer();
- Consumer c3 = new Consumer();
- Consumer c4 = new Consumer();
+ final String queueName = "direct://amq.direct//queue" +
UUID.randomUUID().toString();
+
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
+
+ conn.connect();
+ // clear queue
+ conn.consume(queueName, consumeTimeout);
+ // load test data
+ _logger.info("creating test data, " + numTestMessages + " messages");
+ conn.put(queueName, payload, numTestMessages);
+ // close this connection
+ conn.disconnect();
+
+ Consumer c1 = new Consumer(queueName);
+ Consumer c2 = new Consumer(queueName);
+ Consumer c3 = new Consumer(queueName);
+ Consumer c4 = new Consumer(queueName);
Thread t1 = new Thread(c1);
Thread t2 = new Thread(c2);
@@ -193,7 +203,7 @@
t1.start();
t2.start();
t3.start();
- // t4.start();
+ t4.start();
try
{
@@ -237,16 +247,18 @@
assertEquals(list.toString() + "-" + numTestMessages + "-" +
totalConsumed, 0, failed);
assertEquals("number of consumed messages does not match initial
data", numTestMessages, totalConsumed);
- passed = true;
+
}
class Consumer implements Runnable
{
private Integer count = 0;
private Integer id;
+ private final String _queueName;
- public Consumer()
+ public Consumer(String queueName)
{
+ _queueName = queueName;
id = consumerIds.addAndGet(1);
}
@@ -263,7 +275,7 @@
Message result;
do
{
- result = conn.getNextMessage(queue, consumeTimeout);
+ result = conn.getNextMessage(_queueName, consumeTimeout);
if (result != null)
{
@@ -322,8 +334,21 @@
}
}
- public void testRequeue() throws JMSException, AMQException,
URLSyntaxException
+ public void testRequeue() throws JMSException, AMQException,
URLSyntaxException, InterruptedException
{
+ final String queue = "direct://amq.direct//queue" +
UUID.randomUUID().toString();
+
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
+
+ conn.connect();
+ // clear queue
+ conn.consume(queue, consumeTimeout);
+ // load test data
+ _logger.info("creating test data, " + numTestMessages + " messages");
+ conn.put(queue, payload, numTestMessages);
+ // close this connection
+ conn.disconnect();
+
int run = 0;
// while (run < 10)
{
@@ -338,14 +363,14 @@
String brokerlist = BROKER;
String brokerUrl = "amqp://guest:guest@" + virtualHost +
"?brokerlist='" + brokerlist + "'";
- Connection conn = new AMQConnection(brokerUrl);
- Session session = conn.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ AMQConnection amqConn = new AMQConnection(brokerUrl);
+ Session session = amqConn.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
Queue q = session.createQueue(queue);
_logger.debug("Create Consumer");
MessageConsumer consumer = session.createConsumer(q);
- conn.start();
+ amqConn.start();
_logger.debug("Receiving msg");
Message msg = consumer.receive(2000);
@@ -357,7 +382,7 @@
consumer.close();
_logger.debug("Close Connection");
- conn.close();
+ amqConn.close();
}
}