Author: gsim
Date: Thu Mar 27 11:04:42 2008
New Revision: 641929

URL: http://svn.apache.org/viewvc?rev=641929&view=rev
Log:
Send accept in response to message publications if required.
Hold up completion (and accept) until message from transfer is fully enqueued.
 

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp   
(with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h   
(with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/IncompleteMessageList.cpp   (with 
props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
    incubator/qpid/trunk/qpid/cpp/xml/extra.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=641929&r1=641928&r2=641929&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Mar 27 11:04:42 2008
@@ -201,6 +201,7 @@
   qpid/broker/FanOutExchange.cpp \
   qpid/broker/HeadersExchange.cpp \
   qpid/broker/IncomingExecutionContext.cpp \
+  qpid/broker/IncompleteMessageList.cpp \
   qpid/broker/Message.cpp \
   qpid/broker/MessageAdapter.cpp \
   qpid/broker/MessageBuilder.cpp \
@@ -321,6 +322,7 @@
   qpid/broker/HandlerImpl.h \
   qpid/broker/HeadersExchange.h \
   qpid/broker/IncomingExecutionContext.h \
+  qpid/broker/IncompleteMessageList.h \
   qpid/broker/Message.h \
   qpid/broker/MessageAdapter.h \
   qpid/broker/MessageBuilder.h \

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp?rev=641929&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp 
(added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp Thu 
Mar 27 11:04:42 2008
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "IncompleteMessageList.h"
+
+#include "Message.h"
+
+namespace qpid {
+namespace broker {
+
+void IncompleteMessageList::add(boost::intrusive_ptr<Message> msg)
+{
+    incomplete.push_back(msg);
+}
+
+void IncompleteMessageList::process(CompletionListener l, bool sync)
+{
+    while (!incomplete.empty()) {
+        boost::intrusive_ptr<Message>& msg = incomplete.front();
+        if (!msg->isEnqueueComplete()) {
+            if (sync){
+                msg->flush();
+                msg->waitForEnqueueComplete();
+            } else {
+                //leave the message as incomplete for now
+                return;
+            }            
+        }
+        l(msg);
+        incomplete.pop_front();
+    }
+}
+
+}}

Propchange: 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h?rev=641929&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h 
(added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h Thu 
Mar 27 11:04:42 2008
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _IncompleteMessageList_
+#define _IncompleteMessageList_
+
+#include <list>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace broker {
+
+class Message;
+
+class IncompleteMessageList
+{
+    typedef std::list< boost::intrusive_ptr<Message> > Messages;
+    Messages incomplete;
+
+public:
+    typedef boost::function<void(boost::intrusive_ptr<Message>)> 
CompletionListener;    
+
+    void add(boost::intrusive_ptr<Message> msg);
+    void process(CompletionListener l, bool sync);
+};
+
+
+}}
+
+#endif

Propchange: 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=641929&r1=641928&r2=641929&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Thu Mar 27 
11:04:42 2008
@@ -84,6 +84,11 @@
     return getAdapter().isPersistent(frames);
 }
 
+bool Message::requiresAccept()
+{
+    return getAdapter().requiresAccept(frames);
+}
+
 uint32_t Message::getRequiredCredit() const
 {
     //add up payload for all header and content frames in the frameset

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=641929&r1=641928&r2=641929&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Thu Mar 27 11:04:42 
2008
@@ -69,6 +69,7 @@
     bool isImmediate() const;
     const framing::FieldTable* getApplicationHeaders() const;
     bool isPersistent();
+    bool requiresAccept();
 
     framing::FrameSet& getFrames() { return frames; } 
     const framing::FrameSet& getFrames() const { return frames; } 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp?rev=641929&r1=641928&r2=641929&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp Thu Mar 27 
11:04:42 2008
@@ -53,10 +53,16 @@
 
     bool TransferAdapter::isPersistent(const framing::FrameSet& f)
     {
-        const framing::DeliveryProperties* p = 
f.getHeaders()->get<framing::DeliveryProperties>();
+        const framing::DeliveryProperties010* p = 
f.getHeaders()->get<framing::DeliveryProperties010>();
         return p && p->getDeliveryMode() == 2;
     }
 
+    bool TransferAdapter::requiresAccept(const framing::FrameSet& f)
+    {
+        const framing::Message010TransferBody* b = 
f.as<framing::Message010TransferBody>();
+        return b && b->getAcceptMode();
+    }
+
     std::string PreviewAdapter::getExchange(const framing::FrameSet& f)
     {
         return f.as<framing::MessageTransferBody>()->getDestination();
@@ -72,6 +78,12 @@
     {
         const framing::MessageProperties* p = 
f.getHeaders()->get<framing::MessageProperties>();
         return p ? &(p->getApplicationHeaders()) : 0;
+    }
+
+    bool PreviewAdapter::isPersistent(const framing::FrameSet& f)
+    {
+        const framing::DeliveryProperties* p = 
f.getHeaders()->get<framing::DeliveryProperties>();
+        return p && p->getDeliveryMode() == 2;
     }
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h?rev=641929&r1=641928&r2=641929&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h Thu Mar 27 
11:04:42 2008
@@ -44,15 +44,17 @@
     virtual bool isImmediate(const framing::FrameSet& f) = 0;
     virtual const framing::FieldTable* getApplicationHeaders(const 
framing::FrameSet& f) = 0;
     virtual bool isPersistent(const framing::FrameSet& f) = 0;
+    virtual bool requiresAccept(const framing::FrameSet& f) = 0;    
 };
 
 struct TransferAdapter : MessageAdapter
 {
     virtual std::string getRoutingKey(const framing::FrameSet& f);
     virtual std::string getExchange(const framing::FrameSet& f);
-    bool isImmediate(const framing::FrameSet&);
     virtual const framing::FieldTable* getApplicationHeaders(const 
framing::FrameSet& f);
-    bool isPersistent(const framing::FrameSet& f);
+    virtual bool isPersistent(const framing::FrameSet& f);
+    bool isImmediate(const framing::FrameSet&);
+    bool requiresAccept(const framing::FrameSet& f);    
 };
 
 struct PreviewAdapter : TransferAdapter
@@ -60,6 +62,7 @@
     std::string getExchange(const framing::FrameSet& f);
     std::string getRoutingKey(const framing::FrameSet& f);
     const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& 
f);
+    bool isPersistent(const framing::FrameSet& f);
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=641929&r1=641928&r2=641929&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Thu Mar 27 
11:04:42 2008
@@ -50,7 +50,8 @@
       semanticState(*this, *this),
       adapter(semanticState),
       msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
-      ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2))
+      ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)),
+      enqueuedOp(boost::bind(&SessionState::enqueued, this, _1))
 {
     getConnection().outputTasks.addOutputTask(&semanticState);
 
@@ -182,6 +183,7 @@
         getProxy().getExecution010().result(id, invocation.getResult());
     }
     if (method->isSync()) { 
+        incomplete.process(enqueuedOp, true);
         sendCompletion(); 
     }
     //TODO: if window gets too large send unsolicited completion
