Author: rgreig
Date: Mon Jan 29 03:10:09 2007
New Revision: 501009

URL: http://svn.apache.org/viewvc?view=rev&rev=501009
Log:
QPID-320 : Patch supplied by Rob Godfrey - Improve performance by remembering 
protocol version

Modified:
    
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
    
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
    
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
    
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
    
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
    
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
    
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
    
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
    
incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java

Modified: 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
 Mon Jan 29 03:10:09 2007
@@ -172,12 +172,12 @@
 
     private boolean isMembershipAnnouncement(Object msg)
     {
-        return msg instanceof AMQFrame && (((AMQFrame) msg).bodyFrame 
instanceof ClusterMembershipBody);
+        return msg instanceof AMQFrame && (((AMQFrame) msg).getBodyFrame() 
instanceof ClusterMembershipBody);
     }
 
     private boolean isBufferable(Object msg)
     {
-        return msg instanceof AMQFrame && isBuffereable(((AMQFrame) 
msg).bodyFrame);
+        return msg instanceof AMQFrame && isBuffereable(((AMQFrame) 
msg).getBodyFrame());
     }
 
     private boolean isBuffereable(AMQBody body)

Modified: 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
 Mon Jan 29 03:10:09 2007
@@ -110,6 +110,8 @@
         // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
         ClusterPingBody ping = new ClusterPingBody((byte)8,
                                                    (byte)0,
+                                                   
ClusterPingBody.getClazz((byte)8, (byte)0),
+                                                   
ClusterPingBody.getMethod((byte)8, (byte)0),
                                                    
_group.getLocal().getDetails(),
                                                    _loadTable.getLocalLoad(),
                                                    true);
@@ -159,6 +161,8 @@
         // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
         ClusterJoinBody join = new ClusterJoinBody((byte)8,
                                                    (byte)0,
+                                                   
ClusterJoinBody.getClazz((byte)8, (byte)0),
+                                                   
ClusterJoinBody.getMethod((byte)8, (byte)0),
                                                    
_group.getLocal().getDetails());
 
         send(leader, new SimpleBodySendable(join));
@@ -182,6 +186,8 @@
         // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
         ClusterLeaveBody leave = new ClusterLeaveBody((byte)8,
                                                       (byte)0,
+                                                      
ClusterLeaveBody.getClazz((byte)8, (byte)0),
+                                                      
ClusterLeaveBody.getMethod((byte)8, (byte)0),
                                                       
_group.getLocal().getDetails());
 
         send(getLeader(), new SimpleBodySendable(leave));
@@ -207,6 +213,8 @@
             // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
             ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8,
                                                                 (byte)0,
+                                                                
ClusterSuspectBody.getClazz((byte)8, (byte)0),
+                                                                
ClusterSuspectBody.getMethod((byte)8, (byte)0),
                                                                 
broker.getDetails());
 
             send(getLeader(), new SimpleBodySendable(suspect));
@@ -231,7 +239,10 @@
             //pass request on to leader:
             // AMQP version change: Hardwire the version to 0-8 (major=8, 
minor=0)
             // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-            ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0, 
member.getDetails());
+            ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0,
+                                                          
ClusterJoinBody.getClazz((byte)8, (byte)0),
+                                                          
ClusterJoinBody.getMethod((byte)8, (byte)0),
+                                                          member.getDetails());
             
             Broker leader = getLeader();
             send(leader, new SimpleBodySendable(request));
@@ -278,7 +289,10 @@
     {
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-        ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, 
(byte)0, membership.getBytes());
+        ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, 
(byte)0,
+                                                                   
ClusterMembershipBody.getClazz((byte)8, (byte)0),
+                                                                   
ClusterMembershipBody.getMethod((byte)8, (byte)0),
+                                                                   
membership.getBytes());
 
         
         return announce;

Modified: 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
 Mon Jan 29 03:10:09 2007
