Author: aconway
Date: Fri Sep  5 17:34:58 2008
New Revision: 692595

URL: http://svn.apache.org/viewvc?rev=692595&view=rev
Log:
Queue cpg deliveries for execution in separate thread.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
    incubator/qpid/trunk/qpid/cpp/src/tests/benchmark

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=692595&r1=692594&r2=692595&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Sep  5 
17:34:58 2008
@@ -65,12 +65,14 @@
                       boost::bind(&Cluster::dispatch, this, _1), // read
                       0,                                         // write
                       boost::bind(&Cluster::disconnect, this, _1) // disconnect
-    )
+    ),
+    deliverQueue(EventQueue::forEach(boost::bind(&Cluster::deliverEvent, this, 
_1)))
 {
     broker->addFinalizer(boost::bind(&Cluster::leave, this));
     QPID_LOG(notice, "Joining cluster: " << name.str() << " as " << self);
     cpg.join(name);
-    // Start dispatching from the poller.
+
+    deliverQueue.start(poller);
     cpgDispatchHandle.startWatch(poller);
 }
 
@@ -173,27 +175,7 @@
 {
     try {
         MemberId from(nodeid, pid);
-        Buffer buf(static_cast<char*>(msg), msg_len);
-        Connection* connection;
-        uint8_t type = buf.getOctet();
-        decodePtr(buf, connection);
-        if (connection == 0)  { // Cluster controls
-            AMQFrame frame;
-            while (frame.decode(buf)) 
-                if (!ClusterOperations(*this, from).invoke(frame))
-                    throw Exception("Invalid cluster control");
-        }
-        else {                  // Connection data or control
-            boost::intrusive_ptr<Connection> c =
-                getConnection(ConnectionId(from, connection));
-            if (type == DATA)
-                c->deliverBuffer(buf);
-            else {
-                AMQFrame frame;
-                while (frame.decode(buf))
-                    c->deliver(frame);
-            }
-        }
+        deliverQueue.push(Event::delivered(from, msg, msg_len));
     }
     catch (const std::exception& e) {
         // FIXME aconway 2008-01-30: exception handling.
@@ -203,6 +185,26 @@
     }
 }
 
