Author: kpvdr
Date: Fri Jan 12 14:02:11 2007
New Revision: 495754

URL: http://svn.apache.org/viewvc?view=rev&rev=495754
Log:
Created common AMQMethodListener class, allowing the Request and Response 
managers to use a common interface to dispatch events to both the client and 
servers. Refactoring of bothe the client and broker AMQStateManagers and 
AMQProtocolSession classes was performed. The refactoring has run aground in 
the clustering, however, and this still needs to be resolved. As the cluster 
tests are currently disabled (by whom, I'm not sure), this does not disrupt the 
overall test result. JIRAs will be opened for this issue.

Added:
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
      - copied, changed from r495583, 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java
Removed:
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java
Modified:
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
    
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
    
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
    
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
    
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
    
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
    
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
    
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 Fri Jan 12 14:02:11 2007
@@ -29,6 +29,7 @@
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.codec.AMQDecoder;
 
@@ -99,10 +100,19 @@
                                   AMQCodecFactory codecFactory)
             throws AMQException
     {
-        this(session, queueRegistry, exchangeRegistry, codecFactory, new 
AMQStateManager());
+        _stateManager = new AMQStateManager(queueRegistry, exchangeRegistry, 
this);
+        _minaProtocolSession = session;
+        session.setAttachment(this);
+        
+        _queueRegistry = queueRegistry;
+        _exchangeRegistry = exchangeRegistry;
+        _codecFactory = codecFactory;
+        _managedObject = createMBean();
+        _managedObject.register();
+//        this(session, queueRegistry, exchangeRegistry, codecFactory, new 
AMQStateManager());
     }
 
