Author: kpvdr
Date: Thu Jan 18 06:28:25 2007
New Revision: 497445

URL: http://svn.apache.org/viewvc?view=rev&rev=497445
Log:
Cleared all the cluster compile errors. This now opens the way to testing...

Modified:
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
    
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
    
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
    
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java
    
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
    
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
    
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
    
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
    
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
    
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
    
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
    
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
    
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
    
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
    
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
    
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=497445&r1=497444&r2=497445
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 Thu Jan 18 06:28:25 2007
@@ -509,6 +509,11 @@
     {
         _clientProperties = clientProperties;
     }
+    
+    public AMQStateManager getStateManager()
+    {
+        return _stateManager;
+    }
 
     /**
      * Convenience methods for managing AMQP version.

Modified: 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java?view=diff&rev=497445&r1=497444&r2=497445
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
 Thu Jan 18 06:28:25 2007
@@ -49,9 +49,9 @@
         _stateMgr = stateMgr;
     }
 
-    public void handle(int channel, AMQMethodBody method) throws AMQException
+    public void handle(int channel, AMQMethodBody method, long requestId) 
throws AMQException
     {
-        AMQMethodEvent evt = new AMQMethodEvent(channel, method);
+        AMQMethodEvent evt = new AMQMethodEvent(channel, method, requestId);
         _stateMgr.methodReceived(evt);
     }
 

Modified: 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java?view=diff&rev=497445&r1=497444&r2=497445
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
 Thu Jan 18 06:28:25 2007
@@ -128,9 +128,9 @@
 
     class ConnectionTuneHandler extends ConnectionTuneMethodHandler
     {
-        protected AMQFrame createConnectionOpenFrame(int channel, String path, 
String capabilities, boolean insist)
+        protected AMQMethodBody createConnectionOpenMethodBody(String path, 
String capabilities, boolean insist)
         {
-            return super.createConnectionOpenFrame(channel, path, 
ClusterCapability.add(capabilities, _identity), insist);
+            return super.createConnectionOpenMethodBody(path, 
ClusterCapability.add(capabilities, _identity), insist);
         }
     }
 }

Modified: 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java?view=diff&rev=497445&r1=497444&r2=497445
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
 Thu Jan 18 06:28:25 2007
@@ -32,6 +32,8 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.AMQProtocolWriter;
 
 public class ClusteredProtocolSession extends AMQMinaProtocolSession
 {
@@ -64,7 +66,7 @@
         AMQChannel channel = super.getChannel(channelId);
         if (isPeerSession() && channel == null)
         {
-            channel = new OneUseChannel(channelId);
+            channel = new OneUseChannel(channelId, this, getStateManager());
             addChannel(channel);
         }
         return channel;
@@ -100,25 +102,29 @@
      */
     private class OneUseChannel extends AMQChannel
     {
-        public OneUseChannel(int channelId)
+        public OneUseChannel(int channelId, AMQProtocolWriter protocolWriter,
+            AMQMethodListener methodListener)
             throws AMQException
         {
-            this(channelId, ApplicationRegistry.getInstance());
+            this(channelId, ApplicationRegistry.getInstance(), protocolWriter, 
methodListener);
         }
 
-        public OneUseChannel(int channelId, IApplicationRegistry registry)
+        public OneUseChannel(int channelId, IApplicationRegistry registry,
+            AMQProtocolWriter protocolWriter, AMQMethodListener methodListener)
             throws AMQException
         {
             super(channelId,
                   registry.getMessageStore(),
-                  registry.getExchangeRegistry());
+                  registry.getExchangeRegistry(),
+                  protocolWriter,
+                  methodListener);
         }
 
-        protected void routeCurrentMessage() throws AMQException
-        {
-            super.routeCurrentMessage();
-            removeChannel(getChannelId());
-        }
+//         protected void routeCurrentMessage() throws AMQException
+//         {
+//             super.routeCurrentMessage();
+//             removeChannel(getChannelId());
+//         }
     }
 
     public static boolean isPayloadFromPeer(AMQMessage payload)

Modified: 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java?view=diff&rev=497445&r1=497444&r2=497445
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java
 Thu Jan 18 06:28:25 2007
