Author: ritchiem
Date: Tue Nov 27 16:45:32 2007
New Revision: 598834

URL: http://svn.apache.org/viewvc?rev=598834&view=rev
Log:
QPID-679 : Patch provided by Aidan Skinner and additional from odd problems 
during test runs.
AMQChannel - Catch and log AMQException occuring when requeue()-ing. Previously 
exceptions wouldn't be caught at all. The requeue() is called during closure so 
there is nothing we can do protocol wise on error other than log the issue and 
continue with any other shutdown that is needed.
AMQMinaProtocolSession & AMQPFastProtocolHandler . Additions to catch and log 
AMQExceptions. Changes to AMQMinaProtocolSession were done to ignore all input 
on a closing session other than the close-ok. Previously only Protocol frames 
were ignored this resulted in Content*Body-s still being processed. Additional 
checks were made for the MessageStoreClosedException to log and continue. As 
said else were we need to seperate protocol exceptoions(AMQException) from 
internal code exception handling. Further All AMQExceptions occuring in the 
frameReceived method are now caught and logged. Allowing them to propogate 
higher will only result in thread death.
AMQPFastProtocolHandler Caught AMQExceptions occuring whilst closing the 
session. Again allowing these to continue will result in thread death. There is 
not a lot that can be done other than log the problem as the session is already 
closed by this point. Prevented the stacktrace associated with a session 
exception being printed in the exceptionCaught method when the problem was an 
IO Exception. This doesn't add anything useful and only adds to the log file 
sizes.
ApplicationRegistry - Added removeAll option which ensures that all ARs are 
correctly purged so that we can attempt to clean up between Unit Tests.
MemoryMessageStore - This was causing us real problems during the failover 
testing. Similar checks should probably be made to any other Message Store 
Impl. The issue was that when shutting down the broker the MS.close() method is 
called this sets all the storage to null. However, there may still be message 
processing going on as the close() does not attempt to stop connection 
processing. Hence we now check to see if the Store is close throwing a 
MSClosedException if required. This prevents NPEs that have been seen during 
Unit failover testing. In fact the close() is called as a request to shutdown 
the ApplicationRegistry, but this only occurs from tests and broker shutdown, 
no attempt to unbind or prevent further connections during this period is yet 
done.

CLIENT CHANGES
AMQConnection - Added method to check if failover is in progress.
AMQClient - Upgraded acknowledge() exception to JMSException for errors due to 
failover. Also , added call to update consumers as a result of failover.
BasicMessageConsumer - Changes to acquireReceiving to take in to consideration 
blocking for failover to occur. wrt receiveNoWait.. which previously blocked 
for failover to complete... not exactly noWait. acknowledge will now
TransportConnection - Update to ensure all inVM brokers are correctly killed.

FailoverTest - QPID-679 - Finder of all the above problems.

Added:
    
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java
   (with props)
    
incubator/qpid/branches/M2.1.1/java/systests/src/main/java/org/apache/qpid/test/client/failover/
    
incubator/qpid/branches/M2.1.1/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
   (with props)
Modified:
    
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
    
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
    
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java

Modified: 
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=598834&r1=598833&r2=598834&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Tue Nov 27 16:45:32 2007
@@ -229,7 +229,7 @@
                 BasicContentHeaderProperties properties = 
(BasicContentHeaderProperties) contentHeaderBody.properties;
                 //fixme: fudge for QPID-677
                 properties.getHeaders().keySet();
-                
+
                 
properties.setUserId(protocolSession.getAuthorizedID().getName());
             }
 
@@ -381,7 +381,14 @@
     {
         _txnContext.rollback();
         unsubscribeAllConsumers(session);
-        requeue();
+        try
+        {
+            requeue();
+        }
+        catch (AMQException e)
+        {
+            _log.error("Caught AMQException whilst attempting to reque:" + e); 
       
+        }
 
         setClosing(true);
     }

Modified: 
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?rev=598834&r1=598833&r2=598834&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 (original)
+++ 
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 Tue Nov 27 16:45:32 2007
@@ -33,12 +33,27 @@
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.codec.AMQDecoder;
 import org.apache.qpid.common.ClientProperties;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MainRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.VersionSpecificRegistry;
 import org.apache.qpid.pool.ReadWriteThreadModel;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.store.MessageStoreClosedException;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -118,7 +133,7 @@
     }
 
     public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry 
