Author: ritchiem
Date: Thu Oct 18 03:10:19 2007
New Revision: 585912

URL: http://svn.apache.org/viewvc?rev=585912&view=rev
Log:
QPID-637 : Patch provided by Aidan Skinner to ensure correct behaviour of 
session closure.

Modified:
    
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java

Modified: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=585912&r1=585911&r2=585912&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Thu Oct 18 03:10:19 2007
@@ -24,6 +24,7 @@
 import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.AMQInvalidRoutingKeyException;
 import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverNoopSupport;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -112,22 +113,20 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- *
  * <p/><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
  * <tr><td>
  * </table>
  *
  * @todo Different FailoverSupport implementation are needed on the same 
method call, in different situations. For
- *       example, when failing-over and reestablishing the bindings, the bind 
cannot be interrupted by a second
- *       fail-over, if it fails with an exception, the fail-over process 
should also fail. When binding outside of
- *       the fail-over process, the retry handler could be used to 
automatically retry the operation once the connection
- *       has been reestablished. All fail-over protected operations should be 
placed in private methods, with
- *       FailoverSupport passed in by the caller to provide the correct 
support for the calling context. Sometimes the
- *       fail-over process sets a nowait flag and uses an async method call 
instead.
- *
+ * example, when failing-over and reestablishing the bindings, the bind cannot 
be interrupted by a second
+ * fail-over, if it fails with an exception, the fail-over process should also 
fail. When binding outside of
+ * the fail-over process, the retry handler could be used to automatically 
retry the operation once the connection
+ * has been reestablished. All fail-over protected operations should be placed 
in private methods, with
+ * FailoverSupport passed in by the caller to provide the correct support for 
the calling context. Sometimes the
+ * fail-over process sets a nowait flag and uses an async method call instead.
  * @todo Two new objects created on every failover supported method call. 
Consider more efficient ways of doing this,
- *       after looking at worse bottlenecks first.
+ * after looking at worse bottlenecks first.
  */
 public class AMQSession extends Closeable implements Session, QueueSession, 
