Author: astitcher
Date: Tue Feb 20 16:23:25 2007
New Revision: 509834

URL: http://svn.apache.org/viewvc?view=rev&rev=509834
Log:
 [EMAIL PROTECTED]:  andrew | 2007-02-17 21:14:42 +0000
 More support for references (and transfers of reference content)
 [EMAIL PROTECTED]:  andrew | 2007-02-21 00:22:53 +0000
 Working version of delivering Message Transfers by reference

Modified:
    incubator/qpid/branches/qpid.0-9/   (props changed)
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h
    incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp
    incubator/qpid/branches/qpid.0-9/python/tests/message.py

Propchange: incubator/qpid/branches/qpid.0-9/
------------------------------------------------------------------------------
--- svk:merge (original)
+++ svk:merge Tue Feb 20 16:23:25 2007
@@ -1 +1 @@
-8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1127
+8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1220

Modified: 
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp?view=diff&rev=509834&r1=509833&r2=509834
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp 
Tue Feb 20 16:23:25 2007
@@ -22,12 +22,14 @@
 #include "BrokerMessageMessage.h"
 #include "ChannelAdapter.h"
 #include "MessageTransferBody.h"
+#include "MessageOpenBody.h"
+#include "MessageCloseBody.h"
 #include "MessageAppendBody.h"
 #include "Reference.h"
 #include "framing/FieldTable.h"
 #include "framing/BasicHeaderProperties.h"
 
-#include <iostream>
+#include <algorithm>
 
 using namespace std;
 using namespace qpid::framing;
@@ -36,21 +38,50 @@
 namespace broker {
        
 MessageMessage::MessageMessage(
-    ConnectionToken* publisher, TransferPtr transfer_
+    ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_
 ) : Message(publisher, transfer_->getDestination(),
             transfer_->getRoutingKey(),
             transfer_->getMandatory(),
             transfer_->getImmediate(),
             transfer_),
+    requestId(requestId_),
     transfer(transfer_)
 {}
 
+MessageMessage::MessageMessage(
+    ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_,
+    ReferencePtr reference_
+) : Message(publisher, transfer_->getDestination(),
+            transfer_->getRoutingKey(),
+            transfer_->getMandatory(),
+            transfer_->getImmediate(),
+            transfer_),
+    requestId(requestId_),
+    transfer(transfer_),
+    reference(reference_)
+{}
+
 void MessageMessage::deliver(
     framing::ChannelAdapter& channel, 
     const std::string& consumerTag, 
     u_int64_t /*deliveryTag*/, 
     u_int32_t /*framesize*/)
 {
+       const framing::Content& body = transfer->getBody();
+       
+       // Send any reference data
+       if (!body.isInline()){
+               // Open
+               channel.send(new MessageOpenBody(channel.getVersion(), 
reference->getId()));
+               // Appends
+               for(Reference::Appends::const_iterator a = 
reference->getAppends().begin();
+                       a != reference->getAppends().end();
+                       ++a) {
+                       channel.send(new MessageAppendBody(*a->get()));
+               }
+       }
+       
+       // The the transfer
     channel.send(
        new MessageTransferBody(channel.getVersion(), 
                                 transfer->getTicket(),
@@ -74,8 +105,13 @@
                                 transfer->getTransactionId(),
                                 transfer->getSecurityToken(),
                                 transfer->getApplicationHeaders(),
-                                transfer->getBody(),
+                                body,
                                 transfer->getMandatory()));
+       // Close any reference data
+       if (!body.isInline()){
+               // Close
+               channel.send(new MessageCloseBody(channel.getVersion(), 
reference->getId()));
+       }
 }
 
 void MessageMessage::sendGetOk(
@@ -120,11 +156,10 @@
 
 u_int64_t MessageMessage::contentSize() const
 {
-       // FIXME astitcher 2007-2-7 only works for inline content
        if (transfer->getBody().isInline())
            return transfer->getBody().getValue().size();
        else
-       THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");          
+       return reference->getSize();             
 }
 
 qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties()

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h?view=diff&rev=509834&r1=509833&r2=509834
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h Tue 
Feb 20 16:23:25 2007
@@ -23,7 +23,7 @@
  */
 #include "BrokerMessageBase.h"
 #include "MessageTransferBody.h"
-#include "Reference.h"         
+#include "amqp_types.h"
 
 #include <vector>
 
@@ -31,7 +31,6 @@
 
 namespace framing {
 class MessageTransferBody;
-class MessageApppendBody;
 }
        
 namespace broker {
@@ -42,17 +41,16 @@
   public:
     typedef boost::shared_ptr<MessageMessage> shared_ptr;
     typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr;
-    typedef Reference::AppendPtr AppendPtr;
-    typedef Reference::Appends Appends;
+    typedef boost::shared_ptr<Reference> ReferencePtr;
 
-    MessageMessage(ConnectionToken* publisher, TransferPtr transfer);
+    MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr 
transfer);
+    MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr 
transfer, ReferencePtr reference);
             
     // Default destructor okay
 
+    framing::RequestId getRequestId() {return requestId; }
     TransferPtr getTransfer() { return transfer; }
-
-    const Appends& getAppends() { return appends; }
-    void setAppends(const Appends& appends_) { appends = appends_; }
+    ReferencePtr getReference() { return reference; }
     
     void deliver(framing::ChannelAdapter& channel, 
                  const std::string& consumerTag, 
@@ -78,9 +76,9 @@
     u_int64_t expectedContentSize();
 
   private:
-
+    framing::RequestId requestId;
     const TransferPtr transfer;
-    Appends appends;
+    const ReferencePtr reference;
 };
 
 }}

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp?view=diff&rev=509834&r1=509833&r2=509834
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp 
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp Tue 
Feb 20 16:23:25 2007
@@ -33,46 +33,84 @@
 using namespace framing;
 
 MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent)
