Author: ritchiem
Date: Thu Feb  1 08:25:57 2007
New Revision: 502261

URL: http://svn.apache.org/viewvc?view=rev&rev=502261
Log:
QPID-339 DispatcherTest.java was broker now it actually tests correctly.
Added test to Check changing message listeners

Added:
    
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
   (with props)
Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=502261&r1=502260&r2=502261
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Thu Feb  1 08:25:57 2007
@@ -1642,6 +1642,10 @@
             _dispatcher.setDaemon(true);
             _dispatcher.start();
         }
+        else
+        {
+            _dispatcher.setConnectionStopped(false);
+        }
     }
 
     void stop()

Modified: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java?view=diff&rev=502261&r1=502260&r2=502261
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java
 Thu Feb  1 08:25:57 2007
@@ -112,10 +112,6 @@
 
     protected void tearDown() throws Exception
     {
-        assertEquals("Messages not received correctly", 0, 
_allFirstMessagesSent.getCount());
-        assertEquals("Messages not received correctly", 0, 
_allSecondMessagesSent.getCount());
-        assertEquals("Client didn't get all messages", MSG_COUNT * 2, 
_receivedCount);
-        assertEquals("Messages received while stopped is not 0", 0, 
_receivedCountWhileStopped);
 
         _clientConnection.close();
 
@@ -130,6 +126,9 @@
 
         _logger.info("Test Start");
 
+
+        assertTrue(!((AMQConnection) _clientConnection).started());
+
         //Set default Message Listener
         try
         {
@@ -165,10 +164,9 @@
                     }
                 }
             });
-
-
-            // FIXME Note : Should we need to call start to be able to call 
stop?
-            //_clientConnection.start();
+            
+            assertTrue("Connecion should not be started", !((AMQConnection) 
_clientConnection).started());
+            _clientConnection.start();            
         }
         catch (JMSException e)
         {
@@ -187,6 +185,7 @@
 
         try
         {
+            assertTrue("Connecion should be started", ((AMQConnection) 
_clientConnection).started());
             _clientConnection.stop();
             _connectionStopped = true;
         }
@@ -243,6 +242,12 @@
         {
             //do nothing
         }
+
+        assertEquals("Messages not received correctly", 0, 
_allFirstMessagesSent.getCount());
+        assertEquals("Messages not received correctly", 0, 
_allSecondMessagesSent.getCount());
+        assertEquals("Client didn't get all messages", MSG_COUNT * 2, 
_receivedCount);
+        assertEquals("Messages received while stopped is not 0", 0, 
_receivedCountWhileStopped);
+
     }
 
 

Added: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java?view=auto&rev=502261
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
 Thu Feb  1 08:25:57 2007