@@ -200,10 +200,10 @@
 
     private void handleFrame(AMQFrame frame) throws AMQException
     {
-        AMQBody body = frame.bodyFrame;
+        AMQBody body = frame.getBodyFrame();
         if (body instanceof AMQMethodBody)
         {
-            handleMethod(frame.channel, (AMQMethodBody) body);
+            handleMethod(frame.getChannel(), (AMQMethodBody) body);
         }
         else
         {

Modified: 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
 Mon Jan 29 03:10:09 2007
@@ -56,6 +56,8 @@
             // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
             BasicConsumeBody m = new BasicConsumeBody((byte)8,
                                                       (byte)0,
+                                                      
BasicConsumeBody.getClazz((byte)8, (byte)0),
+                                                      
BasicConsumeBody.getMethod((byte)8, (byte)0),
                                                       null,
                                                       queue,
                                                       false,

Modified: 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
 Mon Jan 29 03:10:09 2007
@@ -50,13 +50,13 @@
     private final byte minor = (byte)0;
     private final Iterable<FrameDescriptor> _frames = Arrays.asList(new 
FrameDescriptor[]
             {
-                    new FrameDescriptor(QueueDeclareBody.class, new 
QueueDeclareBody(major, minor,null,false,false,false,false,false,null,0)),
-                    new FrameDescriptor(QueueDeleteBody.class, new 
QueueDeleteBody(major, minor,false,false,false,null,0)),
-                    new FrameDescriptor(QueueBindBody.class, new 
QueueBindBody(major, minor,null,null,false,null,null,0)),
-                    new FrameDescriptor(ExchangeDeclareBody.class, new 
ExchangeDeclareBody(major, 
minor,null,false,false,null,false,false,false,0,null)),
-                    new FrameDescriptor(ExchangeDeleteBody.class, new 
ExchangeDeleteBody(major, minor,null,false,false,0)),
-                    new FrameDescriptor(BasicConsumeBody.class, new 
BasicConsumeBody(major, minor,null,null,false,false,false,false,null,0)),
-                    new FrameDescriptor(BasicCancelBody.class, new 
BasicCancelBody(major, minor,null,false))
+                    new FrameDescriptor(QueueDeclareBody.class, new 
QueueDeclareBody(major, minor, QueueDeclareBody.getClazz(major, minor), 
QueueDeclareBody.getMethod(major, 
minor),null,false,false,false,false,false,null,0)),
+                    new FrameDescriptor(QueueDeleteBody.class, new 
QueueDeleteBody(major, minor, QueueDeleteBody.getClazz(major, minor), 
QueueDeleteBody.getMethod(major, minor),false,false,false,null,0)),
+                    new FrameDescriptor(QueueBindBody.class, new 
QueueBindBody(major, minor, QueueBindBody.getClazz(major, minor), 
QueueBindBody.getMethod(major, minor),null,null,false,null,null,0)),
+                    new FrameDescriptor(ExchangeDeclareBody.class, new 
ExchangeDeclareBody(major, minor, ExchangeDeclareBody.getClazz(major, minor), 
ExchangeDeclareBody.getMethod(major, 
minor),null,false,false,null,false,false,false,0,null)),
+                    new FrameDescriptor(ExchangeDeleteBody.class, new 
ExchangeDeleteBody(major, minor, ExchangeDeleteBody.getClazz(major, minor), 
ExchangeDeleteBody.getMethod(major, minor),null,false,false,0)),
+                    new FrameDescriptor(BasicConsumeBody.class, new 
BasicConsumeBody(major, minor, BasicConsumeBody.getClazz(major, minor), 
BasicConsumeBody.getMethod(major, 
minor),null,null,false,false,false,false,null,0)),
+                    new FrameDescriptor(BasicCancelBody.class, new 
BasicCancelBody(major, minor, BasicCancelBody.getClazz(major, minor), 
BasicCancelBody.getMethod(major, minor),null,false))
             });
 
 

Modified: 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
 Mon Jan 29 03:10:09 2007
@@ -122,7 +122,7 @@
         _consumers.replay(methods);
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-        methods.add(new ClusterSynchBody((byte)8, (byte)0));
+        methods.add(new ClusterSynchBody((byte)8, (byte)0, 
ClusterSynchBody.getClazz((byte)8, (byte)0), 
ClusterSynchBody.getMethod((byte)8, (byte)0)));
         return methods;
     }
 

Modified: 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
 Mon Jan 29 03:10:09 2007
@@ -74,6 +74,8 @@
                // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
             QueueDeleteBody request = new QueueDeleteBody((byte)8,
                                                           (byte)0,
+                                                          
QueueDeleteBody.getClazz((byte)8,(byte)0),
+                                                          
QueueDeleteBody.getMethod((byte)8,(byte)0),
                                                           false,
                                                           false,
                                                           false,
@@ -94,6 +96,8 @@
         // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
         BasicCancelBody request = new BasicCancelBody((byte)8,
                                                       (byte)0,
+                                                      
BasicCancelBody.getClazz((byte)8, (byte)0),
+                                                      
BasicCancelBody.getMethod((byte)8, (byte)0),
                                                       getName(),
                                                       false);
         

Modified: 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
 Mon Jan 29 03:10:09 2007
@@ -54,7 +54,10 @@
         //send delete request to peers:
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-        QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0, 
false,false,false,null,0);
+        QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0,
+                                                      
QueueDeleteBody.getClazz((byte)8, (byte)0),
+                                                      
QueueDeleteBody.getMethod((byte)8, (byte)0),
+                                                      
false,false,false,null,0);
         request.queue = getName();
         _groupMgr.broadcast(new SimpleBodySendable(request));
     }

