Author: arnaudsimon
Date: Fri Aug  3 07:52:43 2007
New Revision: 562489

URL: http://svn.apache.org/viewvc?view=rev&rev=562489
Log:
implemented message dispatching thread 

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java?view=diff&rev=562489&r1=562488&r2=562489
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
 Fri Aug  3 07:52:43 2007
@@ -37,17 +37,23 @@
     /**
      * Indicates whether this MessageActor is closed.
      */
-    boolean _isClosed = false;
+    private boolean _isClosed = false;
 
     /**
      * This messageActor's session
      */
-    SessionImpl _session;
+    private SessionImpl _session;
 
     /**
      * The JMS destination this actor is set for.
      */
-    DestinationImpl _destination;
+    private DestinationImpl _destination;
+
+
+    /**
+     * The ID of this actor for the session.
+     */
+    private String _messageActorID;
 
     //-- Constructor
 
@@ -140,5 +146,16 @@
     {
         return _session;
     }
+
+    /**
+     * Get the ID of this actor within its session.
+     *  
+     * @return This actor ID.
+     */
+    protected String getMessageActorID()
+    {
+        return _messageActorID;
+    }
+
 
 }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java?view=diff&rev=562489&r1=562488&r2=562489
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
 Fri Aug  3 07:52:43 2007
@@ -28,9 +28,9 @@
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Vector;
 import java.util.LinkedList;
+import java.util.HashMap;
 
 /**
  * Implementation of the JMS Session interface
@@ -54,7 +54,7 @@
     private boolean _hasStopped = false;
 
     /**
-     * lock for the sessionThread to wiat on when the session is stopped
+     * lock for the sessionThread to wait until the session is stopped
      */
     private Object _stoppingLock = new Object();
 
@@ -63,11 +63,16 @@
      */
     private Object _stoppingJoin = new Object();
 
+    /**
+     * thread to dispatch messages to async consumers
+     */
+    private MessageDispatcherThread _messageDispatcherThread = null;
+
 
     /**
      * The messageActors of this session.
      */
-    private ArrayList<MessageActor> _messageActors = new 
ArrayList<MessageActor>();
+    private HashMap<String, MessageActor> _messageActors = new HashMap<String, 
MessageActor>();
 
     /**
      * All the not yet acknoledged messages
@@ -151,6 +156,10 @@
         {
             throw ExceptionHelper.convertQpidExceptionToJMSException(e);
         }
+        // Create and start a MessageDispatcherThread
+        // This thread is dispatching messages to the async consumers
+        _messageDispatcherThread = new MessageDispatcherThread();
+        _messageDispatcherThread.start();
     }
 
     //--- javax.jms.Session API
@@ -362,10 +371,43 @@
     {
         if (!_isClosed)
         {
+            _messageDispatcherThread.interrupt();
+            if (!_isClosing)
+            {
+                _isClosing = true;
+                // if the session is stopped then restart it before notifying 
on the lock
+                // that will stop the sessionThread
+                if (_isStopped)
+                {
+                    start();
+                }
+
+                //stop the sessionThread
+                synchronized (_incomingAsynchronousMessages)
+                {
+                    _incomingAsynchronousMessages.notifyAll();
+                }
+
+                try
+                {
+                    _messageDispatcherThread.join();
+                    _messageDispatcherThread = null;
+                }
+                catch (InterruptedException ie)
+                {
+                    /* ignore */
+                }
+            }
             // from now all the session methods will throw a 
IllegalStateException
             _isClosed = true;
             // close all the actors
             closeAllActors();
+            _messageActors.clear();
+            synchronized (_incomingAsynchronousMessages)
+            {
+                _incomingAsynchronousMessages.clear();
+                _incomingAsynchronousMessages.notifyAll();
+            }
             // close the underlaying QpidSession
             try
             {
@@ -375,6 +417,7 @@
             {
                 throw ExceptionHelper.convertQpidExceptionToJMSException(e);
             }
+
         }
     }
 
@@ -466,7 +509,7 @@
         checkNotClosed();
         MessageProducerImpl producer = new MessageProducerImpl(this, 
(DestinationImpl) destination);
         // register this actor with the session
