Author: ritchiem
Date: Wed Feb  7 04:12:19 2007
New Revision: 504521

URL: http://svn.apache.org/viewvc?view=rev&rev=504521
Log:
Qpid-346 & QPID-347 messages remaining taken after closure.. 
+added release() to Channel.requeue()

QPID-346 message loss after roll back.
+client now flushes local pre-receive queues. To ensure message order is 
preserved. 

Test cases to follow

Modified:
    
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java

Modified: 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=504521&r1=504520&r2=504521
==============================================================================
--- 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Wed Feb  7 04:12:19 2007
@@ -71,28 +71,20 @@
      */
     private AtomicLong _deliveryTag = new AtomicLong(0);
 
-    /**
-     * A channel has a default queue (the last declared) that is used when no 
queue name is
-     * explictily set
-     */
+    /** A channel has a default queue (the last declared) that is used when no 
queue name is explictily set */
     private AMQQueue _defaultQueue;
 
-    /**
-     * This tag is unique per subscription to a queue. The server returns this 
in response to a
-     * basic.consume request.
-     */
+    /** This tag is unique per subscription to a queue. The server returns 
this in response to a basic.consume request. */
     private int _consumerTag;
 
     /**
-     * The current message - which may be partial in the sense that not all 
frames have been received yet -
-     * which has been received by this channel. As the frames are received the 
message gets updated and once all
-     * frames have been received the message can then be routed.
+     * The current message - which may be partial in the sense that not all 
frames have been received yet - which has
+     * been received by this channel. As the frames are received the message 
gets updated and once all frames have been
+     * received the message can then be routed.
      */
     private AMQMessage _currentMessage;
 
-    /**
-     * Maps from consumer tag to queue instance. Allows us to unsubscribe from 
a queue.
-     */
+    /** Maps from consumer tag to queue instance. Allows us to unsubscribe 
from a queue. */
     private final Map<String, AMQQueue> _consumerTag2QueueMap = new 
TreeMap<String, AMQQueue>();
 
     private final MessageStore _messageStore;
@@ -282,16 +274,16 @@
     }
 
     /**
-     * Subscribe to a queue. We register all subscriptions in the channel so 
that
-     * if the channel is closed we can clean up all subscriptions, even if the
-     * client does not explicitly unsubscribe from all queues.
+     * Subscribe to a queue. We register all subscriptions in the channel so 
that if the channel is closed we can clean
+     * up all subscriptions, even if the client does not explicitly 
unsubscribe from all queues.
      *
      * @param tag     the tag chosen by the client (if null, server will 
generate one)
      * @param queue   the queue to subscribe to
      * @param session the protocol session of the subscriber
      * @param noLocal
-     * @return the consumer tag. This is returned to the subscriber and used in
-     *         subsequent unsubscribe requests
+     *
+     * @return the consumer tag. This is returned to the subscriber and used 
in subsequent unsubscribe requests
+     *
      * @throws ConsumerTagNotUniqueException if the tag is not unique
      * @throws AMQException                  if something goes wrong
      */
@@ -331,13 +323,13 @@
     {
         if (_transactional)
         {
-            synchronized(_txnBuffer)
+            synchronized (_txnBuffer)
             {
                 _txnBuffer.rollback();//releases messages
             }
         }
         unsubscribeAllConsumers(session);
-        requeue();        
+        requeue();
         _txnBuffer.commit();
     }
 
@@ -360,7 +352,7 @@
      */
     public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, 
