Author: kpvdr
Date: Fri Jan 12 14:02:11 2007
New Revision: 495754
URL: http://svn.apache.org/viewvc?view=rev&rev=495754
Log:
Created common AMQMethodListener class, allowing the Request and Response
managers to use a common interface to dispatch events to both the client and
servers. Refactoring of bothe the client and broker AMQStateManagers and
AMQProtocolSession classes was performed. The refactoring has run aground in
the clustering, however, and this still needs to be resolved. As the cluster
tests are currently disabled (by whom, I'm not sure), this does not disrupt the
overall test result. JIRAs will be opened for this issue.
Added:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
- copied, changed from r495583,
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java
Removed:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
Fri Jan 12 14:02:11 2007
@@ -29,6 +29,7 @@
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
@@ -99,10 +100,19 @@
AMQCodecFactory codecFactory)
throws AMQException
{
- this(session, queueRegistry, exchangeRegistry, codecFactory, new
AMQStateManager());
+ _stateManager = new AMQStateManager(queueRegistry, exchangeRegistry,
this);
+ _minaProtocolSession = session;
+ session.setAttachment(this);
+
+ _queueRegistry = queueRegistry;
+ _exchangeRegistry = exchangeRegistry;
+ _codecFactory = codecFactory;
+ _managedObject = createMBean();
+ _managedObject.register();
+// this(session, queueRegistry, exchangeRegistry, codecFactory, new
AMQStateManager());
}
- public AMQMinaProtocolSession(IoSession session, QueueRegistry
queueRegistry, ExchangeRegistry exchangeRegistry,
+ public AMQMinaProtocolSession(IoSession session, QueueRegistry
queueRegistry, ExchangeRegistry exchangeRegistry,
AMQCodecFactory codecFactory,
AMQStateManager stateManager)
throws AMQException
{
@@ -208,13 +218,13 @@
(AMQMethodBody) frame.bodyFrame);
try
{
- boolean wasAnyoneInterested = _stateManager.methodReceived(evt,
this, _queueRegistry, _exchangeRegistry);
+ boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
if(!_frameListeners.isEmpty())
{
for (AMQMethodListener listener : _frameListeners)
{
- wasAnyoneInterested = listener.methodReceived(evt, this,
_queueRegistry, _exchangeRegistry) ||
+ wasAnyoneInterested = listener.methodReceived(evt) ||
wasAnyoneInterested;
}
}
@@ -233,7 +243,7 @@
_logger.error("Closing connection due to: " + e.getMessage());
writeFrame(e.getCloseFrame(frame.channel));
}
- catch (AMQException e)
+ catch (Exception e)
{
_stateManager.error(e);
for (AMQMethodListener listener : _frameListeners)
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
Fri Jan 12 14:02:11 2007
@@ -25,7 +25,7 @@
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.handler.*;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.log4j.Logger;
@@ -43,7 +43,9 @@
public class AMQStateManager implements AMQMethodListener
{
private static final Logger _logger =
Logger.getLogger(AMQStateManager.class);
-
+ private final QueueRegistry _queueRegistry;
+ private final ExchangeRegistry _exchangeRegistry;
+ private final AMQProtocolSession _protocolSession;
/**
* The current state
*/
@@ -58,13 +60,16 @@
private CopyOnWriteArraySet<StateListener> _stateListeners = new
CopyOnWriteArraySet<StateListener>();
- public AMQStateManager()
+ public AMQStateManager(QueueRegistry queueRegistry, ExchangeRegistry
exchangeRegistry, AMQProtocolSession protocolSession)
{
- this(AMQState.CONNECTION_NOT_STARTED, true);
+ this(AMQState.CONNECTION_NOT_STARTED, true, queueRegistry,
exchangeRegistry, protocolSession);
}
- protected AMQStateManager(AMQState initial, boolean register)
+ protected AMQStateManager(AMQState initial, boolean register,
QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
AMQProtocolSession protocolSession)
{
+ _queueRegistry = queueRegistry;
+ _exchangeRegistry = exchangeRegistry;
+ _protocolSession = protocolSession;
_currentState = initial;
if (register)
{
@@ -149,7 +154,7 @@
}
}
- public void error(AMQException e)
+ public void error(Exception e)
{
_logger.error("State manager received error notification: " + e, e);
for (StateListener l : _stateListeners)
@@ -158,15 +163,12 @@
}
}
- public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B>
evt,
- AMQProtocolSession protocolSession,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry) throws
AMQException
+ public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B>
evt) throws AMQException
{
StateAwareMethodListener<B> handler =
findStateTransitionHandler(_currentState, evt.getMethod());
if (handler != null)
{
- handler.methodReceived(this, queueRegistry, exchangeRegistry,
protocolSession, evt);
+ handler.methodReceived(this, _queueRegistry, _exchangeRegistry,
_protocolSession, evt);
return true;
}
return false;
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
Fri Jan 12 14:02:11 2007
@@ -89,7 +89,7 @@
// have a state waiter waiting until the connection is closed for
some reason. Or in future we may have
// a slightly more complex state model therefore I felt it was
worthwhile doing this.
AMQStateManager existingStateManager =
_amqProtocolHandler.getStateManager();
- _amqProtocolHandler.setStateManager(new AMQStateManager());
+ _amqProtocolHandler.setStateManager(new
AMQStateManager(_amqProtocolHandler.getProtocolSession()));
if (!_amqProtocolHandler.getConnection().firePreFailover(_host !=
null))
{
_amqProtocolHandler.setStateManager(existingStateManager);
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
Fri Jan 12 14:02:11 2007
@@ -40,6 +40,7 @@
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.ssl.BogusSSLContextFactory;
import java.util.Iterator;
@@ -68,7 +69,7 @@
*/
private volatile AMQProtocolSession _protocolSession;
- private AMQStateManager _stateManager = new AMQStateManager();
+// private AMQStateManager _stateManager = new AMQStateManager();
private final CopyOnWriteArraySet _frameListeners = new
CopyOnWriteArraySet();
@@ -277,7 +278,7 @@
*/
public void propagateExceptionToWaiters(Exception e)
{
- _stateManager.error(e);
+ _protocolSession.getStateManager().error(e);
if(!_frameListeners.isEmpty())
{
final Iterator it = _frameListeners.iterator();
@@ -316,14 +317,14 @@
try
{
- boolean wasAnyoneInterested =
_stateManager.methodReceived(evt, _protocolSession);
+ boolean wasAnyoneInterested =
_protocolSession.getStateManager().methodReceived(evt);
if(!_frameListeners.isEmpty())
{
Iterator it = _frameListeners.iterator();
while (it.hasNext())
{
final AMQMethodListener listener = (AMQMethodListener)
it.next();
- wasAnyoneInterested = listener.methodReceived(evt,
_protocolSession) || wasAnyoneInterested;
+ wasAnyoneInterested = listener.methodReceived(evt) ||
wasAnyoneInterested;
}
}
if (!wasAnyoneInterested)
@@ -333,7 +334,7 @@
}
catch (AMQException e)
{
- _stateManager.error(e);
+ _protocolSession.getStateManager().error(e);
if(!_frameListeners.isEmpty())
{
Iterator it = _frameListeners.iterator();
@@ -394,7 +395,7 @@
*/
public void attainState(AMQState s) throws AMQException
{
- _stateManager.attainState(s);
+ _protocolSession.getStateManager().attainState(s);
}
/**
@@ -486,7 +487,7 @@
public void closeConnection() throws AMQException
{
- _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+
_protocolSession.getStateManager().changeState(AMQState.CONNECTION_CLOSING);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
@@ -556,12 +557,17 @@
public AMQStateManager getStateManager()
{
- return _stateManager;
+ return _protocolSession.getStateManager();
}
public void setStateManager(AMQStateManager stateManager)
{
- _stateManager = stateManager;
+ _protocolSession.setStateManager(stateManager);
+ }
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
}
FailoverState getFailoverState()
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
Fri Jan 12 14:02:11 2007
@@ -33,6 +33,7 @@
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQProtocolWriter;
+import org.apache.qpid.client.state.AMQStateManager;
import org.apache.commons.lang.StringUtils;
import javax.jms.JMSException;
@@ -63,6 +64,8 @@
protected final IoSession _minaProtocolSession;
+ private AMQStateManager _stateManager;
+
protected WriteFuture _lastWriteFuture;
/**
@@ -98,6 +101,7 @@
{
_protocolHandler = null;
_minaProtocolSession = null;
+ _stateManager = new AMQStateManager(this);
}
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession
protocolSession, AMQConnection connection)
@@ -106,6 +110,7 @@
_minaProtocolSession = protocolSession;
// properties of the connection are made available to the event
handlers
_minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
+ _stateManager = new AMQStateManager(this);
}
public void init()
@@ -135,6 +140,16 @@
public void setClientID(String clientID) throws JMSException
{
getAMQConnection().setClientID(clientID);
+ }
+
+ public AMQStateManager getStateManager()
+ {
+ return _stateManager;
+ }
+
+ public void setStateManager(AMQStateManager stateManager)
+ {
+ _stateManager = stateManager;
}
public String getVirtualHost()
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
Fri Jan 12 14:02:11 2007
@@ -23,6 +23,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.client.protocol.AMQProtocolSession;
public abstract class BlockingMethodFrameListener implements AMQMethodListener
@@ -55,7 +56,7 @@
* @return true if the listener has dealt with this frame
* @throws AMQException
*/
- public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession
protocolSession) throws AMQException
+ public boolean methodReceived(AMQMethodEvent evt) throws AMQException
{
AMQMethodBody method = evt.getMethod();
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
Fri Jan 12 14:02:11 2007
@@ -23,8 +23,8 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.client.handler.*;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.protocol.AMQMethodListener;
import org.apache.qpid.framing.*;
import org.apache.log4j.Logger;
@@ -41,6 +41,7 @@
public class AMQStateManager implements AMQMethodListener
{
private static final Logger _logger =
Logger.getLogger(AMQStateManager.class);
+ private final AMQProtocolSession _protocolSession;
/**
* The current state
@@ -55,13 +56,14 @@
private final CopyOnWriteArraySet _stateListeners = new
CopyOnWriteArraySet();
- public AMQStateManager()
+ public AMQStateManager(AMQProtocolSession protocolSession)
{
- this(AMQState.CONNECTION_NOT_STARTED, true);
+ this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession);
}
- protected AMQStateManager(AMQState state, boolean register)
+ protected AMQStateManager(AMQState state, boolean register,
AMQProtocolSession protocolSession)
{
+ _protocolSession = protocolSession;
_currentState = state;
if(register)
{
@@ -147,12 +149,12 @@
}
}
- public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession
protocolSession) throws AMQException
+ public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B>
evt) throws AMQException
{
StateAwareMethodListener handler =
findStateTransitionHandler(_currentState, evt.getMethod());
if (handler != null)
{
- handler.methodReceived(this, protocolSession, evt);
+ handler.methodReceived(this, _protocolSession, evt);
return true;
}
return false;
Modified:
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
---
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
(original)
+++
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
Fri Jan 12 14:02:11 2007
@@ -52,7 +52,7 @@
public void handle(int channel, AMQMethodBody method) throws AMQException
{
AMQMethodEvent evt = new AMQMethodEvent(channel, method);
- _stateMgr.methodReceived(evt, _session);
+ _stateMgr.methodReceived(evt);
}
private class SessionAdapter extends AMQProtocolSession
Modified:
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
---
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
(original)
+++
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
Fri Jan 12 14:02:11 2007
@@ -29,6 +29,7 @@
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.IllegalStateTransitionException;
import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.framing.*;
import java.util.HashMap;
@@ -43,9 +44,9 @@
private final Map<AMQState, ClientRegistry> _handlers = new
HashMap<AMQState, ClientRegistry>();
private final MemberHandle _identity;
- protected ClientHandlerRegistry(MemberHandle local)
+ protected ClientHandlerRegistry(MemberHandle local, AMQProtocolSession
protocolSession)
{
- super(AMQState.CONNECTION_NOT_STARTED, false);
+ super(AMQState.CONNECTION_NOT_STARTED, false, protocolSession);
_identity = local;
Modified:
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
---
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
(original)
+++
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
Fri Jan 12 14:02:11 2007
@@ -46,7 +46,7 @@
ServerHandlerRegistry getHandlerRegistry()
{
- return new ServerHandlerRegistry(getHandlerFactory());
+ return new ServerHandlerRegistry(getHandlerFactory(), null, null,
null);
}
private MethodHandlerFactory getHandlerFactory()
Modified:
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
---
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
Fri Jan 12 14:02:11 2007
@@ -33,6 +33,7 @@
import org.apache.qpid.framing.ClusterMembershipBody;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -73,9 +74,9 @@
_handlers = handler._handlers;
}
- protected void createSession(IoSession session, QueueRegistry queues,
ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException
+ protected void createSession(IoSession session, QueueRegistry queues,
ExchangeRegistry exchanges, AMQProtocolSession protocolSession, AMQCodecFactory
codec) throws AMQException
{
- new ClusteredProtocolSession(session, queues, exchanges, codec, new
ServerHandlerRegistry(_handlers));
+ new ClusteredProtocolSession(session, queues, exchanges, codec, new
ServerHandlerRegistry(_handlers, queues, exchanges, protocolSession));
}
void connect(String join) throws Exception
Modified:
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
---
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
Fri Jan 12 14:02:11 2007
@@ -37,10 +37,12 @@
{
private MemberHandle _peer;
- public ClusteredProtocolSession(IoSession session, QueueRegistry
queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory,
AMQStateManager stateManager)
- throws AMQException
+ public ClusteredProtocolSession(IoSession session, QueueRegistry
queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory,
AMQStateManager stateManager) throws AMQException
+// public ClusteredProtocolSession(IoSession session, QueueRegistry
queueRegistry,
+// ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory)
throws AMQException
{
super(session, queueRegistry, exchangeRegistry, codecFactory,
stateManager);
+// super(session, queueRegistry, exchangeRegistry, codecFactory);
}
public boolean isPeerSession()
Modified:
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
---
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
(original)
+++
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
Fri Jan 12 14:02:11 2007
@@ -63,7 +63,7 @@
{
super(host, port);
_local = local;
- _legacyHandler = new ClientHandlerRegistry(local);
+ _legacyHandler = new ClientHandlerRegistry(local, null);
}
private void init(IoSession session)
Modified:
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
---
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
(original)
+++
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
Fri Jan 12 14:02:11 2007
@@ -27,6 +27,9 @@
import org.apache.qpid.server.state.IllegalStateTransitionException;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import java.util.HashMap;
import java.util.Map;
@@ -40,20 +43,23 @@
private final Logger _logger =
Logger.getLogger(ServerHandlerRegistry.class);
private final Map<AMQState, MethodHandlerRegistry> _handlers = new
HashMap<AMQState, MethodHandlerRegistry>();
- ServerHandlerRegistry()
+ ServerHandlerRegistry(QueueRegistry queueRegistry, ExchangeRegistry
exchangeRegistry,
+ AMQProtocolSession protocolSession)
{
- super(AMQState.CONNECTION_NOT_STARTED, false);
+ super(AMQState.CONNECTION_NOT_STARTED, false, queueRegistry,
exchangeRegistry, protocolSession);
}
- ServerHandlerRegistry(ServerHandlerRegistry s)
+ ServerHandlerRegistry(ServerHandlerRegistry s, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
{
- this();
+ this(queueRegistry, exchangeRegistry, protocolSession);
_handlers.putAll(s._handlers);
}
- ServerHandlerRegistry(MethodHandlerFactory factory)
+ ServerHandlerRegistry(MethodHandlerFactory factory, QueueRegistry
queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
{
- this();
+ this(queueRegistry, exchangeRegistry, protocolSession);
init(factory);
}
Copied:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
(from r495583,
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java)
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java?view=diff&rev=495754&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java&r1=495583&p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java&r2=495754
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
Fri Jan 12 14:02:11 2007
@@ -18,12 +18,8 @@
* under the License.
*
*/
-package org.apache.qpid.server.protocol;
+package org.apache.qpid.protocol;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.framing.AMQMethodBody;
/**
@@ -43,14 +39,11 @@
* to all registered listeners using the error() method (see below)
allowing them to
* perform cleanup if necessary.
*/
- <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt,
- AMQProtocolSession protocolSession,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry) throws
AMQException;
+ <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt)
throws Exception;
/**
* Callback when an error has occurred. Allows listeners to clean up.
* @param e
*/
- void error(AMQException e);
+ void error(Exception e);
}