Author: rgreig
Date: Wed Jan 24 16:30:16 2007
New Revision: 499628

URL: http://svn.apache.org/viewvc?view=rev&rev=499628
Log:
QPID-318 : Patch supplied by Rob Godfrey - Remove hard-coding of protocol 
version number.  

Modified:
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
 Wed Jan 24 16:30:16 2007
@@ -72,7 +72,7 @@
 
 
                     // TODO - set clusterId
-                    
session.writeFrame(BasicGetEmptyBody.createAMQFrame(channelId, (byte) 8, (byte) 
0, null));
+                    
session.writeFrame(BasicGetEmptyBody.createAMQFrame(channelId, body.getMajor(), 
body.getMinor(), null));
                 }
             }
         }

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 Wed Jan 24 16:30:16 2007
@@ -539,17 +539,17 @@
      * NOTE: Both major and minor will be set to 0 prior to protocol 
initiation.
      */
 
-    public byte getAmqpMajor()
+    public byte getProtocolMajorVersion()
     {
         return _major;
     }
 
-    public byte getAmqpMinor()
+    public byte getProtocolMinorVersion()
     {
         return _minor;
     }
 
-    public boolean amqpVersionEquals(byte major, byte minor)
+    public boolean isProtocolVersion(byte major, byte minor)
     {
         return _major == major && _minor == minor;
     }

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
 Wed Jan 24 16:30:16 2007
@@ -35,6 +35,7 @@
 {
 
 
+
     public static interface Task
     {
         public void doTask(AMQProtocolSession session) throws AMQException;
@@ -142,5 +143,9 @@
     void addSessionCloseTask(Task task);
 
     void removeSessionCloseTask(Task task);
+
+    byte getProtocolMajorVersion();
+
+    byte getProtocolMinorVersion();
 
 }

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 Wed Jan 24 16:30:16 2007
@@ -541,7 +541,7 @@
     public void writeDeliver(AMQProtocolSession protocolSession, int 
channelId, long deliveryTag, AMQShortString consumerTag)
             throws AMQException
     {
-        ByteBuffer deliver = createEncodedDeliverFrame(channelId, deliveryTag, 
consumerTag);
+        ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, 
channelId, deliveryTag, consumerTag);
         AMQDataBlock contentHeader = 
ContentHeaderBody.createAMQFrame(channelId,
                                                                       
getContentHeaderBody());
 
@@ -585,7 +585,7 @@
 
     public void writeGetOk(AMQProtocolSession protocolSession, int channelId, 
long deliveryTag, int queueSize) throws AMQException
     {
-        ByteBuffer deliver = createEncodedGetOkFrame(channelId, deliveryTag, 
queueSize);
+        ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, 
channelId, deliveryTag, queueSize);
         AMQDataBlock contentHeader = 
ContentHeaderBody.createAMQFrame(channelId,
                                                                       
getContentHeaderBody());
 
@@ -627,11 +627,11 @@
     }
 
 
-    private ByteBuffer createEncodedDeliverFrame(int channelId, long 
deliveryTag, AMQShortString consumerTag)
+    private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession 
protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
             throws AMQException
     {
         BasicPublishBody pb = getPublishBody();
-        AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, 
(byte) 8, (byte) 0, consumerTag,
+        AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, 
protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
                                                                 deliveryTag, 
pb.exchange, _messageHandle.isRedelivered(),
                                                                 pb.routingKey);
         ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // 
XXX: Could cast be a problem?
@@ -640,11 +640,13 @@
         return buf;
     }
 
-    private ByteBuffer createEncodedGetOkFrame(int channelId, long 
deliveryTag, int queueSize)
+    private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession 
protocolSession, int channelId, long deliveryTag, int queueSize)
             throws AMQException
     {
         BasicPublishBody pb = getPublishBody();
-        AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId, (byte) 
8, (byte) 0,
+        AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
+                                                            
protocolSession.getProtocolMajorVersion(),
+                                                            
protocolSession.getProtocolMinorVersion(),
                                                                 deliveryTag, 
pb.exchange,
                                                                 queueSize,
                                                                 
_messageHandle.isRedelivered(),
@@ -655,9 +657,12 @@
         return buf;
     }
 
