Author: aconway
Date: Mon Jan 15 13:56:23 2007
New Revision: 496511

URL: http://svn.apache.org/viewvc?view=rev&rev=496511
Log:

* Client & broker using Requester/Responder to manage request/response IDs.

Modified:
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h
    
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ProtocolVersionException.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h
    incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/MethodBodyClass.h.tmpl

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp Mon Jan 15 
13:56:23 2007
@@ -23,6 +23,7 @@
 
 #include "AMQFrame.h"
 #include "DirectExchange.h"
+#include "TopicExchange.h"
 #include "FanOutExchange.h"
 #include "HeadersExchange.h"
 #include "MessageStoreModule.h"
@@ -102,3 +103,4 @@
 
 
 }} // namespace qpid::broker
+

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h Mon Jan 15 
13:56:23 2007
@@ -29,6 +29,15 @@
 #include <SharedObject.h>
 #include <MessageStore.h>
 #include <AutoDelete.h>
+#include "Requester.h"
+#include "Responder.h"
+#include <ExchangeRegistry.h>
+#include <BrokerChannel.h>
+#include <ConnectionToken.h>
+#include <DirectExchange.h>
+#include <OutputHandler.h>
+#include <ProtocolInitiation.h>
+#include <QueueRegistry.h>
 
 namespace qpid {
 namespace broker {
@@ -77,6 +86,8 @@
     u_int32_t getTimeout() { return timeout; }
     u_int64_t getStagingThreshold() { return stagingThreshold; }
     AutoDelete& getCleaner() { return cleaner; }
+    qpid::framing::Requester& getRequester() { return requester; }
+    qpid::framing::Responder& getResponder() { return responder; }
     
   private:
     Broker(const Configuration& config); 
@@ -89,6 +100,8 @@
     u_int64_t stagingThreshold;
     AutoDelete cleaner;
     SessionHandlerFactoryImpl factory;
+    qpid::framing::Requester requester;
+    qpid::framing::Responder responder;
 };
 
 }}

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp Mon 
Jan 15 13:56:23 2007
@@ -19,11 +19,15 @@
  *
  */
 #include <iostream>
-#include <SessionHandlerImpl.h>
-#include <FanOutExchange.h>
-#include <HeadersExchange.h>
-#include <TopicExchange.h>
-#include "assert.h"
+#include <assert.h>
+
+#include "SessionHandlerImpl.h" 
+
+#include "FanOutExchange.h"
+#include "HeadersExchange.h"
+
+#include "Requester.h"
+#include "Responder.h"
 
 using namespace boost;
 using namespace qpid::sys;
@@ -42,6 +46,8 @@
     exchanges(broker.getExchanges()),
     cleaner(broker.getCleaner()),
     settings(broker.getTimeout(), broker.getStagingThreshold()),
+    requester(broker.getRequester()),
+    responder(broker.getResponder()),
     basicHandler(new BasicHandlerImpl(this)),
     channelHandler(new ChannelHandlerImpl(this)),
     connectionHandler(new ConnectionHandlerImpl(this)),
@@ -55,7 +61,7 @@
 
 SessionHandlerImpl::~SessionHandlerImpl(){
 
-   if (client != NULL)
+    if (client != NULL)
        delete client;
 
 }
@@ -87,51 +93,87 @@
     return exchanges.get(name);
 }
 