-    public AMQMinaProtocolSession(IoSession session, QueueRegistry 
queueRegistry, ExchangeRegistry exchangeRegistry,
+     public AMQMinaProtocolSession(IoSession session, QueueRegistry 
queueRegistry, ExchangeRegistry exchangeRegistry,
                                   AMQCodecFactory codecFactory, 
AMQStateManager stateManager)
             throws AMQException
     {
@@ -208,13 +218,13 @@
                                                                                
     (AMQMethodBody) frame.bodyFrame);
         try
         {
-            boolean wasAnyoneInterested = _stateManager.methodReceived(evt, 
this, _queueRegistry, _exchangeRegistry);
+            boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
 
             if(!_frameListeners.isEmpty())
             {
                 for (AMQMethodListener listener : _frameListeners)
                 {
-                    wasAnyoneInterested = listener.methodReceived(evt, this, 
_queueRegistry, _exchangeRegistry) ||
+                    wasAnyoneInterested = listener.methodReceived(evt) ||
                                           wasAnyoneInterested;
                 }
             }
@@ -233,7 +243,7 @@
             _logger.error("Closing connection due to: " + e.getMessage());
             writeFrame(e.getCloseFrame(frame.channel));
         }        
-        catch (AMQException e)
+        catch (Exception e)
         {
             _stateManager.error(e);
             for (AMQMethodListener listener : _frameListeners)

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
 Fri Jan 12 14:02:11 2007
@@ -25,7 +25,7 @@
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.handler.*;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.log4j.Logger;
@@ -43,7 +43,9 @@
 public class AMQStateManager implements AMQMethodListener
 {
     private static final Logger _logger = 
Logger.getLogger(AMQStateManager.class);
-
+    private final QueueRegistry _queueRegistry;
+    private final ExchangeRegistry _exchangeRegistry;
+    private final AMQProtocolSession _protocolSession;
     /**
      * The current state
      */
@@ -58,13 +60,16 @@
 
     private CopyOnWriteArraySet<StateListener> _stateListeners = new 
CopyOnWriteArraySet<StateListener>();
 
-    public AMQStateManager()
+    public AMQStateManager(QueueRegistry queueRegistry, ExchangeRegistry 
exchangeRegistry, AMQProtocolSession protocolSession)
     {
-        this(AMQState.CONNECTION_NOT_STARTED, true);
+        this(AMQState.CONNECTION_NOT_STARTED, true, queueRegistry, 
exchangeRegistry, protocolSession);
     }
 
-    protected AMQStateManager(AMQState initial, boolean register)
+    protected AMQStateManager(AMQState initial, boolean register, 
QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, 
AMQProtocolSession protocolSession)
     {
+        _queueRegistry = queueRegistry;
+        _exchangeRegistry = exchangeRegistry;
+        _protocolSession = protocolSession;
         _currentState = initial;
         if (register)
         {
@@ -149,7 +154,7 @@
         }
     }
 
-    public void error(AMQException e)
+    public void error(Exception e)
     {
         _logger.error("State manager received error notification: " + e, e);
         for (StateListener l : _stateListeners)
@@ -158,15 +163,12 @@
         }
     }
 
-    public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> 
evt,
-                           AMQProtocolSession protocolSession,
-                           QueueRegistry queueRegistry,
-                           ExchangeRegistry exchangeRegistry) throws 
AMQException
+    public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> 
evt) throws AMQException
     {
         StateAwareMethodListener<B> handler = 
findStateTransitionHandler(_currentState, evt.getMethod());
         if (handler != null)
         {
-            handler.methodReceived(this, queueRegistry, exchangeRegistry, 
protocolSession, evt);
+            handler.methodReceived(this, _queueRegistry, _exchangeRegistry, 
_protocolSession, evt);
             return true;
         }
         return false;

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
 Fri Jan 12 14:02:11 2007
@@ -89,7 +89,7 @@
             // have a state waiter waiting until the connection is closed for 
some reason. Or in future we may have
             // a slightly more complex state model therefore I felt it was 
worthwhile doing this.
             AMQStateManager existingStateManager = 
_amqProtocolHandler.getStateManager();
-            _amqProtocolHandler.setStateManager(new AMQStateManager());
+            _amqProtocolHandler.setStateManager(new 
AMQStateManager(_amqProtocolHandler.getProtocolSession()));
             if (!_amqProtocolHandler.getConnection().firePreFailover(_host != 
null))
             {
                 _amqProtocolHandler.setStateManager(existingStateManager);

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
 Fri Jan 12 14:02:11 2007
@@ -40,6 +40,7 @@
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.ssl.BogusSSLContextFactory;
 
 import java.util.Iterator;
@@ -68,7 +69,7 @@
      */
     private volatile AMQProtocolSession _protocolSession;
 
-    private AMQStateManager _stateManager = new AMQStateManager();
+//    private AMQStateManager _stateManager = new AMQStateManager();
 
     private final CopyOnWriteArraySet _frameListeners = new 
CopyOnWriteArraySet();
 
@@ -277,7 +278,7 @@
      */
     public void propagateExceptionToWaiters(Exception e)
     {
-        _stateManager.error(e);
+        _protocolSession.getStateManager().error(e);
         if(!_frameListeners.isEmpty())
         {
             final Iterator it = _frameListeners.iterator();
@@ -316,14 +317,14 @@
             try
             {
 
-                boolean wasAnyoneInterested = 
_stateManager.methodReceived(evt, _protocolSession);
+                boolean wasAnyoneInterested = 
_protocolSession.getStateManager().methodReceived(evt);
                 if(!_frameListeners.isEmpty())
                 {
                     Iterator it = _frameListeners.iterator();
                     while (it.hasNext())
                     {
                         final AMQMethodListener listener = (AMQMethodListener) 
it.next();
-                        wasAnyoneInterested = listener.methodReceived(evt, 
_protocolSession) || wasAnyoneInterested;
+                        wasAnyoneInterested = listener.methodReceived(evt) || 
wasAnyoneInterested;
                     }
                 }
                 if (!wasAnyoneInterested)
@@ -333,7 +334,7 @@
             }
             catch (AMQException e)
             {
-                _stateManager.error(e);
+                _protocolSession.getStateManager().error(e);
                 if(!_frameListeners.isEmpty())
                 {
                     Iterator it = _frameListeners.iterator();
@@ -394,7 +395,7 @@
   */
     public void attainState(AMQState s) throws AMQException
     {
-        _stateManager.attainState(s);
+        _protocolSession.getStateManager().attainState(s);
     }
 
     /**
@@ -486,7 +487,7 @@
 
     public void closeConnection() throws AMQException
     {
-        _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+        
_protocolSession.getStateManager().changeState(AMQState.CONNECTION_CLOSING);
 
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
@@ -556,12 +557,17 @@
 
     public AMQStateManager getStateManager()
     {
-        return _stateManager;
+        return _protocolSession.getStateManager();
     }
 
     public void setStateManager(AMQStateManager stateManager)
     {
-        _stateManager = stateManager;
+        _protocolSession.setStateManager(stateManager);
+    }
+    
+    public AMQProtocolSession getProtocolSession()
+    {
+        return _protocolSession;
     }
 
     FailoverState getFailoverState()

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
 Fri Jan 12 14:02:11 2007
@@ -33,6 +33,7 @@
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQProtocolWriter;
+import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.commons.lang.StringUtils;
 
 import javax.jms.JMSException;
@@ -63,6 +64,8 @@
 
     protected final IoSession _minaProtocolSession;
 
+    private AMQStateManager _stateManager;
+
     protected WriteFuture _lastWriteFuture;
 
     /**
@@ -98,6 +101,7 @@
     {
         _protocolHandler = null;
         _minaProtocolSession = null;
+        _stateManager = new AMQStateManager(this);
     }
 
     public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession 
protocolSession, AMQConnection connection)
@@ -106,6 +110,7 @@
         _minaProtocolSession = protocolSession;
         // properties of the connection are made available to the event 
handlers
         _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
+        _stateManager = new AMQStateManager(this);
     }
 
     public void init()
@@ -135,6 +140,16 @@
     public void setClientID(String clientID) throws JMSException
     {
         getAMQConnection().setClientID(clientID);
+    }
+    
+    public AMQStateManager getStateManager()
+    {
+        return _stateManager;
+    }
+    
+    public void setStateManager(AMQStateManager stateManager)
+    {
+        _stateManager = stateManager;
     }
 
     public String getVirtualHost()

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
 Fri Jan 12 14:02:11 2007
@@ -23,6 +23,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 
 public abstract class BlockingMethodFrameListener implements AMQMethodListener
@@ -55,7 +56,7 @@
      * @return true if the listener has dealt with this frame
      * @throws AMQException
      */
-    public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession 
protocolSession) throws AMQException
+    public boolean methodReceived(AMQMethodEvent evt) throws AMQException
     {
         AMQMethodBody method = evt.getMethod();
 

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
 Fri Jan 12 14:02:11 2007
@@ -23,8 +23,8 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.handler.*;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.protocol.AMQMethodListener;
 import org.apache.qpid.framing.*;
 import org.apache.log4j.Logger;
 
@@ -41,6 +41,7 @@
 public class AMQStateManager implements AMQMethodListener
 {
     private static final Logger _logger = 
Logger.getLogger(AMQStateManager.class);
+    private final AMQProtocolSession _protocolSession;
 
     /**
      * The current state
@@ -55,13 +56,14 @@
 
     private final CopyOnWriteArraySet _stateListeners = new 
CopyOnWriteArraySet();
 
-    public AMQStateManager()
+    public AMQStateManager(AMQProtocolSession protocolSession)
     {
-        this(AMQState.CONNECTION_NOT_STARTED, true);
+        this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession);
     }
 
-    protected AMQStateManager(AMQState state, boolean register)
+    protected AMQStateManager(AMQState state, boolean register, 
AMQProtocolSession protocolSession)
     {
+        _protocolSession = protocolSession;
         _currentState = state;
         if(register)
         {
@@ -147,12 +149,12 @@
         }
     }
 
-    public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession 
protocolSession) throws AMQException
+    public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> 
evt) throws AMQException
     {
         StateAwareMethodListener handler = 
findStateTransitionHandler(_currentState, evt.getMethod());
         if (handler != null)
         {
-            handler.methodReceived(this, protocolSession, evt);
+            handler.methodReceived(this, _protocolSession, evt);
             return true;
         }
         return false;

Modified: 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
 Fri Jan 12 14:02:11 2007
@@ -52,7 +52,7 @@
     public void handle(int channel, AMQMethodBody method) throws AMQException
     {
         AMQMethodEvent evt = new AMQMethodEvent(channel, method);
-        _stateMgr.methodReceived(evt, _session);
+        _stateMgr.methodReceived(evt);
     }
 
     private class SessionAdapter extends AMQProtocolSession

Modified: 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
 Fri Jan 12 14:02:11 2007
@@ -29,6 +29,7 @@
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.IllegalStateTransitionException;
 import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.framing.*;
 
 import java.util.HashMap;
@@ -43,9 +44,9 @@
     private final Map<AMQState, ClientRegistry> _handlers = new 
HashMap<AMQState, ClientRegistry>();
     private final MemberHandle _identity;
 
-    protected ClientHandlerRegistry(MemberHandle local)
+    protected ClientHandlerRegistry(MemberHandle local, AMQProtocolSession 
protocolSession)
     {
-        super(AMQState.CONNECTION_NOT_STARTED, false);
+        super(AMQState.CONNECTION_NOT_STARTED, false, protocolSession);
 
         _identity = local;
 

Modified: 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
 Fri Jan 12 14:02:11 2007
@@ -46,7 +46,7 @@
 
     ServerHandlerRegistry getHandlerRegistry()
     {
-        return new ServerHandlerRegistry(getHandlerFactory());
+        return new ServerHandlerRegistry(getHandlerFactory(), null, null, 
null);
     }
 
     private MethodHandlerFactory getHandlerFactory()

Modified: 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
 Fri Jan 12 14:02:11 2007
@@ -33,6 +33,7 @@
 import org.apache.qpid.framing.ClusterMembershipBody;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -73,9 +74,9 @@
         _handlers = handler._handlers;
     }
 
-    protected void createSession(IoSession session, QueueRegistry queues, 
ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException
+    protected void createSession(IoSession session, QueueRegistry queues, 
ExchangeRegistry exchanges, AMQProtocolSession protocolSession, AMQCodecFactory 
codec) throws AMQException
     {
-        new ClusteredProtocolSession(session, queues, exchanges, codec, new 
ServerHandlerRegistry(_handlers));
+        new ClusteredProtocolSession(session, queues, exchanges, codec, new 
ServerHandlerRegistry(_handlers, queues, exchanges, protocolSession));
     }
 
     void connect(String join) throws Exception

Modified: 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
 Fri Jan 12 14:02:11 2007
@@ -37,10 +37,12 @@
 {
     private MemberHandle _peer;
 
-    public ClusteredProtocolSession(IoSession session, QueueRegistry 
queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, 
AMQStateManager stateManager)
-            throws AMQException
+    public ClusteredProtocolSession(IoSession session, QueueRegistry 
queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, 
AMQStateManager stateManager) throws AMQException
+//    public ClusteredProtocolSession(IoSession session, QueueRegistry 
queueRegistry,
+//        ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory) 
throws AMQException
     {
         super(session, queueRegistry, exchangeRegistry, codecFactory, 
stateManager);
+//        super(session, queueRegistry, exchangeRegistry, codecFactory);
     }
 
     public boolean isPeerSession()

Modified: 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
 Fri Jan 12 14:02:11 2007
@@ -63,7 +63,7 @@
     {
         super(host, port);
         _local = local;
-        _legacyHandler = new ClientHandlerRegistry(local);
+        _legacyHandler = new ClientHandlerRegistry(local, null);
     }
 
     private void init(IoSession session)

Modified: 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
 Fri Jan 12 14:02:11 2007
@@ -27,6 +27,9 @@
 import org.apache.qpid.server.state.IllegalStateTransitionException;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -40,20 +43,23 @@
     private final Logger _logger = 
Logger.getLogger(ServerHandlerRegistry.class);
     private final Map<AMQState, MethodHandlerRegistry> _handlers = new 
HashMap<AMQState, MethodHandlerRegistry>();
 
-    ServerHandlerRegistry()
+    ServerHandlerRegistry(QueueRegistry queueRegistry, ExchangeRegistry 
exchangeRegistry,
+        AMQProtocolSession protocolSession)
     {
-        super(AMQState.CONNECTION_NOT_STARTED, false);
+        super(AMQState.CONNECTION_NOT_STARTED, false, queueRegistry, 
exchangeRegistry, protocolSession);
     }
 
-    ServerHandlerRegistry(ServerHandlerRegistry s)
+    ServerHandlerRegistry(ServerHandlerRegistry s, QueueRegistry queueRegistry,
+        ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
     {
-        this();
+        this(queueRegistry, exchangeRegistry, protocolSession);
         _handlers.putAll(s._handlers);
     }
 
-    ServerHandlerRegistry(MethodHandlerFactory factory)
+    ServerHandlerRegistry(MethodHandlerFactory factory, QueueRegistry 
queueRegistry,
+        ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
     {
-        this();
+        this(queueRegistry, exchangeRegistry, protocolSession);
         init(factory);
     }
 

Copied: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
 (from r495583, 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java)
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java?view=diff&rev=495754&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java&r1=495583&p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java&r2=495754
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
 Fri Jan 12 14:02:11 2007
@@ -18,12 +18,8 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.protocol;
+package org.apache.qpid.protocol;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.framing.AMQMethodBody;
 
 /**
@@ -43,14 +39,11 @@
      * to all registered listeners using the error() method (see below) 
allowing them to
      * perform cleanup if necessary.
      */
-    <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt,
-                           AMQProtocolSession protocolSession,
-                           QueueRegistry queueRegistry,
-                           ExchangeRegistry exchangeRegistry) throws 
AMQException;
+    <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) 
throws Exception;
 
     /**
      * Callback when an error has occurred. Allows listeners to clean up.
      * @param e
      */
-    void error(AMQException e);
+    void error(Exception e);
 }


Reply via email to