+void Cluster::deliverEvent(const Event& e) {
+    Buffer buf(e);
+    if (e.getConnection().getConnectionPtr() == 0)  { // Cluster control
+        AMQFrame frame;
+        while (frame.decode(buf)) 
+            if (!ClusterOperations(*this, 
e.getConnection().getMember()).invoke(frame))
+                throw Exception("Invalid cluster control");
+    }
+    else {                  // Connection data or control
+        boost::intrusive_ptr<Connection> c = getConnection(e.getConnection());
+        if (e.getType() == DATA)
+            c->deliverBuffer(buf);
+        else {              // control
+            AMQFrame frame;
+            while (frame.decode(buf))
+                c->deliver(frame);
+        }
+    }
+}
+
 struct AddrList {
     const cpg_address* addrs;
     int count;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=692595&r1=692594&r2=692595&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Sep  5 
17:34:58 2008
@@ -20,6 +20,7 @@
  */
 
 #include "qpid/cluster/Cpg.h"
+#include "qpid/cluster/Event.h"
 #include "qpid/cluster/PollableQueue.h"
 #include "qpid/cluster/NoOpConnectionOutputHandler.h"
 
@@ -85,7 +86,7 @@
 
     /** Message sent over the cluster. */
     typedef std::pair<framing::AMQFrame, ConnectionId> Message;
-    typedef PollableQueue<Message> MessageQueue;
+    typedef PollableQueue<Event> EventQueue;
 
     boost::function<void()> shutdownNext;
 
@@ -93,6 +94,8 @@
     void deliverFrame(framing::AMQFrame&, const ConnectionId&);
 
     void deliverBuffer(const char*, size_t, const ConnectionId&);
+
+    void deliverEvent(const Event&);
     
     /** CPG deliver callback. */
     void deliver(
@@ -132,7 +135,8 @@
     ConnectionMap connections;
     NoOpConnectionOutputHandler shadowOut;
     sys::DispatchHandle cpgDispatchHandle;
-
+    PollableQueue<Event> deliverQueue;
+    
   friend std::ostream& operator <<(std::ostream&, const Cluster&);
   friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&);
   friend std::ostream& operator <<(std::ostream&, const UrlMap&);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=692595&r1=692594&r2=692595&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Fri Sep  5 
17:34:58 2008
@@ -25,6 +25,7 @@
 
 namespace qpid {
 namespace cluster {
+
 using framing::Buffer;
 
 const size_t Event::OVERHEAD = 1 /*type*/ + 8 /*64-bit pointr*/;
@@ -32,10 +33,14 @@
 Event::Event(EventType t, const ConnectionId c, const size_t s)
     : type(t), connection(c), size(s), data(RefCountedBuffer::create(s)) {}
 
-Event::Event(const MemberId& m, const char* d, size_t s)
-    : connection(m, 0), size(s-OVERHEAD), data(RefCountedBuffer::create(size))
-{
-    memcpy(data->get(), d, s);
+Event Event::delivered(const MemberId& m, void* d, size_t s) {
+    Buffer buf(static_cast<char*>(d), s);
+    EventType type((EventType)buf.getOctet()); 
+    ConnectionId connection(m, 
reinterpret_cast<Connection*>(buf.getLongLong()));
+    assert(buf.getPosition() == OVERHEAD);
+    Event e(type, connection, s-OVERHEAD);
+    memcpy(e.getData(), static_cast<char*>(d)+OVERHEAD, s-OVERHEAD);
+    return e;
 }
     
 void Event::mcast(const Cpg::Name& name, Cpg& cpg) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=692595&r1=692594&r2=692595&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Fri Sep  5 17:34:58 
2008
@@ -25,6 +25,7 @@
 #include "types.h"
 #include "Cpg.h"
 #include "qpid/RefCountedBuffer.h"
+#include "qpid/framing/Buffer.h"
 
 namespace qpid {
 namespace cluster {
@@ -39,11 +40,11 @@
  */
 struct Event {
   public:
-    /** Create an event with for mcasting, with size bytes of space. */
-    Event(EventType t, const ConnectionId c, size_t size);
+    /** Create an event to mcast with a buffer of size bytes. */
+    Event(EventType t=DATA, const ConnectionId c=ConnectionId(), size_t 
size=0);
 
-    /** Create an event from delivered data. */
-    Event(const MemberId& m, const char* data, size_t size);
+    /** Create an event copied from delivered data. */
+    static Event delivered(const MemberId& m, void* data, size_t size);
     
     void mcast(const Cpg::Name& name, Cpg& cpg);
     
@@ -51,6 +52,9 @@
     ConnectionId getConnection() const { return connection; }
     size_t getSize() const { return size; }
     char* getData() { return data->get(); }
+    const char* getData() const { return data->get(); }
+
+    operator framing::Buffer() const { return 
framing::Buffer(const_cast<char*>(getData()), getSize()); }
 
   private:
     static const size_t OVERHEAD;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h?rev=692595&r1=692594&r2=692595&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h Fri Sep  5 
17:34:58 2008
@@ -27,6 +27,7 @@
 #include "qpid/sys/Mutex.h"
 #include <boost/function.hpp>
 #include <boost/bind.hpp>
+#include <algorithm>
 #include <deque>
 
 namespace qpid {
@@ -53,6 +54,15 @@
     /** Callback to process a range of items. */
     typedef boost::function<void (const iterator&, const iterator&)> Callback;
 
+    /** Functor tempalate to create a Callback from a functor that handles a 
single item. */
+    template <class F> struct ForEach {
+        F handleOne;
+        ForEach(const F& f) : handleOne(f) {}
+        void operator()(const iterator& i, const iterator& j) const { 
std::for_each(i, j, handleOne); }
+    };
+    /** Function to create ForEach instances */
+    template <class F> static ForEach<F> forEach(const F& f) { return 
ForEach<F>(f); }
+    
     /** When the queue is selected by the poller, values are passed to 
callback cb. */
     explicit PollableQueue(const Callback& cb);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h?rev=692595&r1=692594&r2=692595&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h Fri Sep  5 17:34:58 
2008
@@ -34,7 +34,7 @@
 
 class Connection;
 
-/** Types of cluster messages. */
+/** Types of cluster event. */
 enum EventType { DATA, CONTROL };
 
 /** first=node-id, second=pid */

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/benchmark
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/benchmark?rev=692595&r1=692594&r2=692595&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/benchmark (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/benchmark Fri Sep  5 17:34:58 2008
@@ -78,7 +78,6 @@
     } | tee $FILE
 }
 
-echo "benchmark $*" | tee benchmark.tab
 HEADING="pub   sub     total   Mb"
 dosamples $SCRIPTDIR/perfdist --size $SIZE --count $COUNT --nsubs $NSUBS 
--npubs $NPUBS -s -- ${CLIENTS[*]} --- ${BROKERS[*]}
 HEADING="pub"
@@ -89,6 +88,8 @@
 dosamples ssh -A ${CLIENTS[0]} $TESTDIR/echotest --count $ECHO -s -b 
${BROKERS[0]} 
 
 echo
-echo "Tab separated spreadsheet (also stored in benchmark.tab):"
+echo "Tab separated spreadsheet (also saved as benchmark.tab):"
 echo
+
+echo "benchmark -- ${CLIENTS[*]} --- ${BROKERS[*]} " | tee benchmark.tab
 paste $FILES | tee -a benchmark.tab


Reply via email to