Author: rhs
Date: Tue Apr 22 10:15:34 2008
New Revision: 650581

URL: http://svn.apache.org/viewvc?rev=650581&view=rev
Log:
QPID-832: moved 0-8 specific code into 0-8 subclass of session

Modified:
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=650581&r1=650580&r2=650581&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Tue Apr 22 10:15:34 2008
@@ -80,7 +80,6 @@
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
-import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
@@ -539,20 +538,7 @@
      *
      * @todo Be aware of possible changes to parameter order as versions 
change.
      */
-    public void acknowledgeMessage(long deliveryTag, boolean multiple)
-    {
-
-        BasicAckBody body = 
getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
-
-        final AMQFrame ackFrame = body.generateFrame(_channelId);
-
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("Sending ack for delivery tag " + deliveryTag + " on 
channel " + _channelId);
-        }
-
-        getProtocolHandler().writeFrame(ackFrame);
-    }
+    public abstract void acknowledgeMessage(long deliveryTag, boolean 
multiple);
 
     public MethodRegistry getMethodRegistry()
     {
@@ -1043,12 +1029,7 @@
         {
             public Object execute() throws AMQException, FailoverException
             {
-                    QueueDeclareBody body = 
getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,null);
-
-                    AMQFrame queueDeclare = body.generateFrame(_channelId);
-
-                    getProtocolHandler().syncWrite(queueDeclare, 
QueueDeclareOkBody.class);
-
+                sendCreateQueue(name, autoDelete, durable, exclusive);
                 return null;
             }
         }, _connection).execute();
@@ -1425,33 +1406,7 @@
                 _dispatcher.rollback();
             }
 
-            if (isStrictAMQP())
-            {
-                // We can't use the BasicRecoverBody-OK method as it isn't 
part of the spec.
-
-                BasicRecoverBody body = 
getMethodRegistry().createBasicRecoverBody(false);
-                
_connection.getProtocolHandler().writeFrame(body.generateFrame(_channelId));
-                _logger.warn("Session Recover cannot be guaranteed with 
STRICT_AMQP. Messages may arrive out of order.");
-            }
-            else
-            {
-                // in Qpid the 0-8 spec was hacked to have a recover-ok 
method... this is bad
-                // in 0-9 we used the cleaner addition of a new sync recover 
method with its own ok
-                
if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0))
-                {
-                    BasicRecoverBody body = 
getMethodRegistry().createBasicRecoverBody(false);
-                    
_connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), 
BasicRecoverOkBody.class);
-                }
-                else if(getProtocolVersion().equals(ProtocolVersion.v0_9))
-                {
-                    BasicRecoverSyncBody body = 
((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false);
-                    
_connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), 
BasicRecoverSyncOkBody.class);
-                }
-                else
-                {
-                    throw new RuntimeException("Unsupported version of the 
AMQP Protocol: " + getProtocolVersion());
-                }
-            }
+            sendRecover();
 
             if (!isSuspended)
             {
@@ -1468,10 +1423,7 @@
         }
     }
 
-    private ProtocolVersion getProtocolVersion()
-    {
-        return getProtocolHandler().getProtocolVersion();
-    }
+    abstract void sendRecover() throws AMQException, FailoverException;
 
     public void rejectMessage(UnprocessedMessage message, boolean requeue)
     {
@@ -1495,21 +1447,7 @@
 
     }
 