Modified: 
incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
 Mon Jan 29 03:10:09 2007
@@ -54,7 +54,7 @@
 
         for (RecordingBroker b : brokers)
         {
-            b.handleResponse(((AMQFrame) b.getMessages().get(0)).channel, new 
TestMethod("response"));
+            b.handleResponse(((AMQFrame) b.getMessages().get(0)).getChannel(), 
new TestMethod("response"));
         }
 
         assertTrue("Handler did not receive response", handler.isCompleted());
@@ -80,7 +80,7 @@
 
         for (RecordingBroker broker : succeeded)
         {
-            broker.handleResponse(((AMQFrame) 
broker.getMessages().get(0)).channel, new TestMethod("response"));
+            broker.handleResponse(((AMQFrame) 
broker.getMessages().get(0)).getChannel(), new TestMethod("response"));
         }
         b.remove();
 
@@ -106,7 +106,7 @@
         for (int i = 0; i < msgs.length; i++)
         {
             assertTrue(sent.get(i) instanceof AMQFrame);
-            assertEquals(msgs[i], ((AMQFrame) sent.get(i)).bodyFrame);
+            assertEquals(msgs[i], ((AMQFrame) sent.get(i)).getBodyFrame());
         }
     }
 
@@ -119,9 +119,9 @@
         List<AMQDataBlock> sent = broker.getMessages();
         assertEquals(1, sent.size());
         assertTrue(sent.get(0) instanceof AMQFrame);
-        assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame);
+        assertEquals(new TestMethod("A"), ((AMQFrame) 
sent.get(0)).getBodyFrame());
 
-        broker.handleResponse(((AMQFrame) sent.get(0)).channel, new 
TestMethod("B"));
+        broker.handleResponse(((AMQFrame) sent.get(0)).getChannel(), new 
TestMethod("B"));
 
         assertEquals(new TestMethod("B"), handler.getResponse());
     }
@@ -135,7 +135,7 @@
         List<AMQDataBlock> sent = broker.getMessages();
         assertEquals(1, sent.size());
         assertTrue(sent.get(0) instanceof AMQFrame);
-        assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame);
+        assertEquals(new TestMethod("A"), ((AMQFrame) 
sent.get(0)).getBodyFrame());
         broker.remove();
         assertEquals(null, handler.getResponse());
         assertTrue(handler.isCompleted());


Reply via email to