Author: ritchiem
Date: Fri Oct  5 07:59:21 2007
New Revision: 582296

URL: http://svn.apache.org/viewvc?rev=582296&view=rev
Log:
Merged revisions 
573738-573739,573741-574077,574079-574236,574238-574265,574267-574503,574505-574554,574556-574584,574586-574873,574875-574901,574903-575737,575739-575787,575789-575810,575812-577772,577774-577940,577942-578057,578059-578732,578734,578736-578744,578746-578827,578829-578844,578846-579114,579116-579146,579148-579197,579199-579228,579230-579573,579575-579576,579579-579601,579603-579613,579615-579708,579710-580021,580023-580039,580042-580060,580062-580065,580067-580080,580082-580257,580259-580264,580266-580350,580352-580984,580986-580991,580994-581001,581003-581170,581172-581188,581190-581206,581208-581245,581247-581292,581294-581539,581541-581565,581567-581620,581622-581626,581628-581646,581648-582204,582206-582269
 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1

........
  r581968 | rupertlssmith | 2007-10-04 17:57:40 +0100 (Thu, 04 Oct 2007) | 1 
line
  
  Updaded performance tests to better test pub/sub mode with 1:10 fanout.
........
  r582198 | ritchiem | 2007-10-05 11:33:14 +0100 (Fri, 05 Oct 2007) | 1 line
  
  QPID-617 : Forgot to commit Test case to validate fix.
........
  r582201 | ritchiem | 2007-10-05 11:39:54 +0100 (Fri, 05 Oct 2007) | 1 line
  
  QPID-624: Update to ensure all errors are correctly processed in 
BlockingMethodFrameListener.java
........
  r582202 | ritchiem | 2007-10-05 11:44:06 +0100 (Fri, 05 Oct 2007) | 1 line
  
  QPID-624 : Forgot to commit updates to test along with 
BlockingMethodFrameListener
........
  r582263 | ritchiem | 2007-10-05 14:38:13 +0100 (Fri, 05 Oct 2007) | 1 line
  
  Qpid-623 : When only selectors are used on a queue the main _messages queue 
causes a leak. Here is a new test provided by Aidan Skinner and a simple fix 
that will prevent OOME when only selectors are connected to the Queue.
........
  r582265 | ritchiem | 2007-10-05 14:39:03 +0100 (Fri, 05 Oct 2007) | 1 line
  
  Qpid-558 : Patch provided by Aidan Skinner addressing AMQShortString not 
autoexpand-ing so when adding content to it would throw an exception
........
  r582266 | ritchiem | 2007-10-05 14:39:25 +0100 (Fri, 05 Oct 2007) | 1 line
  
  QPID-551 : Patch provided by Aidan Skinner to address problems in info 
logging when stacktraces are short.
........

Added:
    
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
      - copied unchanged from r582266, 
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
    
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java
      - copied unchanged from r582202, 
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java
Modified:
    incubator/qpid/branches/M2/   (props changed)
    
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
    
incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
    
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
    incubator/qpid/branches/M2/java/perftests/pom.xml
    
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java

Propchange: incubator/qpid/branches/M2/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Fri Oct  5 07:59:21 2007
@@ -1 +1 @@
-/incubator/qpid/branches/M2.1:1-573736,573738-577772,577774-578732,578734,578736-578744,578746-578827,578829-581628,581647,582205
+/incubator/qpid/branches/M2.1:1-573736,573738-577772,577774-578732,578734,578736-578744,578746-578827,578829-582269

Modified: 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=582296&r1=582295&r2=582296&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 (original)
+++ 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 Fri Oct  5 07:59:21 2007
@@ -422,7 +422,7 @@
             //If this causes ref count to hit zero then data will be purged so 
message.getSize() will NPE.
             message.decrementReference(storeContext);
 
-        }        
+        }
 
         _lock.unlock();
     }
@@ -462,15 +462,15 @@
      */
     private AMQMessage getNextMessage() throws AMQException
     {
-        return getNextMessage(_messages, null);
+        return getNextMessage(_messages, null, false);
     }
 