+void SessionHandlerImpl::handleMethod(
+    u_int16_t channel, qpid::framing::AMQBody::shared_ptr body)
+{
+    AMQMethodBody::shared_ptr method =
+        shared_polymorphic_cast<AMQMethodBody, AMQBody>(body);
+    try{
+        method->invoke(*this, channel);
+    }catch(ChannelException& e){
+        channels[channel]->close();
+        channels.erase(channel);
+        client->getChannel().close(
+            channel, e.code, e.text,
+            method->amqpClassId(), method->amqpMethodId());
+    }catch(ConnectionException& e){
+        client->getConnection().close(
+            0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
+    }catch(std::exception& e){
+        client->getConnection().close(
+            0, 541/*internal error*/, e.what(),
+            method->amqpClassId(), method->amqpMethodId());
+    }
+}
+
 void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
     u_int16_t channel = frame->getChannel();
     AMQBody::shared_ptr body = frame->getBody();
-    AMQMethodBody::shared_ptr method;
-
     switch(body->type())
     {
       case REQUEST_BODY:
-        //        responder.received(frame);
+        responder.received(AMQRequestBody::getData(body));
+        handleMethod(channel, body);
+        break;
       case RESPONSE_BODY:
-        // requester.received(frame);
-      case METHOD_BODY:         // 
-        method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body);
-        try{
-            method->invoke(*this, channel);
-        }catch(ChannelException& e){
-            channels[channel]->close();
-            channels.erase(channel);
-            client->getChannel().close(channel, e.code, e.text, 
method->amqpClassId(), method->amqpMethodId());
-        }catch(ConnectionException& e){
-            client->getConnection().close(0, e.code, e.text, 
method->amqpClassId(), method->amqpMethodId());
-        }catch(std::exception& e){
-            string error(e.what());
-            client->getConnection().close(0, 541/*internal error*/, error, 
method->amqpClassId(), method->amqpMethodId());
-        }
+        // Must process responses before marking them received.
+        handleMethod(channel, body);     
+        requester.processed(AMQResponseBody::getData(body));
+        break;
+        // TODO aconway 2007-01-15: Leftover from 0-8 support, remove.
+      case METHOD_BODY:    
+        handleMethod(channel, body);
+        break;
+      case HEADER_BODY:
+       handleHeader(
+            channel, shared_polymorphic_cast<AMQHeaderBody>(body));
        break;
 
-    case HEADER_BODY:
-       this->handleHeader(channel, dynamic_pointer_cast<AMQHeaderBody, 
AMQBody>(body));
+      case CONTENT_BODY:
+       handleContent(
+            channel, shared_polymorphic_cast<AMQContentBody>(body));
        break;
 
-    case CONTENT_BODY:
-       this->handleContent(channel, dynamic_pointer_cast<AMQContentBody, 
AMQBody>(body));
-       break;
-
-    case HEARTBEAT_BODY:
-        //channel must be 0
-       this->handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, 
AMQBody>(body));
+      case HEARTBEAT_BODY:
+        assert(channel == 0);
+       handleHeartbeat(
+            shared_polymorphic_cast<AMQHeartbeatBody>(body));
        break;
     }
 }
 
