Author: astitcher
Date: Mon Mar  5 10:01:22 2007
New Revision: 514751

URL: http://svn.apache.org/viewvc?view=rev&rev=514751
Log:
 [EMAIL PROTECTED]:  andrew | 2007-02-26 10:58:52 +0000
 Refactored message transfer to extract commmonality from deliver/get
 [EMAIL PROTECTED]:  andrew | 2007-03-05 17:54:44 +0000
 Turn oversize inline transfers into reference transfers

Modified:
    incubator/qpid/branches/qpid.0-9/   (props changed)
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp

Propchange: incubator/qpid/branches/qpid.0-9/
------------------------------------------------------------------------------
--- svk:merge (original)
+++ svk:merge Mon Mar  5 10:01:22 2007
@@ -1 +1 @@
-8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1237
+8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1242

Modified: 
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp?view=diff&rev=514751&r1=514750&r2=514751
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp 
Mon Mar  5 10:01:22 2007
@@ -61,14 +61,13 @@
     reference(reference_)
 {}
 
-void MessageMessage::deliver(
+void MessageMessage::transferMessage(
     framing::ChannelAdapter& channel, 
     const std::string& consumerTag, 
-    u_int64_t /*deliveryTag*/, 
-    u_int32_t /*framesize*/)
-{
+    u_int32_t framesize)
+{      
        const framing::Content& body = transfer->getBody();
-       
+
        // Send any reference data
        if (!body.isInline()){
                // Open
@@ -81,8 +80,9 @@
                }
        }
        
-       // The the transfer
-    channel.send(
+       // The transfer
+       if ( transfer->size()<=framesize ) {
+       channel.send(
        new MessageTransferBody(channel.getVersion(), 
                                 transfer->getTicket(),
                                 consumerTag,
@@ -107,6 +107,44 @@
                                 transfer->getApplicationHeaders(),
                                 body,
                                 transfer->getMandatory()));
+       } else {
+               // Thing to do here is to construct a simple reference message 
then deliver that instead
+               // fragmentmentation will be taken care of in the delivery
+               // if necessary; problem is to invent a reference name to use
+               string content = body.getValue();
+               string refname = "dummy";
+               TransferPtr newTransfer(
+               new MessageTransferBody(channel.getVersion(), 
+                                       transfer->getTicket(),
+                                       consumerTag,
+                                       getRedelivered(),
+                                       transfer->getImmediate(),
+                                       transfer->getTtl(),
+                                       transfer->getPriority(),
+                                       transfer->getTimestamp(),
+                                       transfer->getDeliveryMode(),
+                                       transfer->getExpiration(),
+                                       getExchange(),
+                                       getRoutingKey(),
+                                       transfer->getMessageId(),
+                                       transfer->getCorrelationId(),
+                                       transfer->getReplyTo(),
+                                       transfer->getContentType(),
+                                       transfer->getContentEncoding(),
+                                       transfer->getUserId(),
+                                       transfer->getAppId(),
+                                       transfer->getTransactionId(),
+                                       transfer->getSecurityToken(),
+                                       transfer->getApplicationHeaders(),
+                                       framing::Content(REFERENCE, refname),
+                                       transfer->getMandatory()));
+               ReferencePtr newRef(new Reference(refname));
+               Reference::AppendPtr newAppend(new 
MessageAppendBody(channel.getVersion(), refname, content));
+               newRef->append(newAppend);
+               MessageMessage 
newMsg(const_cast<ConnectionToken*>(getPublisher()), 0, newTransfer, newRef);
+               newMsg.transferMessage(channel, consumerTag, framesize);
+               return;
+       }
        // Close any reference data
        if (!body.isInline()){
                // Close
@@ -114,39 +152,24 @@
        }
 }
 
+void MessageMessage::deliver(
+    framing::ChannelAdapter& channel, 
+    const std::string& consumerTag, 
+    u_int64_t /*deliveryTag*/, 
+    u_int32_t framesize)
+{
+       transferMessage(channel, consumerTag, framesize);
+}
+
 void MessageMessage::sendGetOk(
     const framing::MethodContext& context,
        const std::string& destination,
     u_int32_t /*messageCount*/,
     u_int64_t /*deliveryTag*/, 
-    u_int32_t /*framesize*/)
+    u_int32_t framesize)
 {
        framing::ChannelAdapter* channel = context.channel;
-    channel->send(
-       new MessageTransferBody(channel->getVersion(), 
-                                transfer->getTicket(),
-                                destination,
-                                getRedelivered(),
-                                transfer->getImmediate(),
-                                transfer->getTtl(),
-                                transfer->getPriority(),
-                                transfer->getTimestamp(),
-                                transfer->getDeliveryMode(),
-                                transfer->getExpiration(),
-                                getExchange(),
-                                getRoutingKey(),
-                                transfer->getMessageId(),
-                                transfer->getCorrelationId(),
-                                transfer->getReplyTo(),
-                                transfer->getContentType(),
-                                transfer->getContentEncoding(),
-                                transfer->getUserId(),
-                                transfer->getAppId(),
-                                transfer->getTransactionId(),
-                                transfer->getSecurityToken(),
-                                transfer->getApplicationHeaders(),
-                                transfer->getBody(),
-                                transfer->getMandatory()));
+       transferMessage(*channel, destination, framesize);
 }
 
 bool MessageMessage::isComplete()

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h?view=diff&rev=514751&r1=514750&r2=514751
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h Mon 
Mar  5 10:01:22 2007
@@ -76,6 +76,10 @@
     u_int64_t expectedContentSize();
 
   private:
+       void transferMessage(framing::ChannelAdapter& channel, 
+                                        const std::string& consumerTag, 
+                                        u_int32_t framesize);
+  
     framing::RequestId requestId;
     const TransferPtr transfer;
     const ReferencePtr reference;

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp?view=diff&rev=514751&r1=514750&r2=514751
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp Mon 
Mar  5 10:01:22 2007
@@ -187,12 +187,12 @@
 }
 
 void
-MessageHandlerImpl::reject(const MethodContext&,
+MessageHandlerImpl::reject(const MethodContext& /*context*/,
                            u_int16_t /*code*/,
                            const string& /*text*/ )
 {
-    // FIXME astitcher 2007-01-11: 0-9 feature
-    THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
+    channel.ack();
+    // channel.requeue();
 }
 
 void


Reply via email to