-    private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription 
sub) throws AMQException
+    private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription 
sub, boolean purgeOnly) throws AMQException
     {
         AMQMessage message = messages.peek();
 
         //while (we have a message) && ((The subscriber is not a browser or 
message is taken ) or we are clearing) && (Check message is taken.)
-        while (purgeMessage(message, sub))
+        while (purgeMessage(message, sub, purgeOnly))
         {
             // if we are purging then ensure we mark this message taken for 
the current subscriber
             // the current subscriber may be null in the case of a get or a 
purge but this is ok.
@@ -527,6 +527,24 @@
      */
     private boolean purgeMessage(AMQMessage message, Subscription sub) throws 
AMQException
     {
+        return purgeMessage(message, sub, false);
+    }
+
+    /**
+     * This method will return true if the message is to be purged from the 
queue.
+     * \
+     * SIDE-EFFECT: The msg will be taken by the Subscription(sub) for the 
current Queue(_queue) when purgeOnly is false
+     *
+     * @param message
+     * @param sub
+     * @param purgeOnly When set to false the message will be taken by the 
given Subscription.
+     *
+     * @return if the msg should be purged
+     *
+     * @throws AMQException
+     */
+    private boolean purgeMessage(AMQMessage message, Subscription sub, boolean 
purgeOnly) throws AMQException
+    {
         //Original.. complicated while loop control
 //                (message != null
 //                            && (
@@ -561,9 +579,18 @@
             }
         }
 
-        // if we are purging then ensure we mark this message taken for the 
current subscriber
-        // the current subscriber may be null in the case of a get or a purge 
but this is ok.
-        return purge && message.taken(_queue, sub);
+        if (purgeOnly)
+        {
+            // If we are simply purging the queue don't take the message
+            // just purge up to the next non-taken msg.
+            return purge && message.isTaken(_queue);
+        }
+        else
+        {
+            // if we are purging then ensure we mark this message taken for 
the current subscriber
+            // the current subscriber may be null in the case of a get or a 
purge but this is ok.
+            return purge && message.taken(_queue, sub);
+        }
     }
 
     public void sendNextMessage(Subscription sub, AMQQueue 
queue)//Queue<AMQMessage> messageQueue)
@@ -594,7 +621,7 @@
         {
             synchronized (_queueHeadLock)
             {
-                message = getNextMessage(messageQueue, sub);
+                message = getNextMessage(messageQueue, sub, false);
 
                 // message will be null if we have no messages in the 
messageQueue.
                 if (message == null)
@@ -661,7 +688,7 @@
                     //fixme - we should do the clean up as the message remains 
on the _message queue
                     // this is resulting in the next consumer receiving the 
message and then attempting to purge it
                     //
-                    _log.info(debugIdentity() + "We should do clean up of the 
main _message queue here");
+                    cleanMainQueue(sub);
                 }
             }
 
@@ -677,6 +704,18 @@
                 _log.error(debugIdentity() + "Unable to release message as it 
is null. " + e, e);
             }
             _log.error(debugIdentity() + "Unable to deliver message as dequeue 
failed: " + e, e);
+        }
+    }
+
+    private void cleanMainQueue(Subscription sub)
+    {
+        try
+        {
+            getNextMessage(_messages, sub, true);
+        }
+        catch (AMQException e)
+        {
+            _log.warn("Problem during main queue purge:" + e.getMessage());
         }
     }
 

Modified: 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=582296&r1=582295&r2=582296&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Fri Oct  5 07:59:21 2007
@@ -514,8 +514,9 @@
     {
         if (_logger.isInfoEnabled())
         {
+            StackTraceElement[] stackTrace = 
Thread.currentThread().getStackTrace();
             _logger.info("Closing session: " + this + ":"
-                         + 
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+                         + Arrays.asList(stackTrace).subList(3, 
stackTrace.length - 1));
         }
 
         synchronized (_messageDeliveryLock)
@@ -669,7 +670,7 @@
                     startDistpatcherIfNecessary(true);
                 }
 
-                _dispatcher.rejectPending(consumer);                
+                _dispatcher.rejectPending(consumer);
             }
             else
             {

Modified: 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=582296&r1=582295&r2=582296&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Fri Oct  5 07:59:21 2007
@@ -480,15 +480,14 @@
             {
                 if (_logger.isTraceEnabled())
                 {
+                    StackTraceElement[] stackTrace = 
Thread.currentThread().getStackTrace();
                     if (_closedStack != null)
                     {
-                        _logger.trace(_consumerTag + " close():"
-                            + 
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
                         _logger.trace(_consumerTag + " previously:" + 
_closedStack.toString());
                     }
                     else
                     {
-                        _closedStack = 
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6);
+                        _closedStack = Arrays.asList(stackTrace).subList(3, 
stackTrace.length - 1);
                     }
                 }
 
@@ -553,15 +552,16 @@
 
             if (_logger.isTraceEnabled())
             {
+                StackTraceElement[] stackTrace = 
Thread.currentThread().getStackTrace();
                 if (_closedStack != null)
                 {
                     _logger.trace(_consumerTag + " markClosed():"
-                        + 
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+                        + Arrays.asList(stackTrace).subList(3, 
stackTrace.length - 1));
                     _logger.trace(_consumerTag + " previously:" + 
_closedStack.toString());
                 }
                 else
                 {
-                    _closedStack = 
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8);
+                    _closedStack = Arrays.asList(stackTrace).subList(3, 
stackTrace.length - 1);
                 }
             }
         }
