Author: aconway
Date: Thu Feb 22 15:23:52 2007
New Revision: 510705

URL: http://svn.apache.org/viewvc?view=rev&rev=510705
Log:
* cpp/lib/client/Basic.*, ClientChannel.*: Extracted Basic functionality
  from Channel into separate Basic class.
* cpp/lib/client/*, cpp/test/*: Adjusted for new Channel::getBasic() API.

Added:
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.cpp   (with props)
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.h   (with props)
Modified:
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/client/Makefile.am
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ThreadSafeQueue.h
    incubator/qpid/branches/qpid.0-9/cpp/tests/ClientChannelTest.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/client_test.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/echo_service.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/topic_listener.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/topic_publisher.cpp

Added: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.cpp?view=auto&rev=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.cpp (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.cpp Thu Feb 22 
15:23:52 2007
@@ -0,0 +1,255 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include "Basic.h"
+#include "AMQMethodBody.h"
+#include "ClientChannel.h"
+#include "ReturnedMessageHandler.h"
+#include "MessageListener.h"
+#include "framing/FieldTable.h"
+#include "Connection.h"
+
+using namespace std;
+
+namespace qpid {
+namespace client {
+
+using namespace sys;
+using namespace framing;
+
+Basic::Basic(Channel& ch) : channel(ch), returnsHandler(0) {}
+
+void Basic::consume(
+    Queue& queue, std::string& tag, MessageListener* listener, 
+    AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields)
+{
+    channel.sendAndReceiveSync<BasicConsumeOkBody>(
+        synch,
+        new BasicConsumeBody(
+            channel.version, 0, queue.getName(), tag, noLocal,
+            ackMode == NO_ACK, false, !synch,
+            fields ? *fields : FieldTable()));
+    if (synch) {
+        BasicConsumeOkBody::shared_ptr response =
+            boost::shared_polymorphic_downcast<BasicConsumeOkBody>(
+                channel.responses.getResponse());
+        tag = response->getConsumerTag();
+    }
+    // FIXME aconway 2007-02-20: Race condition!
+    // We could receive the first message for the consumer
+    // before we create the consumer below.
+    // Move consumer creation to handler for BasicConsumeOkBody
+    {
+        Mutex::ScopedLock l(lock);
+        ConsumerMap::iterator i = consumers.find(tag);
+        if (i != consumers.end())
+            THROW_QPID_ERROR(CLIENT_ERROR,
+                             "Consumer already exists with tag="+tag);
+        Consumer& c = consumers[tag];
+        c.listener = listener;
+        c.ackMode = ackMode;
+        c.lastDeliveryTag = 0;
+    }
+}
+
+
+void Basic::cancel(const std::string& tag, bool synch) {
+    Consumer c;
+    {
+        Mutex::ScopedLock l(lock);
+        ConsumerMap::iterator i = consumers.find(tag);
+        if (i == consumers.end())
+            return;
+        c = i->second;
+        consumers.erase(i);
+    }
+    if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) 
+        channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, 
true));
+    channel.sendAndReceiveSync<BasicCancelOkBody>(
+        synch, new BasicCancelBody(channel.version, tag, !synch));
+}
+
+void Basic::cancelAll(){
+    ConsumerMap consumersCopy;
+    {
+        Mutex::ScopedLock l(lock);
+        consumersCopy = consumers;
+        consumers.clear();
+    }
+    for (ConsumerMap::iterator i=consumersCopy.begin();
+         i  != consumersCopy.end(); ++i)
+    {
+        Consumer& c = i->second;
+        if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
+            && c.lastDeliveryTag > 0)
+        {
+            channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, 
true));
+        }
+    }
+}
+
+
+
+bool Basic::get(Message& msg, const Queue& queue, AckMode ackMode) {
+    // Expect a message starting with a BasicGetOk
+    incoming.startGet();
+    channel.send(new BasicGetBody(channel.version, 0, queue.getName(), 
ackMode));
+    return incoming.waitGet(msg);
+}
+
+
+void Basic::publish(
+    const Message& msg, const Exchange& exchange,
+    const std::string& routingKey, bool mandatory, bool immediate)
+{
+    const string e = exchange.getName();
+    string key = routingKey;
+    channel.send(new BasicPublishBody(channel.version, 0, e, key, mandatory, 
immediate));
+    //break msg up into header frame and content frame(s) and send these
+    channel.send(msg.getHeader());
+    string data = msg.getData();
+    u_int64_t data_length = data.length();
+    if(data_length > 0){
+        //frame itself uses 8 bytes
+        u_int32_t frag_size = channel.connection->getMaxFrameSize() - 8;
+        if(data_length < frag_size){
+            channel.send(new AMQContentBody(data));
+        }else{
+            u_int32_t offset = 0;
+            u_int32_t remaining = data_length - offset;
+            while (remaining > 0) {
+                u_int32_t length = remaining > frag_size ? frag_size : 
remaining;
+                string frag(data.substr(offset, length));
+                channel.send(new AMQContentBody(frag));                        
  
+                
+                offset += length;
+                remaining = data_length - offset;
+            }
+        }
+    }
+}
+
+void Basic::handle(boost::shared_ptr<AMQMethodBody> method) {
+    assert(method->amqpClassId() ==BasicGetBody::CLASS_ID);
+    switch(method->amqpMethodId()) {
+      case BasicDeliverBody::METHOD_ID:
+      case BasicReturnBody::METHOD_ID:
+      case BasicGetOkBody::METHOD_ID:
+      case BasicGetEmptyBody::METHOD_ID:
+        incoming.add(method);   
+        return;
+    }
+    throw Channel::UnknownMethod();
+}
+
+void Basic::deliver(Consumer& consumer, Message& msg){
+    //record delivery tag:
+    consumer.lastDeliveryTag = msg.getDeliveryTag();
+
+    //allow registered listener to handle the message
+    consumer.listener->received(msg);
+
+    if(channel.isOpen()){
+        bool multiple(false);
+        switch(consumer.ackMode){
+          case LAZY_ACK: 
+            multiple = true;
+            if(++(consumer.count) < channel.getPrefetch())
+                break;
+            //else drop-through
+          case AUTO_ACK:
+            consumer.lastDeliveryTag = 0;
+            channel.send(
+                new BasicAckBody(
+                    channel.version, msg.getDeliveryTag(), multiple));
+          case NO_ACK:          // Nothing to do
+          case CLIENT_ACK:      // User code must ack.
+            break;
+            // TODO aconway 2007-02-22: Provide a way for user
+            // to ack!
+        }
+    }
+
+    //as it stands, transactionality is entirely orthogonal to ack
+    //mode, though the acks will not be processed by the broker under
+    //a transaction until it commits.
+}
+
+
+void Basic::run() {
+    while(channel.isOpen()) {
+        try {
+            Message msg = incoming.waitDispatch();
+            if(msg.getMethod()->isA<BasicReturnBody>()) {
+                ReturnedMessageHandler* handler=0;
+                {
+                    Mutex::ScopedLock l(lock);
+                    handler=returnsHandler;
+                }
+                if(handler == 0) {
+                    // TODO aconway 2007-02-20: proper logging.
+                    cout << "Message returned: " << msg.getData() << endl;
+                }
+                else 
+                    handler->returned(msg);
+            }
+            else {
+                BasicDeliverBody::shared_ptr deliverBody =
+                    boost::shared_polymorphic_downcast<BasicDeliverBody>(
+                        msg.getMethod());
+                std::string tag = deliverBody->getConsumerTag();
+                Consumer consumer;
+                {
+                    Mutex::ScopedLock l(lock);
+                    ConsumerMap::iterator i = consumers.find(tag);
+                    if(i == consumers.end()) 
+                        THROW_QPID_ERROR(PROTOCOL_ERROR+504,
+                                         "Unknown consumer tag=" + tag);
+                    consumer = i->second;
+                }
+                deliver(consumer, msg);
+            }
+        }
+        catch (const ShutdownException&) {
+            /* Orderly shutdown */
+        }
+        catch (const Exception& e) {
+            // FIXME aconway 2007-02-20: Report exception to user.
+            cout << "client::Basic::run() terminated by: " << e.toString()
+                 << "(" << typeid(e).name() << ")" << endl;
+        }
+    }
+}
+
+void Basic::setReturnedMessageHandler(ReturnedMessageHandler* handler){
+    Mutex::ScopedLock l(lock);
+    returnsHandler = handler;
+}
+
+void Basic::setQos(){
+    channel.sendAndReceive<BasicQosOkBody>(
+        new BasicQosBody(channel.version, 0, channel.getPrefetch(), false));
+    if(channel.isTransactional())
+        channel.sendAndReceive<TxSelectOkBody>(new 
TxSelectBody(channel.version));
+}
+
+
+// TODO aconway 2007-02-22: NOTES:
+// Move incoming to BasicChannel - check for uses.
+
+}} // namespace qpid::client

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.h?view=auto&rev=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.h (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.h Thu Feb 22 15:23:52 
2007
@@ -0,0 +1,195 @@
+#ifndef _client_Basic_h
+#define _client_Basic_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "IncomingMessage.h"
+#include "sys/Runnable.h"
+
+namespace qpid {
+
+namespace framing {
+class AMQMethodBody;
+class FieldTable;
+}
+
+namespace client {
+
+class Channel;
+class Message;
+class Queue;
+class Exchange;
+class MessageListener;
+class ReturnedMessageHandler;
+
+/**
+ * The available acknowledgements modes.
+ * 
+ * \ingroup clientapi
+ */
+enum AckMode {
+    /** No acknowledgement will be sent, broker can
+        discard messages as soon as they are delivered
+        to a consumer using this mode. **/
+    NO_ACK     = 0,  
+    /** Each message will be automatically
+        acknowledged as soon as it is delivered to the
+        application **/  
+    AUTO_ACK   = 1,  
+    /** Acknowledgements will be sent automatically,
+        but not for each message. **/
+    LAZY_ACK   = 2,
+    /** The application is responsible for explicitly
+        acknowledging messages. **/  
+    CLIENT_ACK = 3 
+};
+
+
+/**
+ * Represents the AMQP Basic class for sending and receiving messages.
+ */
+class Basic : public sys::Runnable
+{
+  public:
+    Basic(Channel& parent);
+    
+    /**
+     * Creates a 'consumer' for a queue. Messages in (or arriving
+     * at) that queue will be delivered to consumers
+     * asynchronously.
+     * 
+     * @param queue a Queue instance representing the queue to
+     * consume from
+     * 
+     * @param tag an identifier to associate with the consumer
+     * that can be used to cancel its subscription (if empty, this
+     * will be assigned by the broker)
+     * 
+     * @param listener a pointer to an instance of an
+     * implementation of the MessageListener interface. Messages
+     * received from this queue for this consumer will result in
+     * invocation of the received() method on the listener, with
+     * the message itself passed in.
+     * 
+     * @param ackMode the mode of acknowledgement that the broker
+     * should assume for this consumer. @see AckMode
+     * 
+     * @param noLocal if true, this consumer will not be sent any
+     * message published by this connection
+     * 
+     * @param synch if true this call will block until a response
+     * is received from the broker
+     */
+    void consume(
+        Queue& queue, std::string& tag, MessageListener* listener, 
+        AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
+        const framing::FieldTable* fields = 0);
+        
+    /**
+     * Cancels a subscription previously set up through a call to consume().
+     *
+     * @param tag the identifier used (or assigned) in the consume
+     * request that set up the subscription to be cancelled.
+     * 
+     * @param synch if true this call will block until a response
+     * is received from the broker
+     */
+    void cancel(const std::string& tag, bool synch = true);
+    /**
+     * Synchronous pull of a message from a queue.
+     * 
+     * @param msg a message object that will contain the message
+     * headers and content if the call completes.
+     * 
+     * @param queue the queue to consume from
+     * 
+     * @param ackMode the acknowledgement mode to use (@see
+     * AckMode)
+     * 
+     * @return true if a message was succcessfully dequeued from
+     * the queue, false if the queue was empty.
+     */
+    bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK);
+
+    /**
+     * Publishes (i.e. sends a message to the broker).
+     * 
+     * @param msg the message to publish
+     * 
+     * @param exchange the exchange to publish the message to
+     * 
+     * @param routingKey the routing key to publish with
+     * 
+     * @param mandatory if true and the exchange to which this
+     * publish is directed has no matching bindings, the message
+     * will be returned (see setReturnedMessageHandler()).
+     * 
+     * @param immediate if true and there is no consumer to
+     * receive this message on publication, the message will be
+     * returned (see setReturnedMessageHandler()).
+     */
+    void publish(const Message& msg, const Exchange& exchange,
+                 const std::string& routingKey, 
+                 bool mandatory = false, bool immediate = false);
+
+    /**
+     * Set a handler for this channel that will process any
+     * returned messages
+     * 
+     * @see publish()
+     */
+    void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+
+    /**
+     * Deliver messages from the broker to the appropriate MessageListener. 
+     */
+    void run();
+
+
+  private:
+
+    struct Consumer{
+        MessageListener* listener;
+        AckMode ackMode;
+        int count;
+        u_int64_t lastDeliveryTag;
+    };
+
+    typedef std::map<std::string, Consumer> ConsumerMap;
+
+    void handle(boost::shared_ptr<framing::AMQMethodBody>);
+    void setQos();
+    void cancelAll();
+    void deliver(Consumer& consumer, Message& msg);
+    
+    sys::Mutex lock;
+    Channel& channel;
+    IncomingMessage incoming;
+    ConsumerMap consumers;
+    ReturnedMessageHandler* returnsHandler;
+
+    // FIXME aconway 2007-02-22: Remove friendship.
+  friend class Channel;
+};
+
+}} // namespace qpid::client
+
+
+
+#endif  /*!_client_Basic_h*/

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Basic.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp?view=diff&rev=510705&r1=510704&r2=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp Thu Feb 
22 15:23:52 2007
@@ -37,6 +37,7 @@
 using namespace qpid::sys;
 
 Channel::Channel(bool _transactional, u_int16_t _prefetch) :