String consumerTag, AMQQueue queue)
     {
-        synchronized(_unacknowledgedMessageMapLock)
+        synchronized (_unacknowledgedMessageMapLock)
         {
             _unacknowledgedMessageMap.put(deliveryTag, new 
UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
             _lastDeliveryTag = deliveryTag;
@@ -369,14 +361,14 @@
     }
 
     /**
-     * Called to attempt re-enqueue all outstanding unacknowledged messages on 
the channel.
-     * May result in delivery to this same channel or to other subscribers.
+     * Called to attempt re-enqueue all outstanding unacknowledged messages on 
the channel. May result in delivery to
+     * this same channel or to other subscribers.
      */
     public void requeue() throws AMQException
     {
         // we must create a new map since all the messages will get a new 
delivery tag when they are redelivered
         Map<Long, UnacknowledgedMessage> currentList;
-        synchronized(_unacknowledgedMessageMapLock)
+        synchronized (_unacknowledgedMessageMapLock)
         {
             currentList = _unacknowledgedMessageMap;
             _unacknowledgedMessageMap = new LinkedHashMap<Long, 
UnacknowledgedMessage>(DEFAULT_PREFETCH);
@@ -388,41 +380,64 @@
             {
                 unacked.message.setTxnBuffer(null);
 
+                unacked.message.release();
+
                 unacked.queue.deliver(unacked.message);
             }
         }
     }
 
-    /**
-     * Called to resend all outstanding unacknowledged messages to this same 
channel.
-     */
-    public void resend(AMQProtocolSession session)
+    /** Called to resend all outstanding unacknowledged messages to this same 
channel. */
+    public void resend(AMQProtocolSession session) throws AMQException
     {
         //messages go to this channel
-        synchronized(_unacknowledgedMessageMapLock)
+        synchronized (_unacknowledgedMessageMapLock)
         {
-            for (Map.Entry<Long, UnacknowledgedMessage> entry : 
_unacknowledgedMessageMap.entrySet())
+            Iterator<Map.Entry<Long, UnacknowledgedMessage>> 
messageSetIterator =
+                    _unacknowledgedMessageMap.entrySet().iterator();
+
+            while (messageSetIterator.hasNext())
             {
+                Map.Entry<Long, UnacknowledgedMessage> entry = 
messageSetIterator.next();
+
                 long deliveryTag = entry.getKey();
                 String consumerTag = entry.getValue().consumerTag;
-                AMQMessage msg = entry.getValue().message;
-                msg.setRedelivered(true);
-                session.writeFrame(msg.getDataBlock(_channelId, consumerTag, 
deliveryTag));
+
+                if (_consumerTag2QueueMap.containsKey(consumerTag))
+                {
+                    AMQMessage msg = entry.getValue().message;
+                    msg.setRedelivered(true);
+                    session.writeFrame(msg.getDataBlock(_channelId, 
consumerTag, deliveryTag));
+                }
+                else
+                {
+                    UnacknowledgedMessage unacked = entry.getValue();
+
+                    if (unacked.queue != null)
+                    {
+                        unacked.message.setTxnBuffer(null);
+
+                        unacked.message.release();
+
+                        unacked.queue.deliver(unacked.message);
+                    }
+                    // delete the requeued message.
+                    messageSetIterator.remove();
+                }
             }
         }
     }
 
     /**
-     * Callback indicating that a queue has been deleted. We must update the 
structure of unacknowledged
-     * messages to remove the queue reference and also decrement any message 
reference counts, without
-     * actually removing the item sine we may get an ack for a delivery tag 
that was generated from the
-     * deleted queue.
+     * Callback indicating that a queue has been deleted. We must update the 
structure of unacknowledged messages to
+     * remove the queue reference and also decrement any message reference 
counts, without actually removing the item
+     * sine we may get an ack for a delivery tag that was generated from the 
deleted queue.
      *
      * @param queue
      */
     public void queueDeleted(AMQQueue queue)
     {
-        synchronized(_unacknowledgedMessageMapLock)
+        synchronized (_unacknowledgedMessageMapLock)
         {
             for (Map.Entry<Long, UnacknowledgedMessage> unacked : 
_unacknowledgedMessageMap.entrySet())
             {
@@ -451,6 +466,7 @@
      * @param deliveryTag the last delivery tag
      * @param multiple    if true will acknowledge all messages up to an 
including the delivery tag. if false only
      *                    acknowledges the single message specified by the 
delivery tag
+     *
      * @throws AMQException if the delivery tag is unknown (e.g. not 
outstanding) on this channel
      */
     public void acknowledgeMessage(long deliveryTag, boolean multiple) throws 
AMQException
@@ -473,7 +489,7 @@
             //update the op to include this ack request
             if (multiple && deliveryTag == 0)
             {
-                synchronized(_unacknowledgedMessageMapLock)
+                synchronized (_unacknowledgedMessageMapLock)
                 {
                     //if have signalled to ack all, that refers only
                     //to all at this time
@@ -493,7 +509,7 @@
 
     private void checkAck(long deliveryTag) throws AMQException
     {
-        synchronized(_unacknowledgedMessageMapLock)
+        synchronized (_unacknowledgedMessageMapLock)
         {
             if (!_unacknowledgedMessageMap.containsKey(deliveryTag))
             {
@@ -512,7 +528,7 @@
         if (multiple)
         {
             LinkedList<UnacknowledgedMessage> acked = new 
LinkedList<UnacknowledgedMessage>();
-            synchronized(_unacknowledgedMessageMapLock)
+            synchronized (_unacknowledgedMessageMapLock)
             {
                 if (deliveryTag == 0)
                 {
@@ -573,7 +589,7 @@
         else
         {
             UnacknowledgedMessage msg;
-            synchronized(_unacknowledgedMessageMapLock)
+            synchronized (_unacknowledgedMessageMapLock)
             {
                 msg = _unacknowledgedMessageMap.remove(deliveryTag);
             }
@@ -616,7 +632,7 @@
     {
         boolean suspend;
         //noinspection SynchronizeOnNonFinalField
-        synchronized(_unacknowledgedMessageMapLock)
+        synchronized (_unacknowledgedMessageMapLock)
         {
             suspend = _unacknowledgedMessageMap.size() >= 
_prefetch_HighWaterMark;
         }
@@ -629,7 +645,7 @@
 
         if (isSuspended && !suspended)
         {
-            synchronized(_unacknowledgedMessageMapLock)
+            synchronized (_unacknowledgedMessageMapLock)
             {
                 // Continue being suspended if we are above the 
_prefetch_LowWaterMark
                 suspended = _unacknowledgedMessageMap.size() > 
_prefetch_LowWaterMark;
@@ -679,7 +695,7 @@
     public void rollback() throws AMQException
     {
         //need to protect rollback and close from each other...
-        synchronized(_txnBuffer)
+        synchronized (_txnBuffer)
         {
             _txnBuffer.rollback();
         }

Modified: 
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=504521&r1=504520&r2=504521
==============================================================================
--- 
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 (original)
+++ 
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 Wed Feb  7 04:12:19 2007
@@ -429,7 +429,14 @@
 
                     if (_started)
                     {
-                        session.start();
+                        try
+                        {
+                            session.start();
+                        }
+                        catch (AMQException e)
+                        {
+                            throw new JMSAMQException(e);
+                        }
                     }
                     return session;
                 }
@@ -581,7 +588,14 @@
             while (it.hasNext())
             {
                 final AMQSession s = (AMQSession) ((Map.Entry) 
it.next()).getValue();
-                s.start();
+                try
+                {
+                    s.start();
+                }
+                catch (AMQException e)
+                {
+                    throw new JMSAMQException(e);
+                }
             }
             _started = true;
         }
@@ -594,7 +608,14 @@
         {
             for (Iterator i = _sessions.values().iterator(); i.hasNext();)
             {
-                ((AMQSession) i.next()).stop();
+                try
+                {
+                    ((AMQSession) i.next()).stop();
+                }
+                catch (AMQException e)
+                {
+                    throw new JMSAMQException(e);
+                }
             }
             _started = false;
         }

Modified: 
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=504521&r1=504520&r2=504521
==============================================================================
--- 
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Wed Feb  7 04:12:19 2007
@@ -73,44 +73,32 @@
     private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
 
     /**
-     * Used to reference durable subscribers so they requests for unsubscribe 
can be handled
-     * correctly.  Note this only keeps a record of subscriptions which have 
been created
-     * in the current instance.  It does not remember subscriptions between 
executions of the
-     * client
+     * Used to reference durable subscribers so they requests for unsubscribe 
can be handled correctly.  Note this only
+     * keeps a record of subscriptions which have been created in the current 
instance.  It does not remember
+     * subscriptions between executions of the client
      */
     private final ConcurrentHashMap<String, TopicSubscriberAdaptor> 
_subscriptions =
             new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
     private final ConcurrentHashMap<BasicMessageConsumer, String> 
_reverseSubscriptionMap =
             new ConcurrentHashMap<BasicMessageConsumer, String>();
 
-    /**
-     * Used in the consume method. We generate the consume tag on the client 
so that we can use the nowait
-     * feature.
-     */
+    /** Used in the consume method. We generate the consume tag on the client 
so that we can use the nowait feature. */
     private int _nextTag = 1;
 
-    /**
-     * This queue is bounded and is used to store messages before being 
dispatched to the consumer
-     */
+    /** This queue is bounded and is used to store messages before being 
dispatched to the consumer */
     private final FlowControllingBlockingQueue _queue;
 
     private Dispatcher _dispatcher;
 
     private MessageFactoryRegistry _messageFactoryRegistry;
 
-    /**
-     * Set of all producers created by this session
-     */
+    /** Set of all producers created by this session */
     private Map _producers = new ConcurrentHashMap();
 
-    /**
-     * Maps from consumer tag (String) to JMSMessageConsumer instance
-     */
+    /** Maps from consumer tag (String) to JMSMessageConsumer instance */
     private Map<String, BasicMessageConsumer> _consumers = new 
ConcurrentHashMap<String, BasicMessageConsumer>();
 
-    /**
-     * Maps from destination to count of JMSMessageConsumers
-     */
+    /** Maps from destination to count of JMSMessageConsumers */
     private ConcurrentHashMap<Destination, AtomicInteger> 
_destinationConsumerCount =
             new ConcurrentHashMap<Destination, AtomicInteger>();
 
@@ -127,33 +115,29 @@
     protected static final boolean DEFAULT_MANDATORY = true;
 
     /**
-     * The counter of the next producer id. This id is generated by the 
session and used only to allow the
-     * producer to identify itself to the session when deregistering itself.
-     * <p/>
-     * Access to this id does not require to be synchronized since according 
to the JMS specification only one
-     * thread of control is allowed to create producers for any given session 
instance.
+     * The counter of the next producer id. This id is generated by the 
session and used only to allow the producer to
+     * identify itself to the session when deregistering itself. <p/> Access 
to this id does not require to be
+     * synchronized since according to the JMS specification only one thread 
of control is allowed to create producers
+     * for any given session instance.
      */
     private long _nextProducerId;
 
     /**
-     * Set when recover is called. This is to handle the case where recover() 
is called by application code
-     * during onMessage() processing. We need to make sure we do not send an 
auto ack if recover was called.
+     * Set when recover is called. This is to handle the case where recover() 
is called by application code during
+     * onMessage() processing. We need to make sure we do not send an auto ack 
if recover was called.
      */
     private boolean _inRecovery;
 
     private boolean _connectionStopped;
 
     private boolean _hasMessageListeners;
+    private boolean _suspended;
+    private final Object _suspensionLock = new Object();
 
-
-    /**
-     * Responsible for decoding a message fragment and passing it to the 
appropriate message consumer.
-     */
+    /** Responsible for decoding a message fragment and passing it to the 
appropriate message consumer. */
     private class Dispatcher extends Thread
     {
-        /**
-         * Track the 'stopped' state of the dispatcher, a session starts in 
the stopped state.
-         */
+        /** Track the 'stopped' state of the dispatcher, a session starts in 
the stopped state. */
         private final AtomicBoolean _closed = new AtomicBoolean(false);
 
         private final Object _lock = new Object();
@@ -276,6 +260,30 @@
             //fixme awaitTermination
 
         }
+
+        public void rollback()
+        {
+
+            synchronized (_lock)
+            {
+                boolean isStopped = connectionStopped();
+
+                if (!isStopped)
+                {
+                    setConnectionStopped(true);
+                }
+
+                _queue.clear();
+
+                for (BasicMessageConsumer consumer : _consumers.values())
+                {
+                    consumer.rollback();
+                }
+
+                setConnectionStopped(isStopped);
+            }
+
+        }
     }
 
     AMQSession(AMQConnection con, int channelId, boolean transacted, int 
acknowledgeMode,
@@ -318,7 +326,8 @@
                                                               if 
(_acknowledgeMode == NO_ACKNOWLEDGE)
                                                               {
                                                                   
_logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending 
channel. Current value is " + currentValue);
-                                                                  
suspendChannel();
+
+                                                                  new 
Thread(new SuspenderRunner(true)).start();
                                                               }
                                                           }
 
@@ -327,7 +336,8 @@
                                                               if 
(_acknowledgeMode == NO_ACKNOWLEDGE)
                                                               {
                                                                   
_logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending 
channel. Current value is " + currentValue);
-                                                                  
unsuspendChannel();
+
+                                                                  new 
Thread(new SuspenderRunner(false)).start();
                                                               }
                                                           }
                                                       });
@@ -533,18 +543,39 @@
 
     public void rollback() throws JMSException
     {
-        checkTransacted();
-        try
-        {
-            // AMQP version change: Hardwire the version to 0-8 (major=8, 
minor=0)
-            // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions 
change.
-            _connection.getProtocolHandler().syncWrite(
-                    TxRollbackBody.createAMQFrame(_channelId, (byte) 8, (byte) 
0), TxRollbackOkBody.class);
-        }
-        catch (AMQException e)
+        synchronized (_suspensionLock)
         {
-            throw(JMSException) (new JMSException("Failed to rollback: " + 
e).initCause(e));
+            checkTransacted();
+            try
+            {
+                // AMQP version change: Hardwire the version to 0-8 (major=8, 
minor=0)
+                // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
+                // Be aware of possible changes to parameter order as versions 
change.
+
+                boolean isSuspended = isSuspended();
+
+                if (!isSuspended)
+                {
+                    suspendChannel(true);
+                }
+
+                _connection.getProtocolHandler().syncWrite(
+                        TxRollbackBody.createAMQFrame(_channelId, (byte) 8, 
(byte) 0), TxRollbackOkBody.class);
+
+                if (_dispatcher != null)
+                {
+                    _dispatcher.rollback();
+                }
+
+                if (!isSuspended)
+                {
+                    suspendChannel(false);
+                }
+            }
+            catch (AMQException e)
+            {
+                throw(JMSException) (new JMSException("Failed to rollback: " + 
e).initCause(e));
+            }
         }
     }
 
@@ -616,9 +647,14 @@
         }
     }
 
+    public boolean isSuspended()
+    {
+        return _suspended;
+    }
+
+
     /**
-     * Called when the server initiates the closure of the session
-     * unilaterally.
+     * Called when the server initiates the closure of the session 
unilaterally.
      *
      * @param e the exception that caused this session to be closed. Null 
causes the
      */
@@ -644,10 +680,8 @@
     }
 
     /**
-     * Called to mark the session as being closed. Useful when the session 
needs to be made invalid, e.g. after
-     * failover when the client has veoted resubscription.
-     * <p/>
-     * The caller of this method must already hold the failover mutex.
+     * Called to mark the session as being closed. Useful when the session 
needs to be made invalid, e.g. after failover
+     * when the client has veoted resubscription. <p/> The caller of this 
method must already hold the failover mutex.
      */
     void markClosed()
     {
@@ -867,7 +901,9 @@
      * Creates a QueueReceiver
      *
      * @param destination
+     *
      * @return QueueReceiver - a wrapper around our MessageConsumer
+     *
      * @throws JMSException
      */
     public QueueReceiver createQueueReceiver(Destination destination) throws 
JMSException
@@ -883,7 +919,9 @@
      *
      * @param destination
      * @param messageSelector
+     *
      * @return QueueReceiver - a wrapper around our MessageConsumer
+     *
      * @throws JMSException
      */
     public QueueReceiver createQueueReceiver(Destination destination, String 
messageSelector) throws JMSException
@@ -1146,7 +1184,9 @@
      *
      * @param amqd
      * @param protocolHandler
+     *
      * @return the queue name. This is useful where the broker is generating a 
queue name on behalf of the client.
+     *
      * @throws AMQException
      */
     private String declareQueue(AMQDestination amqd, AMQProtocolHandler 
protocolHandler) throws AMQException
@@ -1198,6 +1238,7 @@
      * Register to consume from the queue.
      *
      * @param queueName
+     *
      * @return the consumer tag generated by the broker
      */
     private void consumeFromQueue(BasicMessageConsumer consumer, String 
queueName, AMQProtocolHandler protocolHandler,
@@ -1284,7 +1325,9 @@
      * Creates a QueueReceiver wrapping a MessageConsumer
      *
      * @param queue
+     *
      * @return QueueReceiver
+     *
      * @throws JMSException
      */
     public QueueReceiver createReceiver(Queue queue) throws JMSException
@@ -1300,7 +1343,9 @@
      *
      * @param queue
      * @param messageSelector
+     *
      * @return QueueReceiver
+     *
      * @throws JMSException
      */
     public QueueReceiver createReceiver(Queue queue, String messageSelector) 
throws JMSException
@@ -1347,7 +1392,9 @@
      * Creates a non-durable subscriber
      *
      * @param topic
+     *
      * @return TopicSubscriber - a wrapper round our MessageConsumer
+     *
      * @throws JMSException
      */
     public TopicSubscriber createSubscriber(Topic topic) throws JMSException
@@ -1364,7 +1411,9 @@
      * @param topic
      * @param messageSelector
      * @param noLocal
+     *
      * @return TopicSubscriber - a wrapper round our MessageConsumer
+     *
      * @throws JMSException
      */
     public TopicSubscriber createSubscriber(Topic topic, String 
messageSelector, boolean noLocal) throws JMSException
@@ -1434,9 +1483,7 @@
         }
     }
 
-    /**
-     * Note, currently this does not handle reuse of the same name with 
different topics correctly.
-     */
+    /** Note, currently this does not handle reuse of the same name with 
different topics correctly. */
     public TopicSubscriber createDurableSubscriber(Topic topic, String name, 
String messageSelector, boolean noLocal)
             throws JMSException
     {
@@ -1549,8 +1596,8 @@
     }
 
     /**
-     * Invoked by the MINA IO thread (indirectly) when a message is received 
from the transport.
-     * Puts the message onto the queue read by the dispatcher.
+     * Invoked by the MINA IO thread (indirectly) when a message is received 
from the transport. Puts the message onto
+     * the queue read by the dispatcher.
      *
      * @param message the message that has been received
      */
@@ -1565,13 +1612,12 @@
     }
 
     /**
-     * Acknowledge a message or several messages. This method can be called 
via AbstractJMSMessage or from
-     * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter 
where the mode is
-     * AUTO_ACK or similar.
+     * Acknowledge a message or several messages. This method can be called 
via AbstractJMSMessage or from a
+     * BasicConsumer. The former where the mode is CLIENT_ACK and the latter 
where the mode is AUTO_ACK or similar.
      *
      * @param deliveryTag the tag of the last message to be acknowledged
-     * @param multiple    if true will acknowledge all messages up to and 
including the one specified by the
-     *                    delivery tag
+     * @param multiple    if true will acknowledge all messages up to and 
including the one specified by the delivery
+     *                    tag
      */
     public void acknowledgeMessage(long deliveryTag, boolean multiple)
     {
@@ -1609,7 +1655,7 @@
         return _channelId;
     }
 
-    void start()
+    void start() throws AMQException
     {
         //fixme This should be controlled by _stopped as it pairs with the 
stop method
         //fixme or check the FlowControlledBlockingQueue _queue to see if we 
have flow controlled.
@@ -1618,7 +1664,7 @@
         if (_startedAtLeastOnce.getAndSet(true))
         {
             //then we stopped this and are restarting, so signal server to 
resume delivery
-            unsuspendChannel();
+            suspendChannel(false);
         }
 
         if (hasMessageListeners())
@@ -1651,10 +1697,10 @@
         }
     }
 
-    void stop()
+    void stop() throws AMQException
     {
         //stop the server delivering messages to this session
-        suspendChannel();
+        suspendChannel(true);
 
         if (_dispatcher != null)
         {
@@ -1666,6 +1712,7 @@
      * Callers must hold the failover mutex before calling this method.
      *
      * @param consumer
+     *
      * @throws AMQException
      */
     void registerConsumer(BasicMessageConsumer consumer, boolean nowait) 
throws AMQException
@@ -1691,8 +1738,8 @@
     }
 
     /**
-     * Called by the MessageConsumer when closing, to deregister the consumer 
from the
-     * map from consumerTag to consumer instance.
+     * Called by the MessageConsumer when closing, to deregister the consumer 
from the map from consumerTag to consumer
+     * instance.
      *
      * @param consumer the consum
      */
@@ -1764,28 +1811,23 @@
         }
     }
 
-    private void suspendChannel()
+    private void suspendChannel(boolean suspend) throws AMQException
     {
-        _logger.warn("Suspending channel");
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
-                                                                   (byte) 8, 
(byte) 0,    // AMQP version (major, minor)
-                                                                   false);    
// active
-        _connection.getProtocolHandler().writeFrame(channelFlowFrame);
-    }
+        synchronized (_suspensionLock)
+        {
+            _logger.warn("Setting channel flow : " + (suspend ? "suspended" : 
"unsuspended"));
 
-    private void unsuspendChannel()
-    {
-        _logger.warn("Unsuspending channel");
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
-                                                                   (byte) 8, 
(byte) 0,    // AMQP version (major, minor)
-                                                                   true);    
// active
-        _connection.getProtocolHandler().writeFrame(channelFlowFrame);
+            _suspended = suspend;
+            
+            // AMQP version change: Hardwire the version to 0-8 (major=8, 
minor=0)
+            // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions 
change.
+            AMQFrame channelFlowFrame = 
ChannelFlowBody.createAMQFrame(_channelId,
+                                                                       (byte) 
8, (byte) 0,    // AMQP version (major, minor)
+                                                                       
!suspend);    // active
+
+            _connection.getProtocolHandler().syncWrite(channelFlowFrame, 
ChannelFlowOkBody.class);
+        }
     }
 
     public void confirmConsumerCancelled(String consumerTag)
@@ -1829,4 +1871,25 @@
         }
     }
 
