Author: aconway
Date: Fri Sep 5 17:34:58 2008
New Revision: 692595
URL: http://svn.apache.org/viewvc?rev=692595&view=rev
Log:
Queue cpg deliveries for execution in separate thread.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
incubator/qpid/trunk/qpid/cpp/src/tests/benchmark
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=692595&r1=692594&r2=692595&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Sep 5
17:34:58 2008
@@ -65,12 +65,14 @@
boost::bind(&Cluster::dispatch, this, _1), // read
0, // write
boost::bind(&Cluster::disconnect, this, _1) // disconnect
- )
+ ),
+ deliverQueue(EventQueue::forEach(boost::bind(&Cluster::deliverEvent, this,
_1)))
{
broker->addFinalizer(boost::bind(&Cluster::leave, this));
QPID_LOG(notice, "Joining cluster: " << name.str() << " as " << self);
cpg.join(name);
- // Start dispatching from the poller.
+
+ deliverQueue.start(poller);
cpgDispatchHandle.startWatch(poller);
}
@@ -173,27 +175,7 @@
{
try {
MemberId from(nodeid, pid);
- Buffer buf(static_cast<char*>(msg), msg_len);
- Connection* connection;
- uint8_t type = buf.getOctet();
- decodePtr(buf, connection);
- if (connection == 0) { // Cluster controls
- AMQFrame frame;
- while (frame.decode(buf))
- if (!ClusterOperations(*this, from).invoke(frame))
- throw Exception("Invalid cluster control");
- }
- else { // Connection data or control
- boost::intrusive_ptr<Connection> c =
- getConnection(ConnectionId(from, connection));
- if (type == DATA)
- c->deliverBuffer(buf);
- else {
- AMQFrame frame;
- while (frame.decode(buf))
- c->deliver(frame);
- }
- }
+ deliverQueue.push(Event::delivered(from, msg, msg_len));
}
catch (const std::exception& e) {
// FIXME aconway 2008-01-30: exception handling.
@@ -203,6 +185,26 @@
}
}
+void Cluster::deliverEvent(const Event& e) {
+ Buffer buf(e);
+ if (e.getConnection().getConnectionPtr() == 0) { // Cluster control
+ AMQFrame frame;
+ while (frame.decode(buf))
+ if (!ClusterOperations(*this,
e.getConnection().getMember()).invoke(frame))
+ throw Exception("Invalid cluster control");
+ }
+ else { // Connection data or control
+ boost::intrusive_ptr<Connection> c = getConnection(e.getConnection());
+ if (e.getType() == DATA)
+ c->deliverBuffer(buf);
+ else { // control
+ AMQFrame frame;
+ while (frame.decode(buf))
+ c->deliver(frame);
+ }
+ }
+}
+
struct AddrList {
const cpg_address* addrs;
int count;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=692595&r1=692594&r2=692595&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Sep 5
17:34:58 2008
@@ -20,6 +20,7 @@
*/
#include "qpid/cluster/Cpg.h"
+#include "qpid/cluster/Event.h"
#include "qpid/cluster/PollableQueue.h"
#include "qpid/cluster/NoOpConnectionOutputHandler.h"
@@ -85,7 +86,7 @@
/** Message sent over the cluster. */
typedef std::pair<framing::AMQFrame, ConnectionId> Message;
- typedef PollableQueue<Message> MessageQueue;
+ typedef PollableQueue<Event> EventQueue;
boost::function<void()> shutdownNext;
@@ -93,6 +94,8 @@
void deliverFrame(framing::AMQFrame&, const ConnectionId&);
void deliverBuffer(const char*, size_t, const ConnectionId&);
+
+ void deliverEvent(const Event&);
/** CPG deliver callback. */
void deliver(
@@ -132,7 +135,8 @@
ConnectionMap connections;
NoOpConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
-
+ PollableQueue<Event> deliverQueue;
+
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&);
friend std::ostream& operator <<(std::ostream&, const UrlMap&);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=692595&r1=692594&r2=692595&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Fri Sep 5
17:34:58 2008
@@ -25,6 +25,7 @@
namespace qpid {
namespace cluster {
+
using framing::Buffer;
const size_t Event::OVERHEAD = 1 /*type*/ + 8 /*64-bit pointr*/;
@@ -32,10 +33,14 @@
Event::Event(EventType t, const ConnectionId c, const size_t s)
: type(t), connection(c), size(s), data(RefCountedBuffer::create(s)) {}
-Event::Event(const MemberId& m, const char* d, size_t s)
- : connection(m, 0), size(s-OVERHEAD), data(RefCountedBuffer::create(size))
-{
- memcpy(data->get(), d, s);
+Event Event::delivered(const MemberId& m, void* d, size_t s) {
+ Buffer buf(static_cast<char*>(d), s);
+ EventType type((EventType)buf.getOctet());
+ ConnectionId connection(m,
reinterpret_cast<Connection*>(buf.getLongLong()));
+ assert(buf.getPosition() == OVERHEAD);
+ Event e(type, connection, s-OVERHEAD);
+ memcpy(e.getData(), static_cast<char*>(d)+OVERHEAD, s-OVERHEAD);
+ return e;
}
void Event::mcast(const Cpg::Name& name, Cpg& cpg) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=692595&r1=692594&r2=692595&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Fri Sep 5 17:34:58
2008
@@ -25,6 +25,7 @@
#include "types.h"
#include "Cpg.h"
#include "qpid/RefCountedBuffer.h"
+#include "qpid/framing/Buffer.h"
namespace qpid {
namespace cluster {
@@ -39,11 +40,11 @@
*/
struct Event {
public:
- /** Create an event with for mcasting, with size bytes of space. */
- Event(EventType t, const ConnectionId c, size_t size);
+ /** Create an event to mcast with a buffer of size bytes. */
+ Event(EventType t=DATA, const ConnectionId c=ConnectionId(), size_t
size=0);
- /** Create an event from delivered data. */
- Event(const MemberId& m, const char* data, size_t size);
+ /** Create an event copied from delivered data. */
+ static Event delivered(const MemberId& m, void* data, size_t size);
void mcast(const Cpg::Name& name, Cpg& cpg);
@@ -51,6 +52,9 @@
ConnectionId getConnection() const { return connection; }
size_t getSize() const { return size; }
char* getData() { return data->get(); }
+ const char* getData() const { return data->get(); }
+
+ operator framing::Buffer() const { return
framing::Buffer(const_cast<char*>(getData()), getSize()); }
private:
static const size_t OVERHEAD;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h?rev=692595&r1=692594&r2=692595&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h Fri Sep 5
17:34:58 2008
@@ -27,6 +27,7 @@
#include "qpid/sys/Mutex.h"
#include <boost/function.hpp>
#include <boost/bind.hpp>
+#include <algorithm>
#include <deque>
namespace qpid {
@@ -53,6 +54,15 @@
/** Callback to process a range of items. */
typedef boost::function<void (const iterator&, const iterator&)> Callback;
+ /** Functor tempalate to create a Callback from a functor that handles a
single item. */
+ template <class F> struct ForEach {
+ F handleOne;
+ ForEach(const F& f) : handleOne(f) {}
+ void operator()(const iterator& i, const iterator& j) const {
std::for_each(i, j, handleOne); }
+ };
+ /** Function to create ForEach instances */
+ template <class F> static ForEach<F> forEach(const F& f) { return
ForEach<F>(f); }
+
/** When the queue is selected by the poller, values are passed to
callback cb. */
explicit PollableQueue(const Callback& cb);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h?rev=692595&r1=692594&r2=692595&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h Fri Sep 5 17:34:58
2008
@@ -34,7 +34,7 @@
class Connection;
-/** Types of cluster messages. */
+/** Types of cluster event. */
enum EventType { DATA, CONTROL };
/** first=node-id, second=pid */
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/benchmark
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/benchmark?rev=692595&r1=692594&r2=692595&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/benchmark (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/benchmark Fri Sep 5 17:34:58 2008
@@ -78,7 +78,6 @@
} | tee $FILE
}
-echo "benchmark $*" | tee benchmark.tab
HEADING="pub sub total Mb"
dosamples $SCRIPTDIR/perfdist --size $SIZE --count $COUNT --nsubs $NSUBS
--npubs $NPUBS -s -- ${CLIENTS[*]} --- ${BROKERS[*]}
HEADING="pub"
@@ -89,6 +88,8 @@
dosamples ssh -A ${CLIENTS[0]} $TESTDIR/echotest --count $ECHO -s -b
${BROKERS[0]}
echo
-echo "Tab separated spreadsheet (also stored in benchmark.tab):"
+echo "Tab separated spreadsheet (also saved as benchmark.tab):"
echo
+
+echo "benchmark -- ${CLIENTS[*]} --- ${BROKERS[*]} " | tee benchmark.tab
paste $FILES | tee -a benchmark.tab