+    basic(*this),
     connection(0), 
     prefetch(_prefetch), 
     transactional(_transactional)
@@ -113,19 +114,16 @@
     
 bool Channel::isOpen() const { return connection; }
 
+void Channel::setQos() {
+    basic.setQos();
+    // FIXME aconway 2007-02-22: message
+}
+
 void Channel::setPrefetch(u_int16_t _prefetch){
     prefetch = _prefetch;
     setQos();
 }
 
-void Channel::setQos(){
-    sendAndReceive<BasicQosOkBody>(
-        new BasicQosBody(version, 0, prefetch, false));
-    if(transactional){
-        sendAndReceive<TxSelectOkBody>(new TxSelectBody(version));
-    }
-}
-
 void Channel::declareExchange(Exchange& exchange, bool synch){
     string name = exchange.getName();
     string type = exchange.getType();
@@ -177,114 +175,6 @@
         new QueueBindBody(version, 0, q, e, key,!synch, args));
 }
 
-void Channel::consume(
-    Queue& queue, std::string& tag, MessageListener* listener, 
-    int ackMode, bool noLocal, bool synch, const FieldTable* fields)
-{
-    sendAndReceiveSync<BasicConsumeOkBody>(
-        synch,
-        new BasicConsumeBody(
-            version, 0, queue.getName(), tag, noLocal,
-            ackMode == NO_ACK, false, !synch,
-            fields ? *fields : FieldTable()));
-    if (synch) {
-        BasicConsumeOkBody::shared_ptr response =
-            shared_polymorphic_downcast<BasicConsumeOkBody>(
-                responses.getResponse());
-        tag = response->getConsumerTag();
-    }
-    // FIXME aconway 2007-02-20: Race condition!
-    // We could receive the first message for the consumer
-    // before we create the consumer below.
-    // Move consumer creation to handler for BasicConsumeOkBody
-    {
-        Mutex::ScopedLock l(lock);
-        ConsumerMap::iterator i = consumers.find(tag);
-        if (i != consumers.end())
-            THROW_QPID_ERROR(CLIENT_ERROR,
-                             "Consumer already exists with tag="+tag);
-        Consumer& c = consumers[tag];
-        c.listener = listener;
-        c.ackMode = ackMode;
-        c.lastDeliveryTag = 0;
-    }
-}
-
-void Channel::cancel(const std::string& tag, bool synch) {
-    Consumer c;
-    {
-        Mutex::ScopedLock l(lock);
-        ConsumerMap::iterator i = consumers.find(tag);
-        if (i == consumers.end())
-            return;
-        c = i->second;
-        consumers.erase(i);
-    }
-    if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) 
-        send(new BasicAckBody(version, c.lastDeliveryTag, true));
-    sendAndReceiveSync<BasicCancelOkBody>(
-        synch, new BasicCancelBody(version, tag, !synch));
-}
-
-void Channel::cancelAll(){
-    ConsumerMap consumersCopy;
-    {
-        Mutex::ScopedLock l(lock);
-        consumersCopy = consumers;
-        consumers.clear();
-    }
-    for (ConsumerMap::iterator i=consumersCopy.begin();
-         i  != consumersCopy.end(); ++i)
-    {
-        Consumer& c = i->second;
-        if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
-            && c.lastDeliveryTag > 0)
-        {
-            send(new BasicAckBody(version, c.lastDeliveryTag, true));
-        }
-    }
-}
-
-bool Channel::get(Message& msg, const Queue& queue, int ackMode) {
-    // Expect a message starting with a BasicGetOk
-    incoming.startGet();
-    send(new BasicGetBody(version, 0, queue.getName(), ackMode));
-    return incoming.waitGet(msg);
-}
-
-    
-void Channel::publish(
-    const Message& msg, const Exchange& exchange,
-    const std::string& routingKey, bool mandatory, bool immediate)
-{
-    // FIXME aconway 2007-01-30: Rework for 0-9 message class.
-    const string e = exchange.getName();
-    string key = routingKey;
-
-    send(new BasicPublishBody(version, 0, e, key, mandatory, immediate));
-    //break msg up into header frame and content frame(s) and send these
-    send(msg.header);
-    string data = msg.getData();
-    u_int64_t data_length = data.length();
-    if(data_length > 0){
-        u_int32_t frag_size = connection->getMaxFrameSize() - 8;//frame itself 
uses 8 bytes
-        if(data_length < frag_size){
-            send(new AMQContentBody(data));
-        }else{
-            u_int32_t offset = 0;
-            u_int32_t remaining = data_length - offset;
-            while (remaining > 0) {
-                u_int32_t length = remaining > frag_size ? frag_size : 
remaining;
-                string frag(data.substr(offset, length));
-                send(new AMQContentBody(frag));                          
-                
-                offset += length;
-                remaining = data_length - offset;
-            }
-        }
-    }
-}
-    
 void Channel::commit(){
     sendAndReceive<TxCommitOkBody>(new TxCommitBody(version));
 }
