Author: astitcher
Date: Tue Feb  6 07:53:33 2007
New Revision: 504182

URL: http://svn.apache.org/viewvc?view=rev&rev=504182
Log:
Inline transferred messages delivered

Modified:
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
    incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp?view=diff&rev=504182&r1=504181&r2=504182
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp Tue Feb  
6 07:53:33 2007
@@ -334,12 +334,17 @@
     //no specific action required, generic response handling should be 
sufficient
 }
 
+
+//
+// Message class method handlers
+//
 void
 BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& 
context)
 {
     connection.client->getChannel().ok(context);
     connection.client->getChannel().pong(context);
 }
+
 
 void
 BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& 
context)

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=504182&r1=504181&r2=504182
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp Tue Feb  
6 07:53:33 2007
@@ -187,6 +187,17 @@
     if(blocked) queue->dispatch();
 }
 
+void Channel::handleInlineTransfer(Message::shared_ptr& msg, 
Exchange::shared_ptr& exch){
+    if(transactional){
+        TxPublish* deliverable = new TxPublish(msg);
+        exch->route(*deliverable, msg->getRoutingKey(), 
&(msg->getHeaderProperties()->getHeaders()));
+        txBuffer.enlist(new DeletingTxOp(deliverable));
+    }else{
+        DeliverableMessage deliverable(msg);
+        exch->route(deliverable, msg->getRoutingKey(), 
&(msg->getHeaderProperties()->getHeaders()));
+    }
+}
+
 // FIXME aconway 2007-02-05: Drop exchange member, calculate from
 // message in ::complete().
 void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=504182&r1=504181&r2=504182
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Tue Feb  6 
07:53:33 2007
@@ -96,7 +96,9 @@
 
     boost::scoped_ptr<BrokerAdapter> adapter;
 
-    virtual void complete(Message::shared_ptr msg);
+       // completion handler for MessageBuilder
+    void complete(Message::shared_ptr msg);
+    
     void deliver(Message::shared_ptr& msg, const string& tag, 
Queue::shared_ptr& queue, bool ackExpected);            
     void cancel(consumer_iterator consumer);
     bool checkPrefetch(Message::shared_ptr& msg);
@@ -110,7 +112,9 @@
     
     ~Channel();
 
+       // For ChannelAdapter
     bool isOpen() const { return opened; }
+    
     void open() { opened = true; }
     void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
     Queue::shared_ptr getDefaultQueue() const { return defaultQueue; }
@@ -134,6 +138,10 @@
     void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>);
     void handleContent(boost::shared_ptr<framing::AMQContentBody>);
     void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
+    
+    void handleInlineTransfer(Message::shared_ptr& msg, Exchange::shared_ptr& 
exchange);
+    
+    // For ChannelAdapter
     void handleMethodInContext(
         boost::shared_ptr<framing::AMQMethodBody> method,
         const framing::MethodContext& context);

Modified: 
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp?view=diff&rev=504182&r1=504181&r2=504182
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp 
Tue Feb  6 07:53:33 2007
@@ -18,15 +18,27 @@
  * under the License.
  *
  */
-#include <iostream>
 #include "BrokerMessageMessage.h"
+#include "ChannelAdapter.h"
 #include "MessageTransferBody.h"
 #include "MessageAppendBody.h"
 #include "Reference.h"
 
+#include <iostream>
+
 using namespace std;
 using namespace qpid::broker;
+using namespace qpid::framing;
        
