Author: arnaudsimon
Date: Fri Aug 3 07:52:43 2007
New Revision: 562489
URL: http://svn.apache.org/viewvc?view=rev&rev=562489
Log:
implemented message dispatching thread
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java?view=diff&rev=562489&r1=562488&r2=562489
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
Fri Aug 3 07:52:43 2007
@@ -37,17 +37,23 @@
/**
* Indicates whether this MessageActor is closed.
*/
- boolean _isClosed = false;
+ private boolean _isClosed = false;
/**
* This messageActor's session
*/
- SessionImpl _session;
+ private SessionImpl _session;
/**
* The JMS destination this actor is set for.
*/
- DestinationImpl _destination;
+ private DestinationImpl _destination;
+
+
+ /**
+ * The ID of this actor for the session.
+ */
+ private String _messageActorID;
//-- Constructor
@@ -140,5 +146,16 @@
{
return _session;
}
+
+ /**
+ * Get the ID of this actor within its session.
+ *
+ * @return This actor ID.
+ */
+ protected String getMessageActorID()
+ {
+ return _messageActorID;
+ }
+
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java?view=diff&rev=562489&r1=562488&r2=562489
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
Fri Aug 3 07:52:43 2007
@@ -28,9 +28,9 @@
import javax.jms.Message;
import javax.jms.MessageListener;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Vector;
import java.util.LinkedList;
+import java.util.HashMap;
/**
* Implementation of the JMS Session interface
@@ -54,7 +54,7 @@
private boolean _hasStopped = false;
/**
- * lock for the sessionThread to wiat on when the session is stopped
+ * lock for the sessionThread to wait until the session is stopped
*/
private Object _stoppingLock = new Object();
@@ -63,11 +63,16 @@
*/
private Object _stoppingJoin = new Object();
+ /**
+ * thread to dispatch messages to async consumers
+ */
+ private MessageDispatcherThread _messageDispatcherThread = null;
+
/**
* The messageActors of this session.
*/
- private ArrayList<MessageActor> _messageActors = new
ArrayList<MessageActor>();
+ private HashMap<String, MessageActor> _messageActors = new HashMap<String,
MessageActor>();
/**
* All the not yet acknoledged messages
@@ -151,6 +156,10 @@
{
throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
+ // Create and start a MessageDispatcherThread
+ // This thread is dispatching messages to the async consumers
+ _messageDispatcherThread = new MessageDispatcherThread();
+ _messageDispatcherThread.start();
}
//--- javax.jms.Session API
@@ -362,10 +371,43 @@
{
if (!_isClosed)
{
+ _messageDispatcherThread.interrupt();
+ if (!_isClosing)
+ {
+ _isClosing = true;
+ // if the session is stopped then restart it before notifying
on the lock
+ // that will stop the sessionThread
+ if (_isStopped)
+ {
+ start();
+ }
+
+ //stop the sessionThread
+ synchronized (_incomingAsynchronousMessages)
+ {
+ _incomingAsynchronousMessages.notifyAll();
+ }
+
+ try
+ {
+ _messageDispatcherThread.join();
+ _messageDispatcherThread = null;
+ }
+ catch (InterruptedException ie)
+ {
+ /* ignore */
+ }
+ }
// from now all the session methods will throw a
IllegalStateException
_isClosed = true;
// close all the actors
closeAllActors();
+ _messageActors.clear();
+ synchronized (_incomingAsynchronousMessages)
+ {
+ _incomingAsynchronousMessages.clear();
+ _incomingAsynchronousMessages.notifyAll();
+ }
// close the underlaying QpidSession
try
{
@@ -375,6 +417,7 @@
{
throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
+
}
}
@@ -466,7 +509,7 @@
checkNotClosed();
MessageProducerImpl producer = new MessageProducerImpl(this,
(DestinationImpl) destination);
// register this actor with the session
- _messageActors.add(producer);
+ _messageActors.put(producer.getMessageActorID(), producer);
return producer;
}
@@ -523,7 +566,7 @@
checkDestination(destination);
MessageConsumerImpl consumer = new MessageConsumerImpl(this,
(DestinationImpl) destination, messageSelector, noLocal, null);
// register this actor with the session
- _messageActors.add(consumer);
+ _messageActors.put(consumer.getMessageActorID(), consumer);
return consumer;
}
@@ -610,7 +653,7 @@
checkNotClosed();
checkDestination(topic);
TopicSubscriberImpl subscriber = new TopicSubscriberImpl(this, topic,
messageSelector, noLocal, _connection.getClientID() + ":" + name);
- _messageActors.add(subscriber);
+ _messageActors.put(subscriber.getMessageActorID(), subscriber);
return subscriber;
}
@@ -643,7 +686,7 @@
checkDestination(queue);
QueueBrowserImpl browser = new QueueBrowserImpl(this, queue,
messageSelector);
// register this actor with the session
- _messageActors.add(browser);
+ _messageActors.put(browser.getMessageActorID(), browser);
return browser;
}
@@ -710,7 +753,18 @@
*/
protected void start() throws JMSException
{
- // TODO: make sure that the correct options are used
+ if (_isStopped)
+ {
+ synchronized (_stoppingLock)
+ {
+ _isStopped = false;
+ _stoppingLock.notify();
+ }
+ synchronized (_stoppingJoin)
+ {
+ _hasStopped = false;
+ }
+ }
}
/**
@@ -720,7 +774,30 @@
*/
protected void stop() throws JMSException
{
- // TODO: make sure that the correct options are used
+ if (!_isClosing && !_isStopped)
+ {
+ synchronized (_incomingAsynchronousMessages)
+ {
+ _isStopped = true;
+ // unlock the sessionThread that will then wait on
_stoppingLock
+ _incomingAsynchronousMessages.notifyAll();
+ }
+ // wait for the sessionThread to stop processing messages
+ synchronized (_stoppingJoin)
+ {
+ while (!_hasStopped)
+ {
+ try
+ {
+ _stoppingJoin.wait();
+ }
+ catch (InterruptedException e)
+ {
+ /* ignore */
+ }
+ }
+ }
+ }
}
/**
@@ -847,7 +924,7 @@
*/
private void closeAllActors() throws JMSException
{
- for (MessageActor messageActor : _messageActors)
+ for (MessageActor messageActor : _messageActors.values())
{
messageActor.closeMessageActor();
}
@@ -861,8 +938,6 @@
* This thread is responsible for removing messages from
m_incomingMessages and
* dispatching them to the appropriate MessageConsumer.
* <p> Messages have to be dispatched serially.
- *
- * @message runtimeExceptionThrownByOnMessage Warning! Asynchronous
message consumer {0} from session {1} has thrown a RunTimeException "{2}".
*/
private class MessageDispatcherThread extends Thread
{
@@ -932,27 +1007,27 @@
}
}
- /* if (message != null)
+ if (message != null)
{
MessageConsumerImpl mc;
- synchronized (_actors)
+ synchronized (_messageActors)
{
- mc = (MessageConsumerImpl)
m_actors.get(actorMessage.consumerID);
+ mc = null; // todo
_messageActors.get(message.consumerID);
}
boolean consumed = false;
if (mc != null)
{
try
{
- consumed =
mc.onMessage(actorMessage.genericMessage);
+ // todo call onMessage
}
catch (RuntimeException t)
{
// the JMS specification tells us to flag that to
the client!
- log.errorb(SessionThread.class.getName(),
"runtimeExceptionThrownByOnMessage", new Object[]{mc, m_sessionID, t}, t);
+ _logger.error("Warning! Asynchronous message
consumer" + mc + " from session " + this + " has thrown a RunTimeException " +
t);
}
}
- } */
+ }
message = null;
}
while (!_isClosing); // repeat as long as this session is not
closing