-    private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, 
AMQShortString replyText) throws AMQException
+    private ByteBuffer createEncodedReturnFrame(AMQProtocolSession 
protocolSession, int channelId, int replyCode, AMQShortString replyText) throws 
AMQException
     {
-        AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, 
(byte) 8, (byte) 0, getPublishBody().exchange,
+        AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
+                                                              
protocolSession.getProtocolMajorVersion(),
+                                                              
protocolSession.getProtocolMinorVersion(), 
+                                                              
getPublishBody().exchange,
                                                               replyCode, 
replyText,
                                                               
getPublishBody().routingKey);
         ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // 
XXX: Could cast be a problem?
@@ -669,7 +674,7 @@
     public void writeReturn(AMQProtocolSession protocolSession, int channelId, 
int replyCode, AMQShortString replyText)
             throws AMQException
     {
-        ByteBuffer returnFrame = createEncodedReturnFrame(channelId, 
replyCode, replyText);
+        ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, 
channelId, replyCode, replyText);
 
         AMQDataBlock contentHeader = 
ContentHeaderBody.createAMQFrame(channelId,
                                                                       
getContentHeaderBody());

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 Wed Jan 24 16:30:16 2007
@@ -480,22 +480,22 @@
     private void createChannelOverWire(int channelId, int prefetchHigh, int 
prefetchLow, boolean transacted)
             throws AMQException
     {
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
+
+        // TODO: Be aware of possible changes to parameter order as versions 
change.
+
         _protocolHandler.syncWrite(
                 ChannelOpenBody.createAMQFrame(channelId,
-                                               (byte) 8, (byte) 0,    // AMQP 
version (major, minor)
+                                               
_protocolHandler.getProtocolMajorVersion(),
+                                               
_protocolHandler.getProtocolMinorVersion(),
                                                null),    // outOfBand
                                                          
ChannelOpenOkBody.class);
 
         //todo send low water mark when protocol allows.
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
+        //todo Be aware of possible changes to parameter order as versions 
change.
         _protocolHandler.syncWrite(
                 BasicQosBody.createAMQFrame(channelId,
-                                            (byte) 8, (byte) 0,    // AMQP 
version (major, minor)
+                                            
_protocolHandler.getProtocolMajorVersion(),
+                                            
_protocolHandler.getProtocolMinorVersion(),
                                             false,    // global
                                             prefetchHigh,    // prefetchCount
                                             0),    // prefetchSize
@@ -507,10 +507,12 @@
             {
                 _logger.debug("Issuing TxSelect for " + channelId);
             }
-            // AMQP version change: Hardwire the version to 0-8 (major=8, 
minor=0)
-            // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions 
change.
-            _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, 
(byte) 8, (byte) 0), TxSelectOkBody.class);
+
+            // TODO: Be aware of possible changes to parameter order as 
versions change.
+            _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId,
+                                                                   
_protocolHandler.getProtocolMajorVersion(),
+                                                                   
_protocolHandler.getProtocolMinorVersion()),
+                                       TxSelectOkBody.class);
         }
     }
 

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Wed Jan 24 16:30:16 2007
@@ -556,10 +556,13 @@
             }
 
             // Commits outstanding messages sent and outstanding 
acknowledgements.
-            // AMQP version change: Hardwire the version to 0-8 (major=8, 
minor=0)
-            // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions 
change.
-            
_connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId,
 (byte) 8, (byte) 0), TxCommitOkBody.class);
+            // TODO: Be aware of possible changes to parameter order as 
versions change.
+            final AMQProtocolHandler handler = getProtocolHandler();
+
+            handler.syncWrite(TxCommitBody.createAMQFrame(_channelId,
+                                                          
getProtocolMajorVersion(),
+                                                          
getProtocolMinorVersion()),
+                              TxCommitOkBody.class);
         }
         catch (AMQException e)
         {
@@ -569,16 +572,15 @@
         }
     }
 
