Author: rgreig
Date: Mon Jan 29 03:02:57 2007
New Revision: 501004

URL: http://svn.apache.org/viewvc?view=rev&rev=501004
Log:
QPID-320 : Patch supplied by Rob Godfrey - Simplify logic to deal with setting 
MessageListener only after connection start has been called

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=501004&r1=501003&r2=501004
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Mon Jan 29 03:02:57 2007
@@ -69,6 +69,8 @@
 
     private MessageListener _messageListener = null;
 
+    private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
+
     /**
      * Used to reference durable subscribers so they requests for unsubscribe 
can be handled
      * correctly.  Note this only keeps a record of subscriptions which have 
been created
@@ -155,27 +157,14 @@
      */
     private boolean _inRecovery;
 
-    public void doDispatcherTask(DispatcherCallback dispatcherCallback)
-    {
-        synchronized (this)
-        {
-            _dispatcher.pause();
-
-            dispatcherCallback.whilePaused(_reprocessQueue);
-
-            _dispatcher.reprocess();
-        }
-    }
-
+    private boolean _hasMessageListeners;
 
     /**
      * Responsible for decoding a message fragment and passing it to the 
appropriate message consumer.
      */
+    
     private class Dispatcher extends Thread
     {
-        private final Logger _logger = Logger.getLogger(Dispatcher.class);
-        private boolean _reDispatching = true;
-
         public Dispatcher()
         {
             super("Dispatcher-Channel-" + _channelId);
@@ -183,121 +172,32 @@
 
         public void run()
         {
-            _stopped.set(false);
-
-            while (!_stopped.get())
-            {
-                synchronized (_pausingDispatcher)
-                {
-                    if (_pausingDispatcher.get())
-                    {
-                        try
-                        {
-
-                            _pausingDispatcher.set(false);
-
-                            //Wait to continue with pause code.
-                            synchronized (_pausedDispatcher)
-                            {
-                                _pausedDispatcher.notify();
-                            }
-
-                            _reDispatching = true;
-
-                            _logger.info("Dispatcher paused");
-                            _pausingDispatcher.wait();
-                            _logger.info("Dispatcher notified");
-
-                        }
-                        catch (InterruptedException e)
-                        {
-                            _logger.info("dispacher interrupted");
-                        }
-                    }
-                }
-
-                if (_reDispatching)
-                {
-                    doReDispatch();
-                }
-                else
-                {
-                    doNormalDispatch();
-                }
-
-            }
-
-
-            _logger.info("Dispatcher thread terminating for channel " + 
_channelId);
-        }
-
-        private void doNormalDispatch()
-        {
             UnprocessedMessage message;
+            _stopped.set(false);
             try
             {
-                while (!_stopped.get() && !_pausingDispatcher.get() && 
(message = (UnprocessedMessage) _queue.take()) != null)
+                while (!_stopped.get() && (message = (UnprocessedMessage) 
_queue.take()) != null)
                 {
                     dispatchMessage(message);
                 }
             }
             catch (InterruptedException e)
             {
-                _logger.info("dispatcher normal dispatch interrupted");
-            }
-
-        }
-
-        private void doReDispatch()
-        {
-            _logger.info("doRedispatching");
-
-            MessageConsumerPair messageConsumerPair;
-
-            if (_reprocessQueue != null)
-            {
-                _logger.info("Reprocess Queue has size:" + 
_reprocessQueue.size());
-                while (!_stopped.get() && ((messageConsumerPair = 
_reprocessQueue.poll()) != null))
-                {
-                    reDispatchMessage(messageConsumerPair);
-                }
-            }
-
-            if (_reprocessQueue == null || _reprocessQueue.isEmpty())
-            {
-                _logger.info("Reprocess Queue emptied");
-
-                _reDispatching = false;
-            }
-            else
-            {
-                _logger.info("Reprocess Queue still contains contains:" + 
_reprocessQueue.size());
-            }
-
-        }
-
-        private void reDispatchMessage(MessageConsumerPair consumerPair)
-        {
-            if (consumerPair.getItem() instanceof AbstractJMSMessage)
-            {
-                _logger.info("do renotify:" + consumerPair.getItem());
-                consumerPair.getConsumer().notifyMessage((AbstractJMSMessage) 
consumerPair.getItem(), _channelId);
+                ;
             }
 
-            //    BasicMessageConsumer.notifyError(Throwable cause)
-            // will put the cause in to the list which could come out here... 
need to watch this.
+            _logger.info("Dispatcher thread terminating for channel " + 
_channelId);
         }
 
-
         private void dispatchMessage(UnprocessedMessage message)
         {
-            if (message.deliverBody != null)
+            if (message.getDeliverBody() != null)
             {
-                final BasicMessageConsumer consumer = (BasicMessageConsumer) 
_consumers.get(message.deliverBody.consumerTag);
+                final BasicMessageConsumer consumer = (BasicMessageConsumer) 
_consumers.get(message.getDeliverBody().consumerTag);
 
                 if (consumer == null)
                 {
-                    _logger.warn("Received a message from queue " + 
message.deliverBody.consumerTag + " without a handler - ignoring...");
+                    _logger.warn("Received a message from queue " + 
message.getDeliverBody().consumerTag + " without a handler - ignoring...");
                     _logger.warn("Consumers that exist: " + _consumers);
                     _logger.warn("Session hashcode: " + 
System.identityHashCode(this));
                 }
@@ -315,11 +215,13 @@
                     // Bounced message is processed here, away from the mina 
thread
                     AbstractJMSMessage bouncedMessage = 
_messageFactoryRegistry.createMessage(0,
                                                                                
               false,
-                                                                               
               message.contentHeader,
-                                                                               
               message.bodies);
+                                                                               
               message.getBounceBody().exchange,
+                                                                               
               message.getBounceBody().routingKey,
+                                                                               
               message.getContentHeader(),
+                                                                               
               message.getBodies());
 
-                    int errorCode = message.bounceBody.replyCode;
-                    AMQShortString reason = message.bounceBody.replyText;
+                    int errorCode = message.getBounceBody().replyCode;
+                    AMQShortString reason = message.getBounceBody().replyText;
                     _logger.debug("Message returned with error code " + 
errorCode + " (" + reason + ")");
 
                     //@TODO should this be moved to an exception handler of 
sorts. Somewhere errors are converted to correct execeptions.
@@ -349,37 +251,9 @@
             _stopped.set(true);
             interrupt();
         }
+    }
 
-        public void pause()
-        {
-            _logger.info("pausing");
-
-            synchronized (_pausedDispatcher)
-            {
-                _pausingDispatcher.set(true);
-
-                interrupt();
-
-                try
-                {
-                    _pausedDispatcher.wait();
-                }
-                catch (InterruptedException e)
-                {
-                    //do nothing
-                }
-            }
-        }
 
-        public void reprocess()
-        {
-            synchronized (_pausingDispatcher)
-            {
-                _logger.info("reprocessing");
-                _pausingDispatcher.notify();
-            }
-        }
-    }
 
     AMQSession(AMQConnection con, int channelId, boolean transacted, int 
acknowledgeMode,
                MessageFactoryRegistry messageFactoryRegistry)
