I figured that part the hard way
On 8/3/07, Rafael Schloming <[EMAIL PROTECTED]> wrote: > > This change broke the build. Changing the _destination field in > MessageActor to private caused compilation errors in other classes that > depend on package access to that field. > > I've changed it back to package access for the moment, but you may want > to add a getter or alter the other classes if you intended to make it > private. > > Please remember that maven doesn't understand java dependencies, it just > rebuilds based on timestamp. That means when you change an interface or > alter the non private signature of any class or method you MUST do an > mvn clean; mvn install in order to be sure your change really does > compile. > > --Rafael > > [EMAIL PROTECTED] wrote: > > Author: arnaudsimon > > Date: Fri Aug 3 07:52:43 2007 > > New Revision: 562489 > > > > URL: http://svn.apache.org/viewvc?view=rev&rev=562489 > > Log: > > implemented message dispatching thread > > > > Modified: > > > incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java > > > incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java > > > > Modified: > incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java > > URL: > http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java?view=diff&rev=562489&r1=562488&r2=562489 > > > ============================================================================== > > --- > incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java > (original) > > +++ > incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java > Fri Aug 3 07:52:43 2007 > > @@ -37,17 +37,23 @@ > > /** > > * Indicates whether this MessageActor is closed. > > */ > > - boolean _isClosed = false; > > + private boolean _isClosed = false; > > > > /** > > * This messageActor's session > > */ > > - SessionImpl _session; > > + private SessionImpl _session; > > > > /** > > * The JMS destination this actor is set for. > > */ > > - DestinationImpl _destination; > > + private DestinationImpl _destination; > > + > > + > > + /** > > + * The ID of this actor for the session. > > + */ > > + private String _messageActorID; > > > > //-- Constructor > > > > @@ -140,5 +146,16 @@ > > { > > return _session; > > } > > + > > + /** > > + * Get the ID of this actor within its session. > > + * > > + * @return This actor ID. > > + */ > > + protected String getMessageActorID() > > + { > > + return _messageActorID; > > + } > > + > > > > } > > > > Modified: > incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java > > URL: > http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java?view=diff&rev=562489&r1=562488&r2=562489 > > > ============================================================================== > > --- > incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java > (original) > > +++ > incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java > Fri Aug 3 07:52:43 2007 > > @@ -28,9 +28,9 @@ > > import javax.jms.Message; > > import javax.jms.MessageListener; > > import java.io.Serializable; > > -import java.util.ArrayList; > > import java.util.Vector; > > import java.util.LinkedList; > > +import java.util.HashMap; > > > > /** > > * Implementation of the JMS Session interface > > @@ -54,7 +54,7 @@ > > private boolean _hasStopped = false; > > > > /** > > - * lock for the sessionThread to wiat on when the session is > stopped > > + * lock for the sessionThread to wait until the session is stopped > > */ > > private Object _stoppingLock = new Object(); > > > > @@ -63,11 +63,16 @@ > > */ > > private Object _stoppingJoin = new Object(); > > > > + /** > > + * thread to dispatch messages to async consumers > > + */ > > + private MessageDispatcherThread _messageDispatcherThread = null; > > + > > > > /** > > * The messageActors of this session. > > */ > > - private ArrayList<MessageActor> _messageActors = new > ArrayList<MessageActor>(); > > + private HashMap<String, MessageActor> _messageActors = new > HashMap<String, MessageActor>(); > > > > /** > > * All the not yet acknoledged messages > > @@ -151,6 +156,10 @@ > > { > > throw ExceptionHelper.convertQpidExceptionToJMSException > (e); > > } > > + // Create and start a MessageDispatcherThread > > + // This thread is dispatching messages to the async consumers > > + _messageDispatcherThread = new MessageDispatcherThread(); > > + _messageDispatcherThread.start(); > > } > > > > //--- javax.jms.Session API > > @@ -362,10 +371,43 @@ > > { > > if (!_isClosed) > > { > > + _messageDispatcherThread.interrupt(); > > + if (!_isClosing) > > + { > > + _isClosing = true; > > + // if the session is stopped then restart it before > notifying on the lock > > + // that will stop the sessionThread > > + if (_isStopped) > > + { > > + start(); > > + } > > + > > + //stop the sessionThread > > + synchronized (_incomingAsynchronousMessages) > > + { > > + _incomingAsynchronousMessages.notifyAll(); > > + } > > + > > + try > > + { > > + _messageDispatcherThread.join(); > > + _messageDispatcherThread = null; > > + } > > + catch (InterruptedException ie) > > + { > > + /* ignore */ > > + } > > + } > > // from now all the session methods will throw a > IllegalStateException > > _isClosed = true; > > // close all the actors > > closeAllActors(); > > + _messageActors.clear(); > > + synchronized (_incomingAsynchronousMessages) > > + { > > + _incomingAsynchronousMessages.clear(); > > + _incomingAsynchronousMessages.notifyAll(); > > + } > > // close the underlaying QpidSession > > try > > { > > @@ -375,6 +417,7 @@ > > { > > throw > ExceptionHelper.convertQpidExceptionToJMSException(e); > > } > > + > > } > > } > > > > @@ -466,7 +509,7 @@ > > checkNotClosed(); > > MessageProducerImpl producer = new MessageProducerImpl(this, > (DestinationImpl) destination); > > // register this actor with the session > > - _messageActors.add(producer); > > + _messageActors.put(producer.getMessageActorID(), producer); > > return producer; > > } > > > > @@ -523,7 +566,7 @@ > > checkDestination(destination); > > MessageConsumerImpl consumer = new MessageConsumerImpl(this, > (DestinationImpl) destination, messageSelector, noLocal, null); > > // register this actor with the session > > - _messageActors.add(consumer); > > + _messageActors.put(consumer.getMessageActorID(), consumer); > > return consumer; > > } > > > > @@ -610,7 +653,7 @@ > > checkNotClosed(); > > checkDestination(topic); > > TopicSubscriberImpl subscriber = new TopicSubscriberImpl(this, > topic, messageSelector, noLocal, _connection.getClientID() + ":" + name); > > - _messageActors.add(subscriber); > > + _messageActors.put(subscriber.getMessageActorID(), subscriber); > > return subscriber; > > } > > > > @@ -643,7 +686,7 @@ > > checkDestination(queue); > > QueueBrowserImpl browser = new QueueBrowserImpl(this, queue, > messageSelector); > > // register this actor with the session > > - _messageActors.add(browser); > > + _messageActors.put(browser.getMessageActorID(), browser); > > return browser; > > } > > > > @@ -710,7 +753,18 @@ > > */ > > protected void start() throws JMSException > > { > > - // TODO: make sure that the correct options are used > > + if (_isStopped) > > + { > > + synchronized (_stoppingLock) > > + { > > + _isStopped = false; > > + _stoppingLock.notify(); > > + } > > + synchronized (_stoppingJoin) > > + { > > + _hasStopped = false; > > + } > > + } > > } > > > > /** > > @@ -720,7 +774,30 @@ > > */ > > protected void stop() throws JMSException > > { > > - // TODO: make sure that the correct options are used > > + if (!_isClosing && !_isStopped) > > + { > > + synchronized (_incomingAsynchronousMessages) > > + { > > + _isStopped = true; > > + // unlock the sessionThread that will then wait on > _stoppingLock > > + _incomingAsynchronousMessages.notifyAll(); > > + } > > + // wait for the sessionThread to stop processing messages > > + synchronized (_stoppingJoin) > > + { > > + while (!_hasStopped) > > + { > > + try > > + { > > + _stoppingJoin.wait(); > > + } > > + catch (InterruptedException e) > > + { > > + /* ignore */ > > + } > > + } > > + } > > + } > > } > > > > /** > > @@ -847,7 +924,7 @@ > > */ > > private void closeAllActors() throws JMSException > > { > > - for (MessageActor messageActor : _messageActors) > > + for (MessageActor messageActor : _messageActors.values()) > > { > > messageActor.closeMessageActor(); > > } > > @@ -861,8 +938,6 @@ > > * This thread is responsible for removing messages from > m_incomingMessages and > > * dispatching them to the appropriate MessageConsumer. > > * <p> Messages have to be dispatched serially. > > - * > > - * @message runtimeExceptionThrownByOnMessage Warning! Asynchronous > message consumer {0} from session {1} has thrown a RunTimeException "{2}". > > */ > > private class MessageDispatcherThread extends Thread > > { > > @@ -932,27 +1007,27 @@ > > } > > } > > > > - /* if (message != null) > > + if (message != null) > > { > > MessageConsumerImpl mc; > > - synchronized (_actors) > > + synchronized (_messageActors) > > { > > - mc = (MessageConsumerImpl) m_actors.get( > actorMessage.consumerID); > > + mc = null; // todo _messageActors.get( > message.consumerID); > > } > > boolean consumed = false; > > if (mc != null) > > { > > try > > { > > - consumed = mc.onMessage( > actorMessage.genericMessage); > > + // todo call onMessage > > } > > catch (RuntimeException t) > > { > > // the JMS specification tells us to flag > that to the client! > > - log.errorb(SessionThread.class.getName(), > "runtimeExceptionThrownByOnMessage", new Object[]{mc, m_sessionID, t}, t); > > + _logger.error("Warning! Asynchronous > message consumer" + mc + " from session " + this + " has thrown a > RunTimeException " + t); > > } > > } > > - } */ > > + } > > message = null; > > } > > while (!_isClosing); // repeat as long as this session is > not closing > > > > >
