Author: gsim
Date: Wed Aug 15 10:26:43 2007
New Revision: 566274

URL: http://svn.apache.org/viewvc?view=rev&rev=566274
Log:
Altered old client channel to use new generated session interface (primarily to 
reduce the number of places where method bodies are constructed).


Modified:
    incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Response.h

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb?view=diff&rev=566274&r1=566273&r2=566274
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/templates/Session.rb Wed Aug 15 
10:26:43 2007
@@ -13,7 +13,7 @@
   end
   
   def declare_method (m)
-    gen "Response #{m.amqp_parent.name.lcaps}#{m.cppname.caps}(" 
+    gen "Response #{m.amqp_parent.name.lcaps}#{m.name.caps}(" 
     if (m.content())
       params=m.signature + ["const MethodContent& content"]
     else
@@ -28,7 +28,7 @@
   end
 
   def define_method (m)
-    gen "Response Session::#{m.amqp_parent.name.lcaps}#{m.cppname.caps}(" 
+    gen "Response Session::#{m.amqp_parent.name.lcaps}#{m.name.caps}(" 
     if (m.content())
       params=m.signature + ["const MethodContent& content"]
     else
@@ -89,8 +89,8 @@
     [EMAIL PROTECTED]();
 
     ReceivedContent::shared_ptr get() { return impl->get(); }
-    void close() { impl->close(); parent->released(impl); }  
-
+    void setSynchronous(bool sync) { impl->setSync(sync); } 
+    void close();
 EOS
   indent { @amqp.amqp_classes.each { |c| declare_class(c) if 
!excludes.include?(c.name) } }
   gen <<EOS
@@ -117,7 +117,19 @@
 [EMAIL PROTECTED]::[EMAIL PROTECTED]()
 {
     impl->stop();
-    parent->released(impl);
+    if (parent) { 
+        parent->released(impl);
+        parent.reset();
+    }
+}
+
+void [EMAIL PROTECTED]::close()
+{
+    impl->close(); 
+    if (parent) { 
+        parent->released(impl);
+        parent.reset();
+    }
 }
 
 EOS

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp?view=diff&rev=566274&r1=566273&r2=566274
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp Wed Aug 15 
10:26:43 2007
@@ -46,6 +46,14 @@
 
 const std::string empty;
 
+class ScopedSync
+{
+    Session& session;
+public:
+    ScopedSync(Session& s, bool enabled = true) : session(s) { 
session.setSynchronous(enabled); }
+    ~ScopedSync() { session.setSynchronous(false); }
+};
+
 }}
 
 Channel::Channel(bool _transactional, u_int16_t _prefetch) :
@@ -64,7 +72,8 @@
         THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
 
     connection = c;
-    session = s;
+    sessionCore = s;
+    session = auto_ptr<Session>(new Session(c, s));
 }
     
 bool Channel::isOpen() const { 
@@ -73,10 +82,10 @@
 }
 
 void Channel::setQos() {
-    sendSync(false, make_shared_ptr(new BasicQosBody(version, 0, 
getPrefetch(), false)));
+    session->basicQos(0, getPrefetch(), false);
     if(isTransactional()) {
         //I think this is wrong! should only send TxSelect once...
-        sendSync(false, make_shared_ptr(new TxSelectBody(version)));
+        session->txSelect();
     }
 }
 
@@ -86,52 +95,46 @@
 }
 
 void Channel::declareExchange(Exchange& exchange, bool synch){
-    string name = exchange.getName();
-    string type = exchange.getType();
     FieldTable args;
-    sendSync(synch, make_shared_ptr(new ExchangeDeclareBody(version, 0, name, 
type, empty, false, false, false, args)));
+    ScopedSync s(*session, synch);
+    session->exchangeDeclare(0, exchange.getName(), exchange.getType(), empty, 
false, false, false, args);
 }
 
 void Channel::deleteExchange(Exchange& exchange, bool synch){
-    string name = exchange.getName();
-    sendSync(synch, make_shared_ptr(new ExchangeDeleteBody(version, 0, name, 
false)));
+    ScopedSync s(*session, synch);
+    session->exchangeDelete(0, exchange.getName(), false);
 }
 
 void Channel::declareQueue(Queue& queue, bool synch){
-    string name = queue.getName();
     FieldTable args;
-    QueueDeclareOkBody::shared_ptr response =
-        sendAndReceiveSync<QueueDeclareOkBody>(
-            synch,
-            make_shared_ptr(new QueueDeclareBody(
-                version, 0, name, empty, false/*passive*/, queue.isDurable(),
-                queue.isExclusive(), queue.isAutoDelete(), !synch, args)));
+    ScopedSync s(*session, synch);
+    Response r = session->queueDeclare(0, queue.getName(), empty, 
false/*passive*/, queue.isDurable(),
+                                       queue.isExclusive(), 
queue.isAutoDelete(), !synch, args);
+    
     if(synch) {
         if(queue.getName().length() == 0)
-            queue.setName(response->getQueue());
+            queue.setName(r.as<QueueDeclareOkBody>().getQueue());
     }
 }
 
 void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool 