+    private class SuspenderRunner implements Runnable
+    {
+        private boolean _suspend;
+
+        public SuspenderRunner(boolean suspend)
+        {
+            _suspend = suspend;
+        }
+
+        public void run()
+        {
+            try
+            {
+                suspendChannel(_suspend);
+            }
+            catch (AMQException e)
+            {
+                _logger.warn("Unable to suspend channel");
+            }
+        }
+    }
 }

Modified: 
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=504521&r1=504520&r2=504521
==============================================================================
--- 
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Wed Feb  7 04:12:19 2007
@@ -762,4 +762,9 @@
         }
 
     }
+
+    public void rollback()
+    {
+        _synchronousQueue.clear();
+    }
 }

Modified: 
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?view=diff&rev=504521&r1=504520&r2=504521
==============================================================================
--- 
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
 (original)
+++ 
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
 Wed Feb  7 04:12:19 2007
@@ -20,23 +20,20 @@
  */
 package org.apache.qpid.client.util;
 
+import org.apache.qpid.AMQException;
+
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
- * A blocking queue that emits events above a user specified threshold allowing
- * the caller to take action (e.g. flow control) to try to prevent the queue
- * growing (much) further. The underlying queue itself is not bounded therefore
- * the caller is not obliged to react to the events.
- * <p/>
- * This implementation is <b>only</b> safe where we have a single thread adding
- * items and a single (different) thread removing items.
+ * A blocking queue that emits events above a user specified threshold 
allowing the caller to take action (e.g. flow
+ * control) to try to prevent the queue growing (much) further. The underlying 
queue itself is not bounded therefore the
+ * caller is not obliged to react to the events. <p/> This implementation is 
<b>only</b> safe where we have a single
+ * thread adding items and a single (different) thread removing items.
  */
 public class FlowControllingBlockingQueue
 {
-    /**
-     * This queue is bounded and is used to store messages before being 
dispatched to the consumer
-     */
+    /** This queue is bounded and is used to store messages before being 
dispatched to the consumer */
     private final BlockingQueue _queue = new LinkedBlockingQueue();
 
     private final int _flowControlHighThreshold;
@@ -44,12 +41,14 @@
 
     private final ThresholdListener _listener;
 
-    /**
-     * We require a separate count so we can track whether we have reached the
-     * threshold
-     */
+    /** We require a separate count so we can track whether we have reached 
the threshold */
     private int _count;
 
+    public void clear()
+    {
+        _queue.clear();
+    }
+
     public interface ThresholdListener
     {
         void aboveThreshold(int currentValue);
@@ -74,7 +73,7 @@
         Object o = _queue.take();
         if (_listener != null)
         {
-            synchronized(_listener)
+            synchronized (_listener)
             {
                 if (_count-- == _flowControlLowThreshold)
                 {
@@ -90,7 +89,7 @@
         _queue.add(o);
         if (_listener != null)
         {
-            synchronized(_listener)
+            synchronized (_listener)
             {
                 if (++_count == _flowControlHighThreshold)
                 {


Reply via email to