Author: aconway
Date: Tue Feb  6 13:38:30 2007
New Revision: 504305

URL: http://svn.apache.org/viewvc?view=rev&rev=504305
Log:
* cpp/lib/broker/BrokerQueue.cpp (): Centralized exceptions.
* cpp/lib/broker/BrokerAdapter.cpp (consume): Moved exceptions to Queue
* cpp/lib/broker/BrokerChannel.cpp (consume): Moved exceptions to Queue
* cpp/lib/broker/BrokerMessageBase.cpp:
 - Added getApplicationHeaders.
* cpp/lib/broker/BrokerMessageMessage.cpp:
 - Fixed exchangeName/destination mix up.
 - Removed redundant constructor.
 - Added getApplicationHeaders
* cpp/lib/broker/MessageHandlerImpl.cpp:
 - Added missing acknowledgements
 - Replaced assert(0) with throw "unimplemented".
 - Moved exchange existence exceptions to ExchangeRegistry
 - Handle transfers with references.
* cpp/tests/Makefile.am (check): Don't run tests unless all libs built OK.
* cpp/tests/python_tests: Re-enabled python tests. Not all passing.

* python/tests/message.py (MessageTests.test_get): Replace get-ok with ok.

Modified:
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerQueue.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerQueue.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.h
    incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am
    incubator/qpid/branches/qpid.0-9/cpp/tests/python_tests
    incubator/qpid/branches/qpid.0-9/python/tests/message.py

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp?view=diff&rev=504305&r1=504304&r2=504305
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp Tue Feb  
6 13:38:30 2007
@@ -15,6 +15,8 @@
  * limitations under the License.
  *
  */
+#include <boost/format.hpp>
+
 #include "BrokerAdapter.h"
 #include "Connection.h"
 #include "Exception.h"
@@ -25,6 +27,7 @@
 namespace qpid {
 namespace broker {
 
+using boost::format;
 using namespace qpid;
 using namespace qpid::framing;
 
@@ -151,9 +154,11 @@
            }
        }
     }
-    if (exclusive && !queue->isExclusiveOwner(&connection)) {
-       throw ChannelException(405, "Cannot grant exclusive access to queue");
-    }
+    if (exclusive && !queue->isExclusiveOwner(&connection)) 
+       throw ChannelException(
+            405,
+            format("Cannot grant exclusive access to queue '%s'")
+            % queue->getName());
     if (!nowait) {
         string queueName = queue->getName();
         connection.client->getQueue().declareOk(context, queueName, 
queue->getMessageCount(), queue->getConsumerCount());
@@ -248,20 +253,14 @@
         throw ConnectionException(530, "Consumer tags must be unique");
     }
 
-    try{
-        string newTag = consumerTag;
-        channel.consume(
-            newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, 
&fields);
-
-        if(!nowait) connection.client->getBasic().consumeOk(context, newTag);
-
-        //allow messages to be dispatched if required as there is now a 
consumer:
-        queue->dispatch();
-    }catch(ExclusiveAccessException& e){
-        if(exclusive) throw ChannelException(403, "Exclusive access cannot be 
granted");
-        else throw ChannelException(403, "Access would violate previously 
granted exclusivity");
-    }
+    string newTag = consumerTag;
+    channel.consume(
+        newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
+
+    if(!nowait) connection.client->getBasic().consumeOk(context, newTag);
 
+    //allow messages to be dispatched if required as there is now a consumer:
+    queue->dispatch();
 } 
         
 void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::cancel(const 
MethodContext& context, const string& consumerTag, bool nowait){

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=504305&r1=504304&r2=504305
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp Tue Feb  
6 13:38:30 2007
@@ -82,10 +82,11 @@
     ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
     try{
         queue->consume(c, exclusive);//may throw exception
-        consumers[tag] = c;
-    }catch(ExclusiveAccessException& e){
+    consumers[tag] = c;
+    } catch(...) {
+        // FIXME aconway 2007-02-06: auto_ptr for exception safe mem. mgmt.
         delete c;
-        throw e;
+        throw;
     }
 }
 
@@ -190,11 +191,11 @@
 void Channel::handleInlineTransfer(Message::shared_ptr& msg, 
Exchange::shared_ptr& exch){
     if(transactional){
         TxPublish* deliverable = new TxPublish(msg);
-        exch->route(*deliverable, msg->getRoutingKey(), 
&(msg->getHeaderProperties()->getHeaders()));
+        exch->route(*deliverable, msg->getRoutingKey(), 
&(msg->getApplicationHeaders()));
         txBuffer.enlist(new DeletingTxOp(deliverable));
     }else{
         DeliverableMessage deliverable(msg);
-        exch->route(deliverable, msg->getRoutingKey(), 
&(msg->getHeaderProperties()->getHeaders()));
+        exch->route(deliverable, msg->getRoutingKey(), 
&(msg->getApplicationHeaders()));
     }
 }
 
@@ -227,12 +228,12 @@
     if(transactional) {
         std::auto_ptr<TxPublish> deliverable(new TxPublish(msg));
         exchange->route(*deliverable, msg->getRoutingKey(),
-                        &(msg->getHeaderProperties()->getHeaders()));
+                        &(msg->getApplicationHeaders()));
         txBuffer.enlist(new DeletingTxOp(deliverable.release()));
     } else {
         DeliverableMessage deliverable(msg);
         exchange->route(deliverable, msg->getRoutingKey(),
-                        &(msg->getHeaderProperties()->getHeaders()));
+                        &(msg->getApplicationHeaders()));
     }
 }
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp?view=diff&rev=504305&r1=504304&r2=504305
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp Tue Feb  
6 13:38:30 2007
@@ -18,6 +18,8 @@
  * under the License.
  *
  */