@@ -25,5 +25,5 @@
 
 interface MethodHandler
 {
-    public void handle(int channel, AMQMethodBody method) throws AMQException;
+    public void handle(int channel, AMQMethodBody method, long requestId) 
throws AMQException;
 }

Modified: 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java?view=diff&rev=497445&r1=497444&r2=497445
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
 Thu Jan 18 06:28:25 2007
@@ -37,6 +37,8 @@
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQRequestBody;
+import org.apache.qpid.framing.AMQResponseBody;
 import org.apache.qpid.framing.ConnectionRedirectBody;
 import org.apache.qpid.framing.ProtocolInitiation;
 import org.apache.qpid.framing.ProtocolVersionList;
@@ -166,7 +168,7 @@
         }
     }
 
-    public void handle(int channel, AMQMethodBody method) throws AMQException
+    public void handle(int channel, AMQMethodBody method, long requestId) 
throws AMQException
     {
         _logger.info(new LogMessage("Handling method: {0} for channel {1}", 
method, channel));
         if (!handleResponse(channel, method))
@@ -175,7 +177,7 @@
         }
     }
 
-    private void handleMethod(int channel, AMQMethodBody method) throws 
AMQException
+    private void handleMethod(int channel, AMQMethodBody method, long 
requestId) throws AMQException
     {
         if (method instanceof ConnectionRedirectBody)
         {
@@ -186,7 +188,7 @@
         }
         else
         {
-            _handler.handle(channel, method);
+            _handler.handle(channel, method, requestId);
             if 
(AMQState.CONNECTION_OPEN.equals(_legacyHandler.getCurrentState()) && _handler 
!= this)
             {
                 _handler = this;
@@ -202,9 +204,15 @@
     private void handleFrame(AMQFrame frame) throws AMQException
     {
         AMQBody body = frame.bodyFrame;
-        if (body instanceof AMQMethodBody)
+        if (body instanceof AMQRequestBody)
         {
-            handleMethod(frame.channel, (AMQMethodBody) body);
+            handleMethod(frame.channel, 
((AMQRequestBody)body).getMethodPayload(),
+                ((AMQRequestBody)body).getRequestId());
+        }
+        else if (body instanceof AMQResponseBody)
+        {
+            handleMethod(frame.channel, 
((AMQResponseBody)body).getMethodPayload(),
+                ((AMQRequestBody)body).getRequestId());
         }
         else
         {

Modified: 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java?view=diff&rev=497445&r1=497444&r2=497445
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
 Thu Jan 18 06:28:25 2007
@@ -44,20 +44,20 @@
     private final Map<AMQState, MethodHandlerRegistry> _handlers = new 
HashMap<AMQState, MethodHandlerRegistry>();
 
     ServerHandlerRegistry(QueueRegistry queueRegistry, ExchangeRegistry 
exchangeRegistry,
-+        AMQProtocolSession protocolSession)
+        AMQProtocolSession protocolSession)
     {
         super(AMQState.CONNECTION_NOT_STARTED, false, queueRegistry, 
exchangeRegistry, protocolSession);
     }
 
     ServerHandlerRegistry(ServerHandlerRegistry s, QueueRegistry queueRegistry,
-+        ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
+        ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
     {
         this(queueRegistry, exchangeRegistry, protocolSession);
         _handlers.putAll(s._handlers);
     }
 
     ServerHandlerRegistry(MethodHandlerFactory factory, QueueRegistry 
queueRegistry,
-+        ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
+        ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
     {
         this(queueRegistry, exchangeRegistry, protocolSession);
         init(factory);

Modified: 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java?view=diff&rev=497445&r1=497444&r2=497445
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
 Thu Jan 18 06:28:25 2007
@@ -32,7 +32,7 @@
 import org.apache.qpid.framing.QueueDeclareBody;
 import org.apache.qpid.framing.QueueBindBody;
 import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.MessageConsumeBody;
 import org.apache.log4j.Logger;
 
 import java.util.Map;
@@ -63,9 +63,9 @@
         return new QueueBindHandler();
     }
 
-    ClusterMethodHandler<BasicConsumeBody> createBasicConsumeHandler()
+    ClusterMethodHandler<MessageConsumeBody> createBasicConsumeHandler()
     {
-        return new BasicConsumeHandler();
+        return new MessageConsumeHandler();
     }
 
     private void set(int channel, String queue)
@@ -121,13 +121,13 @@
         }
     }
 
-    private class BasicConsumeHandler extends 
ClusterMethodHandler<BasicConsumeBody>
+    private class MessageConsumeHandler extends 
ClusterMethodHandler<MessageConsumeBody>
     {
-        protected void peer(AMQStateManager stateMgr, QueueRegistry queues, 
ExchangeRegistry exchanges, AMQProtocolSession session, 
AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+        protected void peer(AMQStateManager stateMgr, QueueRegistry queues, 
ExchangeRegistry exchanges, AMQProtocolSession session, 
AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
         {
         }
 
-        protected void client(AMQStateManager stateMgr, QueueRegistry queues, 
ExchangeRegistry exchanges, AMQProtocolSession session, 
AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+        protected void client(AMQStateManager stateMgr, QueueRegistry queues, 
ExchangeRegistry exchanges, AMQProtocolSession session, 
AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
         {
             if(evt.getMethod().queue == null)
             {

Modified: 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java?view=diff&rev=497445&r1=497444&r2=497445
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
 Thu Jan 18 06:28:25 2007
@@ -23,9 +23,9 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageConsumeBody;
+import org.apache.qpid.framing.MessageTransferBody;
 import org.apache.qpid.framing.ChannelCloseBody;
 import org.apache.qpid.framing.ChannelFlowBody;
 import org.apache.qpid.framing.ChannelOpenBody;
@@ -45,7 +45,7 @@
 import org.apache.qpid.framing.QueueDeclareBody;
 import org.apache.qpid.framing.QueueDeleteBody;
 import org.apache.qpid.framing.ClusterSynchBody;
-import org.apache.qpid.framing.BasicQosBody;
+import org.apache.qpid.framing.MessageQosBody;
 import org.apache.qpid.framing.TxSelectBody;
 import org.apache.qpid.framing.TxCommitBody;
 import org.apache.qpid.framing.TxRollbackBody;
@@ -69,11 +69,11 @@
 import org.apache.qpid.server.handler.ConnectionTuneOkMethodHandler;
 import org.apache.qpid.server.handler.ExchangeDeclareHandler;
 import org.apache.qpid.server.handler.ExchangeDeleteHandler;
-import org.apache.qpid.server.handler.BasicCancelMethodHandler;
-import org.apache.qpid.server.handler.BasicPublishMethodHandler;
+import org.apache.qpid.server.handler.MessageCancelHandler;
+import org.apache.qpid.server.handler.MessageTransferHandler;
 import org.apache.qpid.server.handler.QueueBindHandler;
 import org.apache.qpid.server.handler.QueueDeleteHandler;
-import org.apache.qpid.server.handler.BasicQosHandler;
+import org.apache.qpid.server.handler.MessageQosHandler;
 import org.apache.qpid.server.handler.TxSelectHandler;
 import org.apache.qpid.server.handler.TxCommitHandler;
 import org.apache.qpid.server.handler.TxRollbackHandler;
@@ -141,14 +141,14 @@
 
         registry.addHandler(QueueBindBody.class, 
chain(channelQueueMgr.createQueueBindHandler(), 
replicated(QueueBindHandler.getInstance())));
         registry.addHandler(QueueDeleteBody.class, 
chain(channelQueueMgr.createQueueDeleteHandler(), replicated(alternate(new 
QueueDeleteHandler(false), new QueueDeleteHandler(true)))));
-        registry.addHandler(BasicConsumeBody.class, 
chain(channelQueueMgr.createBasicConsumeHandler(), new 
ReplicatingConsumeHandler(_groupMgr)));
+        registry.addHandler(MessageConsumeBody.class, 
chain(channelQueueMgr.createBasicConsumeHandler(), new 
ReplicatingConsumeHandler(_groupMgr)));
 
         //other modified handlers:
-        registry.addHandler(BasicCancelBody.class, alternate(new 
RemoteCancelHandler(), BasicCancelMethodHandler.getInstance()));
+        registry.addHandler(MessageCancelBody.class, alternate(new 
RemoteCancelHandler(), MessageCancelHandler.getInstance()));
 
         //other unaffected handlers:
-        registry.addHandler(BasicPublishBody.class, 
BasicPublishMethodHandler.getInstance());
-        registry.addHandler(BasicQosBody.class, BasicQosHandler.getInstance());
+        registry.addHandler(MessageTransferBody.class, 
MessageTransferHandler.getInstance());
+        registry.addHandler(MessageQosBody.class, 
MessageQosHandler.getInstance());
         registry.addHandler(ChannelOpenBody.class, 
ChannelOpenHandler.getInstance());
         registry.addHandler(ChannelCloseBody.class, 
ChannelCloseHandler.getInstance());
         registry.addHandler(ChannelFlowBody.class, 
ChannelFlowHandler.getInstance());

Modified: 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java?view=diff&rev=497445&r1=497444&r2=497445
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
 Thu Jan 18 06:28:25 2007
@@ -22,7 +22,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.framing.MessageCancelBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.cluster.ClusteredProtocolSession;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -33,14 +33,14 @@
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
-public class RemoteCancelHandler implements 
StateAwareMethodListener<BasicCancelBody>
+public class RemoteCancelHandler implements 
StateAwareMethodListener<MessageCancelBody>
 {
     private final Logger _logger = Logger.getLogger(RemoteCancelHandler.class);
 
-    public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, 
ExchangeRegistry exchanges, AMQProtocolSession session, 
AMQMethodEvent<BasicCancelBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, 
ExchangeRegistry exchanges, AMQProtocolSession session, 
AMQMethodEvent<MessageCancelBody> evt) throws AMQException
     {
         //By convention, consumers setup between brokers use the queue name as 
the consumer tag:
-        AMQQueue queue = queues.getQueue(evt.getMethod().consumerTag);
+        AMQQueue queue = queues.getQueue(evt.getMethod().getDestination());
         if (queue instanceof ClusteredQueue)
         {
             ((ClusteredQueue) 
queue).removeRemoteSubscriber(ClusteredProtocolSession.getSessionPeer(session));

Modified: 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java?view=diff&rev=497445&r1=497444&r2=497445
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
 Thu Jan 18 06:28:25 2007
@@ -22,8 +22,8 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicConsumeOkBody;
+import org.apache.qpid.framing.MessageConsumeBody;
+import org.apache.qpid.framing.MessageOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.cluster.ClusteredProtocolSession;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -38,11 +38,11 @@
  * Handles consume requests from other cluster members.
  *
  */
-public class RemoteConsumeHandler implements 
StateAwareMethodListener<BasicConsumeBody>
+public class RemoteConsumeHandler implements 
StateAwareMethodListener<MessageConsumeBody>
 {
     private final Logger _logger = 
Logger.getLogger(RemoteConsumeHandler.class);
 
-    public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, 
ExchangeRegistry exchanges, AMQProtocolSession session, 
AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, 
ExchangeRegistry exchanges, AMQProtocolSession session, 
AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
     {
         AMQQueue queue = queues.getQueue(evt.getMethod().queue);
         if (queue instanceof ClusteredQueue)
@@ -51,10 +51,8 @@
             // AMQP version change: Hardwire the version to 0-9 (major=0, 
minor=9)
             // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
             // Be aware of possible changes to parameter order as versions 
change.
-            
session.writeFrame(BasicConsumeOkBody.createAMQFrame(evt.getChannelId(),
-               (byte)0, (byte)9,       // AMQP version (major, minor)
-               evt.getMethod().queue   // consumerTag
-                ));
+            session.writeResponse(evt.getChannelId(), evt.getRequestId(),
+                MessageOkBody.createMethodBody((byte)0, (byte)9));
         }
         else
         {

Modified: 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java?view=diff&rev=497445&r1=497444&r2=497445
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
 Thu Jan 18 06:28:25 2007
@@ -21,20 +21,20 @@
 package org.apache.qpid.server.cluster.handler;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.MessageConsumeBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.cluster.BroadcastPolicy;
 import org.apache.qpid.server.cluster.GroupManager;
 import org.apache.qpid.server.cluster.util.LogMessage;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.handler.BasicConsumeMethodHandler;
+import org.apache.qpid.server.handler.MessageConsumeHandler;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
-public class ReplicatingConsumeHandler extends 
ReplicatingHandler<BasicConsumeBody>
+public class ReplicatingConsumeHandler extends 
ReplicatingHandler<MessageConsumeBody>
 {
     ReplicatingConsumeHandler(GroupManager groupMgr)
     {
@@ -46,7 +46,7 @@
         super(groupMgr, base(), policy);
     }
 
-    protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, 
ExchangeRegistry exchanges, AMQProtocolSession session, 
AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+    protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, 
ExchangeRegistry exchanges, AMQProtocolSession session, 
AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
     {
         //only replicate if the queue in question is a shared queue
         if (isShared(queues.getQueue(evt.getMethod().queue)))
@@ -67,18 +67,18 @@
         return queue != null && queue.isShared();
     }
 
-    static StateAwareMethodListener<BasicConsumeBody> base()
+    static StateAwareMethodListener<MessageConsumeBody> base()
     {
-        return new PeerHandler<BasicConsumeBody>(peer(), client());
+        return new PeerHandler<MessageConsumeBody>(peer(), client());
     }
 
-    static StateAwareMethodListener<BasicConsumeBody> peer()
+    static StateAwareMethodListener<MessageConsumeBody> peer()
     {
         return new RemoteConsumeHandler();
     }
 
-    static StateAwareMethodListener<BasicConsumeBody> client()
+    static StateAwareMethodListener<MessageConsumeBody> client()
     {
-        return BasicConsumeMethodHandler.getInstance();
+        return MessageConsumeHandler.getInstance();
     }
 }

Modified: 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java?view=diff&rev=497445&r1=497444&r2=497445
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
 Thu Jan 18 06:28:25 2007
@@ -21,7 +21,7 @@
 package org.apache.qpid.server.cluster.replay;
 
 import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.MessageConsumeBody;
 
 import java.util.Map;
 import java.util.HashMap;
@@ -36,7 +36,7 @@
         _counts.put(queue, get(queue) + 1);
     }
 
-   synchronized void decrement(String queue)
+    synchronized void decrement(String queue)
     {
         _counts.put(queue,  get(queue) - 1);
     }
@@ -53,14 +53,14 @@
         {
             // AMQP version change: Hardwire the version to 0-9 (major=0, 
minor=9)
             // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-            BasicConsumeBody m = new BasicConsumeBody((byte)0, (byte)9);
+            MessageConsumeBody m = new MessageConsumeBody((byte)0, (byte)9);
             m.queue = queue;
-            m.consumerTag = queue;
+            m.destination = queue;
             replay(m, messages);
         }
     }
 
-    private void replay(BasicConsumeBody msg, List<AMQMethodBody> messages)
+    private void replay(MessageConsumeBody msg, List<AMQMethodBody> messages)
     {
         int count = _counts.get(msg.queue);
         for(int i = 0; i < count; i++)

Modified: 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java?view=diff&rev=497445&r1=497444&r2=497445
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
 Thu Jan 18 06:28:25 2007
@@ -22,8 +22,8 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageConsumeBody;
 import org.apache.qpid.framing.ExchangeDeclareBody;
 import org.apache.qpid.framing.ExchangeDeleteBody;
 import org.apache.qpid.framing.QueueBindBody;
@@ -55,8 +55,8 @@
                     new FrameDescriptor(QueueBindBody.class, new 
QueueBindBody(major, minor)),
                     new FrameDescriptor(ExchangeDeclareBody.class, new 
ExchangeDeclareBody(major, minor)),
                     new FrameDescriptor(ExchangeDeleteBody.class, new 
ExchangeDeleteBody(major, minor)),
-                    new FrameDescriptor(BasicConsumeBody.class, new 
BasicConsumeBody(major, minor)),
-                    new FrameDescriptor(BasicCancelBody.class, new 
BasicCancelBody(major, minor))
+                    new FrameDescriptor(MessageConsumeBody.class, new 
MessageConsumeBody(major, minor)),
+                    new FrameDescriptor(MessageCancelBody.class, new 
MessageCancelBody(major, minor))
             });
 
 

Modified: 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java?view=diff&rev=497445&r1=497444&r2=497445
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
 Thu Jan 18 06:28:25 2007
@@ -29,8 +29,8 @@
 import org.apache.qpid.framing.QueueDeclareBody;
 import org.apache.qpid.framing.QueueDeleteBody;
 import org.apache.qpid.framing.ClusterSynchBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.framing.MessageConsumeBody;
+import org.apache.qpid.framing.MessageCancelBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.cluster.ClusteredProtocolSession;
 import org.apache.qpid.server.cluster.util.LogMessage;
@@ -75,8 +75,8 @@
         _localRecorders.put(QueueDeclareBody.class, new 
PrivateQueueDeclareRecorder());
         _localRecorders.put(QueueDeleteBody.class, new 
PrivateQueueDeleteRecorder());
         _localRecorders.put(QueueBindBody.class, new 
PrivateQueueBindRecorder());
-        _localRecorders.put(BasicConsumeBody.class, new 
BasicConsumeRecorder());
-        _localRecorders.put(BasicCancelBody.class, new BasicCancelRecorder());
+        _localRecorders.put(MessageConsumeBody.class, new 
BasicConsumeRecorder());
+        _localRecorders.put(MessageCancelBody.class, new 
BasicCancelRecorder());
         _localRecorders.put(ExchangeDeclareBody.class, new 
ExchangeDeclareRecorder());
         _localRecorders.put(ExchangeDeleteBody.class, new 
ExchangeDeleteRecorder());
     }
@@ -130,9 +130,9 @@
         return methods;
     }
 
-    private class BasicConsumeRecorder implements 
MethodRecorder<BasicConsumeBody>
+    private class BasicConsumeRecorder implements 
MethodRecorder<MessageConsumeBody>
     {
-        public void record(BasicConsumeBody method)
+        public void record(MessageConsumeBody method)
         {
             if(_sharedQueues.containsKey(method.queue))
             {
@@ -141,13 +141,13 @@
         }
     }
 
-    private class BasicCancelRecorder implements 
MethodRecorder<BasicCancelBody>
+    private class BasicCancelRecorder implements 
MethodRecorder<MessageCancelBody>
     {
-        public void record(BasicCancelBody method)
+        public void record(MessageCancelBody method)
         {
-            if(_sharedQueues.containsKey(method.consumerTag))
+            if(_sharedQueues.containsKey(method.getDestination()))
             {
-                _consumers.decrement(method.consumerTag);
+                _consumers.decrement(method.getDestination());
             }
         }
     }

Modified: 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java?view=diff&rev=497445&r1=497444&r2=497445
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
 Thu Jan 18 06:28:25 2007
@@ -22,7 +22,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.framing.MessageCancelBody;
 import org.apache.qpid.framing.QueueDeleteBody;
 import org.apache.qpid.server.cluster.*;
 import org.apache.qpid.server.cluster.util.LogMessage;
@@ -91,8 +91,8 @@
         //signal other members:
         // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
         // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-        BasicCancelBody request = new BasicCancelBody((byte)0, (byte)9);
-        request.consumerTag = getName();
+        MessageCancelBody request = new MessageCancelBody((byte)0, (byte)9);
+        request.destination = getName();
         _groupMgr.broadcast(new SimpleSendable(request));
     }
 

Modified: 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java?view=diff&rev=497445&r1=497444&r2=497445
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
 Thu Jan 18 06:28:25 2007
@@ -23,9 +23,8 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.framing.Content;
 import org.apache.qpid.server.cluster.ClusteredProtocolSession;
 import org.apache.qpid.server.cluster.GroupManager;
 import org.apache.qpid.server.cluster.util.LogMessage;
@@ -92,9 +91,11 @@
 
     void relay(AMQMessage msg) throws AMQException
     {
-        BasicPublishBody publish = msg.getPublishBody();
+        throw new Error("XXX");
+        /*
+        MessageTransferBody publish = msg.getPublishBody();
         ContentHeaderBody header = msg.getContentHeaderBody();
-        List<ContentBody> bodies = msg.getContentBodies();
+        List<Content> bodies = msg.getContentBodies();
 
         //(i) construct a new publishing block:
         publish.immediate = false;//can't as yet handle the immediate flag in 
a cluster
@@ -105,5 +106,6 @@
 
         //(ii) send this on to the broker for which it is acting as proxy:
         _groupMgr.send(_target, new SimpleSendable(parts));
+        */
     }
 }


Reply via email to