+    
     public void rollback() throws JMSException
     {
         checkTransacted();
         try
         {
-            // AMQP version change: Hardwire the version to 0-8 (major=8, 
minor=0)
-            // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions 
change.
-            _connection.getProtocolHandler().syncWrite(
-                    TxRollbackBody.createAMQFrame(_channelId, (byte) 8, (byte) 
0), TxRollbackOkBody.class);
+            // TODO: Be aware of possible changes to parameter order as 
versions change.
+            getProtocolHandler().syncWrite(
+                    TxRollbackBody.createAMQFrame(_channelId, 
getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
         }
         catch (AMQException e)
         {
@@ -605,17 +607,16 @@
 
                 try
                 {
-                    _connection.getProtocolHandler().closeSession(this);
-                    // AMQP version change: Hardwire the version to 0-8 
(major=8, minor=0)
-                    // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-                    // Be aware of possible changes to parameter order as 
versions change.
+
+                    getProtocolHandler().closeSession(this);
+                    // TODO: Be aware of possible changes to parameter order 
as versions change.
                     final AMQFrame frame = 
ChannelCloseBody.createAMQFrame(getChannelId(),
-                                                                           
(byte) 8, (byte) 0,    // AMQP version (major, minor)
+                                                                           
getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version 
(major, minor)
                                                                            0,  
  // classId
                                                                            0,  
  // methodId
                                                                            
AMQConstant.REPLY_SUCCESS.getCode(),    // replyCode
                                                                            new 
AMQShortString("JMS client closing channel"));    // replyText
-                    _connection.getProtocolHandler().syncWrite(frame, 
ChannelCloseOkBody.class);
+                    getProtocolHandler().syncWrite(frame, 
ChannelCloseOkBody.class);
                     // When control resumes at this point, a reply will have 
been received that
                     // indicates the broker has closed the channel successfully
 
@@ -634,6 +635,23 @@
         }
     }
 
+    private AMQProtocolHandler getProtocolHandler()
+    {
+        return _connection.getProtocolHandler();
+    }
+
+
+    private byte getProtocolMinorVersion()
+    {
+        return getProtocolHandler().getProtocolMinorVersion();
+    }
+
+    private byte getProtocolMajorVersion()
+    {
+        return getProtocolHandler().getProtocolMajorVersion();
+    }
+
+
     /**
      * Close all producers or consumers. This is called either in the error 
case or when closing the session normally.
      *
@@ -818,11 +836,9 @@
         {
             consumer.clearUnackedMessages();
         }
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        
_connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
-                                                                               
     (byte) 8, (byte) 0,    // AMQP version (major, minor)
+        // TODO: Be aware of possible changes to parameter order as versions 
change.
+        
getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
+                                                                               
     getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version 
(major, minor)
                                                                                
     false));    // requeue
     }
 
@@ -934,7 +950,7 @@
                 checkNotClosed();
                 long producerId = getNextProducerId();
                 BasicMessageProducer producer = new 
BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, 
_channelId,
-                                                                         
AMQSession.this, _connection.getProtocolHandler(),
+                                                                         
AMQSession.this, getProtocolHandler(),
                                                                          
producerId, immediate, mandatory, waitUntilSent);
                 registerProducer(producerId, producer);
                 return producer;
@@ -1102,7 +1118,7 @@
 
                 AMQDestination amqd = (AMQDestination) destination;
 
-                final AMQProtocolHandler protocolHandler = 
_connection.getProtocolHandler();
+                final AMQProtocolHandler protocolHandler = 
getProtocolHandler();
                 // TODO: construct the rawSelector from the selector string if 
rawSelector == null
                 final FieldTable ft = FieldTableFactory.newFieldTable();
                 //if (rawSelector != null)
@@ -1183,16 +1199,14 @@
 
     public void declareExchange(AMQShortString name, AMQShortString type)
     {
-        declareExchange(name, type, _connection.getProtocolHandler());
+        declareExchange(name, type, getProtocolHandler());
     }
 
     public void declareExchangeSynch(AMQShortString name, AMQShortString type) 
throws AMQException
     {
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
+        // TODO: Be aware of possible changes to parameter order as versions 
change.
         AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId,
-                                                            (byte) 8, (byte) 
0,    // AMQP version (major, minor)
+                                                            
getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version 
(major, minor)
                                                             null,    // 
arguments
                                                             false,    // 
autoDelete
                                                             false,    // 
durable
@@ -1202,7 +1216,7 @@
                                                             false,    // 
passive
                                                             0,    // ticket
                                                             type);    // type
-        _connection.getProtocolHandler().syncWrite(frame, 
ExchangeDeclareOkBody.class);
+        getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
     }
 
     private void declareExchange(AMQDestination amqd, AMQProtocolHandler 
protocolHandler)
@@ -1212,11 +1226,9 @@
 
     private void declareExchange(AMQShortString name, AMQShortString type, 
AMQProtocolHandler protocolHandler)
     {
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
+        // TODO: Be aware of possible changes to parameter order as versions 
change.
         AMQFrame exchangeDeclare = 
ExchangeDeclareBody.createAMQFrame(_channelId,
-                                                                      (byte) 
8, (byte) 0,    // AMQP version (major, minor)
+                                                                      
getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version 
(major, minor)
                                                                       null,    
// arguments
                                                                       false,   
 // autoDelete
                                                                       false,   
 // durable
@@ -1247,11 +1259,9 @@
             amqd.setQueueName(protocolHandler.generateQueueName());
         }
 
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
+        // TODO: Be aware of possible changes to parameter order as versions 
change.
         AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
-                                                                (byte) 8, 
(byte) 0,    // AMQP version (major, minor)
+                                                                
getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version 
(major, minor)
                                                                 null,    // 
arguments
                                                                 
amqd.isAutoDelete(),    // autoDelete
                                                                 
amqd.isDurable(),    // durable
@@ -1267,11 +1277,9 @@
 
     private void bindQueue(AMQDestination amqd, AMQShortString queueName, 
AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
     {
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
+        // TODO: Be aware of possible changes to parameter order as versions 
change.
         AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
-                                                          (byte) 8, (byte) 0,  
  // AMQP version (major, minor)
+                                                          
getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version 
(major, minor)
                                                           ft,    // arguments
                                                           
amqd.getExchangeName(),    // exchange
                                                           true,    // nowait
@@ -1315,11 +1323,9 @@
 
         try
         {
-            // AMQP version change: Hardwire the version to 0-8 (major=8, 
minor=0)
-            // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions 
change.
+            // TODO: Be aware of possible changes to parameter order as 
versions change.
             AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId,
-                                                                  (byte) 8, 
(byte) 0,    // AMQP version (major, minor)
+                                                                  
getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version 
(major, minor)
                                                                   arguments,   
 // arguments
                                                                   tag,    // 
consumerTag
                                                                   
consumer.isExclusive(),    // exclusive
@@ -1513,17 +1519,15 @@
     {
         try
         {
-            // AMQP version change: Hardwire the version to 0-8 (major=8, 
minor=0)
-            // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions 
change.
+            // TODO: Be aware of possible changes to parameter order as 
versions change.
             AMQFrame queueDeleteFrame = 
QueueDeleteBody.createAMQFrame(_channelId,
-                                                                       (byte) 
8, (byte) 0,    // AMQP version (major, minor)
+                                                                       
getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version 
(major, minor)
                                                                        false,  
  // ifEmpty
                                                                        false,  
  // ifUnused
                                                                        true,   
 // nowait
                                                                        
queueName,    // queue
                                                                        0);    
// ticket
-            _connection.getProtocolHandler().syncWrite(queueDeleteFrame, 
QueueDeleteOkBody.class);
+            getProtocolHandler().syncWrite(queueDeleteFrame, 
QueueDeleteOkBody.class);
         }
         catch (AMQException e)
         {
@@ -1608,18 +1612,16 @@
 
     boolean isQueueBound(AMQShortString queueName, AMQShortString routingKey) 
throws JMSException
     {
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
+        // TODO: Be aware of possible changes to parameter order as versions 
change.
         AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
-                                                               (byte) 8, 
(byte) 0,    // AMQP version (major, minor)
+                                                               
getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version 
(major, minor)
                                                                
ExchangeDefaults.TOPIC_EXCHANGE_NAME,    // exchange
                                                                queueName,    
// queue
                                                                routingKey);    
// routingKey
         AMQMethodEvent response = null;
         try
         {
-            response = _connection.getProtocolHandler().syncWrite(boundFrame, 
ExchangeBoundOkBody.class);
+            response = getProtocolHandler().syncWrite(boundFrame, 
ExchangeBoundOkBody.class);
         }
         catch (AMQException e)
         {
@@ -1672,18 +1674,16 @@
      */
     public void acknowledgeMessage(long deliveryTag, boolean multiple)
     {
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
+        // TODO: Be aware of possible changes to parameter order as versions 
change.
         final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId,
-                                                              (byte) 8, (byte) 
0,    // AMQP version (major, minor)
+                                                              
getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version 
(major, minor)
                                                               deliveryTag,    
// deliveryTag
                                                               multiple);    // 
multiple
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Sending ack for delivery tag " + deliveryTag + " on 
channel " + _channelId);
         }
-        _connection.getProtocolHandler().writeFrame(ackFrame);
+        getProtocolHandler().writeFrame(ackFrame);
     }
 
     public int getDefaultPrefetch()