+/**
+ * An OutputHandler that does request/response procssing before
+ * delgating to another OutputHandler.
+ */
+SessionHandlerImpl::Sender::Sender(
+    OutputHandler& oh, Requester& req, Responder& resp)
+    : out(oh), requester(req), responder(resp)
+{}
+
+void SessionHandlerImpl::Sender::send(AMQFrame* frame) {
+    AMQBody::shared_ptr body =  frame->getBody();
+    u_int16_t type = body->type();
+    if (type == REQUEST_BODY)
+        requester.sending(AMQRequestBody::getData(body));
+    else if (type == RESPONSE_BODY)
+        responder.sending(AMQResponseBody::getData(body));
+    out.send(frame);
+}
+
 void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){
 
-  if (client == NULL)
+    if (client == 0)
     {
        client = new qpid::framing::AMQP_ClientProxy(context, 
header->getMajor(), header->getMinor());
 
@@ -280,7 +322,7 @@
     const string& /*routingKey*/,
     const qpid::framing::FieldTable& /*arguments*/ )
 {
-        assert(0);            // FIXME aconway 2007-01-04: 0-9 feature
+    assert(0);            // FIXME aconway 2007-01-04: 0-9 feature
 }
 
 
@@ -335,9 +377,9 @@
     Queue::shared_ptr queue = parent->getQueue(queueName, channel);
     Exchange::shared_ptr exchange = parent->exchanges.get(exchangeName);
     if(exchange){
-// kpvdr - cannot use this any longer as routingKey is now const
-//        if(routingKey.empty() && queueName.empty()) routingKey = 
queue->getName();
-//        exchange->bind(queue, routingKey, &arguments);
+        // kpvdr - cannot use this any longer as routingKey is now const
+        //        if(routingKey.empty() && queueName.empty()) routingKey = 
queue->getName();
+        //        exchange->bind(queue, routingKey, &arguments);
         string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? 
queue->getName() : routingKey;
         exchange->bind(queue, exchangeRoutingKey, &arguments);
         if(!nowait) parent->client->getQueue().bindOk(channel);    
@@ -483,25 +525,25 @@
     const string& /*routingKey*/,
     const qpid::framing::FieldTable& /*arguments*/ )
 {
-        assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
+    assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
 }
 
 void
 SessionHandlerImpl::ChannelHandlerImpl::ok( u_int16_t /*channel*/ )
 {
-        assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
+    assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
 }
 
 void
 SessionHandlerImpl::ChannelHandlerImpl::ping( u_int16_t /*channel*/ )
 {
-        assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
+    assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
 }
 
 void
 SessionHandlerImpl::ChannelHandlerImpl::pong( u_int16_t /*channel*/ )
 {
-        assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
+    assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
 }
 
 void
@@ -509,148 +551,149 @@
     u_int16_t /*channel*/,
     const string& /*channelId*/ )
 {
-        assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
+    assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
 }
 
 // Message class method handlers
 void
 SessionHandlerImpl::MessageHandlerImpl::append( u_int16_t /*channel*/,
-                    const string& /*reference*/,
-                    const string& /*bytes*/ )
+                                                const string& /*reference*/,
+                                                const string& /*bytes*/ )
 {
-        assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 
 void
 SessionHandlerImpl::MessageHandlerImpl::cancel( u_int16_t /*channel*/,
-                    const string& /*destination*/ )
+                                                const string& /*destination*/ )
 {
-        assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
 SessionHandlerImpl::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
-                    const string& /*reference*/,
-                    const string& /*identifier*/ )
+                                                    const string& 
/*reference*/,
+                                                    const string& 
/*identifier*/ )
 {
-        assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
 SessionHandlerImpl::MessageHandlerImpl::close( u_int16_t /*channel*/,
-                    const string& /*reference*/ )
+                                               const string& /*reference*/ )
 {
-        assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
 SessionHandlerImpl::MessageHandlerImpl::consume( u_int16_t /*channel*/,
-                    u_int16_t /*ticket*/,
-                    const string& /*queue*/,
-                    const string& /*destination*/,
-                    bool /*noLocal*/,
-                    bool /*noAck*/,
-                    bool /*exclusive*/,
-                    const qpid::framing::FieldTable& /*filter*/ )
+                                                 u_int16_t /*ticket*/,
+                                                 const string& /*queue*/,
+                                                 const string& /*destination*/,
+                                                 bool /*noLocal*/,
+                                                 bool /*noAck*/,
+                                                 bool /*exclusive*/,
+                                                 const 
qpid::framing::FieldTable& /*filter*/ )
 {
-        assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
 SessionHandlerImpl::MessageHandlerImpl::empty( u_int16_t /*channel*/ )
 {
-        assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
 SessionHandlerImpl::MessageHandlerImpl::get( u_int16_t /*channel*/,
-                    u_int16_t /*ticket*/,
-                    const string& /*queue*/,
-                    const string& /*destination*/,
-                    bool /*noAck*/ )
+                                             u_int16_t /*ticket*/,
+                                             const string& /*queue*/,
+                                             const string& /*destination*/,
+                                             bool /*noAck*/ )
 {
-        assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
 SessionHandlerImpl::MessageHandlerImpl::offset( u_int16_t /*channel*/,
-                    u_int64_t /*value*/ )
+                                                u_int64_t /*value*/ )
 {
-        assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
 SessionHandlerImpl::MessageHandlerImpl::ok( u_int16_t /*channel*/ )
 {
-        assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
 SessionHandlerImpl::MessageHandlerImpl::open( u_int16_t /*channel*/,
-                    const string& /*reference*/ )
+                                              const string& /*reference*/ )
 {
-        assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
 SessionHandlerImpl::MessageHandlerImpl::qos( u_int16_t /*channel*/,
-                    u_int32_t /*prefetchSize*/,
-                    u_int16_t /*prefetchCount*/,
-                    bool /*global*/ )
+                                             u_int32_t /*prefetchSize*/,
+                                             u_int16_t /*prefetchCount*/,
+                                             bool /*global*/ )
 {
-        assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
 SessionHandlerImpl::MessageHandlerImpl::recover( u_int16_t /*channel*/,
-                    bool /*requeue*/ )
+                                                 bool /*requeue*/ )
 {
-        assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
 SessionHandlerImpl::MessageHandlerImpl::reject( u_int16_t /*channel*/,
-                    u_int16_t /*code*/,
-                    const string& /*text*/ )
+                                                u_int16_t /*code*/,
+                                                const string& /*text*/ )
 {
-        assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
 SessionHandlerImpl::MessageHandlerImpl::resume( u_int16_t /*channel*/,
-                    const string& /*reference*/,
-                    const string& /*identifier*/ )
+                                                const string& /*reference*/,
+                                                const string& /*identifier*/ )
 {
-        assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
 
 void
 SessionHandlerImpl::MessageHandlerImpl::transfer( u_int16_t /*channel*/,
-                    u_int16_t /*ticket*/,
-                    const string& /*destination*/,
-                    bool /*redelivered*/,
-                    bool /*immediate*/,
-                    u_int64_t /*ttl*/,
-                    u_int8_t /*priority*/,
-                    u_int64_t /*timestamp*/,
-                    u_int8_t /*deliveryMode*/,
-                    u_int64_t /*expiration*/,
-                    const string& /*exchange*/,
-                    const string& /*routingKey*/,
-                    const string& /*messageId*/,
-                    const string& /*correlationId*/,
-                    const string& /*replyTo*/,
-                    const string& /*contentType*/,
-                    const string& /*contentEncoding*/,
-                    const string& /*userId*/,
-                    const string& /*appId*/,
-                    const string& /*transactionId*/,
-                    const string& /*securityToken*/,
-                    const qpid::framing::FieldTable& /*applicationHeaders*/,
-                    qpid::framing::Content /*body*/ )
+                                                  u_int16_t /*ticket*/,
+                                                  const string& 
/*destination*/,
+                                                  bool /*redelivered*/,
+                                                  bool /*immediate*/,
+                                                  u_int64_t /*ttl*/,
+                                                  u_int8_t /*priority*/,
+                                                  u_int64_t /*timestamp*/,
+                                                  u_int8_t /*deliveryMode*/,
+                                                  u_int64_t /*expiration*/,
+                                                  const string& /*exchange*/,
+                                                  const string& /*routingKey*/,
+                                                  const string& /*messageId*/,
+                                                  const string& 
/*correlationId*/,
+                                                  const string& /*replyTo*/,
+                                                  const string& 
/*contentType*/,
+                                                  const string& 
/*contentEncoding*/,
+                                                  const string& /*userId*/,
+                                                  const string& /*appId*/,
+                                                  const string& 
/*transactionId*/,
+                                                  const string& 
/*securityToken*/,
+                                                  const 
qpid::framing::FieldTable& /*applicationHeaders*/,
+                                                  qpid::framing::Content 
/*body*/ )
 {
-        assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
  
 }}
+

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.h?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.h 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.h Mon 
Jan 15 13:56:23 2007
@@ -24,28 +24,20 @@
 #include <map>
 #include <sstream>
 #include <vector>
-#include <exception>
+
 #include <AMQFrame.h>
 #include <AMQP_ClientProxy.h>
 #include <AMQP_ServerOperations.h>
-#include <AutoDelete.h>
-#include <ExchangeRegistry.h>
-#include <BrokerChannel.h>
-#include <ConnectionToken.h>
-#include <DirectExchange.h>
-#include <OutputHandler.h>
-#include <ProtocolInitiation.h>
-#include <QueueRegistry.h>
 #include <sys/SessionContext.h>
 #include <sys/SessionHandler.h>
 #include <sys/TimeoutHandler.h>
-#include <TopicExchange.h>
 #include "Broker.h"
+#include "Exception.h"
 
 namespace qpid {
 namespace broker {
 
-struct ChannelException : public std::exception {
+struct ChannelException : public qpid::Exception {
     u_int16_t code;
     string text;
     ChannelException(u_int16_t _code, string _text) : code(_code), text(_text) 
{}
@@ -53,7 +45,7 @@
     const char* what() const throw() { return text.c_str(); }
 };
 
-struct ConnectionException : public std::exception {
+struct ConnectionException : public qpid::Exception {
     u_int16_t code;
     string text;
     ConnectionException(u_int16_t _code, string _text) : code(_code), 
text(_text) {}
@@ -75,13 +67,25 @@
 {
     typedef std::map<u_int16_t, Channel*>::iterator channel_iterator;
     typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
-
+    class Sender : public qpid::framing::OutputHandler {
+      public:
+        Sender(qpid::framing::OutputHandler&,
+               qpid::framing::Requester&, qpid::framing::Responder&);
+        void send(qpid::framing::AMQFrame* frame);
+      private:
+        OutputHandler& out;
+        qpid::framing::Requester& requester;
+        qpid::framing::Responder& responder;
+    };
+    
     qpid::sys::SessionContext* context;
     qpid::framing::AMQP_ClientProxy* client;
     QueueRegistry& queues;
     ExchangeRegistry& exchanges;
     AutoDelete& cleaner;
     Settings settings;
+    qpid::framing::Requester& requester;
+    qpid::framing::Responder& responder;
     std::auto_ptr<BasicHandler> basicHandler;
     std::auto_ptr<ChannelHandler> channelHandler;
     std::auto_ptr<ConnectionHandler> connectionHandler;
@@ -98,6 +102,7 @@
 
     void handleHeader(u_int16_t channel, 
qpid::framing::AMQHeaderBody::shared_ptr body);
     void handleContent(u_int16_t channel, 
qpid::framing::AMQContentBody::shared_ptr body);
+    void handleMethod(u_int16_t channel, qpid::framing::AMQBody::shared_ptr 
body);
     void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
 
     Channel* getChannel(u_int16_t channel);
@@ -371,8 +376,6 @@
     virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, 
"Tunnel class not implemented"); } 
 };
 
-}
-}
-
+}}
 
 #endif

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp Mon Jan 15 
13:56:23 2007
@@ -32,10 +32,14 @@
 
 u_int16_t Connection::channelIdCounter;
 