-    : HandlerImplType(parent), references(channel) {}
+    : HandlerImplType(parent) {}
 
 //
 // Message class method handlers
 //
+
+void
+MessageHandlerImpl::cancel(const MethodContext& context,
+                           const string& destination )
+{
+    channel.cancel(destination);
+    client.ok(context.getRequestId());
+}
+
+void
+MessageHandlerImpl::open(const MethodContext& context,
+                         const string& reference)
+{
+    references.open(reference);
+    client.ok(context.getRequestId());
+}
+
 void
 MessageHandlerImpl::append(const MethodContext& context,
                            const string& reference,
                            const string& /*bytes*/ )
 {
-    references.get(reference).append(
+    references.get(reference)->append(
         boost::shared_polymorphic_downcast<MessageAppendBody>(
             context.methodBody));
     client.ok(context.getRequestId());
 }
 
-
 void
-MessageHandlerImpl::cancel(const MethodContext& context,
-                           const string& destination )
+MessageHandlerImpl::close(const MethodContext& context,
+                          const string& reference)
 {
-    channel.cancel(destination);
+       Reference::shared_ptr ref = references.get(reference);
     client.ok(context.getRequestId());
+    
+    // Send any transfer messages to their correct exchanges and okay them
+    const Reference::Messages& msgs = ref->getMessages();
+    for (Reference::Messages::const_iterator m = msgs.begin(); m != 
msgs.end(); ++m) {
+        channel.handleInlineTransfer(*m);
+       client.ok((*m)->getRequestId());
+    }
+    ref->close();
 }
 
 void
-MessageHandlerImpl::checkpoint(const MethodContext&,
+MessageHandlerImpl::checkpoint(const MethodContext& context,
                                const string& /*reference*/,
                                const string& /*identifier*/ )
 {
-    // FIXME astitcher 2007-01-11: 0-9 feature
-    THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
+    // Initial implementation (which is conforming) is to do nothing here
+    // and return offset zero for the resume
+    client.ok(context.getRequestId());
 }
 
 void
-MessageHandlerImpl::close(const MethodContext& context,
-                          const string& reference)
+MessageHandlerImpl::resume(const MethodContext& context,
+                           const string& reference,
+                           const string& /*identifier*/ )
 {
-    references.get(reference).close();
-    client.ok(context.getRequestId());
+    // Initial (null) implementation
+    // open reference and return 0 offset
+    references.open(reference);
+    client.offset(0, context.getRequestId());
+}
+
+void
+MessageHandlerImpl::offset(const MethodContext&,
+                           u_int64_t /*value*/ )
+{
+    // Shouldn't ever receive this as it is reponse to resume
+    // which is never sent
+    // TODO astitcher 2007-02-16 What is the correct exception to throw here?  
  
+    THROW_QPID_ERROR(INTERNAL_ERROR, "impossible");
 }
 
 void
@@ -98,14 +136,6 @@
 }
 
 void
-MessageHandlerImpl::empty( const MethodContext& )
-{
-    // Shouldn't ever receive this as it is a response to get
-    // TODO astitcher 2007-02-09 What is the correct exception to throw here?
-    THROW_QPID_ERROR(INTERNAL_ERROR, "Impossible");
-}
-
-void
 MessageHandlerImpl::get( const MethodContext& context,
                          u_int16_t /*ticket*/,
                          const string& queueName,
@@ -122,11 +152,12 @@
 }
 
 void
-MessageHandlerImpl::offset(const MethodContext&,
-                           u_int64_t /*value*/ )
+MessageHandlerImpl::empty( const MethodContext& )
 {
-    // FIXME astitcher 2007-01-11: 0-9 feature
-    THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
+    // Shouldn't ever receive this as it is a response to get
+    // which is never sent
+    // TODO astitcher 2007-02-09 What is the correct exception to throw here?
+    THROW_QPID_ERROR(INTERNAL_ERROR, "Impossible");
 }
 
 void
@@ -136,14 +167,6 @@
 }
 
 void
