Author: gsim
Date: Sun Aug  5 06:25:36 2007
New Revision: 562866

URL: http://svn.apache.org/viewvc?view=rev&rev=562866
Log:
Added first cut of generated client interface.
Old channel interface still supported; shares SessionCore with the new 
interface.
Todo: allow applications to signal completion of received commands; keywrod 
args for interface.


Added:
    incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp   (with 
props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h   (with 
props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Response.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/MethodContent.h   (with 
props)
Modified:
    incubator/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb
    incubator/qpid/trunk/qpid/cpp/rubygen/cppgen.rb
    incubator/qpid/trunk/qpid/cpp/rubygen/samples/Operations.rb
    incubator/qpid/trunk/qpid/cpp/rubygen/samples/Proxy.rb
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb?view=diff&rev=562866&r1=562865&r2=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb Sun Aug  5 06:25:36 2007
@@ -76,6 +76,10 @@
 class AmqpMethod < AmqpElement
   def initialize(xml, amqp) super; end
 
+  def content()
+    attributes["content"]
+  end
+
   def index() attributes["index"];  end
 
   def fields()
@@ -84,7 +88,7 @@
 
   # Responses to this method (0-9)
   def responses()
-    @cache_responses ||= elements.collect("response") { |el| new 
AmqpMethod(el,self) }
+    @cache_responses ||= elements.collect("response") { |el| 
AmqpMethod.new(el,self) }
   end
 
   # Methods this method responds to (0-9)
@@ -178,7 +182,7 @@
     if (@outdir != "-")         
       path=Pathname.new "[EMAIL PROTECTED]/#{file}"
       path.parent.mkpath
-        path.open('w') { |@out| yield }
+      path.open('w') { |@out| yield }
     end
   end
 

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/cppgen.rb
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/cppgen.rb?view=diff&rev=562866&r1=562865&r2=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/cppgen.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/cppgen.rb Sun Aug  5 06:25:36 2007
@@ -128,7 +128,7 @@
 
   # Write a .cpp file.
   def cpp_file(path)
-    file (path) do
+    file(path) do
       gen Copyright
       yield
     end

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/samples/Operations.rb
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/samples/Operations.rb?view=diff&rev=562866&r1=562865&r2=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/samples/Operations.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/samples/Operations.rb Sun Aug  5 
06:25:36 2007
@@ -80,6 +80,6 @@
   end
 end
 
-OperationsGen.new("client",ARGV[0], amqp).generate()
-OperationsGen.new("server",ARGV[0], amqp).generate()
+OperationsGen.new("client",ARGV[0], Amqp).generate()
+OperationsGen.new("server",ARGV[0], Amqp).generate()
 

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/samples/Proxy.rb
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/samples/Proxy.rb?view=diff&rev=562866&r1=562865&r2=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/samples/Proxy.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/samples/Proxy.rb Sun Aug  5 06:25:36 
2007
@@ -148,6 +148,6 @@
 end
 
 
-ProxyGen.new("client", ARGV[0], amqp).generate;
-ProxyGen.new("server", ARGV[0], amqp).generate;
+ProxyGen.new("client", ARGV[0], Amqp).generate;
+ProxyGen.new("server", ARGV[0], Amqp).generate;
     

Added: incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb?view=auto&rev=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb (added)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb Sun Aug  5 
06:25:36 2007
@@ -0,0 +1,136 @@
+#!/usr/bin/env ruby
+# Usage: output_directory xml_spec_file [xml_spec_file...]
+# 
+$: << '..'
+require 'cppgen'
+
+class SessionGen < CppGen
+
+  def initialize(outdir, amqp)
+    super(outdir, amqp)
+    @chassis="server"
+    @classname="Session"
+  end
+  
+  def declare_method (m)
+    gen "Response #{m.amqp_parent.name.lcaps}#{m.cppname.caps}(" 
+    if (m.content())
+      params=m.signature + ["const MethodContent& content"]
+    else
+      params=m.signature
+    end
+    indent { gen params.join(",\n") }
+    gen ");\n\n"
+  end
+
+  def declare_class(c)
+    c.methods_on(@chassis).each { |m| declare_method(m) }
+  end
+
+  def define_method (m)
+    gen "Response Session::#{m.amqp_parent.name.lcaps}#{m.cppname.caps}(" 
+    if (m.content())
+      params=m.signature + ["const MethodContent& content"]
+    else
+      params=m.signature
+    end
+    indent { gen params.join(",\n") }
+    gen "){\n\n"
+    indent (2) { 
+      gen "return impl->send(AMQMethodBody::shared_ptr(new #{m.body_name}(" 
+      params = ["version"] + m.param_names
+      gen params.join(", ")
+      other_params=[]
+      if (m.content())
+        other_params << "content"
+      end
+      if m.responses().empty?
+        other_params << "false"
+      else 
+        other_params << "true"
+      end
+      gen ")), #{other_params.join(", ")});\n"
+    }
+    gen "}\n\n"
+  end
+
+  def define_class(c)
+    c.methods_on(@chassis).each { |m| define_method(m) }
+  end
+
+  def generate()
+    excludes = ["channel", "connection", "session", "execution"]
+
+    h_file("qpid/client/[EMAIL PROTECTED]") { 
+      gen <<EOS
+#include <sstream> 
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/framing/MethodContent.h"
+#include "ConnectionImpl.h"
+#include "Response.h"
+#include "SessionCore.h"
+
+namespace qpid {
+namespace client {
+
+using std::string;
+using framing::Content;
+using framing::FieldTable;
+using framing::MethodContent;
+using framing::SequenceNumberSet;
+
+class [EMAIL PROTECTED] {
+  ConnectionImpl::shared_ptr parent;
+  SessionCore::shared_ptr impl;
+  framing::ProtocolVersion version;
+public:
+    [EMAIL PROTECTED](ConnectionImpl::shared_ptr, SessionCore::shared_ptr);
+    [EMAIL PROTECTED]();
+
+    ReceivedContent::shared_ptr get() { return impl->get(); }
+    void close() { impl->close(); parent->released(impl); }  
+
+EOS
+  indent { @amqp.classes.each { |c| declare_class(c) if 
!excludes.include?(c.name) } }
+  gen <<EOS
+}; /* class [EMAIL PROTECTED] */
+}
+}
+EOS
+}
+
+  # .cpp file
+  cpp_file("qpid/client/[EMAIL PROTECTED]") { 
+    gen <<EOS
+#include "[EMAIL PROTECTED]"
+#include "qpid/framing/AMQMethodBody.h"
+
+using std::string;
+using namespace qpid::framing;
+
+namespace qpid {
+namespace client {
+
[EMAIL PROTECTED]::[EMAIL PROTECTED](ConnectionImpl::shared_ptr _parent, 
SessionCore::shared_ptr _impl) : parent(_parent), impl(_impl) {}
+
[EMAIL PROTECTED]::[EMAIL PROTECTED]()
+{
+    impl->stop();
+    parent->released(impl);
+}
+
+EOS
+
+  @amqp.classes.each { |c| define_class(c) if !excludes.include?(c.name)  }
+  
+  gen <<EOS
+}} // namespace qpid::client
+EOS
+  }
+
+  end
+end
+
+SessionGen.new(ARGV[0], Amqp).generate()
+

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=562866&r1=562865&r2=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Sun Aug  5 06:25:36 2007
@@ -46,10 +46,10 @@
 rgen_script=$(rgen_dir)/generate
 rgen_cmd=ruby -I $(rgen_dir) $(rgen_script)
 
-rgen_templates=$(rgen_tdir)/frame_body_lists.rb
+rgen_templates=$(rgen_tdir)/frame_body_lists.rb $(rgen_tdir)/Session.rb
 
 rubygen.mk: $(rgen_script) $(specs) $(rgen_templates)
-       gen=`$(rgen_cmd) . $(specs) $(rgen_templates)` ; echo Generated $$gen; 
echo "rgen_srcs=$$gen" > $@
+       gen=`$(rgen_cmd) . $(specs) $(rgen_templates)` ; echo Generated $$gen; 
echo rgen_srcs=$$gen > $@
 
 $(rgen_srcs): rubygen.mk
 
@@ -228,6 +228,7 @@
   qpid/client/ClientChannel.cpp                        \
   qpid/client/ClientExchange.cpp               \
   qpid/client/ClientQueue.cpp                  \
+  qpid/client/ConnectionImpl.cpp               \
   qpid/client/Connector.cpp                    \
   qpid/client/MessageListener.cpp              \
   qpid/client/ResponseHandler.cpp              \
@@ -241,6 +242,8 @@
   qpid/client/FutureResponse.cpp               \
   qpid/client/FutureFactory.cpp                        \
   qpid/client/ReceivedContent.cpp              \
+  qpid/client/Session.cpp                      \
+  qpid/client/SessionCore.cpp                  \
   qpid/client/StateManager.cpp
 
 
@@ -319,6 +322,7 @@
   qpid/client/ClientMessage.h \
   qpid/client/ClientQueue.h \
   qpid/client/Connection.h \
+  qpid/client/ConnectionImpl.h \
   qpid/client/Connector.h \
   qpid/client/MessageChannel.h \
   qpid/client/MessageListener.h \
@@ -336,6 +340,9 @@
   qpid/client/FutureResponse.h \
   qpid/client/FutureFactory.h \
   qpid/client/ReceivedContent.h \
+  qpid/client/Response.h \
+  qpid/client/Session.h \
+  qpid/client/SessionCore.h \
   qpid/client/StateManager.h \
   qpid/framing/AMQBody.h \
   qpid/framing/AMQContentBody.h \
@@ -356,6 +363,7 @@
   qpid/framing/HeaderProperties.h \
   qpid/framing/InitiationHandler.h \
   qpid/framing/InputHandler.h \
+  qpid/framing/MethodContent.h \
   qpid/framing/MethodContext.h \
   qpid/framing/OutputHandler.h \
   qpid/framing/ProtocolInitiation.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp?view=diff&rev=562866&r1=562865&r2=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp Sun Aug  5 
06:25:36 2007
@@ -49,28 +49,19 @@
 }}
 
 Channel::Channel(bool _transactional, u_int16_t _prefetch) :