-Connection::Connection(  bool debug, u_int32_t _max_frame_size, 
qpid::framing::ProtocolVersion* _version) : max_frame_size(_max_frame_size), 
closed(true),
+Connection::Connection(
+    bool debug, u_int32_t _max_frame_size,
+    qpid::framing::ProtocolVersion* _version
+) : max_frame_size(_max_frame_size), closed(true),
     version(_version->getMajor(),_version->getMinor())
 {
-    connector = new Connector(version, debug, _max_frame_size);
+    connector = new Connector(
+        version, requester, responder, debug, _max_frame_size);
 }
 
 Connection::~Connection(){
@@ -152,6 +156,16 @@
 }
 
 void Connection::received(AMQFrame* frame){
+    AMQBody::shared_ptr body = frame->getBody();
+    u_int8_t type = body->type();
+    if (type == REQUEST_BODY) 
+        responder.received(AMQRequestBody::getData(body));
+    handleFrame(frame);
+    if (type == RESPONSE_BODY)
+        requester.processed(AMQResponseBody::getData(body));
+}
+
+void Connection::handleFrame(AMQFrame* frame){
     u_int16_t channelId = frame->getChannel();
 
     if(channelId == 0){

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h Mon Jan 15 
13:56:23 2007
@@ -37,6 +37,8 @@
 #include <ClientQueue.h>
 #include <ResponseHandler.h>
 #include <AMQP_HighestVersion.h>
+#include "Requester.h"
+#include "Responder.h"
 
 namespace qpid {
 
@@ -79,6 +81,8 @@
        ResponseHandler responses;
         volatile bool closed;
         qpid::framing::ProtocolVersion version;
+        qpid::framing::Requester requester;
+        qpid::framing::Responder responder;
 
         void channelException(Channel* channel, qpid::framing::AMQMethodBody* 
body, QpidError& e);
         void error(int code, const std::string& msg, int classid = 0, int 
methodid = 0);
@@ -89,6 +93,7 @@
        virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr 
body);
        virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr 
body);
        virtual void 
handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+        void handleFrame(qpid::framing::AMQFrame* frame);
 
     public:
         /**

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.cpp?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.cpp Mon Jan 15 
13:56:23 2007
@@ -22,13 +22,17 @@
 #include <QpidError.h>
 #include <sys/Time.h>
 #include "Connector.h"
+#include "Requester.h"
+#include "Responder.h"
 
 using namespace qpid::sys;
 using namespace qpid::client;
 using namespace qpid::framing;
 using qpid::QpidError;
 
-Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, bool 
_debug, u_int32_t buffer_size) :
+Connector::Connector(const qpid::framing::ProtocolVersion& pVersion,
+                     Requester& req, Responder& resp,
+                     bool _debug, u_int32_t buffer_size) :
     debug(_debug),
     receive_buffer_size(buffer_size),
     send_buffer_size(buffer_size),
@@ -40,7 +44,10 @@
     timeoutHandler(0),
     shutdownHandler(0),
     inbuf(receive_buffer_size), 
-    outbuf(send_buffer_size){ }
+    outbuf(send_buffer_size),
+    requester(req),
+    responder(resp)
+{ }
 
 Connector::~Connector(){ }
 
@@ -75,7 +82,13 @@
 }
 
 void Connector::send(AMQFrame* frame){
-    writeBlock(frame);    
+    AMQBody::shared_ptr body = frame->getBody();
+    u_int8_t type = body->type();
+    if (type == REQUEST_BODY)
+        requester.sending(AMQRequestBody::getData(body));
+    else if (type == RESPONSE_BODY)
+        responder.sending(AMQResponseBody::getData(body));
+    writeBlock(frame);
     if(debug) std::cout << "SENT: " << *frame << std::endl; 
     delete frame;
 }

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.h?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.h Mon Jan 15 
13:56:23 2007
@@ -34,60 +34,73 @@
 #include <sys/Socket.h>
 
 namespace qpid {
+
+namespace framing {
+
+class Requester;
+class Responder;
+
+} // namespace framing
+
 namespace client {
 
-    class Connector : public qpid::framing::OutputHandler, 
-                      private qpid::sys::Runnable
-    {
-        const bool debug;
-       const int receive_buffer_size;
-       const int send_buffer_size;
-       qpid::framing::ProtocolVersion version;
-
-       bool closed;
-
-        int64_t lastIn;
-        int64_t lastOut;
-        int64_t timeout;
-        u_int32_t idleIn;
-        u_int32_t idleOut;
-
-        qpid::sys::TimeoutHandler* timeoutHandler;
-        qpid::sys::ShutdownHandler* shutdownHandler;
-       qpid::framing::InputHandler* input;
-       qpid::framing::InitiationHandler* initialiser;
-       qpid::framing::OutputHandler* output;
+class Connector : public qpid::framing::OutputHandler, 
+                  private qpid::sys::Runnable
+{
+    const bool debug;
+    const int receive_buffer_size;
+    const int send_buffer_size;
+    qpid::framing::ProtocolVersion version;
+
+    bool closed;
+
+    int64_t lastIn;
+    int64_t lastOut;
+    int64_t timeout;
+    u_int32_t idleIn;
+    u_int32_t idleOut;
+
+    qpid::sys::TimeoutHandler* timeoutHandler;
+    qpid::sys::ShutdownHandler* shutdownHandler;
+    qpid::framing::InputHandler* input;
+    qpid::framing::InitiationHandler* initialiser;
+    qpid::framing::OutputHandler* output;
        
-       qpid::framing::Buffer inbuf;
-       qpid::framing::Buffer outbuf;
+    qpid::framing::Buffer inbuf;
+    qpid::framing::Buffer outbuf;
+
+    qpid::sys::Mutex writeLock;
+    qpid::sys::Thread receiver;
 
-        qpid::sys::Mutex writeLock;
-       qpid::sys::Thread receiver;
+    qpid::sys::Socket socket;
 
-       qpid::sys::Socket socket;
+    qpid::framing::Requester& requester;
+    qpid::framing::Responder& responder;
         
-        void checkIdle(ssize_t status);
-       void writeBlock(qpid::framing::AMQDataBlock* data);
-       void writeToSocket(char* data, size_t available);
-        void setSocketTimeout();
-
-       void run();
-       void handleClosed();
-
-    public:
-       Connector(const qpid::framing::ProtocolVersion& pVersion, bool debug = 
false, u_int32_t buffer_size = 1024);
-       virtual ~Connector();
-       virtual void connect(const std::string& host, int port);
-       virtual void init(qpid::framing::ProtocolInitiation* header);
-       virtual void close();
-       virtual void setInputHandler(qpid::framing::InputHandler* handler);
-       virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler);
-       virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler);
-       virtual qpid::framing::OutputHandler* getOutputHandler();
-       virtual void send(qpid::framing::AMQFrame* frame);
-        virtual void setReadTimeout(u_int16_t timeout);
-        virtual void setWriteTimeout(u_int16_t timeout);
-    };
+    void checkIdle(ssize_t status);
+    void writeBlock(qpid::framing::AMQDataBlock* data);
+    void writeToSocket(char* data, size_t available);
+    void setSocketTimeout();
+
+    void run();
+    void handleClosed();
+
+  public:
+    Connector(const qpid::framing::ProtocolVersion& pVersion,
+              qpid::framing::Requester& req, qpid::framing::Responder& resp,
+              bool debug = false, u_int32_t buffer_size = 1024);
+    virtual ~Connector();
+    virtual void connect(const std::string& host, int port);
+    virtual void init(qpid::framing::ProtocolInitiation* header);
+    virtual void close();
+    virtual void setInputHandler(qpid::framing::InputHandler* handler);
+    virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler);
+    virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler);
+    virtual qpid::framing::OutputHandler* getOutputHandler();
+    virtual void send(qpid::framing::AMQFrame* frame);
+    virtual void setReadTimeout(u_int16_t timeout);
+    virtual void setWriteTimeout(u_int16_t timeout);
+};
 
 }
 }

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.h?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.h 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.h Mon Jan 
15 13:56:23 2007
@@ -21,7 +21,8 @@
  * under the License.
  *
  */
-/*#include <qpid/framing/amqp_methods.h>*/
+#include <boost/cast.hpp>
+
 #include <amqp_types.h>
 #include <AMQBody.h>
 #include <AMQDataBlock.h>
@@ -49,6 +50,12 @@
     virtual u_int32_t size() const;
     u_int16_t getChannel();
     AMQBody::shared_ptr getBody();
+
+    /** Convenience template to cast the body to an expected type */
+    template <class T> boost::shared_ptr<T> castBody() {
+        assert(dynamic_cast<T*>(getBody().get()));
+        boost::static_pointer_cast<T>(getBody());
+    }
 
     u_int32_t decodeHead(Buffer& buffer); 
     void decodeBody(Buffer& buffer, uint32_t size); 

Modified: 
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.cpp?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.cpp 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.cpp 
Mon Jan 15 13:56:23 2007
@@ -55,4 +55,10 @@
     return AMQRequestBody::shared_ptr(body);
 }
 