synch){
-    //ticket, queue, ifunused, ifempty, nowait
-    string name = queue.getName();
-    sendAndReceiveSync<QueueDeleteOkBody>(
-        synch,
-        make_shared_ptr(new QueueDeleteBody(version, 0, name, ifunused, 
ifempty, !synch)));
+    ScopedSync s(*session, synch);
+    session->queueDelete(0, queue.getName(), ifunused, ifempty, !synch);
 }
 
 void Channel::bind(const Exchange& exchange, const Queue& queue, const 
std::string& key, const FieldTable& args, bool synch){
     string e = exchange.getName();
     string q = queue.getName();
-    sendSync(synch, make_shared_ptr(new QueueBindBody(version, 0, q, e, key, 
args)));
+    ScopedSync s(*session, synch);
+    session->queueBind(0, q, e, key, args);
 }
 
 void Channel::commit(){
-    sendSync(false, make_shared_ptr(new TxCommitBody(version)));
+    session->txCommit();
 }
 
 void Channel::rollback(){
-    sendSync(false, make_shared_ptr(new TxRollbackBody(version)));
+    session->txRollback();
 }
 
 void Channel::close()
@@ -141,43 +144,14 @@
         Mutex::ScopedLock l(lock);
         if (connection);
         {
-            connection->released(session);
+            session.reset();
+            sessionCore.reset();
             connection.reset();
         }
     }
     stop();
 }
 
-AMQMethodBody::shared_ptr Channel::sendAndReceive(AMQMethodBody::shared_ptr 
toSend, ClassId /*c*/, MethodId /*m*/)
-{
-    session->setSync(true);
-    Response r = session->send(toSend, true);
-    session->setSync(false);
-    return r.getPtr();
-}
-
-void Channel::sendSync(bool sync, AMQMethodBody::shared_ptr command)
-{
-    if(sync) {
-        session->setSync(true);
-        session->send(command, false);
-        session->setSync(false);
-    } else {
-        session->send(command);
-    }
-}
-
-AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
-    bool sync, AMQMethodBody::shared_ptr body, ClassId c, MethodId m)
-{
-    if(sync)
-        return sendAndReceive(body, c, m);
-    else {
-        session->send(body);
-        return AMQMethodBody::shared_ptr();
-    }
-}
-
 void Channel::consume(
     Queue& queue, const std::string& tag, MessageListener* listener, 
     AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) {
@@ -195,12 +169,10 @@
         c.ackMode = ackMode;
         c.lastDeliveryTag = 0;
     }
-    sendAndReceiveSync<BasicConsumeOkBody>(
-            synch,
-            make_shared_ptr(new BasicConsumeBody(
-                version, 0, queue.getName(), tag, noLocal,
-                ackMode == NO_ACK, false, !synch,
-                fields ? *fields : FieldTable())));
+    ScopedSync s(*session, synch);
+    session->basicConsume(0, queue.getName(), tag, noLocal,
+                          ackMode == NO_ACK, false, !synch,
+                          fields ? *fields : FieldTable());
 }
         
 void Channel::cancel(const std::string& tag, bool synch) {
@@ -213,16 +185,13 @@
         c = i->second;
         consumers.erase(i);
     }
-    sendAndReceiveSync<BasicCancelOkBody>(
-        synch, make_shared_ptr(new BasicCancelBody(version, tag, !synch)));
+    ScopedSync s(*session, synch);
+    session->basicCancel(tag, !synch);
 }
 
 bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