@@ -294,46 +184,53 @@
 }
 
 void Channel::handleMethodInContext(
-    AMQMethodBody::shared_ptr body, const MethodContext&)
+    AMQMethodBody::shared_ptr method, const MethodContext&)
 {
-    //channel.flow, channel.close, basic.deliver, basic.return or a
-    //response to a synchronous request
     if(responses.isWaiting()) {
-        responses.signalResponse(body);
+        responses.signalResponse(method);
         return;
     }
-
-    if(body->isA<BasicDeliverBody>()
-       || body->isA<BasicReturnBody>()
-       || body->isA<BasicGetOkBody>()
-       || body->isA<BasicGetEmptyBody>())
-        
-    {
-        incoming.add(body);
-        return;
+    try {
+        switch (method->amqpClassId()) {
+          case BasicDeliverBody::CLASS_ID: basic.handle(method); break;
+          case ChannelCloseBody::CLASS_ID: handleChannel(method); break;
+          case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
+          default: throw UnknownMethod();
+        }
     }
-    else if(body->isA<ChannelCloseBody>()) {
-        peerClose(shared_polymorphic_downcast<ChannelCloseBody>(body));
+    catch (const UnknownMethod&) {
+        connection->close(
+            504, "Unknown method",
+            method->amqpClassId(), method->amqpMethodId());
     }
-    else if(body->isA<ChannelFlowBody>()){
-        // TODO aconway 2007-01-24: not implemented yet.
+}
+
+void Channel::handleChannel(AMQMethodBody::shared_ptr method) {
+    switch (method->amqpMethodId()) {
+      case ChannelCloseBody::METHOD_ID:
+        peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method));
+        return;
+      case ChannelFlowBody::METHOD_ID:
+        // FIXME aconway 2007-02-22: Not yet implemented.
+        return;
     }
-    else if(body->isA<ConnectionCloseBody>()){
+    throw UnknownMethod();
+}
+
+void Channel::handleConnection(AMQMethodBody::shared_ptr method) {
+    if (method->amqpMethodId() == ConnectionCloseBody::METHOD_ID) {
         connection->close();
-    }
-    else {
-        connection->close(
-            504, "Unrecognised method",
-            body->amqpClassId(), body->amqpMethodId());
-    }
+        return;
+    } 
+    throw UnknownMethod();
 }
 
 void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
-    incoming.add(body);
+    basic.incoming.add(body);
 }
     
 void Channel::handleContent(AMQContentBody::shared_ptr body){
-    incoming.add(body);
+    basic.incoming.add(body);
 }
 
 void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