@@ -1742,7 +1742,7 @@
     {
         AMQDestination amqd = consumer.getDestination();
 
-        AMQProtocolHandler protocolHandler = _connection.getProtocolHandler();
+        AMQProtocolHandler protocolHandler = getProtocolHandler();
 
         declareExchange(amqd, protocolHandler);
 
@@ -1839,25 +1839,21 @@
     private void suspendChannel()
     {
         _logger.warn("Suspending channel");
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
+        // TODO: Be aware of possible changes to parameter order as versions 
change.
         AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
-                                                                   (byte) 8, 
(byte) 0,    // AMQP version (major, minor)
+                                                                   
getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version 
(major, minor)
                                                                    false);    
// active
-        _connection.getProtocolHandler().writeFrame(channelFlowFrame);
+        getProtocolHandler().writeFrame(channelFlowFrame);
     }
 
     private void unsuspendChannel()
     {
         _logger.warn("Unsuspending channel");
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
+        // TODO: Be aware of possible changes to parameter order as versions 
change.
         AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
-                                                                   (byte) 8, 
(byte) 0,    // AMQP version (major, minor)
+                                                                   
getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version 
(major, minor)
                                                                    true);    
// active
-        _connection.getProtocolHandler().writeFrame(channelFlowFrame);
+        getProtocolHandler().writeFrame(channelFlowFrame);
     }
 
     public void confirmConsumerCancelled(AMQShortString consumerTag)

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Wed Jan 24 16:30:16 2007
@@ -467,11 +467,10 @@
             {
                 if (sendClose)
                 {
-                    // AMQP version change: Hardwire the version to 0-8 
(major=8, minor=0)
-                    // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-                    // Be aware of possible changes to parameter order as 
versions change.
+                    // TODO: Be aware of possible changes to parameter order 
as versions change.
                     final AMQFrame cancelFrame = 
BasicCancelBody.createAMQFrame(_channelId,
-                                                                               
 (byte) 8, (byte) 0,    // AMQP version (major, minor)
+                                                                               
 _protocolHandler.getProtocolMajorVersion(),
+                                                                               
 _protocolHandler.getProtocolMinorVersion(),
                                                                                
 _consumerTag,    // consumerTag
                                                                                
 false);    // nowait
 

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
 Wed Jan 24 16:30:16 2007
@@ -21,6 +21,7 @@
 package org.apache.qpid.client.handler;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQState;

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
 Wed Jan 24 16:30:16 2007
@@ -60,82 +60,117 @@
     {
         ConnectionStartBody body = (ConnectionStartBody) evt.getMethod();
 
-        try
+        byte major = (byte) body.versionMajor;
+        byte minor = (byte) body.versionMinor;
+
+        if(checkVersionOK(major, minor))
         {
-            // the mechanism we are going to use
-            String mechanism;
-            if (body.mechanisms == null)
-            {
-                throw new AMQException("mechanism not specified in 
ConnectionStart method frame");
-            }
-            else
-            {
-                mechanism = chooseMechanism(body.mechanisms);
-            }
 
-            if (mechanism == null)
-            {
-                throw new AMQException("No supported security mechanism found, 
passed: " + new String(body.mechanisms));
-            }
+            protocolSession.setProtocolVersion(major, minor);
+
 
-            byte[] saslResponse;
             try
             {
-                SaslClient sc = Sasl.createSaslClient(new String[]{mechanism},
-                                                      null, "AMQP", 
"localhost",
-                                                      
null,createCallbackHandler(mechanism, protocolSession));
-                if (sc == null)
-                {
-                    throw new AMQException("Client SASL configuration error: 
no SaslClient could be created for mechanism " +
-                                           mechanism + ". Please ensure all 
factories are registered. See DynamicSaslRegistrar for " +
-                                           " details of how to register 
non-standard SASL client providers.");
+                // the mechanism we are going to use
+                String mechanism;
+                if (body.mechanisms == null)
+                {
+                    throw new AMQException("mechanism not specified in 
ConnectionStart method frame");
+                }
+                else
+                {
+                    mechanism = chooseMechanism(body.mechanisms);
                 }
-                protocolSession.setSaslClient(sc);
-                saslResponse = (sc.hasInitialResponse() ? 
sc.evaluateChallenge(new byte[0]) : null);
-            }
-            catch (SaslException e)
-            {
-                protocolSession.setSaslClient(null);
-                throw new AMQException("Unable to create SASL client: " + e, 
e);
-            }
 
-            if (body.locales == null)
-            {
-                throw new AMQException("Locales is not defined in Connection 
Start method");
-            }
-            final String locales = new String(body.locales, "utf8");
-            final StringTokenizer tokenizer = new StringTokenizer(locales, " 
");
-            String selectedLocale = null;
-            if (tokenizer.hasMoreTokens())
-            {
-                selectedLocale = tokenizer.nextToken();
+                if (mechanism == null)
+                {
+                    throw new AMQException("No supported security mechanism 
found, passed: " + new String(body.mechanisms));
+                }
+
+                byte[] saslResponse;
+                try
+                {
+                    SaslClient sc = Sasl.createSaslClient(new 
String[]{mechanism},
+                                                          null, "AMQP", 
"localhost",
+                                                          
null,createCallbackHandler(mechanism, protocolSession));
+                    if (sc == null)
+                    {
+                        throw new AMQException("Client SASL configuration 
error: no SaslClient could be created for mechanism " +
+                                               mechanism + ". Please ensure 
all factories are registered. See DynamicSaslRegistrar for " +
+                                               " details of how to register 
non-standard SASL client providers.");
+                    }
+                    protocolSession.setSaslClient(sc);
+                    saslResponse = (sc.hasInitialResponse() ? 
sc.evaluateChallenge(new byte[0]) : null);
+                }
+                catch (SaslException e)
+                {
+                    protocolSession.setSaslClient(null);
+                    throw new AMQException("Unable to create SASL client: " + 
e, e);
+                }
+
+                if (body.locales == null)
+                {
+                    throw new AMQException("Locales is not defined in 
Connection Start method");
+                }
+                final String locales = new String(body.locales, "utf8");
+                final StringTokenizer tokenizer = new StringTokenizer(locales, 
" ");
+                String selectedLocale = null;
+                if (tokenizer.hasMoreTokens())
+                {
+                    selectedLocale = tokenizer.nextToken();
+                }
+                else
+                {
+                    throw new AMQException("No locales sent from server, 
passed: " + locales);
+                }
+
+                stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
+                FieldTable clientProperties = 
FieldTableFactory.newFieldTable();
+
+                clientProperties.setString(new 
AMQShortString(ClientProperties.instance.toString()), 
protocolSession.getClientID());
+                clientProperties.setString(new 
AMQShortString(ClientProperties.product.toString()), 
QpidProperties.getProductName());
+                clientProperties.setString(new 
AMQShortString(ClientProperties.version.toString()), 
QpidProperties.getReleaseVersion());
+                clientProperties.setString(new 
AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo());
+                // AMQP version change: Hardwire the version to 0-8 (major=8, 
minor=0)
+                // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
+                // Be aware of possible changes to parameter order as versions 
change.
+                
protocolSession.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
+                    protocolSession.getProtocolMajorVersion(),
+                    protocolSession.getProtocolMinorVersion(), 
+                    clientProperties,  // clientProperties
+                    new AMQShortString(selectedLocale),        // locale
+                    new AMQShortString(mechanism),     // mechanism
+                    saslResponse));    // response
             }
-            else
+            catch (UnsupportedEncodingException e)
             {
-                throw new AMQException("No locales sent from server, passed: " 
+ locales);
+                throw new AMQException(_log, "Unable to decode data: " + e, e);
             }
+        }
+        else
+        {
+            _log.error("Broker requested Protocol ["
+                        + body.versionMajor
+                        + "-"
+                        + body.versionMinor
+                        + "] which is not supported by this version of the 
client library");
 
-            stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
-            FieldTable clientProperties = FieldTableFactory.newFieldTable();
-            
-            clientProperties.setString(new 
AMQShortString(ClientProperties.instance.toString()), 
protocolSession.getClientID());
-            clientProperties.setString(new 
AMQShortString(ClientProperties.product.toString()), 
QpidProperties.getProductName());
-            clientProperties.setString(new 
AMQShortString(ClientProperties.version.toString()), 
QpidProperties.getReleaseVersion());
-            clientProperties.setString(new 
AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo());
-            // AMQP version change: Hardwire the version to 0-8 (major=8, 
minor=0)
-            // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions 
change.
-            
protocolSession.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
-                (byte)8, (byte)0,      // AMQP version (major, minor)
-                clientProperties,      // clientProperties
-                new AMQShortString(selectedLocale),    // locale
-                new AMQShortString(mechanism), // mechanism
-                saslResponse));        // response
+            protocolSession.closeProtocolSession();
         }
-        catch (UnsupportedEncodingException e)
+    }
+
+    private boolean checkVersionOK(byte versionMajor, byte versionMinor)
+    {
+        byte[][] supportedVersions = ProtocolVersionList.pv;
+        boolean supported = false;
+        int i = supportedVersions.length;
+        while(i-- != 0 && !supported)
         {
-            throw new AMQException(_log, "Unable to decode data: " + e, e);
+            supported = 
(supportedVersions[i][ProtocolVersionList.PROTOCOL_MAJOR] == versionMajor)
+                        && 
(supportedVersions[i][ProtocolVersionList.PROTOCOL_MINOR] == versionMinor);
         }
+
+        return supported;
     }
 
     private String getFullSystemInfo()

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
 Wed Jan 24 16:30:16 2007
@@ -95,21 +95,6 @@
     public AMQProtocolHandler(AMQConnection con)
     {
         _connection = con;
-
-        // We add a proxy for the state manager so that we can substitute the 
state manager easily in this class.
-        // We substitute the state manager when performing failover
-/*        _frameListeners.add(new AMQMethodListener()
-        {
-            public boolean methodReceived(AMQMethodEvent evt) throws 
AMQException
-            {
-                return _stateManager.methodReceived(evt);
-            }
-
-            public void error(Exception e)
-            {
-                _stateManager.error(e);
-            }
-        });*/
     }
 
     public boolean isUseSSL()
@@ -152,7 +137,7 @@
 
     public void sessionOpened(IoSession session) throws Exception
     {
-        System.setProperty("foo", "bar");
+        //System.setProperty("foo", "bar");
     }
 
     /**
@@ -526,7 +511,8 @@
         // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0,
-                                                                  (byte) 8, 
(byte) 0,    // AMQP version (major, minor)
+                                                                  
_protocolSession.getProtocolMajorVersion(),
+                                                                  
_protocolSession.getProtocolMinorVersion(),    // AMQP version (major, minor)
                                                                   0,    // 
classId
                                                                   0,    // 
methodId
                                                                   
AMQConstant.REPLY_SUCCESS.getCode(),    // replyCode
@@ -621,5 +607,16 @@
     public void setFailoverState(FailoverState failoverState)
     {
         _failoverState = failoverState;
+    }
+
+    public byte getProtocolMajorVersion()
+    {
+        return _protocolSession.getProtocolMajorVersion();
+    }
+
+
+    public byte getProtocolMinorVersion()
+    {
+        return _protocolSession.getProtocolMinorVersion();
     }
 }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
 Wed Jan 24 16:30:16 2007
@@ -93,6 +93,12 @@
     protected int _queueId = 1;
     protected final Object _queueIdLock = new Object();
 
+    private byte _protocolMinorVersion;
+    private byte _protocolMajorVersion;
+
+
+
+
     /**
      * No-arg constructor for use by test subclass - has to initialise final 
vars
      * NOT intended for use other then for test
@@ -458,4 +464,22 @@
 
         session.confirmConsumerCancelled(consumerTag);
     }
+
+    public void setProtocolVersion(byte versionMajor, byte versionMinor)
+    {
+        _protocolMajorVersion = versionMajor;
+        _protocolMinorVersion = versionMinor;
+
+    }
+
+    public byte getProtocolMinorVersion()
+    {
+        return _protocolMinorVersion;
+    }
+
+    public byte getProtocolMajorVersion()
+    {
+        return _protocolMajorVersion;
+    }
+
 }

Modified: 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java?view=diff&rev=499628&r1=499627&r2=499628
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
 Wed Jan 24 16:30:16 2007
@@ -158,4 +158,14 @@
     {
         //To change body of implemented methods use File | Settings | File 
Templates.
     }
+
+    public byte getProtocolMajorVersion()
+    {
+        return 8;  //To change body of implemented methods use File | Settings 
| File Templates.
+    }
+
+    public byte getProtocolMinorVersion()
+    {
+        return 0;  //To change body of implemented methods use File | Settings 
| File Templates.
+    }
 }


Reply via email to