-    connection(0), prefetch(_prefetch), transactional(_transactional), 
errorCode(200), errorText("Ok"), running(false)
+    prefetch(_prefetch), transactional(_transactional), errorCode(200), 
errorText("Ok"), running(false)
 {
 }
 
-Channel::~Channel(){
-    closeInternal();
-}
+Channel::~Channel(){}
 
-void Channel::open(ChannelId id, Connection& con)
+void Channel::open(ConnectionImpl::shared_ptr c, SessionCore::shared_ptr s)
 {
     if (isOpen())
-        THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id);
-    connection = &con;
-    channelId = id;
-    //link up handlers:
-    channelHandler.out = boost::bind(&ConnectionHandler::outgoing, 
&(connection->handler), _1);
-    channelHandler.in = boost::bind(&ExecutionHandler::handle, 
&executionHandler, _1);
-    executionHandler.out = boost::bind(&ChannelHandler::outgoing, 
&channelHandler, _1);
-    //set up close notification:
-    channelHandler.onClose = boost::bind(&Channel::peerClose, this, _1, _2);
+        THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
 
-    channelHandler.open(id);
+    connection = c;
+    session = s;
 }
     
 bool Channel::isOpen() const { 
@@ -79,10 +70,10 @@
 }
 
 void Channel::setQos() {
-    executionHandler.send(make_shared_ptr(new BasicQosBody(version, 0, 
getPrefetch(), false)));
+    sendSync(false, make_shared_ptr(new BasicQosBody(version, 0, 
getPrefetch(), false)));
     if(isTransactional()) {
         //I think this is wrong! should only send TxSelect once...
-        executionHandler.send(make_shared_ptr(new TxSelectBody(version)));
+        sendSync(false, make_shared_ptr(new TxSelectBody(version)));
     }
 }
 