-
-    AMQMethodBody::shared_ptr request(new BasicGetBody(version, 0, 
queue.getName(), ackMode));
-    
-    Response response = session->send(request, true);
-    session->flush();
+    Response response = session->basicGet(0, queue.getName(), ackMode == 
NO_ACK);
+    sessionCore->flush();//TODO: need to expose the ability to request 
completion info through session
     if (response.isA<BasicGetEmptyBody>()) {
         return false;
     } else {
@@ -239,7 +208,7 @@
     const string e = exchange.getName();
     string key = routingKey;
 
-    session->send(make_shared_ptr(new BasicPublishBody(version, 0, e, key, 
mandatory, immediate)), msg, false);
+    session->basicPublish(0, e, key, mandatory, immediate, msg);
 }
 
 void Channel::start(){
@@ -248,7 +217,7 @@
 }
 
 void Channel::stop() {
-    session->stop();
+    //session->stop();
     gets.close();
     join();
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h?view=diff&rev=566274&r1=566273&r2=566274
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h Wed Aug 15 
10:26:43 2007
@@ -21,13 +21,14 @@
  * under the License.
  *
  */
+#include <memory>
 #include <boost/scoped_ptr.hpp>
 #include "qpid/framing/amqp_framing.h"
 #include "ClientExchange.h"
 #include "ClientMessage.h"
 #include "ClientQueue.h"
 #include "ConnectionImpl.h"
-#include "SessionCore.h"
+#include "Session.h"
 #include "qpid/Exception.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/Runnable.h"
@@ -58,9 +59,6 @@
 class Channel : private sys::Runnable
 {
   private:
-    struct UnknownMethod {};
-    typedef shared_ptr<framing::AMQMethodBody> MethodPtr;
-
     struct Consumer{
         MessageListener* listener;
         AckMode ackMode;
@@ -81,40 +79,14 @@
 
     ConsumerMap consumers;
     ConnectionImpl::shared_ptr connection;
-    SessionCore::shared_ptr session;
+    std::auto_ptr<Session> session;
+    SessionCore::shared_ptr sessionCore;
     framing::ChannelId channelId;
     BlockingQueue<ReceivedContent::shared_ptr> gets;
 
     void stop();
 
     void setQos();
-    
-    framing::AMQMethodBody::shared_ptr sendAndReceive(
-        framing::AMQMethodBody::shared_ptr,
-        framing::ClassId = 0, framing::MethodId = 0);
-
-    framing::AMQMethodBody::shared_ptr sendAndReceiveSync(
-        bool sync,
-        framing::AMQMethodBody::shared_ptr,
-        framing::ClassId, framing::MethodId);
-
-    void sendSync(bool sync, framing::AMQMethodBody::shared_ptr body);
-
-
-    template <class BodyType>
-    boost::shared_ptr<BodyType> 
sendAndReceive(framing::AMQMethodBody::shared_ptr body) {
-        return boost::shared_polymorphic_downcast<BodyType>(
-            sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID));
-    }
-
-    template <class BodyType>
-    boost::shared_ptr<BodyType> sendAndReceiveSync(
-        bool sync, framing::AMQMethodBody::shared_ptr body) {
-        return boost::shared_polymorphic_downcast<BodyType>(
-            sendAndReceiveSync(
-                sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID));
-    }
-
     void open(ConnectionImpl::shared_ptr, SessionCore::shared_ptr);
     void closeInternal();
     void join();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?view=diff&rev=566274&r1=566273&r2=566274
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Wed Aug 15 
10:26:43 2007
@@ -47,10 +47,9 @@
 void ConnectionImpl::released(SessionCore::shared_ptr session)
 {
     SessionMap::iterator i = sessions.find(session->getId());
-    if (i == sessions.end()) {
-        throw Exception("Id not in use.");
+    if (i != sessions.end()) {
+        sessions.erase(i);
     }
-    sessions.erase(i);
 }
 
 void ConnectionImpl::handle(framing::AMQFrame& frame)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Response.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Response.h?view=diff&rev=566274&r1=566273&r2=566274
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Response.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Response.h Wed Aug 15 
10:26:43 2007
@@ -39,7 +39,7 @@
     template <class T> T& as() 
     {
         framing::AMQMethodBody::shared_ptr response(future->getResponse());
-        return boost::shared_polymorphic_cast<T>(*response);
+        return dynamic_cast<T&>(*response);
     }
     template <class T> bool isA() 
     {
@@ -50,12 +50,6 @@
     void sync()
     {
         return future->waitForCompletion();
-    }
-
-    //TODO: only exposed for old channel class, may want to hide this 
eventually
-    framing::AMQMethodBody::shared_ptr getPtr()
-    {
-        return future->getResponse();
     }
 };
 


Reply via email to