Author: ritchiem
Date: Thu Feb  1 08:01:14 2007
New Revision: 502253

URL: http://svn.apache.org/viewvc?view=rev&rev=502253
Log:
QPID-339 Java client hangs when starting up (intermittently)

Patched the problem where the dispatcher would hang. The previous logic was 
flawed.

Patch worked on by Robert Godfrey and Martin Ritchie.

Added test to ensure that the connection is not automatically started.

(Only added the test last time by mistake. This is the actual fix)

With a test for the DispatcherTest

Added:
    
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java
   (with props)
Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=502253&r1=502252&r2=502253
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Thu Feb  1 08:01:14 2007
@@ -93,8 +93,6 @@
      */
     private final FlowControllingBlockingQueue _queue;
 
-    private final java.util.Queue<MessageConsumerPair> _reprocessQueue;
-
     private Dispatcher _dispatcher;
 
     private MessageFactoryRegistry _messageFactoryRegistry;
@@ -136,20 +134,6 @@
      */
     private long _nextProducerId;
 
-    /**
-     * Track the 'stopped' state of the dispatcher, a session starts in the 
stopped state.
-     */
-    private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
-
-    /**
-     * Used to signal 'pausing' the dispatcher when setting a message listener 
on a consumer
-     */
-    private final AtomicBoolean _pausingDispatcher = new AtomicBoolean(false);
-
-    /**
-     * Used to signal 'pausing' the dispatcher when setting a message listener 
on a consumer
-     */
-    private final AtomicBoolean _pausedDispatcher = new AtomicBoolean(false);
 
     /**
      * Set when recover is called. This is to handle the case where recover() 
is called by application code
@@ -157,14 +141,24 @@
      */
     private boolean _inRecovery;
 
+    private boolean _connectionStopped;
+
     private boolean _hasMessageListeners;
 
     /**
      * Responsible for decoding a message fragment and passing it to the 
appropriate message consumer.
      */
-    
+
     private class Dispatcher extends Thread
     {
+
+        /**
+         * Track the 'stopped' state of the dispatcher, a session starts in 
the stopped state.
+         */
+        private final AtomicBoolean _closed = new AtomicBoolean(false);
+
+        private final Object _lock = new Object();
+
         public Dispatcher()
         {
             super("Dispatcher-Channel-" + _channelId);
@@ -173,12 +167,28 @@
         public void run()
         {
             UnprocessedMessage message;
-            _stopped.set(false);
+
             try
             {
-                while (!_stopped.get() && (message = (UnprocessedMessage) 
_queue.take()) != null)
+                while (!_closed.get() && (message = (UnprocessedMessage) 
_queue.take()) != null)
                 {
-                    dispatchMessage(message);
+                    synchronized (_lock)
+                    {
+
+                        while (connectionStopped())
+                        {
+                            _lock.wait();
+                        }
+
+                        dispatchMessage(message);
+
+                        while (connectionStopped())
+                        {
+                            _lock.wait();
+                        }
+
+                    }
+
                 }
             }
             catch (InterruptedException e)
@@ -189,6 +199,21 @@
             _logger.info("Dispatcher thread terminating for channel " + 
_channelId);
         }
 
+        // only call while holding lock
+        final boolean connectionStopped()
+        {
+            return _connectionStopped;
+        }
+
+        void setConnectionStopped(boolean connectionStopped)
+        {
+            synchronized (_lock)
+            {
+                _connectionStopped = connectionStopped;
+                _lock.notify();
+            }
+        }
+
         private void dispatchMessage(UnprocessedMessage message)
         {
             if (message.getDeliverBody() != null)
@@ -246,15 +271,17 @@
             }
         }
 
-        public void stopDispatcher()
+        public void close()
         {
-            _stopped.set(true);
+            _closed.set(true);
             interrupt();
+
+            //fixme awaitTermination
+
         }
     }
 
 