@@ -0,0 +1,271 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Hashtable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause 
messages to be "lost" on a internal delivery queue
+ * <p/>
+ * The message delivery process:
+ * Mina puts a message on _queue in AMQSession and the dispatcher thread 
take()s
+ * from here and dispatches to the _consumers. If the _consumer1 doesn't have 
a message listener set at connection start
+ * then messages are stored on _synchronousQueue (which needs to be > 1 to 
pass JMS TCK as multiple consumers on a
+ * session can run in any order and a synchronous put/poll will block the 
dispatcher).
+ * <p/>
+ * When setting the message listener later the _synchronousQueue is just 
poll()'ed and the first message delivered
+ * the remaining messages will be left on the queue and lost, subsequent 
messages on the session will arrive first.
+ */
+public class ResetMessageListenerTest extends TestCase
+{
+    private static final Logger _logger = 
Logger.getLogger(ResetMessageListenerTest.class);
+
+    Context _context;
+
+    private static final int MSG_COUNT = 6;
+    private int receivedCount1ML1 = 0;
+    private int receivedCount1ML2 = 0;
+    private int receivedCount2 = 0;
+    private Connection _clientConnection, _producerConnection;
+    private MessageConsumer _consumer1;
+    private MessageConsumer _consumer2;
+    MessageProducer _producer;
+    Session _clientSession, _producerSession;
+
+    private final CountDownLatch _allFirstMessagesSent = new 
CountDownLatch(2); //all messages Sent Lock
+    private final CountDownLatch _allSecondMessagesSent = new 
CountDownLatch(2); //all messages Sent Lock
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        TransportConnection.createVMBroker(1);
+
+        InitialContextFactory factory = new 
PropertiesFileInitialContextFactory();
+
+        Hashtable<String, String> env = new Hashtable<String, String>();
+
+        env.put("connectionfactory.connection", "amqp://client:[EMAIL 
PROTECTED]/test?brokerlist='vm://:1'");
+        env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+
+        _context = factory.getInitialContext(env);
+
+        Queue queue = (Queue) _context.lookup("queue");
+
+        //Create Client 1
+        _clientConnection = ((ConnectionFactory) 
_context.lookup("connection")).createConnection();
+
+        _clientSession = _clientConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        _consumer1 = _clientSession.createConsumer(queue);
+
+        //Create Client 2 on same session
+        _consumer2 = _clientSession.createConsumer(queue);
+
+        //Create Producer
+        _producerConnection = ((ConnectionFactory) 
_context.lookup("connection")).createConnection();
+
+        _producerConnection.start();
+
+        _producerSession = _producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        _producer = _producerSession.createProducer(queue);
+
+        for (int msg = 0; msg < MSG_COUNT; msg++)
+        {
+            _producer.send(_producerSession.createTextMessage("Message " + 
msg));
+        }
+
+    }
+
+    protected void tearDown() throws Exception
+    {
+        assertEquals("First batch of messages not received correctly", 0, 
_allFirstMessagesSent.getCount());
+        assertEquals("Second batch of messages not received correctly", 0, 
_allSecondMessagesSent.getCount());
+        assertEquals("Client 1 ML1 didn't get all messages", MSG_COUNT / 2, 
receivedCount1ML1);
+        assertEquals("Client 2 didn't get all messages", MSG_COUNT, 
receivedCount2);
+        assertEquals("Client 1 ML2 didn't get all messages", MSG_COUNT / 2, 
receivedCount1ML2);
+
+        _clientConnection.close();
+
+        _producerConnection.close();
+        super.tearDown();
+        TransportConnection.killAllVMBrokers();
+    }
+
+
+    public void testAsynchronousRecieve()
+    {
+
+        _logger.info("Test Start");
+
+        //Set default Message Listener
+        try
+        {
+            _consumer1.setMessageListener(new MessageListener()
+            {
+                public void onMessage(Message message)
+                {
+                    _logger.info("Client 1 ML 1 Received Message(" + 
receivedCount1ML1 + "):" + message);
+
+                    receivedCount1ML1++;
+                    if (receivedCount1ML1 == MSG_COUNT / 2)
+                    {
+                        _allFirstMessagesSent.countDown();
+                    }
+                }
+            });
+        }
+        catch (JMSException e)
+        {
+            _logger.error("Error Setting Default ML on consumer1");
+        }
+
+
+        try
+        {
+            _consumer2.setMessageListener(new MessageListener()
+            {
+                public void onMessage(Message message)
+                {
+                    _logger.info("Client 2 Received Message(" + receivedCount2 
+ "):" + message);
+
+                    receivedCount2++;
+                    if (receivedCount2 == MSG_COUNT / 2)
+                    {
+                        _logger.info("Client 2 received all its messages1");
+                        _allFirstMessagesSent.countDown();
+                    }
+
+                    if (receivedCount2 == MSG_COUNT)
+                    {
+                        _logger.info("Client 2 received all its messages2");
+                        _allSecondMessagesSent.countDown();
+                    }
+                }
+            });
+
+            _clientConnection.start();
+        }
+        catch (JMSException e)
+        {
+            _logger.error("Error Setting Default ML on consumer2");
+
+        }
+
+
+        try
+        {
+            _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS);
+            _logger.info("Received first batch of messages");
+        }
+        catch (InterruptedException e)
+        {
+            //do nothing
+        }
+
+        try
+        {
+            _clientConnection.stop();                        
+        }
+        catch (JMSException e)
+        {
+            _logger.error("Error stopping connection");
+        }
+
+        _logger.info("Reset Message Listener to better listener while 
connection stopped, will restart session");
+        try
+        {
+            _consumer1.setMessageListener(new MessageListener()
+            {
+                public void onMessage(Message message)
+                {
+                    _logger.info("Client 1 ML2 Received Message(" + 
receivedCount1ML1 + "):" + message);
+
+                    receivedCount1ML2++;
+                    if (receivedCount1ML2 == MSG_COUNT / 2)
+                    {
+                        _allSecondMessagesSent.countDown();
+                    }
+                }
+            });
+            
+            _clientConnection.start();
+        }
+        catch (javax.jms.IllegalStateException e)
+        {
+            _logger.error("Connection not stopped while setting ML", e);
+            fail("Unable to change message listener:" + e.getCause());
+        }
+        catch (JMSException e)
+        {
+            _logger.error("Error Setting Better ML on consumer1", e);
+        }
+
+        try
+        {
+            _logger.error("Send additional messages");
+
+            for (int msg = 0; msg < MSG_COUNT; msg++)
+            {
+                _producer.send(_producerSession.createTextMessage("Message " + 
msg));
+            }
+        }
+        catch (JMSException e)
+        {
+            _logger.error("Unable to send additional messages", e);
+        }
+
+        _logger.info("Waiting upto 2 seconds for messages");
+
+        try
+        {
+            _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException e)
+        {
+            //do nothing
+        }
+    }
+
+
+    public static junit.framework.Test suite()
+    {
+        return new junit.framework.TestSuite(ResetMessageListenerTest.class);
+    }
+}

Propchange: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to