-        _messageActors.add(producer);
+        _messageActors.put(producer.getMessageActorID(), producer);
         return producer;
     }
 
@@ -523,7 +566,7 @@
         checkDestination(destination);
         MessageConsumerImpl consumer = new MessageConsumerImpl(this, 
(DestinationImpl) destination, messageSelector, noLocal, null);
         // register this actor with the session
-        _messageActors.add(consumer);
+        _messageActors.put(consumer.getMessageActorID(), consumer);
         return consumer;
     }
 
@@ -610,7 +653,7 @@
         checkNotClosed();
         checkDestination(topic);
         TopicSubscriberImpl subscriber = new TopicSubscriberImpl(this, topic, 
messageSelector, noLocal, _connection.getClientID() + ":" + name);
-        _messageActors.add(subscriber);
+        _messageActors.put(subscriber.getMessageActorID(), subscriber);
         return subscriber;
     }
 
@@ -643,7 +686,7 @@
         checkDestination(queue);
         QueueBrowserImpl browser = new QueueBrowserImpl(this, queue, 
messageSelector);
         // register this actor with the session
-        _messageActors.add(browser);
+        _messageActors.put(browser.getMessageActorID(), browser);
         return browser;
     }
 
@@ -710,7 +753,18 @@
      */
     protected void start() throws JMSException
     {
-        // TODO: make sure that the correct options are used
+        if (_isStopped)
+        {
+            synchronized (_stoppingLock)
+            {
+                _isStopped = false;
+                _stoppingLock.notify();
+            }
+            synchronized (_stoppingJoin)
+            {
+                _hasStopped = false;
+            }
+        }
     }
 
     /**
@@ -720,7 +774,30 @@
      */
     protected void stop() throws JMSException
     {
-        // TODO: make sure that the correct options are used
+        if (!_isClosing && !_isStopped)
+        {
+            synchronized (_incomingAsynchronousMessages)
+            {
+                _isStopped = true;
+                // unlock the sessionThread that will then wait on 
_stoppingLock
+                _incomingAsynchronousMessages.notifyAll();
+            }
+            // wait for the sessionThread to stop processing messages
+            synchronized (_stoppingJoin)
+            {
+                while (!_hasStopped)
+                {
+                    try
+                    {
+                        _stoppingJoin.wait();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        /* ignore */
+                    }
+                }
+            }
+        }
     }
 
     /**
@@ -847,7 +924,7 @@
      */
     private void closeAllActors() throws JMSException
     {
-        for (MessageActor messageActor : _messageActors)
+        for (MessageActor messageActor : _messageActors.values())
         {
             messageActor.closeMessageActor();
         }
@@ -861,8 +938,6 @@
      * This thread is responsible for removing messages from 
m_incomingMessages and
      * dispatching them to the appropriate MessageConsumer.
      * <p> Messages have to be dispatched serially.
-     *
-     * @message runtimeExceptionThrownByOnMessage Warning! Asynchronous 
message consumer {0} from session {1} has thrown a RunTimeException "{2}".
      */
     private class MessageDispatcherThread extends Thread
     {
@@ -932,27 +1007,27 @@
                     }
                 }
 
-              /*  if (message != null)
+                if (message != null)
                 {
                     MessageConsumerImpl mc;
-                    synchronized (_actors)
+                    synchronized (_messageActors)
                     {
-                        mc = (MessageConsumerImpl) 
m_actors.get(actorMessage.consumerID);
+                        mc = null; // todo 
_messageActors.get(message.consumerID);
                     }
                     boolean consumed = false;
                     if (mc != null)
                     {
                         try
                         {
-                            consumed = 
mc.onMessage(actorMessage.genericMessage);
+                            // todo call onMessage
                         }
                         catch (RuntimeException t)
                         {
                             // the JMS specification tells us to flag that to 
the client!
-                            log.errorb(SessionThread.class.getName(), 
"runtimeExceptionThrownByOnMessage", new Object[]{mc, m_sessionID, t}, t);
+                            _logger.error("Warning! Asynchronous message 
consumer" + mc + " from session " + this + " has thrown a RunTimeException " + 
t);
                         }
                     }
-                } */
+                }
                 message = null;
             }
             while (!_isClosing);   // repeat as long as this session is not 
closing


Reply via email to