virtualHostRegistry, AMQCodecFactory codecFactory)
-        throws AMQException
+            throws AMQException
     {
         _stateManager = new AMQStateManager(virtualHostRegistry, this);
         _minaProtocolSession = session;
@@ -144,7 +159,7 @@
     }
 
     public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry 
virtualHostRegistry, AMQCodecFactory codecFactory,
-        AMQStateManager stateManager) throws AMQException
+                                  AMQStateManager stateManager) throws 
AMQException
     {
         _stateManager = stateManager;
         _minaProtocolSession = session;
@@ -197,7 +212,7 @@
         }
     }
 
-    private void frameReceived(AMQFrame frame) throws AMQException
+    private void frameReceived(AMQFrame frame)
     {
         int channelId = frame.getChannel();
         AMQBody body = frame.getBodyFrame();
@@ -207,26 +222,57 @@
             _logger.debug("Frame Received: " + frame);
         }
 
-        if (body instanceof AMQMethodBody)
-        {
-            methodFrameReceived(channelId, (AMQMethodBody) body);
-        }
-        else if (body instanceof ContentHeaderBody)
-        {
-            contentHeaderReceived(channelId, (ContentHeaderBody) body);
-        }
-        else if (body instanceof ContentBody)
+        // Check that this channel is not closing
+        if (channelAwaitingClosure(channelId))
         {
-            contentBodyReceived(channelId, (ContentBody) body);
+            if (body instanceof ChannelCloseOkBody)
+            {
+                if (_logger.isInfoEnabled())
+                {
+                    _logger.info("Channel[" + channelId + "] awaiting closure 
- processing close-ok");
+                }
+            }
+            else
+            {
+                if (_logger.isInfoEnabled())
+                {
+                    _logger.info("Channel[" + channelId + "] awaiting closure 
ignoring");
+                }
+
+                return;
+            }
         }
-        else if (body instanceof HeartbeatBody)
+        try
         {
-            // NO OP
+            if (body instanceof AMQMethodBody)
+            {
+                methodFrameReceived(channelId, (AMQMethodBody) body);
+            }
+            else if (body instanceof ContentHeaderBody)
+            {
+                contentHeaderReceived(channelId, (ContentHeaderBody) body);
+            }
+            else if (body instanceof ContentBody)
+            {
+                contentBodyReceived(channelId, (ContentBody) body);
+            }
+            else if (body instanceof HeartbeatBody)
+            {
+                // NO OP
+            }
+            else
+            {
+                _logger.warn("Unrecognised frame " + 
frame.getClass().getName());
+            }
         }
-        else
+        catch (AMQException e)
         {
-            _logger.warn("Unrecognised frame " + frame.getClass().getName());
+            //This will occur if we receive Content*Body chunks during an 
'inverse' shutdown.
+            // That is one where were the store shuts down before we can 
gracefully close connections.
+            // note: todo: Here we should send forced ConnectionClose frames.
+            _logger.error("AMQException occured whilst receiving Frame:" + e);
         }
+
     }
 
     private void protocolInitiationReceived(ProtocolInitiation pi)
@@ -246,12 +292,12 @@
 
             // Interfacing with generated code - be aware of possible changes 
to parameter order as versions change.
             AMQFrame response =
-                ConnectionStartBody.createAMQFrame((short) 0, 
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, 
minor)
-                    locales.getBytes(), // locales
-                    mechanisms.getBytes(), // mechanisms
-                    null, // serverProperties
-                    (short) getProtocolMajorVersion(), // versionMajor
-                    (short) getProtocolMinorVersion()); // versionMinor
+                    ConnectionStartBody.createAMQFrame((short) 0, 
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, 
minor)
+                                                       locales.getBytes(), // 
locales
+                                                       mechanisms.getBytes(), 
// mechanisms
+                                                       null, // 
serverProperties
+                                                       (short) 
getProtocolMajorVersion(), // versionMajor
+                                                       (short) 
getProtocolMinorVersion()); // versionMinor
             _minaProtocolSession.write(response);
         }
         catch (AMQException e)
