Author: gsim
Date: Mon Jul  9 03:07:26 2007
New Revision: 554590

URL: http://svn.apache.org/viewvc?view=rev&rev=554590
Log:
refactoring: 

* separated out the connection level method handling from semantic level 
(session/channel level should also be separated)
* reduce coupling between Connection and Channel


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionAdapter.cpp   (with 
props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionAdapter.h   (with 
props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=554590&r1=554589&r2=554590
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Mon Jul  9 03:07:26 2007
@@ -193,6 +193,7 @@
   qpid/broker/BrokerMessageMessage.cpp \
   qpid/broker/BrokerQueue.cpp \
   qpid/broker/Connection.cpp \
+  qpid/broker/ConnectionAdapter.cpp \
   qpid/broker/ConnectionFactory.cpp \
   qpid/broker/Daemon.cpp \
   qpid/broker/DeliverableMessage.cpp \
@@ -289,6 +290,7 @@
   qpid/broker/BrokerMessageMessage.h \
   qpid/broker/BrokerSingleton.h \
   qpid/broker/Connection.h \
+  qpid/broker/ConnectionAdapter.h \
   qpid/broker/ConnectionFactory.h \
   qpid/broker/ConnectionToken.h \
   qpid/broker/Content.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?view=diff&rev=554590&r1=554589&r2=554590
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Mon Jul  9 
03:07:26 2007
@@ -38,7 +38,6 @@
     connection(c),
     basicHandler(*this),
     channelHandler(*this),
-    connectionHandler(*this),
     exchangeHandler(*this),
     bindingHandler(*this),
     messageHandler(*this),
@@ -51,47 +50,6 @@
 ProtocolVersion BrokerAdapter::getVersion() const {
     return connection.getVersion();
 }
-
-void BrokerAdapter::ConnectionHandlerImpl::startOk(
-    const MethodContext&, const FieldTable& /*clientProperties*/,
-    const string& /*mechanism*/, 
-    const string& /*response*/, const string& /*locale*/)
-{
-    client.tune(
-        CHANNEL_MAX, connection.getFrameMax(), connection.getHeartbeat());
-}
-        
-void BrokerAdapter::ConnectionHandlerImpl::secureOk(
-    const MethodContext&, const string& /*response*/){}
-        
-void BrokerAdapter::ConnectionHandlerImpl::tuneOk(
-    const MethodContext&, uint16_t /*channelmax*/,
-    uint32_t framemax, uint16_t heartbeat)
-{
-    connection.setFrameMax(framemax);
-    connection.setHeartbeat(heartbeat);
-}
-        
-void BrokerAdapter::ConnectionHandlerImpl::open(
-    const MethodContext& context, const string& /*virtualHost*/,
-    const string& /*capabilities*/, bool /*insist*/)
-{
-    string knownhosts;
-    client.openOk(
-        knownhosts, context.getRequestId());
-}
-        
-void BrokerAdapter::ConnectionHandlerImpl::close(
-    const MethodContext& context, uint16_t /*replyCode*/, const string& 
/*replyText*/, 
-    uint16_t /*classId*/, uint16_t /*methodId*/)
-{
-    client.closeOk(context.getRequestId());
-    connection.getOutput().close();
-} 
-        
-void BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){
-    connection.getOutput().close();
-} 
               
 void BrokerAdapter::ChannelHandlerImpl::open(
     const MethodContext& context, const string& /*outOfBand*/){
@@ -208,7 +166,7 @@
                                               bool autoDelete, bool nowait, 
const qpid::framing::FieldTable& arguments){
     Queue::shared_ptr queue;
     if (passive && !name.empty()) {
-       queue = connection.getQueue(name, channel.getId());
+       queue = getQueue(name);
     } else {
        std::pair<Queue::shared_ptr, bool> queue_created =  
             broker.getQueues().declare(
@@ -249,7 +207,7 @@
                                            const string& exchangeName, const 
string& routingKey, bool nowait, 
                                            const FieldTable& arguments){
 
-    Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+    Queue::shared_ptr queue = getQueue(queueName);
     Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
     if(exchange){
         string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? 
queue->getName() : routingKey;
@@ -275,7 +233,7 @@
     const string& routingKey,
     const qpid::framing::FieldTable& arguments )
 {
-    Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+    Queue::shared_ptr queue = getQueue(queueName);
     if (!queue.get()) throw ChannelException(404, "Unbind failed. No such 
exchange: " + exchangeName);
 
     Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
@@ -290,7 +248,7 @@
         
 void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, 
uint16_t /*ticket*/, const string& queueName, bool nowait){
 
-    Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+    Queue::shared_ptr queue = getQueue(queueName);
     int count = queue->purge();
     if(!nowait) client.purgeOk( count, context.getRequestId());
 } 
@@ -299,7 +257,7 @@
                                               bool ifUnused, bool ifEmpty, 
bool nowait){
     ChannelException error(0, "");
     int count(0);
-    Queue::shared_ptr q = connection.getQueue(queue, channel.getId());
+    Queue::shared_ptr q = getQueue(queue);
     if(ifEmpty && q->getMessageCount() > 0){
         throw ChannelException(406, "Queue not empty.");
     }else if(ifUnused && q->getConsumerCount() > 0){
@@ -337,7 +295,7 @@
     bool nowait, const FieldTable& fields)
 {
     
-    Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); 
   
+    Queue::shared_ptr queue = getQueue(queueName);    
     if(!consumerTag.empty() && channel.exists(consumerTag)){
         throw ConnectionException(530, "Consumer tags must be unique");
     }
@@ -377,8 +335,8 @@
 } 
         
 void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, 
uint16_t /*ticket*/, const string& queueName, bool noAck){
-    Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); 
   
-    if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){
+    Queue::shared_ptr queue = getQueue(queueName);    
+    if(!channel.get(queue, "", !noAck)){
         string clusterId;//not used, part of an imatix hack
 
         client.getEmpty(clusterId, context.getRequestId());

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h?view=diff&rev=554590&r1=554589&r2=554590
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h Mon Jul  9 
03:07:26 2007
@@ -60,7 +60,6 @@
 
     framing::ProtocolVersion getVersion() const;
     ChannelHandler* getChannelHandler() { return &channelHandler; }
-    ConnectionHandler* getConnectionHandler() { return &connectionHandler; }
     BasicHandler* getBasicHandler() { return &basicHandler; }
     ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
     BindingHandler* getBindingHandler() { return &bindingHandler; }
@@ -81,35 +80,14 @@
     DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
     DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
 
+    ConnectionHandler* getConnectionHandler() { 
+        throw ConnectionException(503, "Can't access connection class on 
non-zero channel!");        
+    }
+
     framing::AMQP_ClientProxy& getProxy() { return proxy; }
 
   private:
 
-    class ConnectionHandlerImpl :
-        public ConnectionHandler,
-        public HandlerImpl<framing::AMQP_ClientProxy::Connection>
-    {
-      public:
-        ConnectionHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) 
{}
-
-        void startOk(const framing::MethodContext& context,
-                     const qpid::framing::FieldTable& clientProperties,
-                     const std::string& mechanism, const std::string& response,
-                     const std::string& locale); 
-        void secureOk(const framing::MethodContext& context,
-                      const std::string& response); 
-        void tuneOk(const framing::MethodContext& context,
-                    uint16_t channelMax,
-                    uint32_t frameMax, uint16_t heartbeat); 
-        void open(const framing::MethodContext& context,
-                  const std::string& virtualHost,
-                  const std::string& capabilities, bool insist); 
-        void close(const framing::MethodContext& context, uint16_t replyCode,
-                   const std::string& replyText,
-                   uint16_t classId, uint16_t methodId); 
-        void closeOk(const framing::MethodContext& context); 
-    };
-
     class ChannelHandlerImpl :
         public ChannelHandler,
         public HandlerImpl<framing::AMQP_ClientProxy::Channel>
@@ -231,7 +209,6 @@
     Connection& connection;
     BasicHandlerImpl basicHandler;
     ChannelHandlerImpl channelHandler;
-    ConnectionHandlerImpl connectionHandler;
     ExchangeHandlerImpl exchangeHandler;
     BindingHandlerImpl bindingHandler;
     MessageHandlerImpl messageHandler;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=554590&r1=554589&r2=554590
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Mon Jul  9 
03:07:26 2007
@@ -50,22 +50,17 @@
 using namespace qpid::sys;
 
 
-Channel::Channel(
-    Connection& con, ChannelId id,
-    uint32_t _framesize, MessageStore* const _store,
-    uint64_t _stagingThreshold
-) :
+Channel::Channel(Connection& con, ChannelId id, MessageStore* const _store) :
     ChannelAdapter(),
     connection(con),
     currentDeliveryTag(1),
     prefetchSize(0),
     prefetchCount(0),
-    framesize(_framesize),
     tagGenerator("sgen"),
     dtxSelected(false),
     accumulatedAck(0),
     store(_store),
-    messageBuilder(this, _store, _stagingThreshold),
+    messageBuilder(this, _store, connection.getStagingThreshold()),
     opened(id == 0),//channel 0 is automatically open, other must be 
explicitly opened
     flowActive(true),
     adapter(new BrokerAdapter(*this, con, con.broker))
@@ -215,7 +210,7 @@
         outstanding.count++;
     }
     //send deliver method, header and content(s)
-    msg->deliver(*this, consumerTag, deliveryTag, framesize);
+    msg->deliver(*this, consumerTag, deliveryTag, connection.getFrameMax());
 }
 
 bool Channel::checkPrefetch(Message::shared_ptr& msg){
@@ -378,7 +373,7 @@
         msg->sendGetOk(MethodContext(this, msg->getRespondTo()),
                                   destination,
                        queue->getMessageCount() + 1, myDeliveryTag,
-                       framesize);
+                       connection.getFrameMax());
         if(ackExpected){
             unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
         }
@@ -391,7 +386,7 @@
 void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag,
                       uint64_t deliveryTag)
 {
-    msg->deliver(*this, consumerTag, deliveryTag, framesize);
+    msg->deliver(*this, consumerTag, deliveryTag, connection.getFrameMax());
 }
 
 void Channel::handleMethodInContext(

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h?view=diff&rev=554590&r1=554589&r2=554590
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h Mon Jul  9 
03:07:26 2007
@@ -86,7 +86,6 @@
     uint32_t prefetchSize;    
     uint16_t prefetchCount;    
     Prefetch outstanding;
-    uint32_t framesize;
     NameGenerator tagGenerator;
     std::list<DeliveryRecord> unacked;
     sys::Mutex deliveryLock;
@@ -110,12 +109,7 @@
     void checkDtxTimeout();
         
   public:
-    Channel(Connection& parent,
-            framing::ChannelId id,
-            uint32_t framesize, 
-            MessageStore* const _store = 0,
-            uint64_t stagingThreshold = 0);
-    
+    Channel(Connection& parent, framing::ChannelId id, MessageStore* const 
store = 0);    
     ~Channel();
 
     bool isOpen() const { return opened; }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?view=diff&rev=554590&r1=554589&r2=554590
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Mon Jul  9 
03:07:26 2007
@@ -41,51 +41,34 @@
     framemax(65536), 
     heartbeat(0),
     client(0),
-    stagingThreshold(broker.getStagingThreshold())
+    stagingThreshold(broker.getStagingThreshold()),
+    adapter(*this)
 {}
 
 
-Queue::shared_ptr Connection::getQueue(const string& name, uint16_t channel){
-    Queue::shared_ptr queue;
-    if (name.empty()) {
-        queue = getChannel(channel).getDefaultQueue();
-        if (!queue) throw ConnectionException( 530, "Queue must be specified 
or previously declared" );
-    } else {
-        queue = broker.getQueues().find(name);
-        if (queue == 0) {
-            throw ChannelException( 404, "Queue not found: " + name);
-        }
-    }
-    return queue;
-}
-
-
 Exchange::shared_ptr Connection::findExchange(const string& name){
     return broker.getExchanges().get(name);
 }
 
 
 void Connection::received(framing::AMQFrame& frame){
-    getChannel((frame.getChannel())).getHandlers().in->handle(frame);
+    if (frame.getChannel() == 0) {
+        adapter.handle(frame);
+    } else {
+        getChannel((frame.getChannel())).getHandlers().in->handle(frame);
+    }
 }
 
 void Connection::close(
     ReplyCode code, const string& text, ClassId classId, MethodId methodId)
 {
-    client->close(code, text, classId, methodId);
+    adapter.close(code, text, classId, methodId);
     getOutput().close();
 }
 
 void Connection::initiated(const framing::ProtocolInitiation& header) {
     version = ProtocolVersion(header.getMajor(), header.getMinor());
-    FieldTable properties;
-    string mechanisms("PLAIN");
-    string locales("en_US");
-    getChannel(0).init(0, *out, getVersion());
-    client = &getChannel(0).getAdapter().getProxy().getConnection();
-    client->start(
-        header.getMajor(), header.getMinor(),
-        properties, mechanisms, locales);
+    adapter.init(header);
 }
 
 void Connection::idleOut(){}
@@ -117,10 +100,7 @@
 Channel& Connection::getChannel(ChannelId id) {
     ChannelMap::iterator i = channels.find(id);
     if (i == channels.end()) {
-        i = channels.insert(
-            id, new Channel(
-                *this, id, framemax, broker.getQueues().getStore(),
-                broker.getStagingThreshold())).first;
+        i = channels.insert(id, new Channel(*this, id, 
&broker.getStore())).first;
     }        
     return *i;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?view=diff&rev=554590&r1=554589&r2=554590
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Mon Jul  9 
03:07:26 2007
@@ -29,6 +29,7 @@
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/AMQP_ServerOperations.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/framing/ChannelAdapter.h"
 #include "qpid/sys/ConnectionOutputHandler.h"
 #include "qpid/sys/ConnectionInputHandler.h"
 #include "qpid/sys/TimeoutHandler.h"
@@ -36,6 +37,7 @@
 #include "Broker.h"
 #include "qpid/Exception.h"
 #include "BrokerChannel.h"
+#include "ConnectionAdapter.h"
 
 namespace qpid {
 namespace broker {
@@ -66,14 +68,8 @@
 
     void setFrameMax(uint32_t fm) { framemax = fm; }
     void setHeartbeat(uint16_t hb) { heartbeat = hb; }
+    void setStagingThreshold(uint64_t st) { stagingThreshold = st; }
     
-    /**
-     * Get named queue, never returns 0.
-     * @return: named queue or default queue for channel if name=""
-     * @exception: ChannelException if no queue of that name is found.
-     * @exception: ConnectionException if name="" and channel has no default.
-     */
-    Queue::shared_ptr getQueue(const string& name, uint16_t channel);
 
     Broker& broker;
     std::vector<Queue::shared_ptr> exclusiveQueues;
@@ -97,7 +93,8 @@
     uint32_t framemax;
     uint16_t heartbeat;
     framing::AMQP_ClientProxy::Connection* client;
-    const uint64_t stagingThreshold;
+    uint64_t stagingThreshold;
+    ConnectionAdapter adapter;
 
 };
 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionAdapter.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionAdapter.cpp?view=auto&rev=554590
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionAdapter.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionAdapter.cpp Mon Jul 
 9 03:07:26 2007
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConnectionAdapter.h"
+#include "Connection.h"
+#include "qpid/framing/MethodContext.h"
+
+using namespace qpid;
+using namespace qpid::broker;
+using qpid::framing::ReplyCode;
+using qpid::framing::ClassId;
+using qpid::framing::MethodId;
+using qpid::framing::MethodContext;
+using qpid::framing::FieldTable;
+
+void ConnectionAdapter::init(const framing::ProtocolInitiation& header) {
+    ChannelAdapter::init(0, handler->connection.getOutput(), 
handler->connection.getVersion());
+    FieldTable properties;
+    string mechanisms("PLAIN");
+    string locales("en_US");
+    handler->client.start(header.getMajor(), header.getMinor(), properties, 
mechanisms, locales);
+}
+
+void ConnectionAdapter::close(ReplyCode code, const string& text, ClassId 
classId, MethodId methodId)
+{
+    handler->client.close(code, text, classId, methodId);
+}
+
+void ConnectionAdapter::handleMethodInContext(
+    boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+    const MethodContext& context
+)
+{
+    try{
+        method->invoke(*this, context);
+    }catch(ConnectionException& e){
+        handler->client.close(e.code, e.toString(), method->amqpClassId(), 
method->amqpMethodId());
+    }catch(std::exception& e){
+        handler->client.close(541/*internal error*/, e.what(), 
method->amqpClassId(), method->amqpMethodId());
+    }
+}
+
+framing::AMQP_ServerOperations::ConnectionHandler* 
ConnectionAdapter::getConnectionHandler() 
+{ 
+    return handler.get(); 
+}
+
+framing::ProtocolVersion ConnectionAdapter::getVersion() const 
+{ 
+    return handler->connection.getVersion(); 
+}
+
+void ConnectionAdapter::handle(framing::AMQFrame& frame)
+{
+    getHandlers().in->handle(frame);
+}
+
+ConnectionAdapter::ConnectionAdapter(Connection& connection) 
+{
+    handler = std::auto_ptr<Handler>(new Handler(connection, *this));
+}
+
+Handler::Handler(Connection& c, ConnectionAdapter& a) : 
+    proxy(a), client(proxy.getConnection()), connection(c) {}
+
+
+void Handler::startOk(
+    const MethodContext&, const FieldTable& /*clientProperties*/,
+    const string& /*mechanism*/, 
+    const string& /*response*/, const string& /*locale*/)
+{
+    client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 
connection.getHeartbeat());
+}
+        
+void Handler::secureOk(
+    const MethodContext&, const string& /*response*/){}
+        
+void Handler::tuneOk(
+    const MethodContext&, uint16_t /*channelmax*/,
+    uint32_t framemax, uint16_t heartbeat)
+{
+    connection.setFrameMax(framemax);
+    connection.setHeartbeat(heartbeat);
+}
+        
+void Handler::open(
+    const MethodContext& context, const string& /*virtualHost*/,
+    const string& /*capabilities*/, bool /*insist*/)
+{
+    string knownhosts;
+    client.openOk(
+        knownhosts, context.getRequestId());
+}
+
+        
+void Handler::close(
+    const MethodContext& context, uint16_t /*replyCode*/, const string& 
/*replyText*/, 
+    uint16_t /*classId*/, uint16_t /*methodId*/)
+{
+    client.closeOk(context.getRequestId());
+    connection.getOutput().close();
+} 
+        
+void Handler::closeOk(const MethodContext&){
+    connection.getOutput().close();
+} 

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionAdapter.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionAdapter.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionAdapter.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionAdapter.h?view=auto&rev=554590
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionAdapter.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionAdapter.h Mon Jul  
9 03:07:26 2007
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ConnectionAdapter_
+#define _ConnectionAdapter_
+
+#include <memory>
+#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/framing/ChannelAdapter.h"
+#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/Exception.h"
+
+namespace qpid {
+namespace broker {
+
+class Connection;
+struct Handler;
+
+class ConnectionAdapter : public framing::ChannelAdapter, public 
framing::AMQP_ServerOperations
+{
+    std::auto_ptr<Handler> handler;
+public:
+    ConnectionAdapter(Connection& connection);
+    void init(const framing::ProtocolInitiation& header);
+    void close(framing::ReplyCode code, const std::string& text, 
framing::ClassId classId, framing::MethodId methodId);
+    void handle(framing::AMQFrame& frame);
+
+    //ChannelAdapter virtual methods:
+    void handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> 
method, 
+                               const qpid::framing::MethodContext& context);
+    bool isOpen() const { return true; } //channel 0 is always open
+    //never needed:
+    void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>) {}
+    void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>) {}
+    void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>) {}
+
+    //AMQP_ServerOperations:
+    ConnectionHandler* getConnectionHandler();
+    ChannelHandler* getChannelHandler() { throw ConnectionException(503, 
"Class can't be accessed over channel 0"); }
+    BasicHandler* getBasicHandler() { throw ConnectionException(503, "Class 
can't be accessed over channel 0"); }
+    ExchangeHandler* getExchangeHandler() { throw ConnectionException(503, 
"Class can't be accessed over channel 0"); }
+    BindingHandler* getBindingHandler() { throw ConnectionException(503, 
"Class can't be accessed over channel 0"); }
+    QueueHandler* getQueueHandler() { throw ConnectionException(503, "Class 
can't be accessed over channel 0"); }
+    TxHandler* getTxHandler() { throw ConnectionException(503, "Class can't be 
accessed over channel 0"); }
+    MessageHandler* getMessageHandler() { throw ConnectionException(503, 
"Class can't be accessed over channel 0"); }
+    AccessHandler* getAccessHandler() { throw ConnectionException(503, "Class 
can't be accessed over channel 0"); }
+    FileHandler* getFileHandler() { throw ConnectionException(503, "Class 
can't be accessed over channel 0"); }
+    StreamHandler* getStreamHandler() { throw ConnectionException(503, "Class 
can't be accessed over channel 0"); }
+    DtxHandler* getDtxHandler() { throw ConnectionException(503, "Class can't 
be accessed over channel 0"); }
+    TunnelHandler* getTunnelHandler() { throw ConnectionException(503, "Class 
can't be accessed over channel 0"); }
+    DtxCoordinationHandler* getDtxCoordinationHandler() { throw 
ConnectionException(503, "Class can't be accessed over channel 0"); }
+    DtxDemarcationHandler* getDtxDemarcationHandler() { throw 
ConnectionException(503, "Class can't be accessed over channel 0"); }
+    framing::ProtocolVersion getVersion() const;
+};
+
+struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler
+{
+    framing::AMQP_ClientProxy proxy;
+    framing::AMQP_ClientProxy::Connection client;
+    Connection& connection;
+    
+    Handler(Connection& connection, ConnectionAdapter& adapter);
+    void startOk(const framing::MethodContext& context,
+                 const qpid::framing::FieldTable& clientProperties,
+                 const std::string& mechanism, const std::string& response,
+                 const std::string& locale); 
+    void secureOk(const framing::MethodContext& context,
+                  const std::string& response); 
+    void tuneOk(const framing::MethodContext& context,
+                uint16_t channelMax,
+                uint32_t frameMax, uint16_t heartbeat); 
+    void open(const framing::MethodContext& context,
+              const std::string& virtualHost,
+                  const std::string& capabilities, bool insist); 
+    void close(const framing::MethodContext& context, uint16_t replyCode,
+               const std::string& replyText,
+               uint16_t classId, uint16_t methodId); 
+    void closeOk(const framing::MethodContext& context); 
+};
+
+}}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionAdapter.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionAdapter.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h?view=diff&rev=554590&r1=554589&r2=554590
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h Mon Jul  9 
03:07:26 2007
@@ -19,6 +19,7 @@
  *
  */
 
+#include "Broker.h"
 #include "BrokerChannel.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
 
@@ -30,8 +31,7 @@
 
 namespace broker {
 
-class Broker;
-class Channel;
+    //class Channel;
 class Connection;
 
 /**
@@ -47,6 +47,28 @@
     Connection& connection;
     Broker& broker;
     framing::AMQP_ClientProxy proxy;
+
+    /**
+     * Get named queue, never returns 0.
+     * @return: named queue or default queue for channel if name=""
+     * @exception: ChannelException if no queue of that name is found.
+     * @exception: ConnectionException if name="" and channel has no default.
+     */
+    Queue::shared_ptr getQueue(const string& name) {
+        //Note: this can be removed soon as the default queue for channels is 
scrapped in 0-10
+        Queue::shared_ptr queue;
+        if (name.empty()) {
+            queue = channel.getDefaultQueue();
+            if (!queue) throw ConnectionException( 530, "Queue must be 
specified or previously declared" );
+        } else {
+            queue = broker.getQueues().find(name);
+            if (queue == 0) {
+                throw ChannelException( 404, "Queue not found: " + name);
+            }
+        }
+        return queue;
+    }
+
 };
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?view=diff&rev=554590&r1=554589&r2=554590
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Mon 
Jul  9 03:07:26 2007
@@ -123,7 +123,7 @@
                             bool exclusive,
                             const framing::FieldTable& filter )
 {
-    Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+    Queue::shared_ptr queue = getQueue(queueName);
     if(!destination.empty() && channel.exists(destination))
         throw ConnectionException(530, "Consumer tags must be unique");
     string tag = destination;
@@ -142,8 +142,7 @@
                          const string& destination,
                          bool noAck )
 {
-    Queue::shared_ptr queue =
-        connection.getQueue(queueName, context.channel->getId());
+    Queue::shared_ptr queue = getQueue(queueName);
     
     if(channel.get(queue, destination, !noAck))
         client.ok(context.getRequestId());

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp?view=diff&rev=554590&r1=554589&r2=554590
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp Mon Jul  9 
03:07:26 2007
@@ -154,7 +154,7 @@
 
     void testConsumerMgmt(){
         Queue::shared_ptr queue(new Queue("my_queue"));
-        Channel channel(connection, 0, 0, 0);
+        Channel channel(connection, 0, 0);
         channel.open();
         CPPUNIT_ASSERT(!channel.exists("my_consumer"));
 
@@ -179,7 +179,7 @@
     }
 
     void testDeliveryNoAck(){
-        Channel channel(connection, 7, 10000);
+        Channel channel(connection, 7);
         channel.open();
         const string data("abcdefghijklmn");
         Message::shared_ptr msg(
@@ -209,7 +209,7 @@
     }
 
     void testDeliveryAndRecovery(){
-        Channel channel(connection, 7, 10000);
+        Channel channel(connection, 7);
         channel.open();
         const string data("abcdefghijklmn");
 
@@ -241,8 +241,9 @@
 
     void testStaging(){
         MockMessageStore store;
-        Channel channel(
-            connection, 1, 1000/*framesize*/, &store, 10/*staging threshold*/);
+        connection.setFrameMax(1000);
+        connection.setStagingThreshold(10);
+        Channel channel(connection, 1, &store);
         const string data[] = {"abcde", "fghij", "klmno"};
         
         Message* msg = new BasicMessage(
@@ -335,7 +336,7 @@
     }
 
     void testFlow(){
-        Channel channel(connection, 7, 10000);
+        Channel channel(connection, 7);
         channel.open();
         //there will always be a connection-start frame
         CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size());


Reply via email to