-
     AMQSession(AMQConnection con, int channelId, boolean transacted, int 
acknowledgeMode,
                MessageFactoryRegistry messageFactoryRegistry)
     {
@@ -285,8 +312,6 @@
         _defaultPrefetchHighMark = defaultPrefetchHighMark;
         _defaultPrefetchLowMark = defaultPrefetchLowMark;
 
-        _reprocessQueue = new ConcurrentLinkedQueue<MessageConsumerPair>();
-
         if (_acknowledgeMode == NO_ACKNOWLEDGE)
         {
             _queue = new 
FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
@@ -446,7 +471,7 @@
         }
     }
 
-    
+
     public void rollback() throws JMSException
     {
         checkTransacted();
@@ -654,7 +679,8 @@
     {
         if (_dispatcher != null)
         {
-            _dispatcher.stopDispatcher();
+            _dispatcher.close();
+            _dispatcher = null;
         }
         // we need to clone the list of consumers since the close() method 
updates the _consumers collection
         // which would result in a concurrent modification exception
@@ -680,7 +706,8 @@
     {
         if (_dispatcher != null)
         {
-            _dispatcher.stopDispatcher();
+            _dispatcher.close();
+            _dispatcher = null;
         }
         // we need to clone the list of consumers since the close() method 
updates the _consumers collection
         // which would result in a concurrent modification exception
@@ -712,8 +739,8 @@
         }
         // TODO: Be aware of possible changes to parameter order as versions 
change.
         
getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
-                                                                               
     getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version 
(major, minor)
-                                                                               
     false));    // requeue
+                                                                        
getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version 
(major, minor)
+                                                                        
false));    // requeue
     }
 
     boolean isInRecovery()
@@ -743,37 +770,36 @@
 
     public MessageListener getMessageListener() throws JMSException
     {
-        checkNotClosed();
+//        checkNotClosed();
         return _messageListener;
     }
 
     public void setMessageListener(MessageListener listener) throws 
JMSException
     {
-        checkNotClosed();
-
-        if (!isStopped())
-        {
-            throw new javax.jms.IllegalStateException("Attempt to set listener 
while session is started.");
-        }
-
-        // We are stopped         
-        for (Iterator<BasicMessageConsumer> i = 
_consumers.values().iterator(); i.hasNext();)
-        {
-            BasicMessageConsumer consumer = i.next();
-
-            if (consumer.isReceiving())
-            {
-                throw new javax.jms.IllegalStateException("Another thread is 
already receiving synchronously.");
-            }
-        }
-
-        _messageListener = listener;
-        
-        for (Iterator<BasicMessageConsumer> i = 
_consumers.values().iterator(); i.hasNext();)
-        {
-            i.next().setMessageListener(_messageListener);
-        }
-
+//        checkNotClosed();
+//
+//        if (_dispatcher != null && !_dispatcher.connectionStopped())
+//        {
+//            throw new javax.jms.IllegalStateException("Attempt to set 
listener while session is started.");
+//        }
+//
+//        // We are stopped
+//        for (Iterator<BasicMessageConsumer> i = 
_consumers.values().iterator(); i.hasNext();)
+//        {
+//            BasicMessageConsumer consumer = i.next();
+//
+//            if (consumer.isReceiving())
+//            {
+//                throw new javax.jms.IllegalStateException("Another thread is 
already receiving synchronously.");
+//            }
+//        }
+//
+//        _messageListener = listener;
+//
+//        for (Iterator<BasicMessageConsumer> i = 
_consumers.values().iterator(); i.hasNext();)
+//        {
+//            i.next().setMessageListener(_messageListener);
+//        }
 
     }
 