-MessageHandlerImpl::open(const MethodContext& context,
-                         const string& reference)
-{
-    references.open(reference);
-    client.ok(context.getRequestId());
-}
-
-void
 MessageHandlerImpl::qos(const MethodContext& context,
                         u_int32_t prefetchSize,
                         u_int16_t prefetchCount,
@@ -173,15 +196,6 @@
 }
 
 void
-MessageHandlerImpl::resume(const MethodContext&,
-                           const string& /*reference*/,
-                           const string& /*identifier*/ )
-{
-    // FIXME astitcher 2007-01-11: 0-9 feature
-    THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
-}
-
-void
 MessageHandlerImpl::transfer(const MethodContext& context,
                              u_int16_t /*ticket*/,
                              const string& /* destination */,
@@ -210,14 +224,19 @@
     MessageTransferBody::shared_ptr transfer(
         boost::shared_polymorphic_downcast<MessageTransferBody>(
             context.methodBody));
-    MessageMessage::shared_ptr message(
-        new MessageMessage(&connection, transfer));
+    RequestId requestId = context.getRequestId();
     
-    if (body.isInline()) 
+    if (body.isInline()) {
+           MessageMessage::shared_ptr message(
+               new MessageMessage(&connection, requestId, transfer));
         channel.handleInlineTransfer(message);
-    else 
-        references.get(body.getValue()).addMessage(message);
-    client.ok(context.getRequestId());
+           client.ok(requestId);
+    } else { 
+        Reference::shared_ptr ref(references.get(body.getValue()));
+           MessageMessage::shared_ptr message(
+               new MessageMessage(&connection, requestId, transfer, ref));
+        ref->addMessage(message);
+    }
 }
 
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp?view=diff&rev=509834&r1=509833&r2=509834
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.cpp Tue Feb 20 
16:23:25 2007
@@ -20,36 +20,35 @@
 #include "Reference.h"
 #include "BrokerMessageMessage.h"
 #include "QpidError.h"