@@ -202,11 +204,28 @@
         msg->setPublisher(&getConnection());
         semanticState.handle(msg);        
         msgBuilder.end();
-        //TODO: may want to hold up execution until async enqueue is complete  
      
-        completed.add(msg->getCommandId());
+
+        if (msg->isEnqueueComplete()) {
+            enqueued(msg);
+        } else {
+            incomplete.add(msg);
+        }
+
+        //hold up execution until async enqueue is complete        
         if (msg->getFrames().getMethod()->isSync()) { 
+            incomplete.process(enqueuedOp, true);
             sendCompletion(); 
+        } else {
+            incomplete.process(enqueuedOp, false);
         }
+    }
+}
+
+void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
+{
+    completed.add(msg->getCommandId());
+    if (msg->requiresAccept()) {        
+        getProxy().getMessage010().accept(SequenceSet(msg->getCommandId()));   
     
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=641929&r1=641928&r2=641929&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Thu Mar 27 
11:04:42 2008
@@ -33,10 +33,10 @@
 #include "qpid/management/Session.h"
 #include "SessionAdapter.h"
 #include "DeliveryAdapter.h"
+#include "IncompleteMessageList.h"
 #include "MessageBuilder.h"
 #include "SessionContext.h"
 #include "SemanticState.h"
-#include "IncomingExecutionContext.h"
 
 #include <boost/noncopyable.hpp>
 #include <boost/scoped_ptr.hpp>
@@ -53,10 +53,11 @@
 
 namespace broker {
 
-class SessionHandler;
-class SessionManager;
 class Broker;
 class ConnectionState;
+class Message;
+class SessionHandler;
+class SessionManager;
 
 /**
  * Broker-side session state includes sessions handler chains, which may
@@ -132,12 +133,15 @@
     SemanticState semanticState;
     SessionAdapter adapter;
     MessageBuilder msgBuilder;
+    IncompleteMessageList incomplete;
 
     RangedOperation ackOp;
+    IncompleteMessageList::CompletionListener enqueuedOp;
 
     management::Session::shared_ptr mgmtObject;
     void handleCommand(framing::AMQMethodBody* method, 
framing::SequenceNumber& id);
     void handleContent(framing::AMQFrame& frame, framing::SequenceNumber& id);
+    void enqueued(boost::intrusive_ptr<Message> msg);
 
     friend class SessionManager;
 };

Added: incubator/qpid/trunk/qpid/cpp/src/tests/IncompleteMessageList.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/IncompleteMessageList.cpp?rev=641929&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/IncompleteMessageList.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/IncompleteMessageList.cpp Thu Mar 
27 11:04:42 2008
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <sstream>
+#include "qpid/broker/Message.h"
+#include "qpid/broker/NullMessageStore.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/IncompleteMessageList.h"
+
+#include "unit_test.h"
+
+QPID_AUTO_TEST_SUITE(IncompleteMessageListTestSuite)
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+struct Checker
+{
+    std::list<SequenceNumber> ids;
+
+    Checker() { }
+
+    Checker(uint start, uint end) {
+        for (uint i = start; i <= end; i++) {
+            ids.push_back(i);
+        }        
+    }
+
+    Checker& expect(const SequenceNumber& id) {
+        ids.push_back(id);
+        return *this;
+    }
+
+    void operator()(boost::intrusive_ptr<Message> msg) { 
+        BOOST_CHECK(!ids.empty());
+        BOOST_CHECK_EQUAL(msg->getCommandId(), ids.front());
+        ids.pop_front();
+    } 
+};
+
+BOOST_AUTO_TEST_CASE(testProcessSimple)
+{
+    IncompleteMessageList list;
+    SequenceNumber counter(1);
+    //fill up list with messages
+    for (int i = 0; i < 5; i++) {
+        boost::intrusive_ptr<Message> msg(new Message(counter++));
+        list.add(msg);
+    }
+    //process and ensure they are all passed to completion listener
+    list.process(Checker(1, 5), false);
+    //process again and ensure none are resent to listener
+    list.process(Checker(), false);
+}
+
+BOOST_AUTO_TEST_CASE(testProcessWithIncomplete)
+{
+    IncompleteMessageList list;
+    SequenceNumber counter(1);
+    boost::intrusive_ptr<Message> middle;
+    //fill up list with messages
+    for (int i = 0; i < 5; i++) {
+        boost::intrusive_ptr<Message> msg(new Message(counter++));
+        list.add(msg);
+        if (i == 2) {
+            //mark a message in the middle as incomplete
+            msg->enqueueAsync();
+            middle = msg;
+        }
+    }
+    //process and ensure only message upto incomplete message are passed to 
listener
+    list.process(Checker(1, 2), false);
+    //mark message complete and re-process to get remaining messages sent to 
listener
+    middle->enqueueComplete();
+    list.process(Checker(3, 5), false);    
+}
+
+
+struct MockStore : public NullMessageStore
+{
+    Queue::shared_ptr queue;
+    boost::intrusive_ptr<Message> msg;
+
+    void flush(const qpid::broker::PersistableQueue& q) {
+        BOOST_CHECK_EQUAL(queue.get(), &q);
+        msg->enqueueComplete();
+    }
+};
+
+BOOST_AUTO_TEST_CASE(testSyncProcessWithIncomplete)
+{
+    IncompleteMessageList list;
+    SequenceNumber counter(1);
+    MockStore store;
+    store.queue = Queue::shared_ptr(new Queue("mock-queue"));
+    //fill up list with messages
+    for (int i = 0; i < 5; i++) {
+        boost::intrusive_ptr<Message> msg(new Message(counter++));
+        list.add(msg);
+        if (i == 2) {
+            //mark a message in the middle as incomplete
+            msg->enqueueAsync(store.queue, &store);
+            store.msg = msg;
+        }
+    }
+    //process with sync bit specified and ensure that all messages are passed 
to listener
+    list.process(Checker(1, 5), true);
+}
+
+QPID_AUTO_TEST_SUITE_END()

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/IncompleteMessageList.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/IncompleteMessageList.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=641929&r1=641928&r2=641929&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Thu Mar 27 11:04:42 2008
@@ -40,7 +40,8 @@
        ClientSessionTest.cpp \
        SequenceSet.cpp \
        serialize.cpp \
-       ProxyTemplate.cpp apply.cpp BoundedIterator.cpp
+       ProxyTemplate.cpp apply.cpp BoundedIterator.cpp \
+       IncompleteMessageList.cpp
 
 check_LTLIBRARIES += libshlibtest.la
 libshlibtest_la_LDFLAGS = -module -rpath $(abs_builddir)

Modified: incubator/qpid/trunk/qpid/cpp/xml/extra.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/extra.xml?rev=641929&r1=641928&r2=641929&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/extra.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/extra.xml Thu Mar 27 11:04:42 2008
@@ -656,6 +656,7 @@
     <method name = "transfer" index="1">
         <doc>blah, blah</doc>
         <chassis name="server" implement="MUST" />
+        <chassis name="client" implement="MUST" />
         <field name="destination" domain="shortstr"/>
         <field name="accept-mode" domain="octet"/>
         <field name="acquire-mode" domain="octet"/>
@@ -663,11 +664,13 @@
     <method name = "accept" index="2">
         <doc>blah, blah</doc>
         <chassis name="server" implement="MUST" />
+        <chassis name="client" implement="MUST" />
         <field name="commands" domain="sequence-set"/>
     </method>
     <method name = "reject" index="3">
         <doc>blah, blah</doc>
         <chassis name="server" implement="MUST" />
+        <chassis name="client" implement="MUST" />
         <field name="commands" domain="sequence-set"/>
         <field name="code" domain="short"/>
         <field name="text" domain="shortstr"/>


Reply via email to