@@ -341,82 +238,7 @@
 }
 
 void Channel::start(){
-    dispatcher = Thread(this);
-}
-
-void Channel::deliver(Consumer& consumer, Message& msg){
-    //record delivery tag:
-    consumer.lastDeliveryTag = msg.getDeliveryTag();
-
-    //allow registered listener to handle the message
-    consumer.listener->received(msg);
-
-    if(isOpen()){
-        bool multiple(false);
-        switch(consumer.ackMode){
-          case LAZY_ACK: 
-            multiple = true;
-            if(++(consumer.count) < prefetch) break;
-            //else drop-through
-          case AUTO_ACK:
-            send(new BasicAckBody(version, msg.getDeliveryTag(), multiple));
-            consumer.lastDeliveryTag = 0;
-        }
-    }
-
-    //as it stands, transactionality is entirely orthogonal to ack
-    //mode, though the acks will not be processed by the broker under
-    //a transaction until it commits.
-}
-
-void Channel::run() {
-    while(isOpen()) {
-        try {
-            Message msg = incoming.waitDispatch();
-            if(msg.getMethod()->isA<BasicReturnBody>()) {
-                ReturnedMessageHandler* handler=0;
-                {
-                    Mutex::ScopedLock l(lock);
-                    handler=returnsHandler;
-                }
-                if(handler == 0) {
-                    // TODO aconway 2007-02-20: proper logging.
-                    cout << "Message returned: " << msg.getData() << endl;
-                }
-                else 
-                    handler->returned(msg);
-            }
-            else {
-                BasicDeliverBody::shared_ptr deliverBody =
-                    boost::shared_polymorphic_downcast<BasicDeliverBody>(
-                        msg.getMethod());
-                std::string tag = deliverBody->getConsumerTag();
-                Consumer consumer;
-                {
-                    Mutex::ScopedLock l(lock);
-                    ConsumerMap::iterator i = consumers.find(tag);
-                    if(i == consumers.end()) 
-                        THROW_QPID_ERROR(PROTOCOL_ERROR+504,
-                                         "Unknown consumer tag=" + tag);
-                    consumer = i->second;
-                }
-                deliver(consumer, msg);
-            }
-        }
-        catch (const ShutdownException&) {
-            /* Orderly shutdown */
-        }
-        catch (const Exception& e) {
-            // FIXME aconway 2007-02-20: Report exception to user.
-            cout << "client::Channel::run() terminated by: " << e.toString()
-                 << "(" << typeid(e).name() << ")" << endl;
-        }
-    }
-}
-
-void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
-    Mutex::ScopedLock l(lock);
-    returnsHandler = handler;
+    basicDispatcher = Thread(basic);
 }
 
 // Close called by local application.