@@ -1582,13 +1608,17 @@
 
     void start()
     {
+        //fixme This should be controlled by _stopped as it pairs with the 
stop method
+        //fixme or check the FlowControlledBlockingQueue _queue to see if we 
have flow controlled.
+        //will result in sending Flow messages for each subsequent call to 
flow.. only need to do this
+        // if we have called stop.
         if (_startedAtLeastOnce.getAndSet(true))
         {
             //then we stopped this and are restarting, so signal server to 
resume delivery
             unsuspendChannel();
         }
 
-        if(hasMessageListeners() && _dispatcher == null)
+        if (hasMessageListeners())
         {
             startDistpatcherIfNecessary();
         }
@@ -1606,7 +1636,7 @@
 
     synchronized void startDistpatcherIfNecessary()
     {
-        if(_dispatcher == null)
+        if (_dispatcher == null)
         {
             _dispatcher = new Dispatcher();
             _dispatcher.setDaemon(true);
@@ -1618,14 +1648,7 @@
     {
         //stop the server delivering messages to this session
         suspendChannel();
-
-        //stop the dispatcher thread
-        _stopped.set(true);
-    }
-
-    boolean isStopped()
-    {
-        return _stopped.get();
+        _dispatcher.setConnectionStopped(true);
     }
 
     /**

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=502253&r1=502252&r2=502253
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Thu Feb  1 08:01:14 2007
@@ -212,16 +212,15 @@
 
         //i.e. it is only valid to call this method if
         //
-        //    (a) the session is stopped, in which case the dispatcher is not 
running
+        //    (a) the connection is stopped, in which case the dispatcher is 
not running
         //    OR
         //    (b) the listener is null AND we are not receiving synchronously 
at present
         //
 
-        if (_session.isStopped())
+        if (!_session.getAMQConnection().started())
         {
             _messageListener.set(messageListener);
             _session.setHasMessageListeners();
-            _session.startDistpatcherIfNecessary();
 
             if (_logger.isDebugEnabled())
             {
@@ -248,7 +247,6 @@
 
                 synchronized (_session)
                 {
-                    
                     _messageListener.set(messageListener);
                     _session.setHasMessageListeners();
                     _session.startDistpatcherIfNecessary();
@@ -329,12 +327,13 @@
 
     public Message receive(long l) throws JMSException
     {
-        _session.startDistpatcherIfNecessary();
 
         checkPreConditions();
 
         acquireReceiving();
 
+        _session.startDistpatcherIfNecessary();
+
         try
         {
             if (closeOnAutoClose())
@@ -385,12 +384,12 @@
 
     public Message receiveNoWait() throws JMSException
     {
-        _session.startDistpatcherIfNecessary();
-
         checkPreConditions();
 
         acquireReceiving();
 
+        _session.startDistpatcherIfNecessary();
+
         try
         {
             if (closeOnAutoClose())
@@ -560,7 +559,6 @@
             }
             else
             {
-                //This shouldn't be possible.
                 _synchronousQueue.put(jmsMessage);
             }
         }

Added: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java?view=auto&rev=502253
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java
 Thu Feb  1 08:01:14 2007
@@ -0,0 +1,253 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Hashtable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause 
messages to be "lost" on a internal delivery queue
+ * <p/>
+ * The message delivery process:
+ * Mina puts a message on _queue in AMQSession and the dispatcher thread 
take()s
+ * from here and dispatches to the _consumers. If the _consumer doesn't have a 
message listener set at connection start
+ * then messages are stored on _synchronousQueue (which needs to be > 1 to 
pass JMS TCK as multiple consumers on a
+ * session can run in any order and a synchronous put/poll will block the 
dispatcher).
+ * <p/>
+ * When setting the message listener later the _synchronousQueue is just 
poll()'ed and the first message delivered
+ * the remaining messages will be left on the queue and lost, subsequent 
messages on the session will arrive first.
+ */
+public class DispatcherTest extends TestCase
+{
+    private static final Logger _logger = 
Logger.getLogger(DispatcherTest.class);
+
+    Context _context;
+
+    private static final int MSG_COUNT = 6;
+    private int _receivedCount = 0;
+    private int _receivedCountWhileStopped = 0;
+    private Connection _clientConnection, _producerConnection;
+    private MessageConsumer _consumer;
+    MessageProducer _producer;
+    Session _clientSession, _producerSession;
+
+    private final CountDownLatch _allFirstMessagesSent = new 
CountDownLatch(1); //all messages Sent Lock
+    private final CountDownLatch _allSecondMessagesSent = new 
CountDownLatch(1); //all messages Sent Lock
+
+    private volatile boolean _connectionStopped = false;
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        TransportConnection.createVMBroker(1);
+
+        InitialContextFactory factory = new 
PropertiesFileInitialContextFactory();
+
+        Hashtable<String, String> env = new Hashtable<String, String>();
+
+        env.put("connectionfactory.connection", "amqp://client:[EMAIL 
PROTECTED]/test?brokerlist='vm://:1'");
+        env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+
+        _context = factory.getInitialContext(env);
+
+        Queue queue = (Queue) _context.lookup("queue");
+
+        //Create Client 1
+        _clientConnection = ((ConnectionFactory) 
_context.lookup("connection")).createConnection();
+
+        _clientSession = _clientConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        _consumer = _clientSession.createConsumer(queue);
+
+        //Create Producer
+        _producerConnection = ((ConnectionFactory) 
_context.lookup("connection")).createConnection();
+
+        _producerConnection.start();
+
+        _producerSession = _producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        _producer = _producerSession.createProducer(queue);
+
+        for (int msg = 0; msg < MSG_COUNT; msg++)
+        {
+            _producer.send(_producerSession.createTextMessage("Message " + 
msg));
+        }
+
+    }
+
+    protected void tearDown() throws Exception
+    {
+        assertEquals("Messages not received correctly", 0, 
_allFirstMessagesSent.getCount());
+        assertEquals("Messages not received correctly", 0, 
_allSecondMessagesSent.getCount());
+        assertEquals("Client didn't get all messages", MSG_COUNT * 2, 
_receivedCount);
+        assertEquals("Messages received while stopped is not 0", 0, 
_receivedCountWhileStopped);
+
+        _clientConnection.close();
+
+        _producerConnection.close();
+        super.tearDown();
+        TransportConnection.killAllVMBrokers();
+    }
+
+
+    public void testAsynchronousRecieve()
+    {
+
+        _logger.info("Test Start");
+
+        //Set default Message Listener
+        try
+        {
+            _consumer.setMessageListener(new MessageListener()
+            {
+                public void onMessage(Message message)
+                {
+                    _logger.info("Client 1 ML 1 Received Message(" + 
_receivedCount + "):" + message);
+
+                    _receivedCount++;
+
+                    if (_receivedCount == MSG_COUNT)
+                    {
+                        _allFirstMessagesSent.countDown();
+                    }
+
+                    if (_connectionStopped)
+                    {
+                        _logger.info("Running with Message:" + _receivedCount);
+                    }
+
+                    if (_connectionStopped && _allFirstMessagesSent.getCount() 
== 0)
+                    {
+                        _receivedCountWhileStopped++;
+                    }
+
+                    if (_allFirstMessagesSent.getCount() == 0)
+                    {
+                        if (_receivedCount == MSG_COUNT * 2)
+                        {
+                            _allSecondMessagesSent.countDown();
+                        }
+                    }
+                }
+            });
+
+
+            // FIXME Note : Should we need to call start to be able to call 
stop?
+            //_clientConnection.start();
+        }
+        catch (JMSException e)
+        {
+            _logger.error("Error Setting Default ML on consumer1");
+        }
+
+
+        try
+        {
+            _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException e)
+        {
+            //do nothing
+        }
+
+        try
+        {
+            _clientConnection.stop();
+            _connectionStopped = true;
+        }
+        catch (JMSException e)
+        {
+            _logger.error("Error stopping connection");
+        }
+
+
+        try
+        {
+            _logger.error("Send additional messages");
+
+            for (int msg = 0; msg < MSG_COUNT; msg++)
+            {
+                _producer.send(_producerSession.createTextMessage("Message " + 
msg));
+            }
+        }
+        catch (JMSException e)
+        {
+            _logger.error("Unable to send additional messages", e);
+        }
+
+
+        try
+        {
+            Thread.sleep(1000);
+        }
+        catch (InterruptedException e)
+        {
+            //ignore
+        }
+
+        try
+        {
+            _logger.info("Restarting connection");
+
+            _connectionStopped = false;
+            _clientConnection.start();
+        }
+        catch (JMSException e)
+        {
+            _logger.error("Error Setting Better ML on consumer1", e);
+        }
+
+
+        _logger.info("Waiting upto 2 seconds for messages");
+
+        try
+        {
+            _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException e)
+        {
+            //do nothing
+        }
+    }
+
+
+    public static junit.framework.Test suite()
+    {
+        return new junit.framework.TestSuite(DispatcherTest.class);
+    }
+}

Propchange: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to