@@ -133,63 +124,52 @@
 }
 
 void Channel::commit(){
-    executionHandler.send(make_shared_ptr(new TxCommitBody(version)));
+    sendSync(false, make_shared_ptr(new TxCommitBody(version)));
 }
 
 void Channel::rollback(){
-    executionHandler.send(make_shared_ptr(new TxRollbackBody(version)));
+    sendSync(false, make_shared_ptr(new TxRollbackBody(version)));
 }
 
 void Channel::close()
 {
-    channelHandler.close();
+    session->close();
     {
         Mutex::ScopedLock l(lock);
         if (connection);
         {
-            connection->erase(channelId);
-            connection = 0;
+            connection->released(session);
+            connection.reset();
         }
     }
     stop();
 }
 
-
 // Channel closed by peer.
 void Channel::peerClose(uint16_t code, const std::string& message) {
     assert(isOpen());
     //record reason:
     errorCode = code;
     errorText = message;
-    closeInternal();
     stop();
-    futures.close(code, message);
-}
-
-void Channel::closeInternal() {
-    Mutex::ScopedLock l(lock);
-    if (connection);
-    {
-        connection = 0;
-    }
 }
 
 AMQMethodBody::shared_ptr Channel::sendAndReceive(AMQMethodBody::shared_ptr 
toSend, ClassId /*c*/, MethodId /*m*/)
 {
-
-    boost::shared_ptr<FutureResponse> fr(futures.createResponse());
-    executionHandler.send(toSend, boost::bind(&FutureResponse::completed, fr), 
boost::bind(&FutureResponse::received, fr, _1));
-    return fr->getResponse();
+    session->setSync(true);
+    Response r = session->send(toSend, true);
+    session->setSync(false);
+    return r.getPtr();
 }
 
 void Channel::sendSync(bool sync, AMQMethodBody::shared_ptr command)
 {
     if(sync) {
-        boost::shared_ptr<FutureCompletion> fc(futures.createCompletion());
-        executionHandler.send(command, 
boost::bind(&FutureCompletion::completed, fc));
-        fc->waitForCompletion();
+        session->setSync(true);
+        session->send(command, false);
+        session->setSync(false);
     } else {
-        executionHandler.send(command);
+        session->send(command);
     }
 }
 
@@ -199,7 +179,7 @@
     if(sync)
         return sendAndReceive(body, c, m);
     else {
-        executionHandler.send(body);
+        session->send(body);
         return AMQMethodBody::shared_ptr();
     }
 }