+void AMQRequestBody::printPrefix(std::ostream& out) const {
+    out << "request(id=" << data.requestId << ",mark="
+        << data.responseMark << "): ";
+}
+
 }} // namespace qpid::framing
+

Modified: 
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.h?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.h 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.h 
Mon Jan 15 13:56:23 2007
@@ -42,6 +42,10 @@
         ResponseId responseMark;
     };
 
+    static Data& getData(const AMQBody::shared_ptr& body) {
+        return boost::dynamic_pointer_cast<AMQRequestBody>(body)->getData();
+    }
+
     static shared_ptr create(
         AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
         Buffer& buffer);
@@ -52,6 +56,7 @@
     u_int8_t type() const { return REQUEST_BODY; }
     void encode(Buffer& buffer) const;
 
+    Data& getData() { return data; }
     RequestId  getRequestId() const { return data.requestId; }
     void setRequestId(RequestId id) { data.requestId=id; }
     ResponseId getResponseMark() const { return data.responseMark; }
@@ -59,6 +64,7 @@
 
   protected:
     static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+16; }
+    void printPrefix(std::ostream& out) const;
     
   private:
     Data data;

Modified: 
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp 
Mon Jan 15 13:56:23 2007
@@ -56,5 +56,10 @@
     return AMQResponseBody::shared_ptr(body);
 }
 