TopicSession
 {
@@ -222,9 +221,7 @@
      */
     private final FlowControllingBlockingQueue _queue;
 
-    /**
-     * Holds the highest received delivery tag.
-     */
+    /** Holds the highest received delivery tag. */
     private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
 
     /** Holds the dispatcher thread for this session. */
@@ -236,9 +233,7 @@
     /** Holds all of the producers created by this session, keyed by their 
unique identifiers. */
     private Map<Long, MessageProducer> _producers = new 
ConcurrentHashMap<Long, MessageProducer>();
 
-    /**
-     * Used as a source of unique identifiers so that the consumers can be 
tagged to match them to BasicConsume methods.
-     */
+    /** Used as a source of unique identifiers so that the consumers can be 
tagged to match them to BasicConsume methods. */
     private int _nextTag = 1;
 
     /**
@@ -374,12 +369,12 @@
     /**
      * Creates a new session on a connection with the default message factory 
factory.
      *
-     * @param con                     The connection on which to create the 
session.
-     * @param channelId               The unique identifier for the session.
-     * @param transacted              Indicates whether or not the session is 
transactional.
-     * @param acknowledgeMode         The acknoledgement mode for the session.
-     * @param defaultPrefetchHigh     The maximum number of messages to 
prefetched before suspending the session.
-     * @param defaultPrefetchLow      The number of prefetched messages at 
which to resume the session.
+     * @param con                 The connection on which to create the 
session.
+     * @param channelId           The unique identifier for the session.
+     * @param transacted          Indicates whether or not the session is 
transactional.
+     * @param acknowledgeMode     The acknoledgement mode for the session.
+     * @param defaultPrefetchHigh The maximum number of messages to prefetched 
before suspending the session.
+     * @param defaultPrefetchLow  The number of prefetched messages at which 
to resume the session.
      */
     AMQSession(AMQConnection con, int channelId, boolean transacted, int 
acknowledgeMode, int defaultPrefetchHigh,
                int defaultPrefetchLow)
@@ -402,12 +397,8 @@
 
     public BytesMessage createBytesMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
-        {
-            checkNotClosed();
-
-            return new JMSBytesMessage();
-        }
+        checkNotClosed();
+        return new JMSBytesMessage();
     }
 
     /**
@@ -462,9 +453,7 @@
      * @param exchangeName The exchange to bind the queue on.
      *
      * @throws AMQException If the queue cannot be bound for any reason.
-     *
      * @todo Be aware of possible changes to parameter order as versions 
change.
-     *
      * @todo Document the additional arguments that may be passed in the field 
table. Are these for headers exchanges?
      */
     public void bindQueue(final AMQShortString queueName, final AMQShortString 
routingKey, final FieldTable arguments,
@@ -501,14 +490,11 @@
      * @param timeout The timeout in milliseconds to wait for the session 
close acknoledgement from the broker.
      *
      * @throws JMSException If the JMS provider fails to close the session due 
to some internal error.
-     *
      * @todo Be aware of possible changes to parameter order as versions 
change.
-     *
      * @todo Not certain about the logic of ignoring the failover exception, 
because the channel won't be
-     *       re-opened. May need to examine this more carefully.
-     *
+     * re-opened. May need to examine this more carefully.
      * @todo Note that taking the failover mutex doesn't prevent this 
operation being interrupted by a failover,
-     *       because the failover process sends the failover event before 
acquiring the mutex itself.
+     * because the failover process sends the failover event before acquiring 
the mutex itself.
      */
     public void close(long timeout) throws JMSException
     {
@@ -579,6 +565,14 @@
     {
         synchronized (_connection.getFailoverMutex())
         {
+            if (e instanceof AMQDisconnectedException)
+            {
+                if (_dispatcher != null)
+                {
+                    // Failover failed and ain't coming back. Knife the 
dispatcher.
+                    _dispatcher.interrupt();
+                }
+            }
             synchronized (_messageDeliveryLock)
             {
                 // An AMQException has an error code and message already and 
will be passed in when closure occurs as a
@@ -610,7 +604,6 @@
      * @throws JMSException If the JMS provider fails to commit the 
transaction due to some internal error. This does
      *                      not mean that the commit is known to have failed, 
merely that it is not known whether it
      *                      failed or not.
-     *
      * @todo Be aware of possible changes to parameter order as versions 
change.
      */
     public void commit() throws JMSException
@@ -861,12 +854,8 @@
 
     public MapMessage createMapMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
-        {
-            checkNotClosed();
-
-            return new JMSMapMessage();
-        }
+        checkNotClosed();
+        return new JMSMapMessage();
     }
 
     public javax.jms.Message createMessage() throws JMSException
@@ -876,12 +865,8 @@
 
     public ObjectMessage createObjectMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
-        {
-            checkNotClosed();
-
-            return (ObjectMessage) new JMSObjectMessage();
-        }
+        checkNotClosed();
+        return (ObjectMessage) new JMSObjectMessage();
     }
 
     public ObjectMessage createObjectMessage(Serializable object) throws 
JMSException
@@ -955,7 +940,6 @@
      * @param exclusive  Flag to indicate that the queue is exclusive to this 
client.
      *
      * @throws AMQException If the queue cannot be declared for any reason.
-     *
      * @todo Be aware of possible changes to parameter order as versions 
change.
      */
     public void createQueue(final AMQShortString name, final boolean 
autoDelete, final boolean durable,
@@ -1301,7 +1285,7 @@
      * <li>Stop message delivery.</li>
      * <li>Mark all messages that might have been delivered but not 
acknowledged as "redelivered".
      * <li>Restart the delivery sequence including all unacknowledged messages 
that had been previously delivered.
-     *     Redelivered messages do not have to be delivered in exactly their 
original delivery order.</li>
+     * Redelivered messages do not have to be delivered in exactly their 
original delivery order.</li>
      * </ul>
      *
      * <p/>If the recover operation is interrupted by a fail-over, between 
asking that the broker begin recovery and
@@ -1311,7 +1295,6 @@
      * @throws JMSException If the JMS provider fails to stop and restart 
message delivery due to some internal error.
      *                      Not that this does not necessarily mean that the 
recovery has failed, but simply that it
      *                      is not possible to tell if it has or not.
-     *
      * @todo Be aware of possible changes to parameter order as versions 
change.
      */
     public void recover() throws JMSException
@@ -1402,7 +1385,7 @@
         {
             if (_logger.isDebugEnabled())
             {
-                _logger.debug("Rejecting delivery tag:" + deliveryTag);
+                _logger.debug("Rejecting delivery tag:" + deliveryTag + 
":SessionHC:" + this.hashCode());
             }
 
             AMQFrame basicRejectBody =
@@ -1423,7 +1406,6 @@
      * @throws JMSException If the JMS provider fails to rollback the 
transaction due to some internal error. This does
      *                      not mean that the rollback is known to have 
failed, merely that it is not known whether it
      *                      failed or not.
-     *
      * @todo Be aware of possible changes to parameter order as versions 
change.
      */
     public void rollback() throws JMSException
@@ -1695,7 +1677,6 @@
      * @return <tt>true</tt> if the queue is bound to the exchange and routing 
key, <tt>false</tt> if not.
      *
      * @throws JMSException If the query fails for any reason.
-     *
      * @todo Be aware of possible changes to parameter order as versions 
change.
      */
     boolean isQueueBound(final AMQShortString exchangeName, final 
AMQShortString queueName, final AMQShortString routingKey)
@@ -1768,10 +1749,9 @@
      * Starts the session, which ensures that it is not suspended and that its 
event dispatcher is running.
      *
      * @throws AMQException If the session cannot be started for any reason.
-     *
      * @todo This should be controlled by _stopped as it pairs with the stop 
method fixme or check the
-     *       FlowControlledBlockingQueue _queue to see if we have flow 
controlled. will result in sending Flow messages
-     *       for each subsequent call to flow.. only need to do this if we 
have called stop.
+     * FlowControlledBlockingQueue _queue to see if we have flow controlled. 
will result in sending Flow messages
+     * for each subsequent call to flow.. only need to do this if we have 
called stop.
      */
     void start() throws AMQException
     {
@@ -2138,7 +2118,6 @@
      * @param nowait
      *
      * @throws AMQException If the exchange cannot be declared for any reason.
-     *
      * @todo Be aware of possible changes to parameter order as versions 
change.
      */
     private void declareExchange(final AMQShortString name, final 
AMQShortString type,
@@ -2183,9 +2162,7 @@
      *         the client.
      *
      * @throws AMQException If the queue cannot be declared for any reason.
-     *
      * @todo Verify the destiation is valid or throw an exception.
-     *
      * @todo Be aware of possible changes to parameter order as versions 
change.
      */
     private AMQShortString declareQueue(final AMQDestination amqd, final 
AMQProtocolHandler protocolHandler)
@@ -2229,7 +2206,6 @@
      * @param queueName The name of the queue to delete.
      *
      * @throws JMSException If the queue could not be deleted for any reason.
-     *
      * @todo Be aware of possible changes to parameter order as versions 
change.
      */
     private void deleteQueue(final AMQShortString queueName) throws 
JMSException
@@ -2404,11 +2380,6 @@
         _producers.put(new Long(producerId), producer);
     }
 
-    private void rejectAllMessages(boolean requeue)
-    {
-        rejectMessagesForConsumerTag(null, requeue);
-    }
-
     /**
      * @param consumerTag The consumerTag to prune from queue or all if null
      * @param requeue     Should the removed messages be requeued (or 
discarded. Possibly to DLQ)
@@ -2528,7 +2499,6 @@
      *                should be unsuspended.
      *
      * @throws AMQException If the session cannot be suspended for any reason.
-     *
      * @todo Be aware of possible changes to parameter order as versions 
change.
      */
     private void suspendChannel(boolean suspend) throws AMQException // , 
FailoverException
@@ -2571,6 +2541,7 @@
 
         private final Object _lock = new Object();
         private final AtomicLong _rollbackMark = new AtomicLong(-1);
+        private String dispatcherID = "" + System.identityHashCode(this);
 
         public Dispatcher()
         {
@@ -2606,6 +2577,7 @@
 
                 // Reject messages on pre-dispatch queue
                 rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
+                //Let the dispatcher deal with this when it gets to them.
 
                 // closeConsumer
                 consumer.markClosed();
@@ -2639,8 +2611,7 @@
                     }
                     else
                     {
-                        // should perhaps clear the _SQ here.
-                        // consumer._synchronousQueue.clear();
+                        // cClear the _SQ here.
                         consumer.clearReceiveQueue();
                     }
 
@@ -2756,14 +2727,14 @@
                     {
                         if (consumer == null)
                         {
-                            _dispatcherLogger.info("Received a message(" + 
System.identityHashCode(message) + ")" + "["
+                            _dispatcherLogger.info("Dispatcher(" + 
dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" 
+ "["
                                                    + 
message.getDeliverBody().deliveryTag + "] from queue "
                                                    + 
message.getDeliverBody().consumerTag + " )without a handler - 
rejecting(requeue)...");
                         }
                         else
                         {
-                            _dispatcherLogger.info("Received a message(" + 
System.identityHashCode(message) + ")" + "["
-                                                   + 
message.getDeliverBody().deliveryTag + "] from queue " + " consumer("
+                            _dispatcherLogger.info("Dispatcher(" + 
dispatcherID + ")Received a message(" + System.identityHashCode(message) + ") ["
+                                                   + 
message.getDeliverBody().deliveryTag + "] from queue consumer("
                                                    + consumer.debugIdentity() 
+ ") is closed rejecting(requeue)...");
                         }
                     }

Modified: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=585912&r1=585911&r2=585912&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Thu Oct 18 03:10:19 2007
@@ -142,9 +142,9 @@
     private List<StackTraceElement> _closedStack = null;
 
     protected BasicMessageConsumer(int channelId, AMQConnection connection, 
AMQDestination destination,
-        String messageSelector, boolean noLocal, MessageFactoryRegistry 
messageFactory, AMQSession session,
-        AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, 
int prefetchHigh, int prefetchLow,
-        boolean exclusive, int acknowledgeMode, boolean noConsume, boolean 
autoClose)
+                                   String messageSelector, boolean noLocal, 
MessageFactoryRegistry messageFactory, AMQSession session,
+                                   AMQProtocolHandler protocolHandler, 
FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+                                   boolean exclusive, int acknowledgeMode, 
boolean noConsume, boolean autoClose)
     {
         _channelId = channelId;
         _connection = connection;
@@ -221,7 +221,7 @@
             if (_logger.isDebugEnabled())
             {
                 _logger.debug("Session stopped : Message listener(" + 
messageListener + ") set for destination "
-                    + _destination);
+                              + _destination);
             }
         }
         else
@@ -243,9 +243,9 @@
                 //todo: handle case where connection has already been started, 
and the dispatcher has alreaded started
                 // putting values on the _synchronousQueue
 
-                    _messageListener.set(messageListener);
-                    _session.setHasMessageListeners();
-                    _session.startDistpatcherIfNecessary();
+                _messageListener.set(messageListener);
+                _session.setHasMessageListeners();
+                _session.startDistpatcherIfNecessary();
             }
         }
     }
@@ -360,14 +360,14 @@
                 long endtime = System.currentTimeMillis() + l;
                 while (System.currentTimeMillis() < endtime && o == null)
                 {
-                    try 
+                    try
                     {
                         o = _synchronousQueue.poll(endtime - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                     }
                     catch (InterruptedException e)
                     {
                         _logger.warn("Interrupted: " + e);
-                        if (isClosed()) 
+                        if (isClosed())
                         {
                             return null;
                         }
@@ -381,11 +381,11 @@
                     try
                     {
                         o = _synchronousQueue.take();
-                    } 
+                    }
                     catch (InterruptedException e)
                     {
                         _logger.warn("Interrupted: " + e);
-                        if (isClosed()) 
+                        if (isClosed())
                         {
                             return null;
                         }
@@ -516,9 +516,9 @@
                 {
                     // TODO: Be aware of possible changes to parameter order 
as versions change.
                     final AMQFrame cancelFrame =
-                        BasicCancelBody.createAMQFrame(_channelId, 
_protocolHandler.getProtocolMajorVersion(),
-                            _protocolHandler.getProtocolMinorVersion(), 
_consumerTag, // consumerTag
-                            false); // nowait
+                            BasicCancelBody.createAMQFrame(_channelId, 
_protocolHandler.getProtocolMajorVersion(),
+                                                           
_protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag
+                                                           false); // nowait
 
                     try
                     {
@@ -577,7 +577,7 @@
                 if (_closedStack != null)
                 {
                     _logger.trace(_consumerTag + " markClosed():"
-                        + Arrays.asList(stackTrace).subList(3, 
stackTrace.length - 1));
+                                  + Arrays.asList(stackTrace).subList(3, 
stackTrace.length - 1));
                     _logger.trace(_consumerTag + " previously:" + 
_closedStack.toString());
                 }
                 else
@@ -609,9 +609,9 @@
         try
         {
             AbstractJMSMessage jmsMessage =
-                
_messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
-                    messageFrame.getDeliverBody().redelivered, 
messageFrame.getDeliverBody().exchange,
-                    messageFrame.getDeliverBody().routingKey, 
messageFrame.getContentHeader(), messageFrame.getBodies());
+                    
_messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
+                                                  
messageFrame.getDeliverBody().redelivered, 
messageFrame.getDeliverBody().exchange,
+                                                  
messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), 
messageFrame.getBodies());
 
             if (debug)
             {
@@ -696,15 +696,15 @@
         switch (_acknowledgeMode)
         {
 
-        case Session.PRE_ACKNOWLEDGE:
-            _session.acknowledgeMessage(msg.getDeliveryTag(), false);
-            break;
-
-        case Session.CLIENT_ACKNOWLEDGE:
-            // we set the session so that when the user calls acknowledge() it 
can call the method on session
-            // to send out the appropriate frame
-            msg.setAMQSession(_session);
-            break;
+            case Session.PRE_ACKNOWLEDGE:
+                _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                break;
+
+            case Session.CLIENT_ACKNOWLEDGE:
+                // we set the session so that when the user calls 
acknowledge() it can call the method on session
+                // to send out the appropriate frame
+                msg.setAMQSession(_session);
+                break;
         }
     }
 
@@ -714,43 +714,43 @@
         switch (_acknowledgeMode)
         {
 
-        case Session.CLIENT_ACKNOWLEDGE:
-            if (isNoConsume())
-            {
-                _session.acknowledgeMessage(msg.getDeliveryTag(), false);
-            }
+            case Session.CLIENT_ACKNOWLEDGE:
+                if (isNoConsume())
+                {
+                    _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                }
 
-            break;
+                break;
 
-        case Session.DUPS_OK_ACKNOWLEDGE:
-            if (++_outstanding >= _prefetchHigh)
-            {
-                _dups_ok_acknowledge_send = true;
-            }
+            case Session.DUPS_OK_ACKNOWLEDGE:
+                if (++_outstanding >= _prefetchHigh)
+                {
+                    _dups_ok_acknowledge_send = true;
+                }
 
-            if (_outstanding <= _prefetchLow)
-            {
-                _dups_ok_acknowledge_send = false;
-            }
+                if (_outstanding <= _prefetchLow)
+                {
+                    _dups_ok_acknowledge_send = false;
+                }
 
-            if (_dups_ok_acknowledge_send)
-            {
-                if (!_session.isInRecovery())
+                if (_dups_ok_acknowledge_send)
                 {
-                    _session.acknowledgeMessage(msg.getDeliveryTag(), true);
+                    if (!_session.isInRecovery())
+                    {
+                        _session.acknowledgeMessage(msg.getDeliveryTag(), 
true);
+                    }
                 }
-            }
 
-            break;
+                break;
 
-        case Session.AUTO_ACKNOWLEDGE:
-            // we do not auto ack a message if the application code called 
recover()
-            if (!_session.isInRecovery())
-            {
-                _session.acknowledgeMessage(msg.getDeliveryTag(), false);
-            }
+            case Session.AUTO_ACKNOWLEDGE:
+                // we do not auto ack a message if the application code called 
recover()
+                if (!_session.isInRecovery())
+                {
+                    _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                }
 
-            break;
+                break;
         }
     }
 
@@ -883,6 +883,7 @@
 
         if (_closeWhenNoMessages && _synchronousQueue.isEmpty() && 
_receiving.get() && (_messageListener != null))
         {
+            _closed.set(true);
             _receivingThread.interrupt();
         }
 
@@ -938,6 +939,9 @@
 
             Iterator iterator = _synchronousQueue.iterator();
 
+            int initialSize = _synchronousQueue.size();
+
+            boolean removed = false;
             while (iterator.hasNext())
             {
 
@@ -952,16 +956,24 @@
                     }
 
                     iterator.remove();
+                    removed = true;
 
                 }
                 else
                 {
                     _logger.error("Queue contained a :" + o.getClass()
-                        + " unable to reject as it is not an 
AbstractJMSMessage. Will be cleared");
+                                  + " unable to reject as it is not an 
AbstractJMSMessage. Will be cleared");
                     iterator.remove();
+                    removed = true;
                 }
             }
 
+            if (removed && (initialSize == _synchronousQueue.size()))
+            {
+                _logger.error("Queue had content removed but didn't change in 
size." + initialSize);
+            }
+
+
             if (_synchronousQueue.size() != 0)
             {
                 _logger.warn("Queue was not empty after rejecting all messages 
Remaining:" + _synchronousQueue.size());
@@ -974,7 +986,7 @@
 
     public String debugIdentity()
     {
-        return String.valueOf(_consumerTag);
+        return String.valueOf(_consumerTag) + "[" + 
System.identityHashCode(this) + "]";
     }
 
     public void clearReceiveQueue()


Reply via email to