-    public void rejectMessage(long deliveryTag, boolean requeue)
-    {
-        if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == 
SESSION_TRANSACTED))
-        {
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("Rejecting delivery tag:" + deliveryTag + 
":SessionHC:" + this.hashCode());
-            }
-
-            BasicRejectBody body = 
getMethodRegistry().createBasicRejectBody(deliveryTag, requeue);
-            AMQFrame frame = body.generateFrame(_channelId);
-
-            _connection.getProtocolHandler().writeFrame(frame);
-        }
-    }
+    public abstract void rejectMessage(long deliveryTag, boolean requeue);
 
     /**
      * Commits all messages done in this transaction and releases any locks 
currently held.
@@ -1541,9 +1479,7 @@
 
                 releaseForRollback();
 
-                TxRollbackBody body = 
getMethodRegistry().createTxRollbackBody();
-                AMQFrame frame = body.generateFrame(getChannelId());
-                getProtocolHandler().syncWrite(frame, TxRollbackOkBody.class);
+                sendRollback();
 
                 markClean();
 
@@ -2127,48 +2063,13 @@
         // need to generate a consumer tag on the client so we can exploit the 
nowait flag
         AMQShortString tag = new AMQShortString(Integer.toString(tagId));
 
-        FieldTable arguments = FieldTableFactory.newFieldTable();
-        if ((messageSelector != null) && !messageSelector.equals(""))
-        {
-            arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), 
messageSelector);
-        }
-
-        if (consumer.isAutoClose())
-        {
-            arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
-        }
-
-        if (consumer.isNoConsume())
-        {
-            arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
-        }
-
         consumer.setConsumerTag(tag);
         // we must register the consumer in the map before we actually start 
listening
         _consumers.put(tagId, consumer);
 
         try
         {
-            BasicConsumeBody body = 
getMethodRegistry().createBasicConsumeBody(getTicket(),
-                                                                               
queueName,
-                                                                               
tag,
-                                                                               
consumer.isNoLocal(),
-                                                                               
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
-                                                                               
consumer.isExclusive(),
-                                                                               
nowait,
-                                                                               
arguments);
-
-
-            AMQFrame jmsConsume = body.generateFrame(_channelId);
-
-            if (nowait)
-            {
-                protocolHandler.writeFrame(jmsConsume);
-            }
-            else
-            {
-                protocolHandler.syncWrite(jmsConsume, 
BasicConsumeOkBody.class);
-            }
+            sendConsume(consumer, queueName, protocolHandler, nowait, 
messageSelector, tag);
         }
         catch (AMQException e)
         {
@@ -2229,57 +2130,18 @@
     public long getQueueDepth(final AMQDestination amqd)
             throws AMQException
     {
-
-        class QueueDeclareOkHandler extends SpecificMethodFrameListener
-        {
-
-            private long _messageCount;
-            private long _consumerCount;
-
-            public QueueDeclareOkHandler()
-            {
-                super(getChannelId(), QueueDeclareOkBody.class);
-            }
-
-            public boolean processMethod(int channelId, AMQMethodBody frame) 
//throws AMQException
-            {
-                boolean matches = super.processMethod(channelId, frame);
-                if (matches)
-                {
-                    QueueDeclareOkBody declareOk = (QueueDeclareOkBody) frame;
-                    _messageCount = declareOk.getMessageCount();
-                    _consumerCount = declareOk.getConsumerCount();
-                }
-                return matches;
-            }
-
-        }
-
         return new FailoverNoopSupport<Long, AMQException>(
                 new FailoverProtectedOperation<Long, AMQException>()
                 {
                     public Long execute() throws AMQException, 
FailoverException
                     {
-
-                       AMQFrame queueDeclare =
-                               
getMethodRegistry().createQueueDeclareBody(getTicket(),
-                                                                               
                                   amqd.getAMQQueueName(),
-                                                                               
                                   true,
-                                                                               
                                   amqd.isDurable(),
-                                                                               
                                   amqd.isExclusive(),
-                                                                               
                                   amqd.isAutoDelete(),
-                                                                               
                                   false,
-                                                                               
                                   null).generateFrame(_channelId);
-                        QueueDeclareOkHandler okHandler = new 
QueueDeclareOkHandler();
-                        
getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
-
-                        return okHandler._messageCount;
+                        return requestQueueDepth(amqd);
                     }
                 }, _connection).execute();
 
     }
 
-
+    abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, 
FailoverException;
 
     /**
      * Declares the named exchange and type of exchange.
@@ -2302,11 +2164,7 @@
         {
             public Object execute() throws AMQException, FailoverException
             {
-                ExchangeDeclareBody body = 
getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,false,false,false,false,nowait,null);
-                AMQFrame exchangeDeclare = body.generateFrame(_channelId);
-
-                protocolHandler.syncWrite(exchangeDeclare, 
ExchangeDeclareOkBody.class);
-
+                sendExchangeDeclare(name, type, protocolHandler, nowait);
                 return null;
             }
         }, _connection).execute();
@@ -2353,11 +2211,7 @@
                             
amqd.setQueueName(protocolHandler.generateQueueName());
                         }
 
-                        QueueDeclareBody body = 
getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null);
-
-                        AMQFrame queueDeclare = body.generateFrame(_channelId);
-
-                        protocolHandler.syncWrite(queueDeclare, 
QueueDeclareOkBody.class);
+                        sendQueueDeclare(amqd, protocolHandler);
 
                         return amqd.getAMQQueueName();
                     }
@@ -2385,15 +2239,7 @@
             {
                 public Object execute() throws AMQException, FailoverException
                 {
-                        QueueDeleteBody body = 
getMethodRegistry().createQueueDeleteBody(getTicket(),
-                                                           queueName,
-                                                           false,
-                                                           false,
-                                                           true);
-                        AMQFrame queueDeleteFrame = 
body.generateFrame(_channelId);
-
-                    getProtocolHandler().syncWrite(queueDeleteFrame, 
QueueDeleteOkBody.class);
-
+                    sendQueueDelete(queueName);
                     return null;
                 }
             }, _connection).execute();
@@ -2682,12 +2528,7 @@
                 }
 
                 _suspended = suspend;
-
-                ChannelFlowBody body = 
getMethodRegistry().createChannelFlowBody(!suspend);
-
-                AMQFrame channelFlowFrame = body.generateFrame(_channelId);
-
-                _connection.getProtocolHandler().syncWrite(channelFlowFrame, 
ChannelFlowOkBody.class);
+                sendSuspendChannel(suspend);
             }
             catch (FailoverException e)
             {
@@ -2696,11 +2537,13 @@
         }
     }
 
+    public abstract void sendSuspendChannel(boolean suspend) throws 
AMQException, FailoverException;
+
     Object getMessageDeliveryLock()
     {
         return _messageDeliveryLock;
     }
-    
+
     /**
      * Indicates whether this session consumers pre-fetche messages
      *
@@ -2711,8 +2554,6 @@
         return getAMQConnection().getMaxPrefetch() > 0;
     }
 
-
-    public abstract void sendSuspendChannel(boolean suspend) throws 
AMQException, FailoverException;
 
     /** Signifies that the session has pending sends to commit. */
     public void markDirty()

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=650581&r1=650580&r2=650581&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Tue Apr 22 10:15:34 2008
@@ -746,4 +746,10 @@
 
         return subscriber;
     }