@@ -758,15 +758,16 @@
             _closed.set(true);
             if (_logger.isTraceEnabled())
             {
+                StackTraceElement[] stackTrace = 
Thread.currentThread().getStackTrace();
                 if (_closedStack != null)
                 {
                     _logger.trace(_consumerTag + " notifyError():"
-                        + 
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+                        + Arrays.asList(stackTrace).subList(3, 
stackTrace.length - 1));
                     _logger.trace(_consumerTag + " previously" + 
_closedStack.toString());
                 }
                 else
                 {
-                    _closedStack = 
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8);
+                    _closedStack = Arrays.asList(stackTrace).subList(3, 
stackTrace.length - 1);
                 }
             }
         }

Modified: 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?rev=582296&r1=582295&r2=582296&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
 (original)
+++ 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
 Fri Oct  5 07:59:21 2007
@@ -78,14 +78,22 @@
     /** This flag is used to indicate that the blocked for method has been 
received. */
     private volatile boolean _ready = false;
 
+    /** This flag is used to indicate that the received error has been 
processed. */
+    private volatile boolean _errorAck = false;
+
     /** Used to protect the shared event and ready flag between the producer 
and consumer. */
     private final ReentrantLock _lock = new ReentrantLock();
-        
+
     /**
      * Used to signal that a method has been received
      */
     private final Condition _receivedCondition = _lock.newCondition();
 
+    /**
+      * Used to signal that a error has been processed
+      */
+    private final Condition _errorConditionAck = _lock.newCondition();
+
     /** Used to hold the most recent exception that is passed to the [EMAIL 
PROTECTED] #error(Exception)} method. */
     private volatile Exception _error;
 
@@ -142,7 +150,7 @@
                 _ready = ready;
                 _receivedCondition.signal();
             }
-            finally 
+            finally
             {
                 _lock.unlock();
             }
@@ -174,13 +182,15 @@
     public AMQMethodEvent blockForFrame(long timeout) throws AMQException, 
FailoverException
     {
         long nanoTimeout = TimeUnit.MILLISECONDS.toNanos(timeout);
-        
+
         _lock.lock();
+
         try
         {
             while (!_ready)
             {
-                try {
+                try
+                {
                     if (timeout == -1)
                     {
                         _receivedCondition.await();
@@ -195,7 +205,7 @@
                             _ready = true;
                         }
                     }
-                } 
+                }
                 catch (InterruptedException e)
                 {
                     // IGNORE    -- //fixme this isn't ideal as being 
interrupted isn't equivellant to sucess
@@ -206,29 +216,34 @@
                     // }
                 }
             }
+
+
+            if (_error != null)
+            {
+                if (_error instanceof AMQException)
+                {
+                    throw (AMQException) _error;
+                }
+                else if (_error instanceof FailoverException)
+                {
+                    // This should ensure that FailoverException is not 
wrapped and can be caught.
+                    throw (FailoverException) _error; // needed to expose 
FailoverException.
+                }
+                else
+                {
+                    throw new AMQException("Woken up due to " + 
_error.getClass(), _error);
+                }
+            }
+
         }
         finally
         {
+            _errorAck = true;
+            _errorConditionAck.signal();
+            _error = null;
             _lock.unlock();
         }
 
-        if (_error != null)
-        {
-            if (_error instanceof AMQException)
-            {
-                throw (AMQException) _error;
-            }
-            else if (_error instanceof FailoverException)
-            {
-                // This should ensure that FailoverException is not wrapped 
and can be caught.
-                throw (FailoverException) _error; // needed to expose 
FailoverException.
-            }
-            else
-            {
-                throw new AMQException("Woken up due to " + _error.getClass(), 
_error);
-            }
-        }
-
         return _doneEvt;
     }
 
@@ -242,13 +257,36 @@
     {
         // set the error so that the thread that is blocking (against 
blockForFrame())
         // can pick up the exception and rethrow to the caller
-        _error = e;
+
 
         _lock.lock();