+#include <boost/cast.hpp>
+
 #include <BrokerMessage.h>
 #include <iostream>
 
@@ -116,7 +118,12 @@
 }
 
 BasicHeaderProperties* BasicMessage::getHeaderProperties(){
-    return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
+    return boost::polymorphic_downcast<BasicHeaderProperties*>(
+        header->getProperties());
+}
+
+const FieldTable& BasicMessage::getApplicationHeaders(){
+    return getHeaderProperties()->getHeaders();
 }
 
 const ConnectionToken* const BasicMessage::getPublisher(){

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h?view=diff&rev=504305&r1=504304&r2=504305
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h Tue Feb  6 
13:38:30 2007
@@ -85,6 +85,7 @@
                    u_int32_t framesize);
 
     framing::BasicHeaderProperties* getHeaderProperties();
+    const framing::FieldTable& getApplicationHeaders();
     bool isPersistent();
     u_int64_t contentSize() const { return size; }
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h?view=diff&rev=504305&r1=504304&r2=504305
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h Tue Feb 
 6 13:38:30 2007
@@ -37,6 +37,7 @@
 class MethodContext;
 class ChannelAdapter;
 class BasicHeaderProperties;
+class FieldTable;
 }
        
 namespace broker {
@@ -114,7 +115,12 @@
     virtual bool isComplete() = 0;
             
     virtual u_int64_t contentSize() const = 0;
+    // FIXME aconway 2007-02-06: Get rid of BasicHeaderProperties
+    // at this level. Expose only generic properties available from both
+    // message types (e.g. getApplicationHeaders below).
+    // 
     virtual framing::BasicHeaderProperties* getHeaderProperties() = 0;
+    virtual const framing::FieldTable& getApplicationHeaders() = 0;
     virtual bool isPersistent() = 0;
     virtual const ConnectionToken* const getPublisher() = 0;
 

Modified: 
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp?view=diff&rev=504305&r1=504304&r2=504305
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp 
Tue Feb  6 13:38:30 2007
@@ -18,11 +18,13 @@
  * under the License.
  *
  */
+#include "QpidError.h"
 #include "BrokerMessageMessage.h"
 #include "ChannelAdapter.h"
 #include "MessageTransferBody.h"
 #include "MessageAppendBody.h"
 #include "Reference.h"
+#include "framing/FieldTable.h"
 
 #include <iostream>
 
@@ -30,24 +32,15 @@
 using namespace qpid::broker;
 using namespace qpid::framing;
        
-MessageMessage::MessageMessage(
-    const boost::shared_ptr<MessageTransferBody> _methodBody, 
-    const std::string& _exchange, const std::string& _routingKey, 
-    bool _mandatory, bool _immediate) :
-    Message(_exchange, _routingKey, _mandatory, _immediate, _methodBody),
-    methodBody(_methodBody)
-{
-}
-
 MessageMessage::MessageMessage(TransferPtr transfer_)
-    : Message(transfer_->getExchange(), transfer_->getRoutingKey(),
+    : Message(transfer_->getDestination(), transfer_->getRoutingKey(),
               transfer_->getMandatory(), transfer_->getImmediate(),
               transfer_),
       transfer(transfer_)
 {}
 
 MessageMessage::MessageMessage(TransferPtr transfer_, const Reference& ref)
-    : Message(transfer_->getExchange(), transfer_->getRoutingKey(),
+    : Message(transfer_->getDestination(), transfer_->getRoutingKey(),
               transfer_->getMandatory(), transfer_->getImmediate(),
               transfer_),
       transfer(transfer_),
@@ -62,29 +55,29 @@
 {
     channel.send(
        new MessageTransferBody(channel.getVersion(), 
-        methodBody->getTicket(),
+        transfer->getTicket(),
         consumerTag,
         getRedelivered(),
-        methodBody->getImmediate(),
-        methodBody->getTtl(),
-        methodBody->getPriority(),
-        methodBody->getTimestamp(),
-        methodBody->getDeliveryMode(),
-        methodBody->getExpiration(),
+        transfer->getImmediate(),
+        transfer->getTtl(),
+        transfer->getPriority(),
+        transfer->getTimestamp(),
+        transfer->getDeliveryMode(),
+        transfer->getExpiration(),
         getExchange(),
         getRoutingKey(),
-        methodBody->getMessageId(),
-        methodBody->getCorrelationId(),
-        methodBody->getReplyTo(),
-        methodBody->getContentType(),
-        methodBody->getContentEncoding(),
-        methodBody->getUserId(),
-        methodBody->getAppId(),
-        methodBody->getTransactionId(),
-        methodBody->getSecurityToken(),
-        methodBody->getApplicationHeaders(),
-        methodBody->getBody(),
-        methodBody->getMandatory()));
+        transfer->getMessageId(),
+        transfer->getCorrelationId(),
+        transfer->getReplyTo(),
+        transfer->getContentType(),
+        transfer->getContentEncoding(),
+        transfer->getUserId(),
+        transfer->getAppId(),
+        transfer->getTransactionId(),
+        transfer->getSecurityToken(),
+        transfer->getApplicationHeaders(),
+        transfer->getBody(),
+        transfer->getMandatory()));
 }
 
 void MessageMessage::sendGetOk(
@@ -98,11 +91,12 @@
 
 bool MessageMessage::isComplete()
 {
-    return true;               // FIXME aconway 2007-02-05: 
+    return true;
 }
 
 u_int64_t MessageMessage::contentSize() const
 {
+    THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
     return 0;               // FIXME aconway 2007-02-05: 
 }
 
@@ -110,33 +104,45 @@
 {
     return 0;               // FIXME aconway 2007-02-05: 
 }
+
+const FieldTable& MessageMessage::getApplicationHeaders()
+{
+    THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
+    return transfer->getApplicationHeaders();
+}
 bool MessageMessage::isPersistent()
 {
+    THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
     return false;               // FIXME aconway 2007-02-05: 
 }
 
 const ConnectionToken* const MessageMessage::getPublisher()
 {
+    THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
     return 0;               // FIXME aconway 2007-02-05: 
 }
 
 u_int32_t MessageMessage::encodedSize()
 {
+    THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
     return 0;               // FIXME aconway 2007-02-05: 
 }
 
 u_int32_t MessageMessage::encodedHeaderSize()
 {
+    THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
     return 0;               // FIXME aconway 2007-02-05: 
 }
 
 u_int32_t MessageMessage::encodedContentSize()
 {
+    THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
     return 0;               // FIXME aconway 2007-02-05: 
 }
 
 u_int64_t MessageMessage::expectedContentSize()
 {
+    THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
     return 0;               // FIXME aconway 2007-02-05: 
 }
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h?view=diff&rev=504305&r1=504304&r2=504305
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h Tue 
Feb  6 13:38:30 2007
@@ -38,17 +38,11 @@
 class Reference;
 
 class MessageMessage: public Message{
-    const boost::shared_ptr<framing::MessageTransferBody> methodBody;
-
   public:
     typedef Reference::TransferPtr TransferPtr;
     typedef Reference::AppendPtr AppendPtr;
     typedef Reference::Appends Appends;
 
-    MessageMessage(
-        const boost::shared_ptr<framing::MessageTransferBody> methodBody, 
-        const std::string& exchange, const std::string& routingKey, 
-        bool mandatory, bool immediate);
     MessageMessage(TransferPtr transfer);
     MessageMessage(TransferPtr transfer, const Reference&);
             
@@ -67,7 +61,8 @@
     bool isComplete();
 
     u_int64_t contentSize() const;
-    qpid::framing::BasicHeaderProperties* getHeaderProperties();
+    framing::BasicHeaderProperties* getHeaderProperties();
+    const framing::FieldTable& getApplicationHeaders();
     bool isPersistent();
     const ConnectionToken* const getPublisher();
             

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerQueue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerQueue.cpp?view=diff&rev=504305&r1=504304&r2=504305
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerQueue.cpp Tue Feb  6 
13:38:30 2007
@@ -18,6 +18,9 @@
  * under the License.
  *
  */
+
+#include <boost/format.hpp>
+
 #include <BrokerQueue.h>
 #include <MessageStore.h>
 #include <sys/Monitor.h>
@@ -27,6 +30,7 @@
 using namespace qpid::broker;
 using namespace qpid::sys;
 using namespace qpid::framing;
+using boost::format;
 
 Queue::Queue(const string& _name, u_int32_t _autodelete, 
              MessageStore* const _store,
@@ -128,12 +132,17 @@
 
 void Queue::consume(Consumer* c, bool requestExclusive){
     Mutex::ScopedLock locker(lock);
-    if(exclusive) throw ExclusiveAccessException();
-    if(requestExclusive){
-        if(!consumers.empty()) throw ExclusiveAccessException();
+    if(exclusive) 
+        throw ChannelException(
+            403, format("Queue '%s' has an exclusive consumer."
+                        " No more consumers allowed.") % getName());
+    if(requestExclusive) {
+        if(!consumers.empty())
+            throw ChannelException(
+                403, format("Queue '%s' already has conumers."
+                            "Exclusive access denied.") %getName());
         exclusive = c;
     }
-
     if(autodelete && consumers.empty()) lastUsed = 0;
     consumers.push_back(c);
 }

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerQueue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerQueue.h?view=diff&rev=504305&r1=504304&r2=504305
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerQueue.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerQueue.h Tue Feb  6 
13:38:30 2007
@@ -21,7 +21,6 @@
  * under the License.
  *
  */
-
 #include <vector>
 #include <memory>
 #include <queue>
@@ -35,6 +34,9 @@
 #include <sys/Monitor.h>
 #include <QueuePolicy.h>
 
+// TODO aconway 2007-02-06: Use auto_ptr and boost::ptr_vector to
+// enforce ownership of Consumers.
+
 namespace qpid {
     namespace broker {
         class MessageStore;
@@ -42,8 +44,6 @@
         /**
          * Thrown when exclusive access would be violated.
          */
-        struct ExclusiveAccessException{};
-
         using std::string;
 
         /**

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp?view=diff&rev=504305&r1=504304&r2=504305
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp Tue 
Feb  6 13:38:30 2007
@@ -1,4 +1,3 @@
-
 /*
  *
  * Copyright (c) 2006 The Apache Software Foundation
@@ -17,6 +16,7 @@
  *
  */
 
+#include "QpidError.h"
 #include "MessageHandlerImpl.h"
 #include "BrokerChannel.h"
 #include "FramingContent.h"
@@ -31,6 +31,11 @@
 
 using namespace framing;
 
+MessageHandlerImpl::MessageHandlerImpl(Channel& ch, Connection& c, Broker& b)
+    : channel(ch), connection(c), broker(b), references(ch),
+      client(connection.client->getMessage())
+{}
+
 //
 // Message class method handlers
 //
@@ -42,7 +47,7 @@
     references.get(reference).append(
         boost::shared_polymorphic_downcast<MessageAppendBody>(
             context.methodBody));
-    sendOk(context);
+    client.ok(context);
 }
 
 
@@ -51,7 +56,7 @@
                            const string& destination )
 {
     channel.cancel(destination);
-    sendOk(context);
+    client.ok(context);
 }
 
 void
@@ -59,7 +64,8 @@
                                const string& /*reference*/,
                                const string& /*identifier*/ )
 {
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    // FIXME astitcher 2007-01-11: 0-9 feature
+    THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
 }
 
 void
@@ -67,7 +73,7 @@
                           const string& reference)
 {
     references.get(reference).close();
-    sendOk(context);
+    client.ok(context);
 }
 
 void
@@ -80,32 +86,23 @@
                             bool exclusive,
                             const qpid::framing::FieldTable& filter )
 {
-    Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); 
   
-    if(!destination.empty() && channel.exists(destination)){
+    Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+    if(!destination.empty() && channel.exists(destination))
         throw ConnectionException(530, "Consumer tags must be unique");
-    }
-
-    try{
-        string newTag = destination;
-        channel.consume(newTag, queue, !noAck, exclusive, noLocal ? 
&connection : 0, &filter);
-
-        sendOk(context);
-
-        //allow messages to be dispatched if required as there is now a 
consumer:
-        queue->dispatch();
-    }catch(ExclusiveAccessException& e){
-        if(exclusive)
-            throw ChannelException(403, "Exclusive access cannot be granted");
-        else
-            throw ChannelException(
-                403, "Access would violate previously granted exclusivity");
-    }
+    string tag = destination;
+    channel.consume(
+        tag, queue, !noAck, exclusive,
+        noLocal ? &connection : 0, &filter);
+    client.ok(context);
+    // Dispatch messages as there is now a consumer.
+    queue->dispatch();
 }
 
 void
 MessageHandlerImpl::empty( const MethodContext& )
 {
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    // FIXME astitcher 2007-01-11: 0-9 feature
+    THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
 }
 
 void
@@ -121,17 +118,18 @@
         connection.getQueue(queueName, context.channel->getId());
     
     // FIXME: get is probably Basic specific
-    if(!channel.get(queue, !noAck)){
-        connection.client->getMessageHandler()->empty(context);
-    }
-    
+    if(channel.get(queue, !noAck))
+        client.ok(context);
+    else 
+        client.empty(context);
 }
 
 void
 MessageHandlerImpl::offset(const MethodContext&,
                            u_int64_t /*value*/ )
 {
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    // FIXME astitcher 2007-01-11: 0-9 feature
+    THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
 }
 
 void