+
+    Long requestQueueDepth(AMQDestination amqd)
+    {
+        return 
getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
+    }
+
 }

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=650581&r1=650580&r2=650581&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
 Tue Apr 22 10:15:34 2008
@@ -30,6 +30,7 @@
 import org.apache.qpid.client.failover.FailoverRetrySupport;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
@@ -80,10 +81,16 @@
              defaultPrefetchLow);
     }
 
+    private ProtocolVersion getProtocolVersion()
+    {
+        return getProtocolHandler().getProtocolVersion();
+    }
+
     public void acknowledgeMessage(long deliveryTag, boolean multiple)
     {
-        final AMQFrame ackFrame =
-            
getProtocolHandler().getMethodRegistry().createBasicAckBody(deliveryTag, 
multiple).generateFrame(_channelId);
+        BasicAckBody body = 
getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
+
+        final AMQFrame ackFrame = body.generateFrame(_channelId);
 
         if (_logger.isDebugEnabled())
         {
@@ -121,7 +128,8 @@
     public void sendCreateQueue(AMQShortString name, final boolean autoDelete, 
final boolean durable, final boolean exclusive) throws AMQException,
             FailoverException
     {
-        AMQFrame queueDeclare = 
getProtocolHandler().getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,null).generateFrame(_channelId);
 
+        QueueDeclareBody body = 
getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,null);
+        AMQFrame queueDeclare = body.generateFrame(_channelId);
         getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
     }
 
