Author: gsim
Date: Thu Oct 12 09:32:47 2006
New Revision: 463309
URL: http://svn.apache.org/viewvc?view=rev&rev=463309
Log:
Fixed ack.RecoverTest and ack.DisconnectAndRedeliverTest. These were failing
due to a race condition
where the consumers queue was not bound by the time the publisher sent messages.
This is a result of the use of nowait=true for the declare/bind/consume cycle
for a BasicMessageConsumer.
To work around this in tests like these that have two connections, one
consuming & one publishing, I
added a declareExchangeSynch() method to AMQSession which allows a thread to
block until the session it
invokes that method on has processed all the commands up to that point.
Modified:
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/RecoverTest.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java?view=diff&rev=463309&r1=463308&r2=463309
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java
Thu Oct 12 09:32:47 2006
@@ -785,6 +785,12 @@
declareExchange(name, type, _connection.getProtocolHandler());
}
+ public void declareExchangeSynch(String name, String type) throws
AMQException
+ {
+ AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, 0,
name, type, false, false, false, false, false, null);
+ _connection.getProtocolHandler().syncWrite(frame,
ExchangeDeclareOkBody.class);
+ }
+
private void declareExchange(AMQDestination amqd, AMQProtocolHandler
protocolHandler)
{
declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(),
protocolHandler);
Modified:
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java?view=diff&rev=463309&r1=463308&r2=463309
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java
Thu Oct 12 09:32:47 2006
@@ -22,6 +22,7 @@
import org.apache.log4j.xml.DOMConfigurator;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
@@ -68,9 +69,11 @@
TestableMemoryMessageStore store = (TestableMemoryMessageStore)
ApplicationRegistry.getInstance().getMessageStore();
- Session consumerSession = con.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ Session consumerSession = (AMQSession) con.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
AMQQueue queue = new AMQQueue("someQ", "someQ", false, false);
MessageConsumer consumer = consumerSession.createConsumer(queue);
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch("amq.direct",
"direct");
Connection con2 = new AMQConnection("vm://:1", "guest", "guest",
"producer1", "/test");
Session producerSession = con2.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
@@ -103,15 +106,15 @@
con.start();
tm = (TextMessage) consumer.receive(3000);
- Assert.assertEquals(tm.getText(), "msg2");
+ Assert.assertEquals("msg2", tm.getText());
tm = (TextMessage) consumer.receive(3000);
- Assert.assertEquals(tm.getText(), "msg3");
+ Assert.assertEquals("msg3", tm.getText());
tm = (TextMessage) consumer.receive(3000);
- Assert.assertEquals(tm.getText(), "msg4");
+ Assert.assertEquals("msg4", tm.getText());
_logger.info("Received redelivery of three messages. Acknowledging
last message");
tm.acknowledge();
@@ -157,6 +160,8 @@
Session consumerSession = con.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
Queue queue = new AMQQueue("someQ", "someQ", false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch("amq.direct",
"direct");
Connection con2 = new AMQConnection("vm://:1", "guest", "guest",
"producer1", "/test");
Session producerSession = con2.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
Modified:
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/RecoverTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/RecoverTest.java?view=diff&rev=463309&r1=463308&r2=463309
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/RecoverTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/RecoverTest.java
Thu Oct 12 09:32:47 2006
@@ -20,6 +20,7 @@
import junit.framework.JUnit4TestAdapter;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.vmbroker.AMQVMBrokerCreationException;
import org.apache.log4j.Logger;
@@ -55,6 +56,8 @@
Session consumerSession = con.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
Queue queue = new AMQQueue("someQ", "someQ", false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch("amq.direct",
"direct");
Connection con2 = new AMQConnection("vm://:1", "guest", "guest",
"producer1", "/test");
Session producerSession = con2.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
@@ -80,13 +83,13 @@
// no ack for last three messages so when I call recover I expect to
get three messages back
consumerSession.recover();
tm = (TextMessage) consumer.receive(3000);
- Assert.assertEquals(tm.getText(), "msg2");
+ Assert.assertEquals("msg2", tm.getText());
tm = (TextMessage) consumer.receive(3000);
- Assert.assertEquals(tm.getText(), "msg3");
+ Assert.assertEquals("msg3", tm.getText());
tm = (TextMessage) consumer.receive(3000);
- Assert.assertEquals(tm.getText(), "msg4");
+ Assert.assertEquals("msg4", tm.getText());
_logger.info("Received redelivery of three messages. Acknowledging
last message");
tm.acknowledge();