@@ -145,7 +143,7 @@
                          const string& reference)
 {
     references.open(reference);
-    sendOk(context);
+    client.ok(context);
 }
 
 void
@@ -157,18 +155,17 @@
     //TODO: handle global
     channel.setPrefetchSize(prefetchSize);
     channel.setPrefetchCount(prefetchCount);
-    
-    sendOk(context);
+    client.ok(context);
 }
 
 void
-MessageHandlerImpl::recover(const MethodContext&,
-                            bool requeue )
+MessageHandlerImpl::recover(const MethodContext& context,
+                            bool requeue)
 {
-    //assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-
+    THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented");
+    // FIXME aconway 2007-02-06: Call to recover hangs client.
     channel.recover(requeue);
-    
+    client.ok(context);
 }
 
 void
@@ -176,7 +173,8 @@
                            u_int16_t /*code*/,
                            const string& /*text*/ )
 {
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    // FIXME astitcher 2007-01-11: 0-9 feature
+    THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
 }
 
 void
@@ -184,22 +182,23 @@
                            const string& /*reference*/,
                            const string& /*identifier*/ )
 {
-    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
+    // FIXME astitcher 2007-01-11: 0-9 feature
+    THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
 }
 
 void
 MessageHandlerImpl::transfer(const MethodContext& context,
                              u_int16_t /*ticket*/,
-                             const string& /*destination*/,
+                             const string& destination,
                              bool /*redelivered*/,
-                             bool immediate,
+                             bool /*immediate*/,
                              u_int64_t /*ttl*/,
                              u_int8_t /*priority*/,
                              u_int64_t /*timestamp*/,
                              u_int8_t /*deliveryMode*/,
                              u_int64_t /*expiration*/,
-                             const string& exchangeName,
-                             const string& routingKey,
+                             const string& /*exchangeName*/,
+                             const string& /*routingKey*/,
                              const string& /*messageId*/,
                              const string& /*correlationId*/,
                              const string& /*replyTo*/,
@@ -211,30 +210,23 @@
                              const string& /*securityToken*/,
                              const qpid::framing::FieldTable& 
/*applicationHeaders*/,
                              qpid::framing::Content body,
-                             bool mandatory)
+                             bool /*mandatory*/)
 {
-    Exchange::shared_ptr exchange = exchangeName.empty() ?
-        broker.getExchanges().getDefault() : 
broker.getExchanges().get(exchangeName);
-    boost::shared_ptr<MessageTransferBody> 
transfer(boost::dynamic_pointer_cast<MessageTransferBody>(context.methodBody));
-    if(exchange){
-       if (body.isInline()) {
-            Message::shared_ptr msg(new MessageMessage(transfer, exchangeName,
-                                   routingKey, mandatory, immediate));
-
-            channel.handleInlineTransfer(msg, exchange);
-        
-            connection.client->getMessageHandler()->ok(context);
-       } else {
-            references.get(body.getValue()).transfer(transfer);
-       }
-    }else{
-        throw ChannelException(404, "Exchange not found '" + exchangeName + 
"'");
+    Exchange::shared_ptr exchange(
+        broker.getExchanges().get(destination)); 
+    MessageTransferBody::shared_ptr transfer(
+        boost::shared_polymorphic_downcast<MessageTransferBody>(
+            context.methodBody));
+    if (body.isInline()) {
+        Message::shared_ptr msg(new MessageMessage(transfer));
+        channel.handleInlineTransfer(msg, exchange);
     }
+    else {
+        // Add to reference.
+        references.get(body.getValue()).transfer(transfer);
+    }
+    client.ok(context);
 }
 
-
-void MessageHandlerImpl::sendOk(const MethodContext& context) {
-    connection.client->getMessageHandler()->ok(context);
-}
 
 }} // namespace qpid::broker

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.h?view=diff&rev=504305&r1=504304&r2=504305
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.h 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.h Tue 
Feb  6 13:38:30 2007
@@ -22,6 +22,7 @@
 #include <memory>
 
 #include "AMQP_ServerOperations.h"