@@ -246,8 +226,8 @@
 bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
 
     AMQMethodBody::shared_ptr request(new BasicGetBody(version, 0, 
queue.getName(), ackMode));
-    AMQMethodBody::shared_ptr response = sendAndReceive(request);
-    if (response && response->isA<BasicGetEmptyBody>()) {
+    Response response = session->send(request, true);
+    if (response.isA<BasicGetEmptyBody>()) {
         return false;
     } else {
         ReceivedContent::shared_ptr content = gets.pop();
@@ -263,38 +243,7 @@
     const string e = exchange.getName();
     string key = routingKey;
 
-    executionHandler.sendContent(make_shared_ptr(new BasicPublishBody(version, 
0, e, key, mandatory, immediate)), 
-                                 msg, msg.getData(), 
connection->getMaxFrameSize());//sending framesize here is horrible, fix this!
-    /*
-    // Make a header for the message
-    AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
-    BasicHeaderProperties::copy(
-        *static_cast<BasicHeaderProperties*>(header->getProperties()), msg);
-    header->setContentSize(msg.getData().size());
-
-    executionHandler.send(make_shared_ptr(new BasicPublishBody(version, 0, e, 
key, mandatory, immediate)));
-    executionHandler.sendContent(header);
-    string data = msg.getData();
-    u_int64_t data_length = data.length();
-    if(data_length > 0){
-        //frame itself uses 8 bytes
-        u_int32_t frag_size = connection->getMaxFrameSize() - 8;
-        if(data_length < frag_size){
-            executionHandler.sendContent(make_shared_ptr(new 
AMQContentBody(data)));
-        }else{
-            u_int32_t offset = 0;
-            u_int32_t remaining = data_length - offset;
-            while (remaining > 0) {
-                u_int32_t length = remaining > frag_size ? frag_size : 
remaining;
-                string frag(data.substr(offset, length));
-                executionHandler.sendContent(make_shared_ptr(new 
AMQContentBody(frag)));                          
-                
-                offset += length;
-                remaining = data_length - offset;
-            }
-        }
-    }
-    */
+    session->send(make_shared_ptr(new BasicPublishBody(version, 0, e, key, 
mandatory, immediate)), msg, false);
 }
 
 void Channel::start(){
@@ -303,7 +252,7 @@
 }
 
 void Channel::stop() {
-    executionHandler.received.close();
+    session->stop();
     gets.close();
     Mutex::ScopedLock l(stopLock);
     if(running) {
@@ -315,7 +264,7 @@
 void Channel::run() {
     try {
         while (true) {
-            ReceivedContent::shared_ptr content = 
executionHandler.received.pop();
+            ReceivedContent::shared_ptr content = session->get();
             //need to dispatch this to the relevant listener:
             if (content->isA<BasicDeliverBody>()) {
                 ConsumerMap::iterator i = 
consumers.find(content->as<BasicDeliverBody>()->getConsumerTag());

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h?view=diff&rev=562866&r1=562865&r2=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h Sun Aug  5 
06:25:36 2007
@@ -26,9 +26,8 @@
 #include "ClientExchange.h"
 #include "ClientMessage.h"
 #include "ClientQueue.h"
-#include "ChannelHandler.h"
-#include "ExecutionHandler.h"
-#include "FutureFactory.h"
+#include "ConnectionImpl.h"
+#include "SessionCore.h"
 #include "qpid/Exception.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/Runnable.h"
@@ -71,7 +70,6 @@
     typedef std::map<std::string, Consumer> ConsumerMap;
         
     mutable sys::Mutex lock;
-    Connection* connection;
     sys::Thread dispatcher;
 
     uint16_t prefetch;
@@ -85,11 +83,10 @@
     bool running;
 
     ConsumerMap consumers;
-    ExecutionHandler executionHandler;
-    ChannelHandler channelHandler;
+    ConnectionImpl::shared_ptr connection;
+    SessionCore::shared_ptr session;
     framing::ChannelId channelId;
     BlockingQueue<ReceivedContent::shared_ptr> gets;
-    FutureFactory futures;
 
     void stop();
 
@@ -121,7 +118,7 @@
                 sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID));
     }
 
-    void open(framing::ChannelId, Connection&);
+    void open(ConnectionImpl::shared_ptr, SessionCore::shared_ptr);
     void closeInternal();
     void peerClose(uint16_t, const std::string&);
 
@@ -256,9 +253,6 @@
     
     /** True if the channel is open */
     bool isOpen() const;
-
-    /** Get the connection associated with this channel */
-    Connection& getConnection() { return *connection; }
 
     /** Return the protocol version */
     framing::ProtocolVersion getVersion() const { return version ; }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp?view=diff&rev=562866&r1=562865&r2=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp Sun Aug  
5 06:25:36 2007
@@ -41,31 +41,20 @@
 namespace qpid {
 namespace client {
 
-const std::string Connection::OK("OK");
-
-Connection::Connection(
-    bool _debug, uint32_t _max_frame_size,
-    framing::ProtocolVersion _version
-    ) : channelIdCounter(0), version(_version), 
max_frame_size(_max_frame_size),
-    defaultConnector(version, _debug, _max_frame_size),
-    isOpen(false), debug(_debug)
-{
-    setConnector(defaultConnector);
-
-    handler.maxFrameSize = _max_frame_size;
-}
+Connection::Connection(bool _debug, uint32_t _max_frame_size, 
framing::ProtocolVersion _version) : 
+    channelIdCounter(0), version(_version), 
+    max_frame_size(_max_frame_size), 
+    impl(new ConnectionImpl(boost::shared_ptr<Connector>(new 
Connector(_version, _debug)))),
+    isOpen(false) {}
+
+Connection::Connection(boost::shared_ptr<Connector> c) : 
+    channelIdCounter(0), version(framing::highestProtocolVersion), 
+    max_frame_size(65536), 
+    impl(new ConnectionImpl(c)),
+    isOpen(false) {}
 
 Connection::~Connection(){}
 
-void Connection::setConnector(Connector& con)
-{
-    connector = &con;
-    connector->setInputHandler(&handler);
-    connector->setTimeoutHandler(this);
-    connector->setShutdownHandler(this);
-    out = connector->getOutputHandler();
-}
-
 void Connection::open(
     const std::string& host, int port,
     const std::string& uid, const std::string& pwd, const std::string& vhost)
@@ -73,97 +62,28 @@
     if (isOpen)
         THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open");
 
-    //wire up the handler:
-    handler.in = boost::bind(&Connection::received, this, _1);
-    handler.out = boost::bind(&Connector::send, connector, _1);
-    handler.onClose = boost::bind(&Connection::closeChannels, this);
-
-    handler.uid = uid;
-    handler.pwd = pwd;
-    handler.vhost = vhost;
-
-    connector->connect(host, port);
-    connector->init();
-    handler.waitForOpen();
+    impl->open(host, port, uid, pwd, vhost);
     isOpen = true;
 }
 
-void Connection::shutdown() {
-    //this indicates that the socket to the server has closed we do
-    //not want to send a close request (or any other requests)
-    if(markClosed()) {
-        QPID_LOG(info, "Connection to peer closed!");
-        closeChannels();
-    }
-}
-        
-void Connection::close(
-    ReplyCode /*code*/, const string& /*msg*/, ClassId /*classId*/, MethodId 
/*methodId*/
-)
-{
-    if(markClosed()) {
-        try {
-            handler.close();
-        } catch (const std::exception& e) {
-            QPID_LOG(error, "Exception closing channel: " << e.what());
-        }
-        closeChannels();
-        connector->close();
-    }
-}
-
-bool Connection::markClosed()
-{
-    Mutex::ScopedLock locker(shutdownLock);
-    if (isOpen) {
-        isOpen = false;
-        return true;
-    } else {
-        return false; 
-    }
-}
-
-void Connection::closeChannels()
-{
-    using boost::bind;
-    for_each(channels.begin(), channels.end(),
-             bind(&Channel::closeInternal,
-                  bind(&ChannelMap::value_type::second, _1)));
-    channels.clear();
-}
-
 void Connection::openChannel(Channel& channel) {
     ChannelId id = ++channelIdCounter;
-    assert (channels.find(id) == channels.end());
-    assert(out);
-    channels[id] = &channel;
-    channel.open(id, *this);
-}
-
-void Connection::erase(ChannelId id) {
-    channels.erase(id);
+    SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size));
+    impl->allocated(session);
+    channel.open(impl, session);
+    session->open();
 }
 
-void Connection::received(AMQFrame& frame){
-    ChannelId id = frame.getChannel();
-    Channel* channel = channels[id];
-    if (channel == 0) {
-        throw ConnectionException(504, (boost::format("Invalid channel number 
%g") % id).str());
-    }
-    channel->channelHandler.incoming(frame);
-}
-
-void Connection::send(AMQFrame& frame) {
-    out->send(frame);
-}
-
-void Connection::idleIn(){
-    connector->close();
+Session Connection::newSession() {
+    ChannelId id = ++channelIdCounter;
+    SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size));
+    impl->allocated(session);
+    return Session(impl, session);
 }
 
-void Connection::idleOut(){
-    AMQFrame frame(version, 0, new AMQHeartbeatBody());
-    out->send(frame);
+void Connection::close()
+{
+    impl->close();
 }
 
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h?view=diff&rev=562866&r1=562865&r2=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h Sun Aug  5 
06:25:36 2007
@@ -23,6 +23,7 @@
  */
 #include <string>
 #include "qpid/framing/BasicHeaderProperties.h"
+#include "qpid/framing/MethodContent.h"
 
 namespace qpid {
 namespace client {
@@ -35,11 +36,11 @@
  */
 // FIXME aconway 2007-04-05: Should be based on MessageTransfer properties not
 // basic header properties.
-class Message : public framing::BasicHeaderProperties {
+class Message : public framing::BasicHeaderProperties, public 
framing::MethodContent {
   public:
     Message(const std::string& data_=std::string()) : data(data_) {}
 
-    std::string getData() const { return data; }
+    const std::string& getData() const { return data; }
     void setData(const std::string& _data) { data = _data; }
 
     std::string getDestination() const { return destination; }
@@ -51,6 +52,8 @@
 
     bool isRedelivered() const { return redelivered; }
     void setRedelivered(bool _redelivered){  redelivered = _redelivered; }
+
+    const HeaderProperties& getMethodHeaders() const { return *this; }
 
   private:
     std::string data;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h?view=diff&rev=562866&r1=562865&r2=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h Sun Aug  5 
06:25:36 2007
@@ -25,11 +25,8 @@
 #include <string>
 #include "qpid/QpidError.h"
 #include "ClientChannel.h"
-#include "Connector.h"
-#include "ConnectionHandler.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/sys/ShutdownHandler.h"
-#include "qpid/sys/TimeoutHandler.h"
+#include "ConnectionImpl.h"
+#include "Session.h"
 
 
 namespace qpid {
@@ -42,22 +39,6 @@
  */
 namespace client {
 
-/**
- * \internal provide access to selected private channel functions
- * for the Connection without making it a friend of the entire channel.
- */
-class ConnectionForChannel :
-        public framing::InputHandler,
-        public framing::OutputHandler,
-        public sys::TimeoutHandler, 
-        public sys::ShutdownHandler
-        
-{
-  private:
-  friend class Channel;
-    virtual void erase(framing::ChannelId) = 0;
-};
-
 
 /**
  * \defgroup clientapi Application API for an AMQP client
@@ -70,30 +51,17 @@
  * 
  * \ingroup clientapi
  */
-class Connection : public ConnectionForChannel
+class Connection
 {
-    typedef std::map<framing::ChannelId, Channel*> ChannelMap;
-
     framing::ChannelId channelIdCounter;
-    static const std::string OK;
-
     framing::ProtocolVersion version;
     const uint32_t max_frame_size;
-    ChannelMap channels;
-    ConnectionHandler handler;
-    Connector defaultConnector;
-    Connector* connector;
-    framing::OutputHandler* out;
+    ConnectionImpl::shared_ptr impl;
     bool isOpen;
-    sys::Mutex shutdownLock;
     bool debug;
-        
-    void erase(framing::ChannelId);
-    void closeChannels();
-    bool markClosed();
 
     // TODO aconway 2007-01-26: too many friendships, untagle these classes.
-  friend class Channel;
+    friend class Channel;
     
   public:
     /**
@@ -111,6 +79,7 @@
      */
     Connection(bool debug = false, uint32_t max_frame_size = 65536,
                framing::ProtocolVersion=framing::highestProtocolVersion);
+    Connection(boost::shared_ptr<Connector>);
     ~Connection();
 
     /**
@@ -136,13 +105,12 @@
               const std::string& virtualhost = "/");
 
     /**
-     * Close the connection with optional error information for the peer.
+     * Close the connection
      *
      * Any further use of this connection (without reopening it) will
      * not succeed.
      */
-    void close(framing::ReplyCode=200, const std::string& msg=OK,
-               framing::ClassId = 0, framing::MethodId = 0);
+    void close();
 
     /**
      * Associate a Channel with this connection and open it for use.
@@ -156,24 +124,7 @@
      */
     void openChannel(Channel&);
 
-
-    // TODO aconway 2007-01-26: can these be private?
-    void send(framing::AMQFrame&);
-    void received(framing::AMQFrame&);
-    void idleOut();
-    void idleIn();
-    void shutdown();
-    
-    /**\internal used for testing */
-    void setConnector(Connector& connector);
-    
-    /**
-     * @return the maximum frame size in use on this connection
-     */
-    inline uint32_t getMaxFrameSize(){ return max_frame_size; }
-
-    /** @return protocol version in use on this connection. */ 
-    //framing::ProtocolVersion getVersion() const { return version; }
+    Session newSession();
 };
 
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?view=diff&rev=562866&r1=562865&r2=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Sun Aug 
 5 06:25:36 2007
@@ -175,8 +175,9 @@
         if (method->isA<ConnectionCloseBody>()) {
             send(make_shared_ptr(new ConnectionCloseOkBody(version)));
             setState(CLOSED);
-            if (onClose) {
-                onClose();
+            if (onError) {
+                ConnectionCloseBody::shared_ptr 
c(shared_polymorphic_cast<ConnectionCloseBody>(method));
+                onError(c->getReplyCode(), c->getReplyText());
             }
         } else {
             error(503, "Unexpected method on channel zero.", 
method->amqpClassId(), method->amqpMethodId());

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h?view=diff&rev=562866&r1=562865&r2=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h Sun Aug  
5 06:25:36 2007
@@ -61,6 +61,7 @@
 
 public:
     typedef boost::function<void()> CloseListener;    
+    typedef boost::function<void(uint16_t, const std::string&)> ErrorListener; 
   
 
     ConnectionHandler();
 
@@ -73,6 +74,7 @@
     void close();
 
     CloseListener onClose;
+    ErrorListener onError;
 };
 
 }}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?view=auto&rev=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Sun Aug  5 
06:25:36 2007
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionImpl.h"
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c)
+{
+    handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
+    handler.out = boost::bind(&Connector::send, connector, _1);
+    handler.onClose = boost::bind(&ConnectionImpl::closed, this);
+    connector->setInputHandler(&handler);
+    connector->setTimeoutHandler(this);
+    connector->setShutdownHandler(this);
+}
+
+void ConnectionImpl::allocated(SessionCore::shared_ptr session)
+{
+    if (sessions.find(session->getId()) != sessions.end()) {
+        throw Exception("Id already in use.");
+    }
+    sessions[session->getId()] = session;
+}
+
+void ConnectionImpl::released(SessionCore::shared_ptr session)
+{
+    SessionMap::iterator i = sessions.find(session->getId());
+    if (i == sessions.end()) {
+        throw Exception("Id not in use.");
+    }
+    sessions.erase(i);
+}
+
+void ConnectionImpl::handle(framing::AMQFrame& frame)
+{
+    handler.outgoing(frame);
+}
+
+void ConnectionImpl::incoming(framing::AMQFrame& frame)
+{
+    uint16_t id = frame.getChannel();
+    SessionCore::shared_ptr session = sessions[id];
+    if (!session) {
+        throw ConnectionException(504, (boost::format("Invalid channel number 
%g") % id).str());
+    }
+    session->handle(frame);
+}
+
+void ConnectionImpl::open(const std::string& host, int port,
+                          const std::string& uid, const std::string& pwd, 
+                          const std::string& vhost)
+{
+    //TODO: better management of connection properties
+    handler.uid = uid;
+    handler.pwd = pwd;
+    handler.vhost = vhost;
+
+    connector->connect(host, port);
+    connector->init();
+    handler.waitForOpen();
+}
+
+void ConnectionImpl::close()
+{
+    handler.close();
+}
+
+void ConnectionImpl::closed()
+{
+    closed(200, "OK");
+}
+
+void ConnectionImpl::closed(uint16_t code, const std::string& text)
+{
+    for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
+        i->second->closed(code, text);
+    }
+    sessions.clear();
+    connector->close();
+}
+
+void ConnectionImpl::idleIn()
+{
+    connector->close();
+}
+
+void ConnectionImpl::idleOut()
+{
+    AMQFrame frame(version, 0, new AMQHeartbeatBody());
+    connector->send(frame);
+}
+
+void ConnectionImpl::shutdown() {
+    //this indicates that the socket to the server has closed
+    for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
+        i->second->closed(0, "Unexpected scoket closure.");
+    }
+    sessions.clear();
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?view=auto&rev=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Sun Aug  5 
06:25:36 2007
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _ConnectionImpl_
+#define _ConnectionImpl_
+
+#include <map>
+#include <boost/shared_ptr.hpp>
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/sys/ShutdownHandler.h"
+#include "qpid/sys/TimeoutHandler.h"
+#include "ConnectionHandler.h"
+#include "Connector.h"
+#include "SessionCore.h"
+
+namespace qpid {
+namespace client {
+
+class ConnectionImpl : public framing::FrameHandler,
+        public sys::TimeoutHandler, 
+        public sys::ShutdownHandler
+
+{
+    typedef std::map<uint16_t, SessionCore::shared_ptr> SessionMap;
+    SessionMap sessions; 
+    ConnectionHandler handler;
+    boost::shared_ptr<Connector> connector;
+    framing::ProtocolVersion version;
+
+    void incoming(framing::AMQFrame& frame);    
+    void closed();
+    void closed(uint16_t, const std::string&);
+    void idleOut();
+    void idleIn();
+    void shutdown();
+public:
+    typedef boost::shared_ptr<ConnectionImpl> shared_ptr;
+
+    ConnectionImpl(boost::shared_ptr<Connector> c);
+    void allocated(SessionCore::shared_ptr);
+    void released(SessionCore::shared_ptr);
+    void open(const std::string& host, int port = 5672, 
+              const std::string& uid = "guest",
+              const std::string& pwd = "guest", 
+              const std::string& virtualhost = "/");
+    void close();
+    void handle(framing::AMQFrame& frame);    
+};
+
+}}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?view=diff&rev=562866&r1=562865&r2=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Sun Aug  
5 06:25:36 2007
@@ -50,7 +50,8 @@
     return body->type() == METHOD_BODY && 
shared_polymorphic_cast<AMQMethodBody>(body)->invoke(target);
 }
 
-ExecutionHandler::ExecutionHandler() : 
version(framing::highestProtocolVersion) {}
+ExecutionHandler::ExecutionHandler(uint64_t _maxFrameSize) : 
+    version(framing::highestProtocolVersion), maxFrameSize(_maxFrameSize) {}
 
 //incoming:
 void ExecutionHandler::handle(AMQFrame& frame)
@@ -97,6 +98,12 @@
     //make_shared_ptr(new ExecutionCompleteBody(getVersion(), 
incoming.hwm.getValue(), SequenceNumberSet())));
 }
 
+void ExecutionHandler::sendFlush()
+{
+    AMQFrame frame(version, 0, make_shared_ptr(new 
ExecutionFlushBody(version)));
+    out(frame);        
+}
+
 void ExecutionHandler::send(AMQBody::shared_ptr command, 
CompletionTracker::Listener f, Correlator::Listener g)
 {
     //allocate id:
@@ -111,21 +118,9 @@
 
     AMQFrame frame(version, 0/*id will be filled in be channel handler*/, 
command);
     out(frame);
-
-    if (f) {
-        AMQFrame frame(version, 0, make_shared_ptr(new 
ExecutionFlushBody(version)));
-        out(frame);        
-    }
-}
-
-void ExecutionHandler::sendContent(framing::AMQBody::shared_ptr content)
-{
-    AMQFrame frame(version, 0/*id will be filled in be channel handler*/, 
content);
-    out(frame);
 }
 
 void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const 
BasicHeaderProperties& headers, const std::string& data, 
-                                   uint64_t frameSize,
                                    CompletionTracker::Listener f, 
Correlator::Listener g)
 {
     send(command, f, g);
@@ -139,7 +134,7 @@
     u_int64_t data_length = data.length();
     if(data_length > 0){
         //frame itself uses 8 bytes
-        u_int32_t frag_size = frameSize - 8;
+        u_int32_t frag_size = maxFrameSize - 8;
         if(data_length < frag_size){
             AMQFrame frame(version, 0, make_shared_ptr(new 
AMQContentBody(data)));
             out(frame);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?view=diff&rev=562866&r1=562865&r2=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Sun Aug  5 
06:25:36 2007
@@ -43,6 +43,7 @@
     Correlator correlation;
     CompletionTracker completion;
     framing::ProtocolVersion version;
+    uint64_t maxFrameSize;
 
     void complete(uint32_t mark, framing::SequenceNumberSet range);    
     void flush();
@@ -50,7 +51,9 @@
 public:
     BlockingQueue<ReceivedContent::shared_ptr> received; 
 
-    ExecutionHandler();
+    ExecutionHandler(uint64_t maxFrameSize = 65536);
+
+    void setMaxFrameSize(uint64_t size) { maxFrameSize = size; }
 
     void handle(framing::AMQFrame& frame);
     void send(framing::AMQBody::shared_ptr command, 
@@ -58,11 +61,9 @@
               Correlator::Listener g = Correlator::Listener());
     void sendContent(framing::AMQBody::shared_ptr command, 
                      const framing::BasicHeaderProperties& headers, const 
std::string& data, 
-                     uint64_t frameSize,
                      CompletionTracker::Listener f = 
CompletionTracker::Listener(), 
                      Correlator::Listener g = Correlator::Listener());
-
-    void sendContent(framing::AMQBody::shared_ptr content);
+    void sendFlush();
 };
 
 }}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Response.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Response.h?view=auto&rev=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Response.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Response.h Sun Aug  5 
06:25:36 2007
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _Response_
+#define _Response_
+
+#include <boost/shared_ptr.hpp>
+#include "qpid/framing/amqp_framing.h"
+#include "FutureResponse.h"
+
+namespace qpid {
+namespace client {
+
+class Response
+{
+    boost::shared_ptr<FutureResponse> future;
+
+public:
+    Response(boost::shared_ptr<FutureResponse> f) : future(f) {}
+
+    template <class T> T& as() 
+    {
+        framing::AMQMethodBody::shared_ptr response(future->getResponse());
+        return boost::shared_polymorphic_cast<T>(*response);
+    }
+    template <class T> bool isA() 
+    {
+        return future->getResponse()->isA<T>();
+    }
+    
+    void sync()
+    {
+        return future->waitForCompletion();
+    }
+
+    //TODO: only exposed for old channel class, may want to hide this 
eventually
+    framing::AMQMethodBody::shared_ptr getPtr()
+    {
+        return future->getResponse();
+    }
+};
+
+}}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Response.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Response.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?view=auto&rev=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Sun Aug  5 
06:25:36 2007
@@ -0,0 +1,115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SessionCore.h"
+#include <boost/bind.hpp>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+SessionCore::SessionCore(uint16_t _id, 
boost::shared_ptr<framing::FrameHandler> out, 
+                         uint64_t maxFrameSize) : l3(maxFrameSize), id(_id), 
sync(false)
+{
+    l2.out = boost::bind(&FrameHandler::handle, out, _1);
+    l2.in = boost::bind(&ExecutionHandler::handle, &l3, _1);
+    l3.out = boost::bind(&ChannelHandler::outgoing, &l2, _1);
+    l2.onClose = boost::bind(&SessionCore::closed, this, _1, _2);
+}
+
+void SessionCore::open()
+{
+    l2.open(id);
+}
+
+void SessionCore::flush()
+{
+    l3.sendFlush();
+}
+
+Response SessionCore::send(AMQMethodBody::shared_ptr method, bool 
expectResponse)
+{
+    boost::shared_ptr<FutureResponse> f(futures.createResponse());
+    if (expectResponse) {
+        l3.send(method, boost::bind(&FutureResponse::completed, f), 
boost::bind(&FutureResponse::received, f, _1));    
+    } else {
+        l3.send(method, boost::bind(&FutureResponse::completed, f));    
+    }
+    if (sync) {
+        flush();
+        f->waitForCompletion();
+    }
+    return Response(f);
+}
+
+Response SessionCore::send(AMQMethodBody::shared_ptr method, const 
MethodContent& content, bool expectResponse)
+{
+    //TODO: lots of duplication between these two send methods; refactor
+    boost::shared_ptr<FutureResponse> f(futures.createResponse());
+    if (expectResponse) {
+        l3.sendContent(method, dynamic_cast<const 
BasicHeaderProperties&>(content.getMethodHeaders()), content.getData(), 
+                       boost::bind(&FutureResponse::completed, f), 
boost::bind(&FutureResponse::received, f, _1));    
+    } else {
+        l3.sendContent(method, dynamic_cast<const 
BasicHeaderProperties&>(content.getMethodHeaders()), content.getData(), 
+                       boost::bind(&FutureResponse::completed, f));    
+    }
+    if (sync) {
+        flush();
+        f->waitForCompletion();
+    }
+    return Response(f);
+}
+
+ReceivedContent::shared_ptr SessionCore::get()
+{
+    return l3.received.pop();
+}
+
+void SessionCore::setSync(bool s)
+{
+    sync = s;
+}
+
+bool SessionCore::isSync()
+{
+    return sync;
+}
+
+void SessionCore::close()
+{
+    l2.close();
+    l3.received.close();
+}
+
+void SessionCore::stop()
+{
+    l3.received.close();
+}
+
+void SessionCore::handle(AMQFrame& frame)
+{
+    l2.incoming(frame);
+}
+
+void SessionCore::closed(uint16_t code, const std::string& text)
+{
+    l3.received.close();
+    futures.close(code, text);
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h?view=auto&rev=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h Sun Aug  5 
06:25:36 2007
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _SessionCore_
+#define _SessionCore_
+
+#include <boost/shared_ptr.hpp>
+#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/MethodContent.h"
+#include "ChannelHandler.h"
+#include "ExecutionHandler.h"
+#include "FutureFactory.h"
+#include "ReceivedContent.h"
+#include "Response.h"
+
+namespace qpid {
+namespace client {
+
+class SessionCore : public framing::FrameHandler
+{
+    ExecutionHandler l3;
+    ChannelHandler l2;
+    FutureFactory futures;
+    const uint16_t id;
+    bool sync;
+    
+public:    
+    typedef boost::shared_ptr<SessionCore> shared_ptr;
+
+    SessionCore(uint16_t id, boost::shared_ptr<framing::FrameHandler> out, 
uint64_t maxFrameSize);
+    Response send(framing::AMQMethodBody::shared_ptr method, bool 
expectResponse = false);
+    Response send(framing::AMQMethodBody::shared_ptr method, const 
framing::MethodContent& content, bool expectResponse = false);
+    ReceivedContent::shared_ptr get();
+    uint16_t getId() const { return id; } 
+    void setSync(bool);
+    bool isSync();
+    void flush();
+    void open();
+    void close();
+    void stop();
+    void closed(uint16_t code, const std::string& text);
+    
+    //for incoming frames:
+    void handle(framing::AMQFrame& frame);    
+};
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/MethodContent.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/MethodContent.h?view=auto&rev=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/MethodContent.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/MethodContent.h Sun Aug  5 
06:25:36 2007
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _MethodContent_
+#define _MethodContent_
+
+#include "HeaderProperties.h"
+
+namespace qpid {
+namespace framing {
+
+class MethodContent
+{
+public:
+    virtual ~MethodContent() {}
+    //TODO: rethink this interface
+    virtual const HeaderProperties& getMethodHeaders() const = 0;
+    virtual const std::string& getData() const = 0;
+};
+
+}}
+#endif  

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/MethodContent.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/MethodContent.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp?view=diff&rev=562866&r1=562865&r2=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp Sun Aug  5 06:25:36 
2007
@@ -33,6 +33,7 @@
 #include "qpid/framing/Responder.h"
 #include "InProcessBroker.h"
 #include "qpid/client/Connection.h"
+#include "qpid/client/Connector.h"
 #include "qpid/client/ClientExchange.h"
 #include "qpid/client/ClientQueue.h"
 #include "qpid/framing/Correlator.h"
@@ -386,9 +387,8 @@
     CPPUNIT_ASSERT_EQUAL(string(expect, sizeof(expect)-1), 
boost::lexical_cast<string>(frame))
 
     void testRequestResponseRoundtrip() {
-        broker::InProcessBroker ibroker(version);
-        client::Connection clientConnection;
-        clientConnection.setConnector(ibroker);
+        boost::shared_ptr<broker::InProcessBroker> ibroker(new 
broker::InProcessBroker(version));
+        client::Connection 
clientConnection(boost::static_pointer_cast<client::Connector>(ibroker));
         clientConnection.open("");
         client::Channel c;
         clientConnection.openChannel(c);
@@ -399,7 +399,9 @@
         c.declareExchange(exchange);
         c.declareQueue(queue);
         c.bind(exchange, queue, "MyTopic", framing::FieldTable());
-        broker::InProcessBroker::Conversation::const_iterator i = 
ibroker.conversation.begin();
+        c.close();
+        clientConnection.close();
+        broker::InProcessBroker::Conversation::const_iterator i = 
ibroker->conversation.begin();
         ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionStart: 
versionMajor=0; versionMinor=10; serverProperties={}; mechanisms=PLAIN; 
locales=en_US]", *i++);
         ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionStartOk: 
clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; 
locale=en_US]", *i++);
         ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionTune: 
channelMax=32767; frameMax=65536; heartbeat=0]", *i++);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h?view=diff&rev=562866&r1=562865&r2=562866
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h Sun Aug  5 
06:25:36 2007
@@ -134,30 +134,7 @@
 }
 
 } // namespace broker
-
-
-namespace client {
-/** An in-process client+broker all in one. */
-class InProcessBrokerClient : public client::Connection {
-  public:
-    broker::InProcessBroker broker;
-    broker::InProcessBroker::Conversation& conversation;
-    
-    /** Constructor creates broker and opens client connection. */
-    InProcessBrokerClient(
-        u_int32_t max_frame_size=65536,
-        framing::ProtocolVersion version= framing::highestProtocolVersion
-    ) : client::Connection(false, max_frame_size, version),
-        broker(version),
-        conversation(broker.conversation)
-    {
-        setConnector(broker);
-        open("");
-    }
-};
-
-
-}} // namespace qpid::client
+} // namespace qpid
 
 
 #endif // _tests_InProcessBroker_h


Reply via email to