@@ -133,7 +141,7 @@
         {
             // We can't use the BasicRecoverBody-OK method as it isn't part of 
the spec.
 
-            BasicRecoverBody body = 
getProtocolHandler().getMethodRegistry().createBasicRecoverBody(false);
+            BasicRecoverBody body = 
getMethodRegistry().createBasicRecoverBody(false);
             
_connection.getProtocolHandler().writeFrame(body.generateFrame(_channelId));
             _logger.warn("Session Recover cannot be guaranteed with 
STRICT_AMQP. Messages may arrive out of order.");
         }
@@ -143,17 +151,17 @@
             // in 0-9 we used the cleaner addition of a new sync recover 
method with its own ok
             
if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0))
             {
-                BasicRecoverBody body = 
getProtocolHandler().getMethodRegistry().createBasicRecoverBody(false);
+                BasicRecoverBody body = 
getMethodRegistry().createBasicRecoverBody(false);
                 
_connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), 
BasicRecoverOkBody.class);
             }
-            else 
if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v0_9))
+            else if(getProtocolVersion().equals(ProtocolVersion.v0_9))
             {
-                BasicRecoverSyncBody body = 
((MethodRegistry_0_9)getProtocolHandler().getMethodRegistry()).createBasicRecoverSyncBody(false);
+                BasicRecoverSyncBody body = 
((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false);
                 
_connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), 
BasicRecoverSyncOkBody.class);
             }
             else
             {
-                throw new RuntimeException("Unsupported version of the AMQP 
Protocol: " + getProtocolHandler().getProtocolVersion());
+                throw new RuntimeException("Unsupported version of the AMQP 
Protocol: " + getProtocolVersion());
             }
         }
     }
@@ -183,12 +191,13 @@
         {
             if (_logger.isDebugEnabled())
             {
-                _logger.debug("Rejecting delivery tag:" + deliveryTag);
+                _logger.debug("Rejecting delivery tag:" + deliveryTag + 
":SessionHC:" + this.hashCode());
             }
 
-            AMQFrame basicRejectBody = 
getProtocolHandler().getMethodRegistry().createBasicRejectBody(deliveryTag, 
requeue).generateFrame(_channelId);
+            BasicRejectBody body = 
getMethodRegistry().createBasicRejectBody(deliveryTag, requeue);
+            AMQFrame frame = body.generateFrame(_channelId);
 
-            _connection.getProtocolHandler().writeFrame(basicRejectBody);
+            _connection.getProtocolHandler().writeFrame(frame);
         }
     }
 
@@ -229,7 +238,6 @@
     public void sendConsume(BasicMessageConsumer consumer, AMQShortString 
queueName, AMQProtocolHandler protocolHandler, boolean nowait,
             String messageSelector, AMQShortString tag) throws AMQException, 
FailoverException
     {
-
         FieldTable arguments = FieldTableFactory.newFieldTable();
         if ((messageSelector != null) && !messageSelector.equals(""))
         {
@@ -246,18 +254,17 @@
             arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
         }
 
-        consumer.setConsumerTag(tag);
-        // we must register the consumer in the map before we actually start 
listening
-        _consumers.put(tag.toIntValue(), consumer);
-        // TODO: Be aware of possible changes to parameter order as versions 
change.
-        AMQFrame jmsConsume = 
getProtocolHandler().getMethodRegistry().createBasicConsumeBody(getTicket(),
-                queueName,
-                tag,
-                consumer.isNoLocal(),
-                consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
-                consumer.isExclusive(),
-                nowait,
-                arguments).generateFrame(_channelId);
+        BasicConsumeBody body = 
getMethodRegistry().createBasicConsumeBody(getTicket(),
+                                                                           
queueName,
+                                                                           tag,
+                                                                           
consumer.isNoLocal(),
+                                                                           
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
+                                                                           
consumer.isExclusive(),
+                                                                           
nowait,
+                                                                           
arguments);
+
+
+        AMQFrame jmsConsume = body.generateFrame(_channelId);
 
         if (nowait)
         {
@@ -272,26 +279,28 @@
     public void sendExchangeDeclare(final AMQShortString name, final 
AMQShortString type, final AMQProtocolHandler protocolHandler,
             final boolean nowait) throws AMQException, FailoverException
     {
-        AMQFrame exchangeDeclare = 
getProtocolHandler().getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,false,false,false,false,nowait,null).
-                                            generateFrame(_channelId);
+        ExchangeDeclareBody body = 
getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,false,false,false,false,nowait,null);
+        AMQFrame exchangeDeclare = body.generateFrame(_channelId);
 
         protocolHandler.syncWrite(exchangeDeclare, 
ExchangeDeclareOkBody.class);
     }
 
     public void sendQueueDeclare(final AMQDestination amqd, final 
AMQProtocolHandler protocolHandler) throws AMQException, FailoverException
     {
-        AMQFrame queueDeclare = 
getProtocolHandler().getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null).generateFrame(_channelId);
+        QueueDeclareBody body = 
getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null);
+
+        AMQFrame queueDeclare = body.generateFrame(_channelId);
 
         protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
     }
 
     public void sendQueueDelete(final AMQShortString queueName) throws 