+void AMQResponseBody::printPrefix(std::ostream& out) const {
+    out << "response(id=" << data.responseId << ",request=" << data.requestId
+        << ",batch=" << data.batchOffset << "): ";
+}
+
 
 }} // namespace qpid::framing

Modified: 
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h 
Mon Jan 15 13:56:23 2007
@@ -46,6 +46,10 @@
         u_int32_t batchOffset;
     };
 
+    static Data& getData(const AMQBody::shared_ptr& body) {
+        return boost::dynamic_pointer_cast<AMQResponseBody>(body)->getData();
+    }
+
     static shared_ptr create(
         AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
         Buffer& buffer);
@@ -57,12 +61,15 @@
     u_int8_t type() const { return RESPONSE_BODY; }
     void encode(Buffer& buffer) const;
 
+    Data& getData() { return data; }
     ResponseId getResponseId() { return data.responseId; }
     RequestId getRequestId() { return data.requestId; }
     BatchOffset getBatchOffset() { return data.batchOffset; }
 
   protected:
     static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+20; }
+    void printPrefix(std::ostream& out) const;
+
   private:
     Data data;
 };

Modified: 
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ProtocolVersionException.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ProtocolVersionException.h?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ProtocolVersionException.h
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ProtocolVersionException.h
 Mon Jan 15 13:56:23 2007