+MessageMessage::MessageMessage(
+    const boost::shared_ptr<MessageTransferBody> _methodBody, 
+    const std::string& _exchange, const std::string& _routingKey, 
+    bool _mandatory, bool _immediate) :
+    Message(_exchange, _routingKey, _mandatory, _immediate, _methodBody),
+    methodBody(_methodBody)
+{
+}
+
 MessageMessage::MessageMessage(TransferPtr transfer_)
     : Message(transfer_->getExchange(), transfer_->getRoutingKey(),
               transfer_->getMandatory(), transfer_->getImmediate(),
@@ -43,14 +55,36 @@
 {}
 
 void MessageMessage::deliver(
-    framing::ChannelAdapter& /*channel*/,
-    const std::string& /*consumerTag*/, 
+    framing::ChannelAdapter& channel, 
+    const std::string& consumerTag, 
     u_int64_t /*deliveryTag*/, 
     u_int32_t /*framesize*/)
 {
-    // FIXME aconway 2007-02-05:
-    cout << "MessageMessage::deliver" << *transfer << " + " << appends.size()
-         << " appends." << endl;
+    channel.send(
+       new MessageTransferBody(channel.getVersion(), 
+        methodBody->getTicket(),
+        consumerTag,
+        getRedelivered(),
+        methodBody->getImmediate(),
+        methodBody->getTtl(),
+        methodBody->getPriority(),
+        methodBody->getTimestamp(),
+        methodBody->getDeliveryMode(),
+        methodBody->getExpiration(),
+        getExchange(),
+        getRoutingKey(),
+        methodBody->getMessageId(),
+        methodBody->getCorrelationId(),
+        methodBody->getReplyTo(),
+        methodBody->getContentType(),
+        methodBody->getContentEncoding(),
+        methodBody->getUserId(),
+        methodBody->getAppId(),
+        methodBody->getTransactionId(),
+        methodBody->getSecurityToken(),
+        methodBody->getApplicationHeaders(),
+        methodBody->getBody(),
+        methodBody->getMandatory()));
 }
 
 void MessageMessage::sendGetOk(

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h?view=diff&rev=504182&r1=504181&r2=504182
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h Tue 
Feb  6 07:53:33 2007
@@ -21,10 +21,12 @@
  * under the License.
  *
  */
-#include <vector>
 #include "BrokerMessageBase.h"
+#include "MessageTransferBody.h"
 #include "Reference.h"         
 
+#include <vector>
+
 namespace qpid {
 
 namespace framing {
@@ -36,11 +38,17 @@
 class Reference;
 
 class MessageMessage: public Message{
+    const boost::shared_ptr<framing::MessageTransferBody> methodBody;
+
   public:
     typedef Reference::TransferPtr TransferPtr;
     typedef Reference::AppendPtr AppendPtr;
-    typedef  Reference::Appends Appends;
+    typedef Reference::Appends Appends;
 
+    MessageMessage(
+        const boost::shared_ptr<framing::MessageTransferBody> methodBody, 
+        const std::string& exchange, const std::string& routingKey, 
+        bool mandatory, bool immediate);
     MessageMessage(TransferPtr transfer);
     MessageMessage(TransferPtr transfer, const Reference&);
             

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp?view=diff&rev=504182&r1=504181&r2=504182
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp Tue 
Feb  6 07:53:33 2007
@@ -80,8 +80,6 @@
                             bool exclusive,
                             const qpid::framing::FieldTable& filter )
 {
-    //assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-       
     Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); 
   
     if(!destination.empty() && channel.exists(destination)){
         throw ConnectionException(530, "Consumer tags must be unique");
@@ -139,7 +137,7 @@
 void
 MessageHandlerImpl::ok( const MethodContext& )
 {
-    // TODO aconway 2007-02-05: For HA, we can drop acked messages here.
+    // TODO: Need to ack the transfers acknowledged so far for flow control 
purp oses
 }
 
 void
@@ -156,8 +154,6 @@
                         u_int16_t prefetchCount,
                         bool /*global*/ )
 {
-    //assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-    
     //TODO: handle global
     channel.setPrefetchSize(prefetchSize);
     channel.setPrefetchCount(prefetchCount);
@@ -196,14 +192,14 @@
                              u_int16_t /*ticket*/,
                              const string& /*destination*/,
                              bool /*redelivered*/,
-                             bool /* immediate */,
+                             bool immediate,
                              u_int64_t /*ttl*/,
                              u_int8_t /*priority*/,
                              u_int64_t /*timestamp*/,
                              u_int8_t /*deliveryMode*/,
                              u_int64_t /*expiration*/,
                              const string& exchangeName,
-                             const string& /* routingKey */,
+                             const string& routingKey,
                              const string& /*messageId*/,
                              const string& /*correlationId*/,
                              const string& /*replyTo*/,
@@ -215,22 +211,24 @@
                              const string& /*securityToken*/,
                              const qpid::framing::FieldTable& 
/*applicationHeaders*/,
                              qpid::framing::Content body,
-                             bool /* mandatory */ )
+                             bool mandatory)
 {
-    //assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
-    MessageTransferBody::shared_ptr transfer =
-        boost::shared_polymorphic_downcast<MessageTransferBody>(
-            context.methodBody);
-    // Verify the exchange exists, will throw if not.
-    broker.getExchanges().get(exchangeName);
-    if (body.isInline()) {
-        MessageMessage* msg = new MessageMessage(transfer);
-        // FIXME aconway 2007-02-05: Remove exchange parameter.
-        // use shared_ptr for message.
-        channel.handlePublish(msg, Exchange::shared_ptr());
-        sendOk(context);
-    } else {
-        references.get(body.getValue()).transfer(transfer);
+    Exchange::shared_ptr exchange = exchangeName.empty() ?
+        broker.getExchanges().getDefault() : 
broker.getExchanges().get(exchangeName);
+    boost::shared_ptr<MessageTransferBody> 
transfer(boost::dynamic_pointer_cast<MessageTransferBody>(context.methodBody));
+    if(exchange){
+       if (body.isInline()) {
+            Message::shared_ptr msg(new MessageMessage(transfer, exchangeName,
+                                   routingKey, mandatory, immediate));
+
+            channel.handleInlineTransfer(msg, exchange);
+        
+            connection.client->getMessageHandler()->ok(context);
+       } else {
+            references.get(body.getValue()).transfer(transfer);
+       }
+    }else{
+        throw ChannelException(404, "Exchange not found '" + exchangeName + 
"'");
     }
 }
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am?view=diff&rev=504182&r1=504181&r2=504182
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am Tue Feb  6 07:53:33 
2007
@@ -24,7 +24,6 @@
   InMemoryContentTest  \
   LazyLoadedContentTest        \
   MessageBuilderTest   \
-  MessageHandlerTest   \
   MessageTest          \
   ReferenceTest         \
   QueueRegistryTest    \


Reply via email to