+#include "AMQP_ClientProxy.h"
 #include "Reference.h"
 #include "BrokerChannel.h"
 
@@ -36,8 +37,7 @@
         public framing::AMQP_ServerOperations::MessageHandler
 {
   public:
-    MessageHandlerImpl(Channel& ch, Connection& c, Broker& b)
-        : channel(ch), connection(c), broker(b), references(ch) {}
+    MessageHandlerImpl(Channel& ch, Connection& c, Broker& b);
 
     void append(const framing::MethodContext&,
                  const std::string& reference,
@@ -119,12 +119,11 @@
                    framing::Content body,
                    bool mandatory );
   private:
-    void sendOk(const framing::MethodContext&);
-    
     Channel& channel;
     Connection& connection;
     Broker& broker;
     ReferenceRegistry references;
+    framing::AMQP_ClientProxy::Message& client;
 };
 
 }} // namespace qpid::broker

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am?view=diff&rev=504305&r1=504304&r2=504305
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am Tue Feb  6 13:38:30 
2007
@@ -98,3 +98,6 @@
        )                                       \
        > [EMAIL PROTECTED]
        mv [EMAIL PROTECTED] $@
+
+
+check: $(check_LTLIBRARIES) $(lib_common) $(lib_client) $(lib_broker)

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/python_tests
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/python_tests?view=diff&rev=504305&r1=504304&r2=504305
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/python_tests (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/python_tests Tue Feb  6 13:38:30 
2007
@@ -1,8 +1,4 @@
 #!/bin/sh
-# FIXME aconway 2007-01-09: Re-enable.
-echo "*** WARNING: PYTHON TESTS DISABLED till branch is functioning on 0-9."
-exit
-
 # Run the python tests.
 if test -d ../../python ;  then
     cd ../../python && ./run-tests -v -I cpp_failing.txt

Modified: incubator/qpid/branches/qpid.0-9/python/tests/message.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/tests/message.py?view=diff&rev=504305&r1=504304&r2=504305
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/tests/message.py (original)
+++ incubator/qpid/branches/qpid.0-9/python/tests/message.py Tue Feb  6 
13:38:30 2007
@@ -364,7 +364,7 @@
         for i in range(1, 11):
             reply = channel.message_get(no_ack=True)
             self.assertEqual(reply.method.klass.name, "message")
-            self.assertEqual(reply.method.name, "get-ok")
+            self.assertEqual(reply.method.name, "ok")
             self.assertEqual("Message %d" % i, reply.body)
 
         reply = channel.message_get(no_ack=True)
@@ -378,7 +378,7 @@
         for i in range(11, 21):
             reply = channel.message_get(no_ack=False)
             self.assertEqual(reply.method.klass.name, "message")
-            self.assertEqual(reply.method.name, "get-ok")
+            self.assertEqual(reply.method.name, "ok")
             self.assertEqual("Message %d" % i, reply.body)
             reply.ok()
 
@@ -399,7 +399,7 @@
         for i in [14, 16, 18, 20]:
             reply = channel.message_get(no_ack=False)
             self.assertEqual(reply.method.klass.name, "message")
-            self.assertEqual(reply.method.name, "get-ok")
+            self.assertEqual(reply.method.name, "ok")
             self.assertEqual("Message %d" % i, reply.body)
             reply.ok()
             #channel.message_ack(delivery_tag=reply.delivery_tag)


Reply via email to