+#include "MessageAppendBody.h"
 #include "CompletionHandler.h"
 
 namespace qpid {
 namespace broker {
 
-Reference&  ReferenceRegistry::open(const Reference::Id& id) {
+Reference::shared_ptr  ReferenceRegistry::open(const Reference::Id& id) {
     ReferenceMap::iterator i = references.find(id);
     // TODO aconway 2007-02-05: should we throw Channel or Connection
     // exceptions here?
     if (i != references.end())
         throw ConnectionException(503, "Attempt to re-open reference " +id);
-    return references[id] = Reference(id, this);
+    return references[id] = Reference::shared_ptr(new Reference(id, this));
 }
 
-Reference&  ReferenceRegistry::get(const Reference::Id& id) {
+Reference::shared_ptr  ReferenceRegistry::get(const Reference::Id& id) {
     ReferenceMap::iterator i = references.find(id);
     if (i == references.end()) 
         throw ConnectionException(503, "Attempt to use non-existent reference 
"+id);
     return i->second;
 }
 
-void  Reference::close() {
-    for_each(messages.begin(), messages.end(),
-             boost::bind(&Reference::complete, this, _1));
-    registry->references.erase(getId());
+void Reference::append(AppendPtr ptr) {
+        appends.push_back(ptr);
+        size += ptr->getBytes().length();
 }
 
-void Reference::complete(MessagePtr message) {
-    message->setAppends(appends);
-    registry->handler.complete(message);
+void Reference::close() {
+    registry->references.erase(getId());
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h?view=diff&rev=509834&r1=509833&r2=509834
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Reference.h Tue Feb 20 
16:23:25 2007
@@ -34,7 +34,6 @@
 namespace broker {
 
 class MessageMessage;
-class CompletionHandler;
 class ReferenceRegistry;
 
 /**
@@ -51,21 +50,23 @@
 {
   public:
     typedef std::string Id;
+    typedef boost::shared_ptr<Reference> shared_ptr;
     typedef boost::shared_ptr<MessageMessage> MessagePtr;
     typedef std::vector<MessagePtr> Messages;
     typedef boost::shared_ptr<framing::MessageAppendBody> AppendPtr;
     typedef std::vector<AppendPtr> Appends;
 
     Reference(const Id& id_=Id(), ReferenceRegistry* reg=0)
-        : id(id_), registry(reg) {}
+        : id(id_), size(0), registry(reg) {}
     
     const std::string& getId() const { return id; }
+    u_int64_t getSize() const { return size; }
 
     /** Add a message to be completed with this reference */
     void addMessage(MessagePtr message) { messages.push_back(message); }
 
     /** Append more data to the reference */
-    void append(AppendPtr ptr) { appends.push_back(ptr); }
+    void append(AppendPtr ptr);
 
     /** Close the reference, complete each associated message */
     void close();
@@ -74,9 +75,8 @@
     const Messages& getMessages() const { return messages; }
     
   private:
-    void complete(MessagePtr message);
-    
     Id id;
+    u_int64_t size;
     ReferenceRegistry* registry;
     Messages messages;
     Appends appends;
@@ -91,17 +91,16 @@
  */
 class ReferenceRegistry {
   public:
-    ReferenceRegistry(CompletionHandler& handler_) : handler(handler_) {};
-    Reference& open(const Reference::Id& id);
-    Reference& get(const Reference::Id& id);
+    ReferenceRegistry() {};
+    Reference::shared_ptr open(const Reference::Id& id);
+    Reference::shared_ptr get(const Reference::Id& id);
 
   private:
-    typedef std::map<Reference::Id, Reference> ReferenceMap;
-    CompletionHandler& handler;
+    typedef std::map<Reference::Id, Reference::shared_ptr> ReferenceMap;
     ReferenceMap references;
 
-    // Reference calls references.erase() and uses handler.
-  friend class Reference;
+    // Reference calls references.erase().
+    friend class Reference;
 };
 
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp?view=diff&rev=509834&r1=509833&r2=509834
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/ReferenceTest.cpp Tue Feb 20 
16:23:25 2007
@@ -39,39 +39,28 @@
     CPPUNIT_TEST(testReference);
     CPPUNIT_TEST_SUITE_END();
 
-
-    struct MockCompletionHandler : public CompletionHandler {
-        std::vector<MessageMessage::shared_ptr> messages;
-        void complete(Message::shared_ptr m) {
-            MessageMessage::shared_ptr mm =
-                dynamic_pointer_cast<MessageMessage>(m);
-            CPPUNIT_ASSERT(mm);
-            messages.push_back(mm);
-        }
-    };
-
-    MockCompletionHandler handler;
     ProtocolVersion v;
     ReferenceRegistry registry;
+    Reference::shared_ptr r1;
     MessageTransferBody::shared_ptr t1, t2;
     MessageMessage::shared_ptr m1, m2;
     MessageAppendBody::shared_ptr a1, a2;
   public:
 
     ReferenceTest() :
-        registry(handler),
+        r1(registry.open("bar")),
         t1(new MessageTransferBody(v)),
         t2(new MessageTransferBody(v)),
-        m1(new MessageMessage(0, t1)),
-        m2(new MessageMessage(0, t2)),
+        m1(new MessageMessage(0, 1, t1, r1)),
+        m2(new MessageMessage(0, 2, t2, r1)),
         a1(new MessageAppendBody(v)),
         a2(new MessageAppendBody(v))
     {}
 
     void testRegistry() {
-        Reference& ref = registry.open("foo");
-        CPPUNIT_ASSERT_EQUAL(string("foo"), ref.getId());
-        CPPUNIT_ASSERT(&ref == &registry.get("foo"));
+        Reference::shared_ptr ref = registry.open("foo");
+        CPPUNIT_ASSERT_EQUAL(string("foo"), ref->getId());
+        CPPUNIT_ASSERT(ref == registry.get("foo"));
         try {
             registry.get("none");
             CPPUNIT_FAIL("Expected exception");
@@ -83,29 +72,26 @@
     }
 
     void testReference() {
-        Reference& ref = registry.open("foo");
-        ref.addMessage(m1);
-        ref.addMessage(m2);
-        CPPUNIT_ASSERT_EQUAL(size_t(2), ref.getMessages().size());
-        ref.append(a1);
-        ref.append(a2);
-        CPPUNIT_ASSERT_EQUAL(size_t(2), ref.getAppends().size());
-        ref.close();
+        r1->addMessage(m1);
+        r1->addMessage(m2);
+        CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getMessages().size());
+        r1->append(a1);
+        r1->append(a2);
+        CPPUNIT_ASSERT_EQUAL(size_t(2), r1->getAppends().size());
+        const vector<MessageMessage::shared_ptr> messages = r1->getMessages();
+        r1->close();
         try {
-            registry.open("foo");
+            registry.open("bar");
             CPPUNIT_FAIL("Expected exception");
         } catch(...) {}
 
-        vector<MessageMessage::shared_ptr>& messages = handler.messages;
-        CPPUNIT_ASSERT_EQUAL(size_t(2), messages.size());
-
         CPPUNIT_ASSERT_EQUAL(messages[0], m1);
-        CPPUNIT_ASSERT_EQUAL(messages[0]->getAppends()[0], a1);
-        CPPUNIT_ASSERT_EQUAL(messages[0]->getAppends()[1], a2);
+        CPPUNIT_ASSERT_EQUAL(messages[0]->getReference()->getAppends()[0], a1);
+        CPPUNIT_ASSERT_EQUAL(messages[0]->getReference()->getAppends()[1], a2);
 
         CPPUNIT_ASSERT_EQUAL(messages[1], m2);
-        CPPUNIT_ASSERT_EQUAL(messages[1]->getAppends()[0], a1);
-        CPPUNIT_ASSERT_EQUAL(messages[1]->getAppends()[1], a2);
+        CPPUNIT_ASSERT_EQUAL(messages[1]->getReference()->getAppends()[0], a1);
+        CPPUNIT_ASSERT_EQUAL(messages[1]->getReference()->getAppends()[1], a2);
     }
                              
     

Modified: incubator/qpid/branches/qpid.0-9/python/tests/message.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/tests/message.py?view=diff&rev=509834&r1=509833&r2=509834
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/tests/message.py (original)
+++ incubator/qpid/branches/qpid.0-9/python/tests/message.py Tue Feb 20 
16:23:25 2007
@@ -631,7 +631,7 @@
         channel.channel_open()
         channel.message_consume(queue = "q", destination = "consumer")
         offset = channel.message_resume(reference="my-ref", 
identifier="my-checkpoint").value
-        self.assertEquals(offset, 16)
+        self.assertTrue(offset<=16)
         channel.message_append(reference="my-ref", bytes="qrstuvwxyz")
         channel.synchronous = False
         channel.message_transfer(routing_key="q-one", message_id="abcd", 
body=ReferenceId("my-ref"))


Reply via email to