Author: rgreig
Date: Wed Jan 24 16:30:16 2007
New Revision: 499628
URL: http://svn.apache.org/viewvc?view=rev&rev=499628
Log:
QPID-318 : Patch supplied by Rob Godfrey - Remove hard-coding of protocol
version number.
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
Wed Jan 24 16:30:16 2007
@@ -72,7 +72,7 @@
// TODO - set clusterId
-
session.writeFrame(BasicGetEmptyBody.createAMQFrame(channelId, (byte) 8, (byte)
0, null));
+
session.writeFrame(BasicGetEmptyBody.createAMQFrame(channelId, body.getMajor(),
body.getMinor(), null));
}
}
}
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
Wed Jan 24 16:30:16 2007
@@ -539,17 +539,17 @@
* NOTE: Both major and minor will be set to 0 prior to protocol
initiation.
*/
- public byte getAmqpMajor()
+ public byte getProtocolMajorVersion()
{
return _major;
}
- public byte getAmqpMinor()
+ public byte getProtocolMinorVersion()
{
return _minor;
}
- public boolean amqpVersionEquals(byte major, byte minor)
+ public boolean isProtocolVersion(byte major, byte minor)
{
return _major == major && _minor == minor;
}
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
Wed Jan 24 16:30:16 2007
@@ -35,6 +35,7 @@
{
+
public static interface Task
{
public void doTask(AMQProtocolSession session) throws AMQException;
@@ -142,5 +143,9 @@
void addSessionCloseTask(Task task);
void removeSessionCloseTask(Task task);
+
+ byte getProtocolMajorVersion();
+
+ byte getProtocolMinorVersion();
}
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Wed Jan 24 16:30:16 2007
@@ -541,7 +541,7 @@
public void writeDeliver(AMQProtocolSession protocolSession, int
channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- ByteBuffer deliver = createEncodedDeliverFrame(channelId, deliveryTag,
consumerTag);
+ ByteBuffer deliver = createEncodedDeliverFrame(protocolSession,
channelId, deliveryTag, consumerTag);
AMQDataBlock contentHeader =
ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
@@ -585,7 +585,7 @@
public void writeGetOk(AMQProtocolSession protocolSession, int channelId,
long deliveryTag, int queueSize) throws AMQException
{
- ByteBuffer deliver = createEncodedGetOkFrame(channelId, deliveryTag,
queueSize);
+ ByteBuffer deliver = createEncodedGetOkFrame(protocolSession,
channelId, deliveryTag, queueSize);
AMQDataBlock contentHeader =
ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
@@ -627,11 +627,11 @@
}
- private ByteBuffer createEncodedDeliverFrame(int channelId, long
deliveryTag, AMQShortString consumerTag)
+ private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession
protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
BasicPublishBody pb = getPublishBody();
- AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId,
(byte) 8, (byte) 0, consumerTag,
+ AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId,
protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
deliveryTag,
pb.exchange, _messageHandle.isRedelivered(),
pb.routingKey);
ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); //
XXX: Could cast be a problem?
@@ -640,11 +640,13 @@
return buf;
}
- private ByteBuffer createEncodedGetOkFrame(int channelId, long
deliveryTag, int queueSize)
+ private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession
protocolSession, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
BasicPublishBody pb = getPublishBody();
- AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId, (byte)
8, (byte) 0,
+ AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
+
protocolSession.getProtocolMajorVersion(),
+
protocolSession.getProtocolMinorVersion(),
deliveryTag,
pb.exchange,
queueSize,
_messageHandle.isRedelivered(),
@@ -655,9 +657,12 @@
return buf;
}
- private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode,
AMQShortString replyText) throws AMQException
+ private ByteBuffer createEncodedReturnFrame(AMQProtocolSession
protocolSession, int channelId, int replyCode, AMQShortString replyText) throws
AMQException
{
- AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
(byte) 8, (byte) 0, getPublishBody().exchange,
+ AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
+
protocolSession.getProtocolMajorVersion(),
+
protocolSession.getProtocolMinorVersion(),
+
getPublishBody().exchange,
replyCode,
replyText,
getPublishBody().routingKey);
ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); //
XXX: Could cast be a problem?
@@ -669,7 +674,7 @@
public void writeReturn(AMQProtocolSession protocolSession, int channelId,
int replyCode, AMQShortString replyText)
throws AMQException
{
- ByteBuffer returnFrame = createEncodedReturnFrame(channelId,
replyCode, replyText);
+ ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession,
channelId, replyCode, replyText);
AMQDataBlock contentHeader =
ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Wed Jan 24 16:30:16 2007
@@ -480,22 +480,22 @@
private void createChannelOverWire(int channelId, int prefetchHigh, int
prefetchLow, boolean transacted)
throws AMQException
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+
+ // TODO: Be aware of possible changes to parameter order as versions
change.
+
_protocolHandler.syncWrite(
ChannelOpenBody.createAMQFrame(channelId,
- (byte) 8, (byte) 0, // AMQP
version (major, minor)
+
_protocolHandler.getProtocolMajorVersion(),
+
_protocolHandler.getProtocolMinorVersion(),
null), // outOfBand
ChannelOpenOkBody.class);
//todo send low water mark when protocol allows.
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ //todo Be aware of possible changes to parameter order as versions
change.
_protocolHandler.syncWrite(
BasicQosBody.createAMQFrame(channelId,
- (byte) 8, (byte) 0, // AMQP
version (major, minor)
+
_protocolHandler.getProtocolMajorVersion(),
+
_protocolHandler.getProtocolMinorVersion(),
false, // global
prefetchHigh, // prefetchCount
0), // prefetchSize
@@ -507,10 +507,12 @@
{
_logger.debug("Issuing TxSelect for " + channelId);
}
- // AMQP version change: Hardwire the version to 0-8 (major=8,
minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions
change.
- _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId,
(byte) 8, (byte) 0), TxSelectOkBody.class);
+
+ // TODO: Be aware of possible changes to parameter order as
versions change.
+ _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId,
+
_protocolHandler.getProtocolMajorVersion(),
+
_protocolHandler.getProtocolMinorVersion()),
+ TxSelectOkBody.class);
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Wed Jan 24 16:30:16 2007
@@ -556,10 +556,13 @@
}
// Commits outstanding messages sent and outstanding
acknowledgements.
- // AMQP version change: Hardwire the version to 0-8 (major=8,
minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions
change.
-
_connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId,
(byte) 8, (byte) 0), TxCommitOkBody.class);
+ // TODO: Be aware of possible changes to parameter order as
versions change.
+ final AMQProtocolHandler handler = getProtocolHandler();
+
+ handler.syncWrite(TxCommitBody.createAMQFrame(_channelId,
+
getProtocolMajorVersion(),
+
getProtocolMinorVersion()),
+ TxCommitOkBody.class);
}
catch (AMQException e)
{
@@ -569,16 +572,15 @@
}
}
+
public void rollback() throws JMSException
{
checkTransacted();
try
{
- // AMQP version change: Hardwire the version to 0-8 (major=8,
minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions
change.
- _connection.getProtocolHandler().syncWrite(
- TxRollbackBody.createAMQFrame(_channelId, (byte) 8, (byte)
0), TxRollbackOkBody.class);
+ // TODO: Be aware of possible changes to parameter order as
versions change.
+ getProtocolHandler().syncWrite(
+ TxRollbackBody.createAMQFrame(_channelId,
getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
}
catch (AMQException e)
{
@@ -605,17 +607,16 @@
try
{
- _connection.getProtocolHandler().closeSession(this);
- // AMQP version change: Hardwire the version to 0-8
(major=8, minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as
versions change.
+
+ getProtocolHandler().closeSession(this);
+ // TODO: Be aware of possible changes to parameter order
as versions change.
final AMQFrame frame =
ChannelCloseBody.createAMQFrame(getChannelId(),
-
(byte) 8, (byte) 0, // AMQP version (major, minor)
+
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version
(major, minor)
0,
// classId
0,
// methodId
AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
new
AMQShortString("JMS client closing channel")); // replyText
- _connection.getProtocolHandler().syncWrite(frame,
ChannelCloseOkBody.class);
+ getProtocolHandler().syncWrite(frame,
ChannelCloseOkBody.class);
// When control resumes at this point, a reply will have
been received that
// indicates the broker has closed the channel successfully
@@ -634,6 +635,23 @@
}
}
+ private AMQProtocolHandler getProtocolHandler()
+ {
+ return _connection.getProtocolHandler();
+ }
+
+
+ private byte getProtocolMinorVersion()
+ {
+ return getProtocolHandler().getProtocolMinorVersion();
+ }
+
+ private byte getProtocolMajorVersion()
+ {
+ return getProtocolHandler().getProtocolMajorVersion();
+ }
+
+
/**
* Close all producers or consumers. This is called either in the error
case or when closing the session normally.
*
@@ -818,11 +836,9 @@
{
consumer.clearUnackedMessages();
}
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
-
_connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
-
(byte) 8, (byte) 0, // AMQP version (major, minor)
+ // TODO: Be aware of possible changes to parameter order as versions
change.
+
getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
+
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version
(major, minor)
false)); // requeue
}
@@ -934,7 +950,7 @@
checkNotClosed();
long producerId = getNextProducerId();
BasicMessageProducer producer = new
BasicMessageProducer(_connection, (AMQDestination) destination, _transacted,
_channelId,
-
AMQSession.this, _connection.getProtocolHandler(),
+
AMQSession.this, getProtocolHandler(),
producerId, immediate, mandatory, waitUntilSent);
registerProducer(producerId, producer);
return producer;
@@ -1102,7 +1118,7 @@
AMQDestination amqd = (AMQDestination) destination;
- final AMQProtocolHandler protocolHandler =
_connection.getProtocolHandler();
+ final AMQProtocolHandler protocolHandler =
getProtocolHandler();
// TODO: construct the rawSelector from the selector string if
rawSelector == null
final FieldTable ft = FieldTableFactory.newFieldTable();
//if (rawSelector != null)
@@ -1183,16 +1199,14 @@
public void declareExchange(AMQShortString name, AMQShortString type)
{
- declareExchange(name, type, _connection.getProtocolHandler());
+ declareExchange(name, type, getProtocolHandler());
}
public void declareExchangeSynch(AMQShortString name, AMQShortString type)
throws AMQException
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ // TODO: Be aware of possible changes to parameter order as versions
change.
AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId,
- (byte) 8, (byte)
0, // AMQP version (major, minor)
+
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version
(major, minor)
null, //
arguments
false, //
autoDelete
false, //
durable
@@ -1202,7 +1216,7 @@
false, //
passive
0, // ticket
type); // type
- _connection.getProtocolHandler().syncWrite(frame,
ExchangeDeclareOkBody.class);
+ getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
}
private void declareExchange(AMQDestination amqd, AMQProtocolHandler
protocolHandler)
@@ -1212,11 +1226,9 @@
private void declareExchange(AMQShortString name, AMQShortString type,
AMQProtocolHandler protocolHandler)
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ // TODO: Be aware of possible changes to parameter order as versions
change.
AMQFrame exchangeDeclare =
ExchangeDeclareBody.createAMQFrame(_channelId,
- (byte)
8, (byte) 0, // AMQP version (major, minor)
+
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version
(major, minor)
null,
// arguments
false,
// autoDelete
false,
// durable
@@ -1247,11 +1259,9 @@
amqd.setQueueName(protocolHandler.generateQueueName());
}
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ // TODO: Be aware of possible changes to parameter order as versions
change.
AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
- (byte) 8,
(byte) 0, // AMQP version (major, minor)
+
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version
(major, minor)
null, //
arguments
amqd.isAutoDelete(), // autoDelete
amqd.isDurable(), // durable
@@ -1267,11 +1277,9 @@
private void bindQueue(AMQDestination amqd, AMQShortString queueName,
AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ // TODO: Be aware of possible changes to parameter order as versions
change.
AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
- (byte) 8, (byte) 0,
// AMQP version (major, minor)
+
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version
(major, minor)
ft, // arguments
amqd.getExchangeName(), // exchange
true, // nowait
@@ -1315,11 +1323,9 @@
try
{
- // AMQP version change: Hardwire the version to 0-8 (major=8,
minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions
change.
+ // TODO: Be aware of possible changes to parameter order as
versions change.
AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId,
- (byte) 8,
(byte) 0, // AMQP version (major, minor)
+
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version
(major, minor)
arguments,
// arguments
tag, //
consumerTag
consumer.isExclusive(), // exclusive
@@ -1513,17 +1519,15 @@
{
try
{
- // AMQP version change: Hardwire the version to 0-8 (major=8,
minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions
change.
+ // TODO: Be aware of possible changes to parameter order as
versions change.
AMQFrame queueDeleteFrame =
QueueDeleteBody.createAMQFrame(_channelId,
- (byte)
8, (byte) 0, // AMQP version (major, minor)
+
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version
(major, minor)
false,
// ifEmpty
false,
// ifUnused
true,
// nowait
queueName, // queue
0);
// ticket
- _connection.getProtocolHandler().syncWrite(queueDeleteFrame,
QueueDeleteOkBody.class);
+ getProtocolHandler().syncWrite(queueDeleteFrame,
QueueDeleteOkBody.class);
}
catch (AMQException e)
{
@@ -1608,18 +1612,16 @@
boolean isQueueBound(AMQShortString queueName, AMQShortString routingKey)
throws JMSException
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ // TODO: Be aware of possible changes to parameter order as versions
change.
AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
- (byte) 8,
(byte) 0, // AMQP version (major, minor)
+
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version
(major, minor)
ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange
queueName,
// queue
routingKey);
// routingKey
AMQMethodEvent response = null;
try
{
- response = _connection.getProtocolHandler().syncWrite(boundFrame,
ExchangeBoundOkBody.class);
+ response = getProtocolHandler().syncWrite(boundFrame,
ExchangeBoundOkBody.class);
}
catch (AMQException e)
{
@@ -1672,18 +1674,16 @@
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ // TODO: Be aware of possible changes to parameter order as versions
change.
final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId,
- (byte) 8, (byte)
0, // AMQP version (major, minor)
+
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version
(major, minor)
deliveryTag,
// deliveryTag
multiple); //
multiple
if (_logger.isDebugEnabled())
{
_logger.debug("Sending ack for delivery tag " + deliveryTag + " on
channel " + _channelId);
}
- _connection.getProtocolHandler().writeFrame(ackFrame);
+ getProtocolHandler().writeFrame(ackFrame);
}
public int getDefaultPrefetch()
@@ -1742,7 +1742,7 @@
{
AMQDestination amqd = consumer.getDestination();
- AMQProtocolHandler protocolHandler = _connection.getProtocolHandler();
+ AMQProtocolHandler protocolHandler = getProtocolHandler();
declareExchange(amqd, protocolHandler);
@@ -1839,25 +1839,21 @@
private void suspendChannel()
{
_logger.warn("Suspending channel");
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ // TODO: Be aware of possible changes to parameter order as versions
change.
AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
- (byte) 8,
(byte) 0, // AMQP version (major, minor)
+
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version
(major, minor)
false);
// active
- _connection.getProtocolHandler().writeFrame(channelFlowFrame);
+ getProtocolHandler().writeFrame(channelFlowFrame);
}
private void unsuspendChannel()
{
_logger.warn("Unsuspending channel");
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
+ // TODO: Be aware of possible changes to parameter order as versions
change.
AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
- (byte) 8,
(byte) 0, // AMQP version (major, minor)
+
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version
(major, minor)
true);
// active
- _connection.getProtocolHandler().writeFrame(channelFlowFrame);
+ getProtocolHandler().writeFrame(channelFlowFrame);
}
public void confirmConsumerCancelled(AMQShortString consumerTag)
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Wed Jan 24 16:30:16 2007
@@ -467,11 +467,10 @@
{
if (sendClose)
{
- // AMQP version change: Hardwire the version to 0-8
(major=8, minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as
versions change.
+ // TODO: Be aware of possible changes to parameter order
as versions change.
final AMQFrame cancelFrame =
BasicCancelBody.createAMQFrame(_channelId,
-
(byte) 8, (byte) 0, // AMQP version (major, minor)
+
_protocolHandler.getProtocolMajorVersion(),
+
_protocolHandler.getProtocolMinorVersion(),
_consumerTag, // consumerTag
false); // nowait
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
Wed Jan 24 16:30:16 2007
@@ -21,6 +21,7 @@
package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
Wed Jan 24 16:30:16 2007
@@ -60,82 +60,117 @@
{
ConnectionStartBody body = (ConnectionStartBody) evt.getMethod();
- try
+ byte major = (byte) body.versionMajor;
+ byte minor = (byte) body.versionMinor;
+
+ if(checkVersionOK(major, minor))
{
- // the mechanism we are going to use
- String mechanism;
- if (body.mechanisms == null)
- {
- throw new AMQException("mechanism not specified in
ConnectionStart method frame");
- }
- else
- {
- mechanism = chooseMechanism(body.mechanisms);
- }
- if (mechanism == null)
- {
- throw new AMQException("No supported security mechanism found,
passed: " + new String(body.mechanisms));
- }
+ protocolSession.setProtocolVersion(major, minor);
+
- byte[] saslResponse;
try
{
- SaslClient sc = Sasl.createSaslClient(new String[]{mechanism},
- null, "AMQP",
"localhost",
-
null,createCallbackHandler(mechanism, protocolSession));
- if (sc == null)
- {
- throw new AMQException("Client SASL configuration error:
no SaslClient could be created for mechanism " +
- mechanism + ". Please ensure all
factories are registered. See DynamicSaslRegistrar for " +
- " details of how to register
non-standard SASL client providers.");
+ // the mechanism we are going to use
+ String mechanism;
+ if (body.mechanisms == null)
+ {
+ throw new AMQException("mechanism not specified in
ConnectionStart method frame");
+ }
+ else
+ {
+ mechanism = chooseMechanism(body.mechanisms);
}
- protocolSession.setSaslClient(sc);
- saslResponse = (sc.hasInitialResponse() ?
sc.evaluateChallenge(new byte[0]) : null);
- }
- catch (SaslException e)
- {
- protocolSession.setSaslClient(null);
- throw new AMQException("Unable to create SASL client: " + e,
e);
- }
- if (body.locales == null)
- {
- throw new AMQException("Locales is not defined in Connection
Start method");
- }
- final String locales = new String(body.locales, "utf8");
- final StringTokenizer tokenizer = new StringTokenizer(locales, "
");
- String selectedLocale = null;
- if (tokenizer.hasMoreTokens())
- {
- selectedLocale = tokenizer.nextToken();
+ if (mechanism == null)
+ {
+ throw new AMQException("No supported security mechanism
found, passed: " + new String(body.mechanisms));
+ }
+
+ byte[] saslResponse;
+ try
+ {
+ SaslClient sc = Sasl.createSaslClient(new
String[]{mechanism},
+ null, "AMQP",
"localhost",
+
null,createCallbackHandler(mechanism, protocolSession));
+ if (sc == null)
+ {
+ throw new AMQException("Client SASL configuration
error: no SaslClient could be created for mechanism " +
+ mechanism + ". Please ensure
all factories are registered. See DynamicSaslRegistrar for " +
+ " details of how to register
non-standard SASL client providers.");
+ }
+ protocolSession.setSaslClient(sc);
+ saslResponse = (sc.hasInitialResponse() ?
sc.evaluateChallenge(new byte[0]) : null);
+ }
+ catch (SaslException e)
+ {
+ protocolSession.setSaslClient(null);
+ throw new AMQException("Unable to create SASL client: " +
e, e);
+ }
+
+ if (body.locales == null)
+ {
+ throw new AMQException("Locales is not defined in
Connection Start method");
+ }
+ final String locales = new String(body.locales, "utf8");
+ final StringTokenizer tokenizer = new StringTokenizer(locales,
" ");
+ String selectedLocale = null;
+ if (tokenizer.hasMoreTokens())
+ {
+ selectedLocale = tokenizer.nextToken();
+ }
+ else
+ {
+ throw new AMQException("No locales sent from server,
passed: " + locales);
+ }
+
+ stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
+ FieldTable clientProperties =
FieldTableFactory.newFieldTable();
+
+ clientProperties.setString(new
AMQShortString(ClientProperties.instance.toString()),
protocolSession.getClientID());
+ clientProperties.setString(new
AMQShortString(ClientProperties.product.toString()),
QpidProperties.getProductName());
+ clientProperties.setString(new
AMQShortString(ClientProperties.version.toString()),
QpidProperties.getReleaseVersion());
+ clientProperties.setString(new
AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo());
+ // AMQP version change: Hardwire the version to 0-8 (major=8,
minor=0)
+ // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions
change.
+
protocolSession.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
+ protocolSession.getProtocolMajorVersion(),
+ protocolSession.getProtocolMinorVersion(),
+ clientProperties, // clientProperties
+ new AMQShortString(selectedLocale), // locale
+ new AMQShortString(mechanism), // mechanism
+ saslResponse)); // response
}
- else
+ catch (UnsupportedEncodingException e)
{
- throw new AMQException("No locales sent from server, passed: "
+ locales);
+ throw new AMQException(_log, "Unable to decode data: " + e, e);
}
+ }
+ else
+ {
+ _log.error("Broker requested Protocol ["
+ + body.versionMajor
+ + "-"
+ + body.versionMinor
+ + "] which is not supported by this version of the
client library");
- stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
- FieldTable clientProperties = FieldTableFactory.newFieldTable();
-
- clientProperties.setString(new
AMQShortString(ClientProperties.instance.toString()),
protocolSession.getClientID());
- clientProperties.setString(new
AMQShortString(ClientProperties.product.toString()),
QpidProperties.getProductName());
- clientProperties.setString(new
AMQShortString(ClientProperties.version.toString()),
QpidProperties.getReleaseVersion());
- clientProperties.setString(new
AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo());
- // AMQP version change: Hardwire the version to 0-8 (major=8,
minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions
change.
-
protocolSession.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
- (byte)8, (byte)0, // AMQP version (major, minor)
- clientProperties, // clientProperties
- new AMQShortString(selectedLocale), // locale
- new AMQShortString(mechanism), // mechanism
- saslResponse)); // response
+ protocolSession.closeProtocolSession();
}
- catch (UnsupportedEncodingException e)
+ }
+
+ private boolean checkVersionOK(byte versionMajor, byte versionMinor)
+ {
+ byte[][] supportedVersions = ProtocolVersionList.pv;
+ boolean supported = false;
+ int i = supportedVersions.length;
+ while(i-- != 0 && !supported)
{
- throw new AMQException(_log, "Unable to decode data: " + e, e);
+ supported =
(supportedVersions[i][ProtocolVersionList.PROTOCOL_MAJOR] == versionMajor)
+ &&
(supportedVersions[i][ProtocolVersionList.PROTOCOL_MINOR] == versionMinor);
}
+
+ return supported;
}
private String getFullSystemInfo()
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
Wed Jan 24 16:30:16 2007
@@ -95,21 +95,6 @@
public AMQProtocolHandler(AMQConnection con)
{
_connection = con;
-
- // We add a proxy for the state manager so that we can substitute the
state manager easily in this class.
- // We substitute the state manager when performing failover
-/* _frameListeners.add(new AMQMethodListener()
- {
- public boolean methodReceived(AMQMethodEvent evt) throws
AMQException
- {
- return _stateManager.methodReceived(evt);
- }
-
- public void error(Exception e)
- {
- _stateManager.error(e);
- }
- });*/
}
public boolean isUseSSL()
@@ -152,7 +137,7 @@
public void sessionOpened(IoSession session) throws Exception
{
- System.setProperty("foo", "bar");
+ //System.setProperty("foo", "bar");
}
/**
@@ -526,7 +511,8 @@
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0,
- (byte) 8,
(byte) 0, // AMQP version (major, minor)
+
_protocolSession.getProtocolMajorVersion(),
+
_protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor)
0, //
classId
0, //
methodId
AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
@@ -621,5 +607,16 @@
public void setFailoverState(FailoverState failoverState)
{
_failoverState = failoverState;
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return _protocolSession.getProtocolMajorVersion();
+ }
+
+
+ public byte getProtocolMinorVersion()
+ {
+ return _protocolSession.getProtocolMinorVersion();
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
Wed Jan 24 16:30:16 2007
@@ -93,6 +93,12 @@
protected int _queueId = 1;
protected final Object _queueIdLock = new Object();
+ private byte _protocolMinorVersion;
+ private byte _protocolMajorVersion;
+
+
+
+
/**
* No-arg constructor for use by test subclass - has to initialise final
vars
* NOT intended for use other then for test
@@ -458,4 +464,22 @@
session.confirmConsumerCancelled(consumerTag);
}
+
+ public void setProtocolVersion(byte versionMajor, byte versionMinor)
+ {
+ _protocolMajorVersion = versionMajor;
+ _protocolMinorVersion = versionMinor;
+
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return _protocolMinorVersion;
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return _protocolMajorVersion;
+ }
+
}
Modified:
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
---
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
Wed Jan 24 16:30:16 2007
@@ -158,4 +158,14 @@
{
//To change body of implemented methods use File | Settings | File
Templates.
}
+
+ public byte getProtocolMajorVersion()
+ {
+ return 8; //To change body of implemented methods use File | Settings
| File Templates.
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return 0; //To change body of implemented methods use File | Settings
| File Templates.
+ }
}