+
+        if (_error == null)
+        {
+            _error = e;
+        }
+        else
+        {
+            System.err.println("WARNING: new error arrived while old one not 
yet processed");
+        }
+
         try
         {
             _ready = true;
             _receivedCondition.signal();
+
+            while (!_errorAck)
+            {
+                try
+                {
+                    _errorConditionAck.await();
+                }
+                catch (InterruptedException e1)
+                {
+                    //
+                }
+            }
+            _errorAck = false;
         }
         finally
         {

Modified: 
incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java?rev=582296&r1=582295&r2=582296&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
 (original)
+++ 
incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
 Fri Oct  5 07:59:21 2007
@@ -26,7 +26,10 @@
 import org.apache.qpid.AMQTimeoutException;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.failover.FailoverRetrySupport;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.framing.AMQFrame;
@@ -60,7 +63,7 @@
     Connection _connection;
     private String _brokerlist = "vm://:1";
     private Session _session;
-    private static final long SYNC_TIMEOUT = 500;
+    private static final long SYNC_TIMEOUT = 5000;
     private int TEST = 0;
 
     protected void setUp() throws Exception
@@ -287,7 +290,7 @@
         TEST++;
         _logger.info("Test creating producer which will use channel id 1");
 
-        Queue queue = _session.createQueue("CCT_test_validation_queue" + TEST);
+        Queue queue = _session.createTemporaryQueue();
 
         MessageConsumer consumer = _session.createConsumer(queue);
 
@@ -311,7 +314,7 @@
 
             connection.setConnectionListener(this);
 
-            _session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+            _session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
 
             connection.start();
 
@@ -332,31 +335,42 @@
         return connection;
     }
 
-    private void declareExchange(int channelId, String _type, String _name, 
boolean nowait)
-        throws AMQException, FailoverException
+    private void declareExchange(final int channelId, final String _type, 
final String _name, final boolean nowait)
+            throws AMQException, FailoverException
     {
-        AMQFrame exchangeDeclare =
-            ExchangeDeclareBody.createAMQFrame(channelId,
-                ((AMQConnection) 
_connection).getProtocolHandler().getProtocolMajorVersion(),
-                ((AMQConnection) 
_connection).getProtocolHandler().getProtocolMinorVersion(), null, // arguments
-                false, // autoDelete
-                false, // durable
-                new AMQShortString(_name), // exchange
-                false, // internal
-                nowait, // nowait
-                true, // passive
-                0, // ticket
-                new AMQShortString(_type)); // type
+//        new FailoverRetrySupport<Object, AMQException>(new 
FailoverProtectedOperation<Object, AMQException>()
+//        {
+//            public Object execute() throws AMQException, FailoverException
+//            {
+
+                AMQProtocolHandler protocolHandler = ((AMQConnection) 
_connection).getProtocolHandler();
+
+                AMQFrame exchangeDeclare =
+                        ExchangeDeclareBody.createAMQFrame(channelId,
+                                                           
protocolHandler.getProtocolMajorVersion(),
+                                                           
protocolHandler.getProtocolMinorVersion(), null, // arguments
+                                                           false, // autoDelete
+                                                           false, // durable
+                                                           new 
AMQShortString(_name), // exchange
+                                                           false, // internal
+                                                           nowait, // nowait
+                                                           true, // passive
+                                                           0, // ticket
+                                                           new 
AMQShortString(_type)); // type
+
+                if (nowait)
+                {
+                    protocolHandler.writeFrame(exchangeDeclare);
+                }
+                else
+                {
+                    protocolHandler.syncWrite(exchangeDeclare, 
ExchangeDeclareOkBody.class, SYNC_TIMEOUT);
+                }
+
+//                return null;
+//            }
+//        }, (AMQConnection)_connection).execute();
 
-        if (nowait)
-        {
-            ((AMQConnection) 
_connection).getProtocolHandler().writeFrame(exchangeDeclare);
-        }
-        else
-        {
-            ((AMQConnection) 
_connection).getProtocolHandler().syncWrite(exchangeDeclare, 
ExchangeDeclareOkBody.class,
-                SYNC_TIMEOUT);
-        }
     }
 
     private void createChannel(int channelId) throws AMQException, 
FailoverException
@@ -375,10 +389,12 @@
     }
 
     public void bytesSent(long count)
-    { }
+    {
+    }
 
     public void bytesReceived(long count)
-    { }
+    {
+    }
 
     public boolean preFailover(boolean redirect)
     {
@@ -391,5 +407,6 @@
     }
 
     public void failoverComplete()
-    { }
+    {
+    }
 }

Modified: 
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=582296&r1=582295&r2=582296&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
 (original)
+++ 
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
 Fri Oct  5 07:59:21 2007
@@ -212,6 +212,7 @@
         if (size != 0)
         {
 
+            buffer.setAutoExpand(true);
             buffer.put((byte) size);
             if (_data.buf().hasArray())
             {


Reply via email to