AMQException, FailoverException
     {
-        QueueDeleteBody body = 
getProtocolHandler().getMethodRegistry().createQueueDeleteBody(getTicket(),
-                queueName,
-                false,
-                false,
-                true);
+        QueueDeleteBody body = 
getMethodRegistry().createQueueDeleteBody(getTicket(),
+                                                                         
queueName,
+                                                                         false,
+                                                                         false,
+                                                                         true);
         AMQFrame queueDeleteFrame = body.generateFrame(_channelId);
 
         getProtocolHandler().syncWrite(queueDeleteFrame, 
QueueDeleteOkBody.class);
@@ -299,9 +308,9 @@
 
     public void sendSuspendChannel(boolean suspend) throws AMQException, 
FailoverException
     {
-        
_connection.getProtocolHandler().syncWrite(_connection.getProtocolHandler().getMethodRegistry().
-                                                   
createChannelFlowBody(!suspend).generateFrame(_channelId),
-                                                   ChannelFlowOkBody.class);
+        ChannelFlowBody body = 
getMethodRegistry().createChannelFlowBody(!suspend);
+        AMQFrame channelFlowFrame = body.generateFrame(_channelId);
+        _connection.getProtocolHandler().syncWrite(channelFlowFrame, 
ChannelFlowOkBody.class);
     }
 
     public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination 
destination, final int prefetchHigh,
@@ -326,8 +335,9 @@
 
     public void sendRollback() throws AMQException, FailoverException
     {
-        
_connection.getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createTxRollbackBody().generateFrame(_channelId),
 
-                                                    TxRollbackOkBody.class);
+        TxRollbackBody body = getMethodRegistry().createTxRollbackBody();
+        AMQFrame frame = body.generateFrame(getChannelId());
+        getProtocolHandler().syncWrite(frame, TxRollbackOkBody.class);
     }
 
      public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -338,68 +348,109 @@
     }
 
     public  TopicSubscriber createDurableSubscriber(Topic topic, String name) 
