Author: gsim
Date: Thu Oct 12 09:32:47 2006
New Revision: 463309

URL: http://svn.apache.org/viewvc?view=rev&rev=463309
Log:
Fixed ack.RecoverTest and ack.DisconnectAndRedeliverTest. These were failing 
due to a race condition
where the consumers queue was not bound by the time the publisher sent messages.

This is a result of the use of nowait=true for the declare/bind/consume cycle 
for a BasicMessageConsumer.
To work around this in tests like these that have two connections, one 
consuming & one publishing, I 
added a declareExchangeSynch() method to AMQSession which allows a thread to 
block until the session it
invokes that method on has processed all the commands up to that point.


Modified:
    
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java
    
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/RecoverTest.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java?view=diff&rev=463309&r1=463308&r2=463309
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/AMQSession.java
 Thu Oct 12 09:32:47 2006
@@ -785,6 +785,12 @@
         declareExchange(name, type, _connection.getProtocolHandler());
     }
 
+    public void declareExchangeSynch(String name, String type) throws 
AMQException
+    {
+        AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, 0, 
name, type, false, false, false, false, false, null);
+        _connection.getProtocolHandler().syncWrite(frame, 
ExchangeDeclareOkBody.class);
+    }
+
     private void declareExchange(AMQDestination amqd, AMQProtocolHandler 
protocolHandler)
     {
         declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), 
protocolHandler);

Modified: 
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java?view=diff&rev=463309&r1=463308&r2=463309
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/DisconnectAndRedeliverTest.java
 Thu Oct 12 09:32:47 2006
@@ -22,6 +22,7 @@
 import org.apache.log4j.xml.DOMConfigurator;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
@@ -68,9 +69,11 @@
 
         TestableMemoryMessageStore store = (TestableMemoryMessageStore) 
ApplicationRegistry.getInstance().getMessageStore();
 
-        Session consumerSession = con.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+        Session consumerSession = (AMQSession) con.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
         AMQQueue queue = new AMQQueue("someQ", "someQ", false, false);
         MessageConsumer consumer = consumerSession.createConsumer(queue);
+        //force synch to ensure the consumer has resulted in a bound queue
+        ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", 
"direct");
 
         Connection con2 = new AMQConnection("vm://:1", "guest", "guest", 
"producer1", "/test");
         Session producerSession = con2.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
@@ -103,15 +106,15 @@
         con.start();
 
         tm = (TextMessage) consumer.receive(3000);
-        Assert.assertEquals(tm.getText(), "msg2");
+        Assert.assertEquals("msg2", tm.getText());
 
 
         tm = (TextMessage) consumer.receive(3000);
-        Assert.assertEquals(tm.getText(), "msg3");
+        Assert.assertEquals("msg3", tm.getText());
 
 
         tm = (TextMessage) consumer.receive(3000);
-        Assert.assertEquals(tm.getText(), "msg4");
+        Assert.assertEquals("msg4", tm.getText());
 
         _logger.info("Received redelivery of three messages. Acknowledging 
last message");
         tm.acknowledge();
@@ -157,6 +160,8 @@
         Session consumerSession = con.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
         Queue queue = new AMQQueue("someQ", "someQ", false, true);
         MessageConsumer consumer = consumerSession.createConsumer(queue);
+        //force synch to ensure the consumer has resulted in a bound queue
+        ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", 
"direct");
 
         Connection con2 = new AMQConnection("vm://:1", "guest", "guest", 
"producer1", "/test");
         Session producerSession = con2.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);

Modified: 
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/RecoverTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/RecoverTest.java?view=diff&rev=463309&r1=463308&r2=463309
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/RecoverTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/test/src/org/apache/qpid/ack/RecoverTest.java
 Thu Oct 12 09:32:47 2006
@@ -20,6 +20,7 @@
 import junit.framework.JUnit4TestAdapter;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.vmbroker.AMQVMBrokerCreationException;
 import org.apache.log4j.Logger;
@@ -55,6 +56,8 @@
         Session consumerSession = con.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
         Queue queue = new AMQQueue("someQ", "someQ", false, true);
         MessageConsumer consumer = consumerSession.createConsumer(queue);
+        //force synch to ensure the consumer has resulted in a bound queue
+        ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", 
"direct");
 
         Connection con2 = new AMQConnection("vm://:1", "guest", "guest", 
"producer1", "/test");
         Session producerSession = con2.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
@@ -80,13 +83,13 @@
         // no ack for last three messages so when I call recover I expect to 
get three messages back
         consumerSession.recover();
         tm = (TextMessage) consumer.receive(3000);
-        Assert.assertEquals(tm.getText(), "msg2");
+        Assert.assertEquals("msg2", tm.getText());
 
         tm = (TextMessage) consumer.receive(3000);
-        Assert.assertEquals(tm.getText(), "msg3");
+        Assert.assertEquals("msg3", tm.getText());
 
         tm = (TextMessage) consumer.receive(3000);
-        Assert.assertEquals(tm.getText(), "msg4");
+        Assert.assertEquals("msg4", tm.getText());
 
         _logger.info("Received redelivery of three messages. Acknowledging 
last message");
         tm.acknowledge();


Reply via email to