@@ -271,35 +317,12 @@
 
     private void methodFrameReceived(int channelId, AMQMethodBody methodBody)
     {
-
         final AMQMethodEvent<AMQMethodBody> evt = new 
AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
 
-        // Check that this channel is not closing
-        if (channelAwaitingClosure(channelId))
-        {
-            if ((evt.getMethod() instanceof ChannelCloseOkBody))
-            {
-                if (_logger.isInfoEnabled())
-                {
-                    _logger.info("Channel[" + channelId + "] awaiting closure 
- processing close-ok");
-                }
-            }
-            else
-            {
-                if (_logger.isInfoEnabled())
-                {
-                    _logger.info("Channel[" + channelId + "] awaiting closure 
ignoring");
-                }
-
-                return;
-            }
-        }
-
         try
         {
             try
             {
-
                 boolean wasAnyoneInterested = 
_stateManager.methodReceived(evt);
 
                 if (!_frameListeners.isEmpty())
@@ -342,8 +365,8 @@
                     closeSession();
 
                     AMQConnectionException ce =
-                        
evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
-                            AMQConstant.CHANNEL_ERROR.getName().toString());
+                            
evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
+                                                                   
AMQConstant.CHANNEL_ERROR.getName().toString());
 
                     _stateManager.changeState(AMQState.CONNECTION_CLOSING);
                     writeFrame(ce.getCloseFrame(channelId));
@@ -363,23 +386,41 @@
         }
         catch (Exception e)
         {
+            //NOTE: Currently we throw AMQExceptions sub-classes that are not 
Protcol problems.
+            // These items should not cause the connection to close unless 
there is no other option.
+            //note; todo: This should cause the connection to close
+            if (e instanceof MessageStoreClosedException)
+            {
+               _logger.error("Message Store is closed so unable to perform 
action:" + e);
+                // This should really close the exception as mentioned below.
+                return;
+            }
+
+            //NOTE: TODO: While this is the responsible for closing the 
connection as a last resort the above section
+            // May have a problem closing channel ... This may be related to a 
connection fault but we should still
+            // attempt to send a connection close so that the connecion may be 
shutdown gracefully.
+
+            //Detect when needed and shutdown connection gracefully .. such as 
Logged MSCException above
+
+            // If an AMQException gets to here then there it should ONLY be 
donw to a protocol error
+            // from the above attempts to close the Connection.
+            // Notify any exceptions listeners before we just close the 
conneciton
             _stateManager.error(e);
             for (AMQMethodListener listener : _frameListeners)
             {
                 listener.error(e);
             }
 
+            // This is the last resort
             _minaProtocolSession.close();
         }
     }
 
     private void contentHeaderReceived(int channelId, ContentHeaderBody body) 
throws AMQException
     {
-
         AMQChannel channel = getAndAssertChannel(channelId);
 
         channel.publishContentHeader(body, this);
-
     }
 
     private void contentBodyReceived(int channelId, ContentBody body) throws 
AMQException
@@ -430,7 +471,7 @@
     public AMQChannel getChannel(int channelId) throws AMQException
     {
         final AMQChannel channel =
-            ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? 
_cachedChannels[channelId] : _channelMap.get(channelId);
+                ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? 
_cachedChannels[channelId] : _channelMap.get(channelId);
         if ((channel == null) || channel.isClosing())
         {
             return null;
@@ -463,8 +504,8 @@
         if (_channelMap.size() == _maxNoOfChannels)
         {
             String errorMessage =
-                toString() + ": maximum number of channels has been reached (" 
+ _maxNoOfChannels
-                + "); can't create channel";
+                    toString() + ": maximum number of channels has been 
reached (" + _maxNoOfChannels
+                    + "); can't create channel";
             _logger.error(errorMessage);
             throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage);
         }

Modified: 
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?rev=598834&r1=598833&r2=598834&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
 (original)
+++ 
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
 Tue Nov 27 16:45:32 2007
@@ -171,7 +171,14 @@
         //fixme  -- this can be null
         if (amqProtocolSession != null)
         {
-            amqProtocolSession.closeSession();
+            try
+            {
+                amqProtocolSession.closeSession();
+            }
+            catch (AMQException e)
+            {
+                _logger.error("Caught AMQException whilst closingSession:" + 
e);
+            }
         }
     }
 
@@ -205,7 +212,7 @@
         }
         else if (throwable instanceof IOException)
         {
-            _logger.error("IOException caught in" + session + ", session 
closed implictly: " + throwable, throwable);
+            _logger.error("IOException caught in" + session + ", session 
closed implictly: " + throwable);
         }
         else
         {

Modified: 
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=598834&r1=598833&r2=598834&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
 (original)
+++ 
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
 Tue Nov 27 16:45:32 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,15 +20,14 @@
  */
 package org.apache.qpid.server.registry;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
 import org.apache.commons.configuration.Configuration;
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.configuration.Configurator;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * An abstract application registry that provides access to configuration 
information and handles the
  * construction and caching of configurable objects.
@@ -59,24 +58,7 @@
         public void run()
         {
             _logger.info("Shutting down application registries...");
-            try
-            {
-                synchronized (ApplicationRegistry.class)
-                {
-                    Iterator<IApplicationRegistry> keyIterator = 
_instanceMap.values().iterator();
-
-                    while (keyIterator.hasNext())
-                    {
-                        IApplicationRegistry instance = keyIterator.next();
-
-                        instance.close();
-                    }
-                }
-            }
-            catch (Exception e)
-            {
-                _logger.error("Error shutting down message store: " + e, e);
-            }
+            removeAll();
         }
     }
 
@@ -116,6 +98,7 @@
         }
         catch (Exception e)
         {
+            _logger.error("Error shutting down message store: " + e, e);
 
         }
         finally
@@ -124,6 +107,14 @@
         }
     }
 
+    public static void removeAll()
+    {
+        Object[] keys = _instanceMap.keySet().toArray();
+        for (Object k : keys)
+        {
+            remove((Integer) k);
+        }
+    }
 
     protected ApplicationRegistry(Configuration configuration)
     {
@@ -154,7 +145,7 @@
                 catch (Exception e)
                 {
                     _logger.error("Error configuring application: " + e, e);
-                //throw new AMQBrokerCreationException(instanceID, "Unable to 
create Application Registry instance " + instanceID);
+                    //throw new AMQBrokerCreationException(instanceID, "Unable 
to create Application Registry instance " + instanceID);
                     throw new RuntimeException("Unable to create Application 
Registry", e);
                 }
             }
@@ -167,7 +158,7 @@
 
     public void close() throws Exception
     {
-        for(VirtualHost virtualHost : 
getVirtualHostRegistry().getVirtualHosts())
+        for (VirtualHost virtualHost : 
getVirtualHostRegistry().getVirtualHosts())
         {
             virtualHost.close();
         }
@@ -204,7 +195,6 @@
         return instance;
     }
 
-    
 
     public static void setDefaultApplicationRegistry(String clazz)
     {

Modified: 
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=598834&r1=598833&r2=598834&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
 (original)
+++ 
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
 Tue Nov 27 16:45:32 2007
@@ -20,27 +20,26 @@
  */
 package org.apache.qpid.server.store;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.commons.configuration.Configuration;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.Exchange;
 
-/**
- * A simple message store that stores the messages in a threadsafe structure 
in memory.
- */
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** A simple message store that stores the messages in a threadsafe structure 
in memory. */
 public class MemoryMessageStore implements MessageStore
 {
     private static final Logger _log = 
Logger.getLogger(MemoryMessageStore.class);
@@ -54,6 +53,7 @@
     protected ConcurrentMap<Long, List<ContentChunk>> _contentBodyMap;
 
     private final AtomicLong _messageId = new AtomicLong(1);
+    private AtomicBoolean _closed = new AtomicBoolean(false);
 
     public void configure()
     {
@@ -77,6 +77,7 @@
 
     public void close() throws Exception
     {
+        _closed.getAndSet(true);
         if (_metaDataMap != null)
         {
             _metaDataMap.clear();
@@ -89,8 +90,9 @@
         }
     }
 
-    public void removeMessage(StoreContext context, Long messageId)
+    public void removeMessage(StoreContext context, Long messageId) throws 
AMQException
     {
+        checkNotClosed();
         if (_log.isDebugEnabled())
         {
             _log.debug("Removing message with id " + messageId);
@@ -172,9 +174,10 @@
     public void storeContentBodyChunk(StoreContext context, Long messageId, 
int index, ContentChunk contentBody, boolean lastContentBody)
             throws AMQException
     {
+        checkNotClosed();
         List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
 
-        if(bodyList == null && lastContentBody)
+        if (bodyList == null && lastContentBody)
         {
             _contentBodyMap.put(messageId, 
Collections.singletonList(contentBody));
         }
@@ -193,17 +196,28 @@
     public void storeMessageMetaData(StoreContext context, Long messageId, 
MessageMetaData messageMetaData)
             throws AMQException
     {
+        checkNotClosed();
         _metaDataMap.put(messageId, messageMetaData);
     }
 
-    public MessageMetaData getMessageMetaData(StoreContext context,Long 
messageId) throws AMQException
+    public MessageMetaData getMessageMetaData(StoreContext context, Long 
messageId) throws AMQException
     {
+        checkNotClosed();
         return _metaDataMap.get(messageId);
     }
 
     public ContentChunk getContentBodyChunk(StoreContext context, Long 
messageId, int index) throws AMQException
     {
+        checkNotClosed();
         List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
         return bodyList.get(index);
+    }
+
+     private void checkNotClosed() throws MessageStoreClosedException
+     {
+        if (_closed.get())
+        {
+            throw new MessageStoreClosedException();
+        }
     }
 }

Added: 
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java?rev=598834&view=auto
==============================================================================
--- 
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java
 (added)
+++ 
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java
 Tue Nov 27 16:45:32 2007
@@ -0,0 +1,36 @@
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.AMQException;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * NOTE: this class currently extends AMQException but
+ * we should be using AMQExceptions internally in the code base for Protocol 
errors hence
+ * the message store interface should throw a different super class which this 
should be
+ * moved to reflect
+ */
+public class MessageStoreClosedException extends AMQException
+{
+    public MessageStoreClosedException()
+    {
+        super("Message store closed");
+    }
+}

Propchange: 
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/branches/M2.1.1/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=598834&r1=598833&r2=598834&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 (original)
+++ 
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 Tue Nov 27 16:45:32 2007
@@ -1286,4 +1286,9 @@
     {
         return _sessions.get(channelId);
     }
+
+    public boolean isFailingOver()
+    {
+        return (_protocolHandler.getFailoverLatch() != null);
+    }
 }

Modified: 
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=598834&r1=598833&r2=598834&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Tue Nov 27 16:45:32 2007
@@ -420,7 +420,7 @@
      *
      * @throws IllegalStateException If the session is closed.
      */
-    public void acknowledge() throws IllegalStateException
+    public void acknowledge() throws JMSException
     {
         if (isClosed())
         {
@@ -2510,6 +2510,7 @@
         for (Iterator it = consumers.iterator(); it.hasNext();)
         {
             BasicMessageConsumer consumer = (BasicMessageConsumer) it.next();
+            consumer.failedOver();
             registerConsumer(consumer, true);
         }
     }

Modified: 
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=598834&r1=598833&r2=598834&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Tue Nov 27 16:45:32 2007
@@ -33,14 +33,12 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.jms.MessageConsumer;
 import org.apache.qpid.jms.Session;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
-
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
@@ -277,8 +275,28 @@
         _session.setInRecovery(false);
     }
 
-    private void acquireReceiving() throws JMSException
+    /**
+     * @param immediate if true then return immediately if the connection is 
failing over
+     *
+     * @return boolean if the acquisition was successful
+     *
+     * @throws JMSException
+     * @throws InterruptedException
+     */
+    private boolean acquireReceiving(boolean immediate) throws JMSException, 
InterruptedException
     {
+        if (_connection.isFailingOver())
+        {
+            if (immediate)
+            {
+                return false;
+            }
+            else
+            {
+                _connection.blockUntilNotFailingOver();
+            }
+        }
+
         if (!_receiving.compareAndSet(false, true))
         {
             throw new javax.jms.IllegalStateException("Another thread is 
already receiving.");
@@ -290,6 +308,7 @@
         }
 
         _receivingThread = Thread.currentThread();
+        return true;
     }
 
     private void releaseReceiving()
@@ -343,7 +362,18 @@
 
         checkPreConditions();
 
-        acquireReceiving();
+        try
+        {
+            acquireReceiving(false);
+        }
+        catch (InterruptedException e)
+        {
+            _logger.warn("Interrupted: " + e);
+            if (isClosed())
+            {
+                return null;
+            }
+        }
 
         _session.startDistpatcherIfNecessary();
 
@@ -424,7 +454,25 @@
     {
         checkPreConditions();
 
-        acquireReceiving();
+        try
+        {
+            if (!acquireReceiving(true))
+            {
+                //If we couldn't acquire the receiving thread then return null.
+                // This will occur if failing over.
+                return null;
+            }
+        }
+        catch (InterruptedException e)
+        {
+            /*
+             *  This seems slightly shoddy but should never actually be 
executed
+             *  since we told acquireReceiving to return immediately and it 
shouldn't
+             *  block on anything.
+             */
+
+            return null;
+        }
 
         _session.startDistpatcherIfNecessary();
 
@@ -868,11 +916,18 @@
         }
     }
 
-    public void acknowledge() // throws JMSException
+    public void acknowledge() throws JMSException
     {
-        if (!isClosed())
+        if (isClosed())
+        {
+            throw new IllegalStateException("Consumer is closed");
+        }
+        else if (_session.hasFailedOver())
+        {
+            throw new JMSException("has failed over");
+        }
+        else
         {
-
             Iterator<Long> tags = _unacknowledgedDeliveryTags.iterator();
             while (tags.hasNext())
             {
@@ -880,10 +935,6 @@
                 tags.remove();
             }
         }
-        else
-        {
-            throw new IllegalStateException("Consumer is closed");
-        }
     }
 
     /** Called on recovery to reset the list of delivery tags */
@@ -1021,5 +1072,12 @@
     public void clearReceiveQueue()
     {
         _synchronousQueue.clear();
+    }
+
+    /** to be called when a failover has occured */
+    public void failedOver()
+    {
+        clearReceiveQueue();
+        clearUnackedMessages();
     }
 }

Modified: 
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=598834&r1=598833&r2=598834&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
 (original)
+++ 
incubator/qpid/branches/M2.1.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
 Tue Nov 27 16:45:32 2007
@@ -35,7 +35,6 @@
 
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 
 /**
@@ -99,8 +98,8 @@
                         if (!System.getProperties().containsKey("qpidnio") || 
Boolean.getBoolean("qpidnio"))
                         {
                             _logger.warn("Using Qpid MultiThreaded NIO - " + 
(System.getProperties().containsKey("qpidnio")
-                                                                 ? "Qpid NIO 
is new default"
-                                                                 : 
"Sysproperty 'qpidnio' is set"));
+                                                                              
? "Qpid NIO is new default"
+                                                                              
: "Sysproperty 'qpidnio' is set"));
                             result = new MultiThreadSocketConnector();
                         }
                         else
@@ -277,8 +276,7 @@
             }
 
             AMQVMBrokerCreationException amqbce =
-                    new AMQVMBrokerCreationException(null, port, because + " 
Stopped InVM Qpid.AMQP creation", null);
-            amqbce.initCause(e);
+                    new AMQVMBrokerCreationException(null, port, because + " 
Stopped InVM Qpid.AMQP creation", e);
             throw amqbce;
         }
 
@@ -291,14 +289,11 @@
         _acceptor.unbindAll();
         synchronized (_inVmPipeAddress)
         {
-            Iterator keys = _inVmPipeAddress.keySet().iterator();
-
-            while (keys.hasNext())
-            {
-                int id = (Integer) keys.next();
-                _inVmPipeAddress.remove(id);
-            }
-        }
+            _inVmPipeAddress.clear();
+        }        
+        _acceptor = null;
+        _currentInstance = -1;
+        _currentVMPort = -1;
     }
 
     public static void killVMBroker(int port)

Added: 
incubator/qpid/branches/M2.1.1/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1.1/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java?rev=598834&view=auto
==============================================================================
--- 
incubator/qpid/branches/M2.1.1/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
 (added)
+++ 
incubator/qpid/branches/M2.1.1/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
 Tue Nov 27 16:45:32 2007
@@ -0,0 +1,222 @@
+package org.apache.qpid.test.client.failover;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.log4j.Logger;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+
+public class FailoverTest extends TestCase implements ConnectionListener
+{
+    private static final Logger _logger = Logger.getLogger(FailoverTest.class);
+
+    private static final int NUM_BROKERS = 2;
+    private static final String BROKER = 
"amqp://guest:guest@/test?brokerlist='vm://:%d;vm://:%d'";
+    private static final String QUEUE = "queue";
+    private static final int NUM_MESSAGES = 10;
+    private Connection con;
+    private AMQConnectionFactory conFactory;
+    private Session prodSess;
+    private AMQQueue q;
+    private MessageProducer prod;
+    private Session conSess;
+    private MessageConsumer consumer;
+
+    private static int usedBrokers = 0;
+    private CountDownLatch failoverComplete;
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        // Create two VM brokers
+
+        for (int i = 0; i < NUM_BROKERS; i++)
+        {
+            usedBrokers++;
+
+            TransportConnection.createVMBroker(usedBrokers);
+        }
+        //undo last addition
+
+        conFactory = new AMQConnectionFactory(String.format(BROKER, 
usedBrokers - 1, usedBrokers));
+        _logger.info("Connecting on:" + conFactory.getConnectionURL());
+        con = conFactory.createConnection();
+        ((AMQConnection) con).setConnectionListener(this);
+        con.start();
+        failoverComplete = new CountDownLatch(1);
+    }
+
+    private void init(boolean transacted, int mode) throws JMSException
+    {
+        prodSess = con.createSession(transacted, mode);
+        q = new AMQQueue("amq.direct", QUEUE);
+        prod = prodSess.createProducer(q);
+        conSess = con.createSession(transacted, mode);
+        consumer = conSess.createConsumer(q);
+    }
+
+    @Override
+    protected void tearDown() throws Exception
+    {
+        try
+        {
+            con.close();
+        }
+        catch (Exception e)
+        {
+
+        }
+
+        try
+        {
+            TransportConnection.killAllVMBrokers();
+            ApplicationRegistry.removeAll();
+        }
+        catch (Exception e)
+        {
+            fail("Unable to clean up");
+        }
+        super.tearDown();
+    }
+
+    private void consumeMessages(int toConsume) throws JMSException
+    {
+        Message msg;
+        for (int i = 0; i < toConsume; i++)
+        {
+            msg = consumer.receive(1000);
+            assertNotNull("Message " + i + " was null!", msg);
+            assertEquals("message " + i, ((TextMessage) msg).getText());
+        }
+    }
+
+    private void sendMessages(int totalMessages) throws JMSException
+    {
+        for (int i = 0; i < totalMessages; i++)
+        {
+            prod.send(prodSess.createTextMessage("message " + i));
+        }
+
+//        try
+//        {
+//            Thread.sleep(100 * totalMessages);
+//        }
+//        catch (InterruptedException e)
+//        {
+//            //evil ignoring of IE
+//        }
+    }
+
+    public void testP2PFailover() throws Exception
+    {
+        testP2PFailover(NUM_MESSAGES, true);
+    }
+
+    public void testP2PFailoverWithMessagesLeft() throws Exception
+    {
+        testP2PFailover(NUM_MESSAGES, false);
+    }
+
+    private void testP2PFailover(int totalMessages, boolean consumeAll) throws 
JMSException
+    {
+        Message msg = null;
+        init(false, Session.AUTO_ACKNOWLEDGE);
+        sendMessages(totalMessages);
+
+        // Consume some messages
+        int toConsume = totalMessages;
+        if (!consumeAll)
+        {
+            toConsume = totalMessages / 2;
+        }
+
+        consumeMessages(toConsume);
+
+        _logger.info("Failing over");
+
+        causeFailure();
+
+        msg = consumer.receive(500);
+        //todo: reinstate
+        assertNull("Should not have received message from new broker!", msg);
+        // Check that messages still sent / received
+        sendMessages(totalMessages);
+        consumeMessages(totalMessages);
+    }
+
+    private void causeFailure()
+    {
+        _logger.info("Failover");
+
+        TransportConnection.killVMBroker(usedBrokers - 1);
+        ApplicationRegistry.remove(usedBrokers - 1);
+
+        _logger.info("Awaiting Failover completion");
+        try
+        {
+            failoverComplete.await();
+        }
+        catch (InterruptedException e)
+        {
+            //evil ignore IE.
+        }
+    }
+
+    public void testClientAckFailover() throws Exception
+    {
+        init(false, Session.CLIENT_ACKNOWLEDGE);
+        sendMessages(1);
+        Message msg = consumer.receive();
+        assertNotNull("Expected msgs not received", msg);
+
+
+        causeFailure();
+
+        Exception failure = null;
+        try
+        {
+            msg.acknowledge();
+        }
+        catch (Exception e)
+        {
+            failure = e;
+        }
+        assertNotNull("Exception should be thrown", failure);
+    }
+
+    public void bytesSent(long count)
+    {
+    }
+
+    public void bytesReceived(long count)
+    {
+    }
+
+    public boolean preFailover(boolean redirect)
+    {
+        return true;
+    }
+
+    public boolean preResubscribe()
+    {
+        return true;
+    }
+
+    public void failoverComplete()
+    {
+        failoverComplete.countDown();
+    }
+}

Propchange: 
incubator/qpid/branches/M2.1.1/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/branches/M2.1.1/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to