@@ -1708,14 +1582,36 @@
 
     void start()
     {
-        if (_dispatcher != null)
+        if (_startedAtLeastOnce.getAndSet(true))
         {
             //then we stopped this and are restarting, so signal server to 
resume delivery
             unsuspendChannel();
         }
-        _dispatcher = new Dispatcher();
-        _dispatcher.setDaemon(true);
-        _dispatcher.start();
+
+        if(hasMessageListeners() && _dispatcher == null)
+        {
+            startDistpatcherIfNecessary();
+        }
+    }
+
+    private boolean hasMessageListeners()
+    {
+        return _hasMessageListeners;
+    }
+
+    void setHasMessageListeners()
+    {
+        _hasMessageListeners = true;
+    }
+
+    synchronized void startDistpatcherIfNecessary()
+    {
+        if(_dispatcher == null)
+        {
+            _dispatcher = new Dispatcher();
+            _dispatcher.setDaemon(true);
+            _dispatcher.start();
+        }
     }
 
     void stop()

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=501004&r1=501003&r2=501004
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Mon Jan 29 03:02:57 2007
@@ -37,7 +37,6 @@
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import javax.jms.Destination;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -221,6 +220,9 @@
         if (_session.isStopped())
         {
             _messageListener.set(messageListener);
+            _session.setHasMessageListeners();
+            _session.startDistpatcherIfNecessary();
+
             if (_logger.isDebugEnabled())
             {
                 _logger.debug("Session stopped : Message listener(" + 
messageListener + ") set for destination " + _destination);
@@ -246,25 +248,10 @@
 
                 synchronized (_session)
                 {
-                    //Pause Dispatcher
-                    _session.doDispatcherTask(new DispatcherCallback(this)
-                    {
-                        public void whilePaused(Queue<MessageConsumerPair> 
reprocessQueue)
-                        {
-                            // Prepend messages in _synchronousQueue to 
dispatcher queue
-                            _logger.debug("ReprocessQueue current size:" + 
reprocessQueue.size());
-                            for (Object item : _synchronousQueue)
-                            {
-                                reprocessQueue.offer(new 
MessageConsumerPair(_consumer, item));
-                            }
-                            _logger.debug("Added items to reprocessQueue:" + 
reprocessQueue.size());
-
-                            // Set Message Listener
-                            _logger.debug("Set Message Listener");
-                            _messageListener.set(messageListener);
-                        }
-                    }
-                    );
+                    
+                    _messageListener.set(messageListener);
+                    _session.setHasMessageListeners();
+                    _session.startDistpatcherIfNecessary();
                 }
             }
         }
@@ -272,9 +259,6 @@
 
     private void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws 
JMSException
     {
-        byte[] url = 
jmsMsg.getBytesProperty(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.getShortStringName());
-        Destination dest = AMQDestination.createDestination(url);
-        jmsMsg.setJMSDestination(dest);
 
         if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
         {
@@ -345,6 +329,8 @@
 
     public Message receive(long l) throws JMSException
     {
+        _session.startDistpatcherIfNecessary();
+
         checkPreConditions();
 
         acquireReceiving();
@@ -399,6 +385,8 @@
 
     public Message receiveNoWait() throws JMSException
     {
+        _session.startDistpatcherIfNecessary();
+
         checkPreConditions();
 
         acquireReceiving();
@@ -520,14 +508,16 @@
 
         if (debug)
         {
-            _logger.debug("notifyMessage called with message number " + 
messageFrame.deliverBody.deliveryTag);
+            _logger.debug("notifyMessage called with message number " + 
messageFrame.getDeliverBody().deliveryTag);
         }
         try
         {
-            AbstractJMSMessage jmsMessage = 
_messageFactory.createMessage(messageFrame.deliverBody.deliveryTag,
-                                                                          
messageFrame.deliverBody.redelivered,
-                                                                          
messageFrame.contentHeader,
-                                                                          
messageFrame.bodies);
+            AbstractJMSMessage jmsMessage = 
_messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
+                                                                          
messageFrame.getDeliverBody().redelivered,
+                                                                          
messageFrame.getDeliverBody().exchange,
+                                                                          
messageFrame.getDeliverBody().routingKey,
+                                                                          
messageFrame.getContentHeader(),
+                                                                          
messageFrame.getBodies());
 
             if (debug)
             {


Reply via email to