@@ -452,13 +274,13 @@
 void Channel::closeInternal() {
     if (isOpen());
     {
-        cancelAll();
-        incoming.shutdown();
+        basic.cancelAll();
+        basic.incoming.shutdown();
         connection = 0;
         // A 0 response means we are closed.
         responses.signalResponse(AMQMethodBody::shared_ptr());
     }
-    dispatcher.join();        
+    basicDispatcher.join();        
 }
 
 void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m)

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h?view=diff&rev=510705&r1=510704&r2=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h Thu Feb 22 
15:23:52 2007
@@ -21,23 +21,15 @@
  * under the License.
  *
  */
-#include <map>
-#include <string>
-#include <queue>
-#include <boost/scoped_ptr.hpp>
 #include "sys/types.h"
-
 #include <framing/amqp_framing.h>
 #include <ClientExchange.h>
-#include <IncomingMessage.h>
 #include <ClientMessage.h>
-#include <MessageListener.h>
 #include <ClientQueue.h>
 #include <ResponseHandler.h>
-#include <ReturnedMessageHandler.h>
-#include "Runnable.h"
 #include "ChannelAdapter.h"
 #include "Thread.h"
+#include "Basic.h"
 
 namespace qpid {
 
@@ -50,27 +42,6 @@
 
 class Connection;
 
-/**
- * The available acknowledgements modes
- * 
- * \ingroup clientapi
- */
-enum ack_modes {
-    /** No acknowledgement will be sent, broker can
-        discard messages as soon as they are delivered
-        to a consumer using this mode. **/
-    NO_ACK     = 0,  
-    /** Each message will be automatically
-        acknowledged as soon as it is delivered to the
-        application **/  
-    AUTO_ACK   = 1,  
-    /** Acknowledgements will be sent automatically,
-        but not for each message. **/
-    LAZY_ACK   = 2,
-    /** The application is responsible for explicitly
-        acknowledging messages. **/  
-    CLIENT_ACK = 3 
-};
 
 /**
  * Represents an AMQP channel, i.e. loosely a session of work. It
@@ -79,41 +50,34 @@
  * 
  * \ingroup clientapi
  */
-class Channel : public framing::ChannelAdapter,
-                public sys::Runnable
+class Channel : public framing::ChannelAdapter
 {
-    struct Consumer{
-        MessageListener* listener;
-        int ackMode;
-        int count;
-        u_int64_t lastDeliveryTag;
-    };
-    typedef std::map<std::string, Consumer> ConsumerMap;
+  private:
+    // TODO aconway 2007-02-22: Remove friendship.
+  friend class Basic;
+    // FIXME aconway 2007-02-22: friend class Message;
     
+    struct UnknownMethod {};
+        
     sys::Mutex lock;
+    Basic basic;
     Connection* connection;
-    sys::Thread dispatcher;
-    IncomingMessage incoming;
+    sys::Thread basicDispatcher;
     ResponseHandler responses;
-    ConsumerMap consumers;
-    ReturnedMessageHandler* returnsHandler;
 
     u_int16_t prefetch;
     const bool transactional;
     framing::ProtocolVersion version;
 
-    void retrieve(Message& msg);
-    void deliver(Consumer& consumer, Message& msg);
-        
     void handleHeader(framing::AMQHeaderBody::shared_ptr body);
     void handleContent(framing::AMQContentBody::shared_ptr body);
     void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
-        
     void handleMethodInContext(
-        framing::AMQMethodBody::shared_ptr,
-        const framing::MethodContext& method);
+        framing::AMQMethodBody::shared_ptr, const framing::MethodContext&);
+    void handleChannel(framing::AMQMethodBody::shared_ptr method);
+    void handleConnection(framing::AMQMethodBody::shared_ptr method);
+
     void setQos();
-    void cancelAll();
 
     void protocolInit(
         const std::string& uid, const std::string& pwd,
@@ -148,18 +112,18 @@
   public:
 
     /**
-         * Creates a channel object.
-         * 
-         * @param transactional if true, the publishing and acknowledgement
-         * of messages will be transactional and can be committed or
-         * aborted in atomic units (@see commit(), @see rollback())
-         * 
-         * @param prefetch specifies the number of unacknowledged
-         * messages the channel is willing to have sent to it
-         * asynchronously
+     * Creates a channel object.
+     * 
+     * @param transactional if true, the publishing and acknowledgement
+     * of messages will be transactional and can be committed or
+     * aborted in atomic units (@see commit(), @see rollback())
+     * 
+     * @param prefetch specifies the number of unacknowledged
+     * messages the channel is willing to have sent to it
+     * asynchronously
      */
-       Channel(bool transactional = false, u_int16_t prefetch = 500);
-       ~Channel();
+    Channel(bool transactional = false, u_int16_t prefetch = 500);
+    ~Channel();
 
     /**
      * Declares an exchange.
@@ -221,85 +185,16 @@
      * @param synch if true this call will block until a response
      * is received from the broker
      */
-    void bind(const Exchange& exchange, const Queue& queue, const std::string& 
key, 
-              const framing::FieldTable& args, bool synch = true);
-    /**
-     * Creates a 'consumer' for a queue. Messages in (or arriving
-     * at) that queue will be delivered to consumers
-     * asynchronously.
-     * 
-     * @param queue a Queue instance representing the queue to
-     * consume from
-     * 
-     * @param tag an identifier to associate with the consumer
-     * that can be used to cancel its subscription (if empty, this
-     * will be assigned by the broker)
-     * 
-     * @param listener a pointer to an instance of an
-     * implementation of the MessageListener interface. Messages
-     * received from this queue for this consumer will result in
-     * invocation of the received() method on the listener, with
-     * the message itself passed in.
-     * 
-     * @param ackMode the mode of acknowledgement that the broker
-     * should assume for this consumer. @see ack_modes
-     * 
-     * @param noLocal if true, this consumer will not be sent any
-     * message published by this connection
-     * 
-     * @param synch if true this call will block until a response
-     * is received from the broker
-     */
-    void consume(
-        Queue& queue, std::string& tag, MessageListener* listener, 
-        int ackMode = NO_ACK, bool noLocal = false, bool synch = true,
-        const framing::FieldTable* fields = 0);
-        
-    /**
-     * Cancels a subscription previously set up through a call to consume().
-     *
-     * @param tag the identifier used (or assigned) in the consume
-     * request that set up the subscription to be cancelled.
-     * 
-     * @param synch if true this call will block until a response
-     * is received from the broker
-     */
-    void cancel(const std::string& tag, bool synch = true);
-    /**
-     * Synchronous pull of a message from a queue.
-     * 
-     * @param msg a message object that will contain the message
-     * headers and content if the call completes.
-     * 
-     * @param queue the queue to consume from
-     * 
-     * @param ackMode the acknowledgement mode to use (@see
-     * ack_modes)
-     * 
-     * @return true if a message was succcessfully dequeued from
-     * the queue, false if the queue was empty.
-     */
-    bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK);
+    void bind(const Exchange& exchange, const Queue& queue,
+              const std::string& key, const framing::FieldTable& args,
+              bool synch = true);
+
     /**
-     * Publishes (i.e. sends a message to the broker).
-     * 
-     * @param msg the message to publish
-     * 
-     * @param exchange the exchange to publish the message to
-     * 
-     * @param routingKey the routing key to publish with
-     * 
-     * @param mandatory if true and the exchange to which this
-     * publish is directed has no matching bindings, the message
-     * will be returned (see setReturnedMessageHandler()).
-     * 
-     * @param immediate if true and there is no consumer to
-     * receive this message on publication, the message will be
-     * returned (see setReturnedMessageHandler()).
+     * Get a Basic object which provides functions to send and
+     * receive messages using the AMQP 0-8 Basic class methods.
+     [EMAIL PROTECTED] Basic
      */
-    void publish(const Message& msg, const Exchange& exchange,
-                 const std::string& routingKey, 
-                 bool mandatory = false, bool immediate = false);
+    Basic& getBasic() { return basic; }
 
     /**
      * For a transactional channel this will commit all
@@ -314,6 +209,7 @@
      * object is created (@see Channel()).
      */
     void commit();
+    
     /**
      * For a transactional channel, this will rollback any
      * publications or acknowledgements. It will be as if the
@@ -327,33 +223,25 @@
      */
     void setPrefetch(u_int16_t prefetch);
 
+    u_int16_t getPrefetch() { return prefetch; }
+
     /**
      * Start message dispatching on a new thread
      */
     void start();
 
-    // TODO aconway 2007-01-26: Can it be private?
-    /**
-     * Dispatch messages on this channel in the calling thread.
-     */
-    void run();
-
     /**
      * Close the channel with optional error information.
      * Closing a channel that is not open has no effect.
      */
     void close(
         framing::ReplyCode = 200, const std::string& ="OK",
-         framing::ClassId = 0, framing::MethodId  = 0);
-
-    /**
-     * Set a handler for this channel that will process any
-     * returned messages
-     * 
-     * @see publish()
-     */
-    void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+        framing::ClassId = 0, framing::MethodId  = 0);
 