throws JMSException
-       {
+    {
 
-           checkNotClosed();
-           AMQTopic origTopic = checkValidTopic(topic);
-           AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, 
_connection);
-           TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
-           if (subscriber != null)
-           {
-               if (subscriber.getTopic().equals(topic))
-               {
-                   throw new IllegalStateException("Already subscribed to 
topic " + topic + " with subscription exchange "
-                                                   + name);
-               }
-               else
-               {
-                   unsubscribe(name);
-               }
-           }
-           else
-           {
-               AMQShortString topicName;
-               if (topic instanceof AMQTopic)
-               {
-                   topicName = ((AMQTopic) topic).getRoutingKey();
-               }
-               else
-               {
-                   topicName = new AMQShortString(topic.getTopicName());
-               }
-
-               if (_strictAMQP)
-               {
-                   if (_strictAMQPFATAL)
-                   {
-                       throw new UnsupportedOperationException("JMS Durable 
not currently supported by AMQP.");
-                   }
-                   else
-                   {
-                       _logger.warn("Unable to determine if subscription 
already exists for '" + topicName + "' "
-                                    + "for creation durableSubscriber. 
Requesting queue deletion regardless.");
-                   }
-
-                   deleteQueue(dest.getAMQQueueName());
-               }
-               else
-               {
-                   // if the queue is bound to the exchange but NOT for this 
topic, then the JMS spec
-                   // says we must trash the subscription.
-                   if (isQueueBound(dest.getExchangeName(), 
dest.getAMQQueueName())
-                       && !isQueueBound(dest.getExchangeName(), 
dest.getAMQQueueName(), topicName))
-                   {
-                       deleteQueue(dest.getAMQQueueName());
-                   }
-               }
-           }
+        checkNotClosed();
+        AMQTopic origTopic = checkValidTopic(topic);
+        AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, 
_connection);
+        TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+        if (subscriber != null)
+        {
+            if (subscriber.getTopic().equals(topic))
+            {
+                throw new IllegalStateException("Already subscribed to topic " 
+ topic + " with subscription exchange "
+                                                + name);
+            }
+            else
+            {
+                unsubscribe(name);
+            }
+        }
+        else
+        {
+            AMQShortString topicName;
+            if (topic instanceof AMQTopic)
+            {
+                topicName = ((AMQTopic) topic).getRoutingKey();
+            }
+            else
+            {
+                topicName = new AMQShortString(topic.getTopicName());
+            }
 
-           subscriber = new TopicSubscriberAdaptor(dest, 
(BasicMessageConsumer) createConsumer(dest));
+            if (_strictAMQP)
+            {
+                if (_strictAMQPFATAL)
+                {
+                    throw new UnsupportedOperationException("JMS Durable not 
currently supported by AMQP.");
+                }
+                else
+                {
+                    _logger.warn("Unable to determine if subscription already 
exists for '" + topicName + "' "
+                                 + "for creation durableSubscriber. Requesting 
queue deletion regardless.");
+                }
 
-           _subscriptions.put(name, subscriber);
-           _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+                deleteQueue(dest.getAMQQueueName());
+            }
+            else
+            {
+                // if the queue is bound to the exchange but NOT for this 
topic, then the JMS spec
+                // says we must trash the subscription.
+                if (isQueueBound(dest.getExchangeName(), 
dest.getAMQQueueName())
+                    && !isQueueBound(dest.getExchangeName(), 
dest.getAMQQueueName(), topicName))
+                {
+                    deleteQueue(dest.getAMQQueueName());
+                }
+            }
+        }
+
+        subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) 
createConsumer(dest));
+
+        _subscriptions.put(name, subscriber);
+        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+        return subscriber;
+    }
+
+    class QueueDeclareOkHandler extends SpecificMethodFrameListener
+    {
+
+        private long _messageCount;
+        private long _consumerCount;
 
-           return subscriber;
-       }
+        public QueueDeclareOkHandler()
+        {
+            super(getChannelId(), QueueDeclareOkBody.class);
+        }
+
+        public boolean processMethod(int channelId, AMQMethodBody frame) 
//throws AMQException
+        {
+            boolean matches = super.processMethod(channelId, frame);
+            if (matches)
+            {
+                QueueDeclareOkBody declareOk = (QueueDeclareOkBody) frame;
+                _messageCount = declareOk.getMessageCount();
+                _consumerCount = declareOk.getConsumerCount();
+            }
+            return matches;
+        }
+
+    }
+
+    Long requestQueueDepth(AMQDestination amqd) throws AMQException, 
FailoverException
+    {
+        AMQFrame queueDeclare =
+            getMethodRegistry().createQueueDeclareBody(getTicket(),
+                                                       amqd.getAMQQueueName(),
+                                                       true,
+                                                       amqd.isDurable(),
+                                                       amqd.isExclusive(),
+                                                       amqd.isAutoDelete(),
+                                                       false,
+                                                       
null).generateFrame(_channelId);
+        QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
+        getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, 
okHandler);
+        return okHandler._messageCount;
+    }
 
 }


Reply via email to