@@ -27,12 +27,10 @@
 #include <string>
 #include <vector>
 
-namespace qpid
-{
-namespace framing
-{
+namespace qpid {
+namespace framing {
 
-class ProtocolVersionException : virtual public qpid::Exception
+class ProtocolVersionException : public qpid::Exception
 {
 protected:
     ProtocolVersion versionFound;
@@ -49,7 +47,6 @@
     virtual std::string toString() const throw();
 }; // class ProtocolVersionException
 
-} // namespace framing
-} // namespace qpid
+}} // namespace qpid::framing
 
 #endif //ifndef _ProtocolVersionException_

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h Mon Jan 
15 13:56:23 2007
@@ -45,7 +45,7 @@
 
     /** Called after processing a response. */
     void processed(const AMQResponseBody::Data&);
-    
+
   private:
     std::set<RequestId> requests; /** Sent but not responded to */
     RequestId lastId;

Modified: 
incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/MethodBodyClass.h.tmpl
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/MethodBodyClass.h.tmpl?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/MethodBodyClass.h.tmpl 
(original)
+++ incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/MethodBodyClass.h.tmpl 
Mon Jan 15 13:56:23 2007
@@ -69,7 +69,8 @@
     
     inline void print(std::ostream& out) const
     {
-           out << "${CLASS}${METHOD}: ";
+            printPrefix(out);
+            out << "${CLASS}${METHOD}: ";
 %{FLIST} ${mb_field_print}
     }
 


Reply via email to