+    /** True if the channel is transactional */
+    bool isTransactional() { return transactional; }
+    
+    /** True if the channel is open */
     bool isOpen() const;
 };
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.h?view=diff&rev=510705&r1=510704&r2=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.h Thu Feb 22 
15:23:52 2007
@@ -112,7 +112,8 @@
     { return method; }
         
     void setMethod(framing::AMQMethodBody::shared_ptr m) { method=m; }
-    boost::shared_ptr<framing::AMQHeaderBody> getHeader();
+    boost::shared_ptr<framing::AMQHeaderBody> getHeader() const
+    { return header; }
 
     // TODO aconway 2007-02-15: remove friendships.
   friend class IncomingMessage;

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h?view=diff&rev=510705&r1=510704&r2=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h Thu Feb 22 
15:23:52 2007
@@ -31,6 +31,7 @@
 #include <sys/ShutdownHandler.h>
 #include <sys/TimeoutHandler.h>
 
+
 #include "framing/amqp_types.h"
 #include <framing/amqp_framing.h>
 #include <ClientExchange.h>

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Makefile.am?view=diff&rev=510705&r1=510704&r2=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Makefile.am Thu Feb 22 
15:23:52 2007
@@ -13,6 +13,7 @@
   ClientExchange.cpp                           \
   ClientMessage.cpp                            \
   ClientQueue.cpp                              \
+  Basic.cpp                                    \
   Connection.cpp                               \
   Connector.cpp                                        \
   IncomingMessage.cpp                          \
@@ -24,6 +25,7 @@
   ClientExchange.h                             \
   ClientMessage.h                              \
   ClientQueue.h                                        \
+  Basic.h                                      \
   Connection.h                                 \
   Connector.h                                  \
   IncomingMessage.h                            \

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ThreadSafeQueue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ThreadSafeQueue.h?view=diff&rev=510705&r1=510704&r2=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ThreadSafeQueue.h 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/ThreadSafeQueue.h Thu 
Feb 22 15:23:52 2007
@@ -33,7 +33,6 @@
 class ThreadSafeQueue
 {
   public:
-    struct QueueStoppedException : public Exception {};
 
     ThreadSafeQueue() {}
 
@@ -47,7 +46,7 @@
     }
 
     /** Pop a value from the front of the queue. Waits till value is available.
-     [EMAIL PROTECTED] QueueStoppedException if queue is stopped while waiting.
+     [EMAIL PROTECTED] ShutdownException if queue is stopped while waiting.
      */
     T pop() {
         ProducerConsumer::ConsumerLock consumer(pc);
@@ -57,7 +56,7 @@
             container.pop_front();
             return value;
         }
