Author: rgreig
Date: Mon Jan 29 03:10:09 2007
New Revision: 501009
URL: http://svn.apache.org/viewvc?view=rev&rev=501009
Log:
QPID-320 : Patch supplied by Rob Godfrey - Improve performance by remembering
protocol version
Modified:
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
Modified:
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
---
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
Mon Jan 29 03:10:09 2007
@@ -172,12 +172,12 @@
private boolean isMembershipAnnouncement(Object msg)
{
- return msg instanceof AMQFrame && (((AMQFrame) msg).bodyFrame
instanceof ClusterMembershipBody);
+ return msg instanceof AMQFrame && (((AMQFrame) msg).getBodyFrame()
instanceof ClusterMembershipBody);
}
private boolean isBufferable(Object msg)
{
- return msg instanceof AMQFrame && isBuffereable(((AMQFrame)
msg).bodyFrame);
+ return msg instanceof AMQFrame && isBuffereable(((AMQFrame)
msg).getBodyFrame());
}
private boolean isBuffereable(AMQBody body)
Modified:
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
---
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
(original)
+++
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
Mon Jan 29 03:10:09 2007
@@ -110,6 +110,8 @@
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
ClusterPingBody ping = new ClusterPingBody((byte)8,
(byte)0,
+
ClusterPingBody.getClazz((byte)8, (byte)0),
+
ClusterPingBody.getMethod((byte)8, (byte)0),
_group.getLocal().getDetails(),
_loadTable.getLocalLoad(),
true);
@@ -159,6 +161,8 @@
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
ClusterJoinBody join = new ClusterJoinBody((byte)8,
(byte)0,
+
ClusterJoinBody.getClazz((byte)8, (byte)0),
+
ClusterJoinBody.getMethod((byte)8, (byte)0),
_group.getLocal().getDetails());
send(leader, new SimpleBodySendable(join));
@@ -182,6 +186,8 @@
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
ClusterLeaveBody leave = new ClusterLeaveBody((byte)8,
(byte)0,
+
ClusterLeaveBody.getClazz((byte)8, (byte)0),
+
ClusterLeaveBody.getMethod((byte)8, (byte)0),
_group.getLocal().getDetails());
send(getLeader(), new SimpleBodySendable(leave));
@@ -207,6 +213,8 @@
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8,
(byte)0,
+
ClusterSuspectBody.getClazz((byte)8, (byte)0),
+
ClusterSuspectBody.getMethod((byte)8, (byte)0),
broker.getDetails());
send(getLeader(), new SimpleBodySendable(suspect));
@@ -231,7 +239,10 @@
//pass request on to leader:
// AMQP version change: Hardwire the version to 0-8 (major=8,
minor=0)
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0,
member.getDetails());
+ ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0,
+
ClusterJoinBody.getClazz((byte)8, (byte)0),
+
ClusterJoinBody.getMethod((byte)8, (byte)0),
+ member.getDetails());
Broker leader = getLeader();
send(leader, new SimpleBodySendable(request));
@@ -278,7 +289,10 @@
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- ClusterMembershipBody announce = new ClusterMembershipBody((byte)8,
(byte)0, membership.getBytes());
+ ClusterMembershipBody announce = new ClusterMembershipBody((byte)8,
(byte)0,
+
ClusterMembershipBody.getClazz((byte)8, (byte)0),
+
ClusterMembershipBody.getMethod((byte)8, (byte)0),
+
membership.getBytes());
return announce;
Modified:
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
---
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
(original)
+++
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
Mon Jan 29 03:10:09 2007
@@ -200,10 +200,10 @@
private void handleFrame(AMQFrame frame) throws AMQException
{
- AMQBody body = frame.bodyFrame;
+ AMQBody body = frame.getBodyFrame();
if (body instanceof AMQMethodBody)
{
- handleMethod(frame.channel, (AMQMethodBody) body);
+ handleMethod(frame.getChannel(), (AMQMethodBody) body);
}
else
{
Modified:
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
---
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
(original)
+++
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
Mon Jan 29 03:10:09 2007
@@ -56,6 +56,8 @@
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
BasicConsumeBody m = new BasicConsumeBody((byte)8,
(byte)0,
+
BasicConsumeBody.getClazz((byte)8, (byte)0),
+
BasicConsumeBody.getMethod((byte)8, (byte)0),
null,
queue,
false,
Modified:
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
---
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
(original)
+++
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
Mon Jan 29 03:10:09 2007
@@ -50,13 +50,13 @@
private final byte minor = (byte)0;
private final Iterable<FrameDescriptor> _frames = Arrays.asList(new
FrameDescriptor[]
{
- new FrameDescriptor(QueueDeclareBody.class, new
QueueDeclareBody(major, minor,null,false,false,false,false,false,null,0)),
- new FrameDescriptor(QueueDeleteBody.class, new
QueueDeleteBody(major, minor,false,false,false,null,0)),
- new FrameDescriptor(QueueBindBody.class, new
QueueBindBody(major, minor,null,null,false,null,null,0)),
- new FrameDescriptor(ExchangeDeclareBody.class, new
ExchangeDeclareBody(major,
minor,null,false,false,null,false,false,false,0,null)),
- new FrameDescriptor(ExchangeDeleteBody.class, new
ExchangeDeleteBody(major, minor,null,false,false,0)),
- new FrameDescriptor(BasicConsumeBody.class, new
BasicConsumeBody(major, minor,null,null,false,false,false,false,null,0)),
- new FrameDescriptor(BasicCancelBody.class, new
BasicCancelBody(major, minor,null,false))
+ new FrameDescriptor(QueueDeclareBody.class, new
QueueDeclareBody(major, minor, QueueDeclareBody.getClazz(major, minor),
QueueDeclareBody.getMethod(major,
minor),null,false,false,false,false,false,null,0)),
+ new FrameDescriptor(QueueDeleteBody.class, new
QueueDeleteBody(major, minor, QueueDeleteBody.getClazz(major, minor),
QueueDeleteBody.getMethod(major, minor),false,false,false,null,0)),
+ new FrameDescriptor(QueueBindBody.class, new
QueueBindBody(major, minor, QueueBindBody.getClazz(major, minor),
QueueBindBody.getMethod(major, minor),null,null,false,null,null,0)),
+ new FrameDescriptor(ExchangeDeclareBody.class, new
ExchangeDeclareBody(major, minor, ExchangeDeclareBody.getClazz(major, minor),
ExchangeDeclareBody.getMethod(major,
minor),null,false,false,null,false,false,false,0,null)),
+ new FrameDescriptor(ExchangeDeleteBody.class, new
ExchangeDeleteBody(major, minor, ExchangeDeleteBody.getClazz(major, minor),
ExchangeDeleteBody.getMethod(major, minor),null,false,false,0)),
+ new FrameDescriptor(BasicConsumeBody.class, new
BasicConsumeBody(major, minor, BasicConsumeBody.getClazz(major, minor),
BasicConsumeBody.getMethod(major,
minor),null,null,false,false,false,false,null,0)),
+ new FrameDescriptor(BasicCancelBody.class, new
BasicCancelBody(major, minor, BasicCancelBody.getClazz(major, minor),
BasicCancelBody.getMethod(major, minor),null,false))
});
Modified:
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
---
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
(original)
+++
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
Mon Jan 29 03:10:09 2007
@@ -122,7 +122,7 @@
_consumers.replay(methods);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- methods.add(new ClusterSynchBody((byte)8, (byte)0));
+ methods.add(new ClusterSynchBody((byte)8, (byte)0,
ClusterSynchBody.getClazz((byte)8, (byte)0),
ClusterSynchBody.getMethod((byte)8, (byte)0)));
return methods;
}
Modified:
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
---
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
(original)
+++
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
Mon Jan 29 03:10:09 2007
@@ -74,6 +74,8 @@
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
QueueDeleteBody request = new QueueDeleteBody((byte)8,
(byte)0,
+
QueueDeleteBody.getClazz((byte)8,(byte)0),
+
QueueDeleteBody.getMethod((byte)8,(byte)0),
false,
false,
false,
@@ -94,6 +96,8 @@
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
BasicCancelBody request = new BasicCancelBody((byte)8,
(byte)0,
+
BasicCancelBody.getClazz((byte)8, (byte)0),
+
BasicCancelBody.getMethod((byte)8, (byte)0),
getName(),
false);
Modified:
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
---
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
(original)
+++
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
Mon Jan 29 03:10:09 2007
@@ -54,7 +54,10 @@
//send delete request to peers:
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0,
false,false,false,null,0);
+ QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0,
+
QueueDeleteBody.getClazz((byte)8, (byte)0),
+
QueueDeleteBody.getMethod((byte)8, (byte)0),
+
false,false,false,null,0);
request.queue = getName();
_groupMgr.broadcast(new SimpleBodySendable(request));
}
Modified:
incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java?view=diff&rev=501009&r1=501008&r2=501009
==============================================================================
---
incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
Mon Jan 29 03:10:09 2007
@@ -54,7 +54,7 @@
for (RecordingBroker b : brokers)
{
- b.handleResponse(((AMQFrame) b.getMessages().get(0)).channel, new
TestMethod("response"));
+ b.handleResponse(((AMQFrame) b.getMessages().get(0)).getChannel(),
new TestMethod("response"));
}
assertTrue("Handler did not receive response", handler.isCompleted());
@@ -80,7 +80,7 @@
for (RecordingBroker broker : succeeded)
{
- broker.handleResponse(((AMQFrame)
broker.getMessages().get(0)).channel, new TestMethod("response"));
+ broker.handleResponse(((AMQFrame)
broker.getMessages().get(0)).getChannel(), new TestMethod("response"));
}
b.remove();
@@ -106,7 +106,7 @@
for (int i = 0; i < msgs.length; i++)
{
assertTrue(sent.get(i) instanceof AMQFrame);
- assertEquals(msgs[i], ((AMQFrame) sent.get(i)).bodyFrame);
+ assertEquals(msgs[i], ((AMQFrame) sent.get(i)).getBodyFrame());
}
}
@@ -119,9 +119,9 @@
List<AMQDataBlock> sent = broker.getMessages();
assertEquals(1, sent.size());
assertTrue(sent.get(0) instanceof AMQFrame);
- assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame);
+ assertEquals(new TestMethod("A"), ((AMQFrame)
sent.get(0)).getBodyFrame());
- broker.handleResponse(((AMQFrame) sent.get(0)).channel, new
TestMethod("B"));
+ broker.handleResponse(((AMQFrame) sent.get(0)).getChannel(), new
TestMethod("B"));
assertEquals(new TestMethod("B"), handler.getResponse());
}
@@ -135,7 +135,7 @@
List<AMQDataBlock> sent = broker.getMessages();
assertEquals(1, sent.size());
assertTrue(sent.get(0) instanceof AMQFrame);
- assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).bodyFrame);
+ assertEquals(new TestMethod("A"), ((AMQFrame)
sent.get(0)).getBodyFrame());
broker.remove();
assertEquals(null, handler.getResponse());
assertTrue(handler.isCompleted());