-        throw QueueStoppedException();
+        throw ShutdownException();
     }
 
     /**

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/ClientChannelTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/ClientChannelTest.cpp?view=diff&rev=510705&r1=510704&r2=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/ClientChannelTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/ClientChannelTest.cpp Thu Feb 22 
15:23:52 2007
@@ -76,21 +76,21 @@
     void testPublishGet() {
         Message pubMsg(data);
         pubMsg.getHeaders().setString("hello", "world");
-        channel.publish(pubMsg, exchange, qname);
+        channel.getBasic().publish(pubMsg, exchange, qname);
         Message getMsg;
-        CPPUNIT_ASSERT(channel.get(getMsg, queue));
+        CPPUNIT_ASSERT(channel.getBasic().get(getMsg, queue));
         CPPUNIT_ASSERT_EQUAL(data, getMsg.getData());
         CPPUNIT_ASSERT_EQUAL(string("world"),
                              getMsg.getHeaders().getString("hello"));
-        CPPUNIT_ASSERT(!channel.get(getMsg, queue)); // Empty queue
+        CPPUNIT_ASSERT(!channel.getBasic().get(getMsg, queue)); // Empty queue
     }
 
     void testGetNoContent() {
         Message pubMsg;
         pubMsg.getHeaders().setString("hello", "world");
-        channel.publish(pubMsg, exchange, qname);
+        channel.getBasic().publish(pubMsg, exchange, qname);
         Message getMsg;
-        CPPUNIT_ASSERT(channel.get(getMsg, queue));
+        CPPUNIT_ASSERT(channel.getBasic().get(getMsg, queue));
         CPPUNIT_ASSERT(getMsg.getData().empty());
         CPPUNIT_ASSERT_EQUAL(string("world"),
                              getMsg.getHeaders().getString("hello"));
@@ -98,10 +98,10 @@
 
     void testConsumeCancel() {
         string tag;             // Broker assigned
-        channel.consume(queue, tag, &listener);
+        channel.getBasic().consume(queue, tag, &listener);
         channel.start();
         CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
-        channel.publish(Message("a"), exchange, qname);
+        channel.getBasic().publish(Message("a"), exchange, qname);
         {
             Mutex::ScopedLock l(listener.monitor);
             Time deadline(now() + 1*TIME_SEC);
@@ -112,8 +112,8 @@
         CPPUNIT_ASSERT_EQUAL(size_t(1), listener.messages.size());
         CPPUNIT_ASSERT_EQUAL(string("a"), listener.messages[0].getData());
             
-        channel.publish(Message("b"), exchange, qname);
-        channel.publish(Message("c"), exchange, qname);
+        channel.getBasic().publish(Message("b"), exchange, qname);
+        channel.getBasic().publish(Message("c"), exchange, qname);
         {
             Mutex::ScopedLock l(listener.monitor);
             while (listener.messages.size() != 3) {
@@ -124,15 +124,15 @@
         CPPUNIT_ASSERT_EQUAL(string("b"), listener.messages[1].getData());
         CPPUNIT_ASSERT_EQUAL(string("c"), listener.messages[2].getData());
     
-        channel.cancel(tag);
-        channel.publish(Message("d"), exchange, qname);
+        channel.getBasic().cancel(tag);
+        channel.getBasic().publish(Message("d"), exchange, qname);
         CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size());
         {
             Mutex::ScopedLock l(listener.monitor);
             CPPUNIT_ASSERT(!listener.monitor.wait(TIME_SEC/2));
         }
         Message msg;
-        CPPUNIT_ASSERT(channel.get(msg, queue));
+        CPPUNIT_ASSERT(channel.getBasic().get(msg, queue));
         CPPUNIT_ASSERT_EQUAL(string("d"), msg.getData());
     }
 
@@ -140,9 +140,9 @@
     void testConsumePublished() {
         Message pubMsg("x");
         pubMsg.getHeaders().setString("y", "z");
-        channel.publish(pubMsg, exchange, qname);
+        channel.getBasic().publish(pubMsg, exchange, qname);
         string tag;
-        channel.consume(queue, tag, &listener);
+        channel.getBasic().consume(queue, tag, &listener);
         CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
         channel.start();
         {

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/client_test.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/client_test.cpp?view=diff&rev=510705&r1=510704&r2=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/client_test.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/client_test.cpp Thu Feb 22 
15:23:52 2007
@@ -101,7 +101,7 @@
        Monitor monitor;
        SimpleListener listener(&monitor);
        string tag("MyTag");
-       channel.consume(queue, tag, &listener);
+       channel.getBasic().consume(queue, tag, &listener);
        if (verbose) std::cout << "Registered consumer." << std::endl;
 
         //we need to enable the message dispatching for this channel
@@ -114,7 +114,7 @@
        Message msg;
        string data("MyMessage");
        msg.setData(data);
-       channel.publish(msg, exchange, "MyTopic");
+       channel.getBasic().publish(msg, exchange, "MyTopic");
        if (verbose) std::cout << "Published message: " << data << std::endl;
 
        {

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/echo_service.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/echo_service.cpp?view=diff&rev=510705&r1=510704&r2=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/echo_service.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/echo_service.cpp Thu Feb 22 
15:23:52 2007
@@ -116,7 +116,7 @@
             //Consume from the response queue, logging all echoed message to 
console:
             LoggingListener listener;
             std::string tag;
-            channel.consume(response, tag, &listener);
+            channel.getBasic().consume(response, tag, &listener);
 
             //Process incoming requests on a new thread
             channel.start();
@@ -129,7 +129,7 @@
                 Message msg;
                 msg.getHeaders().setString("RESPONSE_QUEUE", 
response.getName());
                 msg.setData(text);
-                channel.publish(msg, Exchange::STANDARD_DIRECT_EXCHANGE, 
echo_service);
+                channel.getBasic().publish(msg, 
Exchange::STANDARD_DIRECT_EXCHANGE, echo_service);
                 
                 std::cout << "Enter text to send:" << std::endl;
             }
@@ -158,10 +158,10 @@
             //Consume from the request queue, echoing back all messages 
received to the client that sent them
             EchoServer server(&channel);
             std::string tag = "server_tag";
-            channel.consume(request, tag, &server);
+            channel.getBasic().consume(request, tag, &server);
 
             //Process incoming requests on the main thread
-            channel.run();
+            channel.getBasic().run();
             
             connection.close();
         } catch(qpid::QpidError error) {
@@ -184,7 +184,7 @@
         std::cout << "Echoing " << message.getData() << " back to " << name << 
std::endl;
         
         //'echo' the message back:
-        channel->publish(message, Exchange::STANDARD_DIRECT_EXCHANGE, name);
+        channel->getBasic().publish(message, 
Exchange::STANDARD_DIRECT_EXCHANGE, name);
     }
 }
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/topic_listener.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/topic_listener.cpp?view=diff&rev=510705&r1=510704&r2=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/topic_listener.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/topic_listener.cpp Thu Feb 22 
15:23:52 2007
@@ -71,7 +71,7 @@
 class Args{
     string host;
     int port;
-    int ackMode;
+    AckMode ackMode;
     bool transactional;
     int prefetch;
     bool trace;
@@ -81,13 +81,13 @@
     void parse(int argc, char** argv);
     void usage();
 
-    inline const string& getHost() const { return host;}
-    inline int getPort() const { return port; }
-    inline int getAckMode(){ return ackMode; }
-    inline bool getTransactional() const { return transactional; }
-    inline int getPrefetch(){ return prefetch; }
-    inline bool getTrace() const { return trace; }
-    inline bool getHelp() const { return help; }
+    const string& getHost() const { return host;}
+    int getPort() const { return port; }
+    AckMode getAckMode(){ return ackMode; }
+    bool getTransactional() const { return transactional; }
+    int getPrefetch(){ return prefetch; }
+    bool getTrace() const { return trace; }
+    bool getHelp() const { return help; }
 };
 
 /**
@@ -119,9 +119,9 @@
             //set up listener
             Listener listener(&channel, response.getName(), 
args.getTransactional());
             string tag;
-            channel.consume(control, tag, &listener, args.getAckMode());
+            channel.getBasic().consume(control, tag, &listener, 
args.getAckMode());
             cout << "topic_listener: Consuming." << endl;
-            channel.run();
+            channel.getBasic().run();
             connection.close();
             cout << "topic_listener: normal exit" << endl;
             return 0;
@@ -166,7 +166,7 @@
               << time/TIME_MSEC << " ms.";
     Message msg(reportstr.str());
     msg.getHeaders().setString("TYPE", "REPORT");
-    channel->publish(msg, string(), responseQueue);
+    channel->getBasic().publish(msg, string(), responseQueue);
     if(transactional){
         channel->commit();
     }
@@ -184,7 +184,7 @@
         }else if("-port" == name){
             port = atoi(argv[++i]);
         }else if("-ack_mode" == name){
-            ackMode = atoi(argv[++i]);
+            ackMode = AckMode(atoi(argv[++i]));
         }else if("-transactional" == name){
             transactional = true;
         }else if("-prefetch" == name){

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/topic_publisher.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/topic_publisher.cpp?view=diff&rev=510705&r1=510704&r2=510705
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/topic_publisher.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/topic_publisher.cpp Thu Feb 22 
15:23:52 2007
@@ -80,7 +80,7 @@
     int port;
     int messages;
     int subscribers;
-    int ackMode;
+    AckMode ackMode;
     bool transactional;
     int prefetch;
     int batches;
@@ -96,18 +96,18 @@
     void parse(int argc, char** argv);
     void usage();
 
-    inline const string& getHost() const { return host;}
-    inline int getPort() const { return port; }
-    inline int getMessages() const { return messages; }
-    inline int getSubscribers() const { return subscribers; }
-    inline int getAckMode(){ return ackMode; }
-    inline bool getTransactional() const { return transactional; }
-    inline int getPrefetch(){ return prefetch; }
-    inline int getBatches(){ return batches; }
-    inline int getDelay(){ return delay; }
-    inline int getSize(){ return size; }
-    inline bool getTrace() const { return trace; }
-    inline bool getHelp() const { return help; }
+    const string& getHost() const { return host;}
+    int getPort() const { return port; }
+    int getMessages() const { return messages; }
+    int getSubscribers() const { return subscribers; }
+    AckMode getAckMode(){ return ackMode; }
+    bool getTransactional() const { return transactional; }
+    int getPrefetch(){ return prefetch; }
+    int getBatches(){ return batches; }
+    int getDelay(){ return delay; }
+    int getSize(){ return size; }
+    bool getTrace() const { return trace; }
+    bool getHelp() const { return help; }
 };
 
 int main(int argc, char** argv) {
@@ -129,7 +129,7 @@
             //set up listener
             Publisher publisher(&channel, "topic_control", 
args.getTransactional());
             std::string tag("mytag");
-            channel.consume(response, tag, &publisher, args.getAckMode());
+            channel.getBasic().consume(response, tag, &publisher, 
args.getAckMode());
             channel.start();
 
             int batchSize(args.getBatches());
@@ -187,12 +187,12 @@
     {
         Monitor::ScopedLock l(monitor);
         for(int i = 0; i < msgs; i++){
-            channel->publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, 
controlTopic);
+            channel->getBasic().publish(msg, 
Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
         }
         //send report request
         Message reportRequest;
         reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
-        channel->publish(reportRequest, Exchange::STANDARD_TOPIC_EXCHANGE, 
controlTopic);
+        channel->getBasic().publish(reportRequest, 
Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
         if(transactional){
             channel->commit();
         }
@@ -216,7 +216,7 @@
     //send termination request
     Message terminationRequest;
     terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST");
-    channel->publish(terminationRequest, Exchange::STANDARD_TOPIC_EXCHANGE, 
controlTopic);
+    channel->getBasic().publish(terminationRequest, 
Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
     if(transactional){
         channel->commit();
     }
@@ -237,7 +237,7 @@
         }else if("-subscribers" == name){
             subscribers = atoi(argv[++i]);
         }else if("-ack_mode" == name){
-            ackMode = atoi(argv[++i]);
+            ackMode = AckMode(atoi(argv[++i]));
         }else if("-transactional" == name){
             transactional = true;
         }else if("-prefetch" == name){


Reply via email to