Author: gsim
Date: Mon Oct 1 03:24:25 2007
New Revision: 580915
URL: http://svn.apache.org/viewvc?rev=580915&view=rev
Log:
Make ExecutionHandler threadsafe for calls that can be made by application
threads.
Added generic listener for completion changes.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h?rev=580915&r1=580914&r2=580915&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/CompletionTracker.h Mon Oct
1 03:24:25 2007
@@ -34,8 +34,7 @@
class CompletionTracker
{
public:
- //typedef boost::function<void()> CompletionListener;
- typedef boost::function0<void> CompletionListener;
+ typedef boost::function<void()> CompletionListener;
typedef boost::function<void(const std::string&)> ResultListener;
CompletionTracker();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h?rev=580915&r1=580914&r2=580915&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h Mon Oct 1
03:24:25 2007
@@ -37,6 +37,7 @@
virtual Demux& getDemux() = 0;
virtual bool isComplete(const framing::SequenceNumber& id) = 0;
virtual bool isCompleteUpTo(const framing::SequenceNumber& id) = 0;
+ virtual void setCompletionListener(boost::function<void()>) = 0;
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=580915&r1=580914&r2=580915&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Mon Oct
1 03:24:25 2007
@@ -29,6 +29,7 @@
using namespace qpid::client;
using namespace qpid::framing;
using namespace boost;
+using qpid::sys::Mutex;
bool isMessageMethod(AMQMethodBody* method)
{
@@ -81,9 +82,12 @@
throw ConnectionException(530, "Received odd number of elements in
ranged mark");
} else {
SequenceNumber mark(cumulative);
- outgoingCompletionStatus.update(mark, range);
+ {
+ Mutex::ScopedLock l(lock);
+ outgoingCompletionStatus.update(mark, range);
+ }
+ if (completionListener) completionListener();
completion.completed(outgoingCompletionStatus.mark);
-
//TODO: signal listeners of early notification?
}
}
@@ -111,6 +115,7 @@
void ExecutionHandler::flushTo(const framing::SequenceNumber& point)
{
+ Mutex::ScopedLock l(lock);
if (point > outgoingCompletionStatus.mark) {
sendFlushRequest();
}
@@ -118,12 +123,14 @@
void ExecutionHandler::sendFlushRequest()
{
+ Mutex::ScopedLock l(lock);
AMQFrame frame(0, ExecutionFlushBody());
out(frame);
}
void ExecutionHandler::syncTo(const framing::SequenceNumber& point)
{
+ Mutex::ScopedLock l(lock);
if (point > outgoingCompletionStatus.mark) {
sendSyncRequest();
}
@@ -132,17 +139,21 @@
void ExecutionHandler::sendSyncRequest()
{
+ Mutex::ScopedLock l(lock);
AMQFrame frame(0, ExecutionSyncBody());
out(frame);
}
void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative,
bool send)
{
- if (id > incomingCompletionStatus.mark) {
- if (cumulative) {
- incomingCompletionStatus.update(incomingCompletionStatus.mark, id);
- } else {
- incomingCompletionStatus.update(id, id);
+ {
+ Mutex::ScopedLock l(lock);
+ if (id > incomingCompletionStatus.mark) {
+ if (cumulative) {
+ incomingCompletionStatus.update(incomingCompletionStatus.mark,
id);
+ } else {
+ incomingCompletionStatus.update(id, id);
+ }
}
}
if (send) {
@@ -153,15 +164,17 @@
void ExecutionHandler::sendCompletion()
{
+ Mutex::ScopedLock l(lock);
SequenceNumberSet range;
incomingCompletionStatus.collectRanges(range);
AMQFrame frame(0, ExecutionCompleteBody(version,
incomingCompletionStatus.mark.getValue(), range));
out(frame);
}
-SequenceNumber ExecutionHandler::send(const AMQBody& command,
CompletionTracker::ResultListener l)
+SequenceNumber ExecutionHandler::send(const AMQBody& command,
CompletionTracker::ResultListener listener)
{
- return send(command, l, false);
+ Mutex::ScopedLock l(lock);
+ return send(command, listener, false);
}
SequenceNumber ExecutionHandler::send(const AMQBody& command,
CompletionTracker::ResultListener l, bool hasContent)
@@ -179,9 +192,10 @@
}
SequenceNumber ExecutionHandler::send(const AMQBody& command, const
MethodContent& content,
- CompletionTracker::ResultListener l)
+ CompletionTracker::ResultListener
listener)
{
- SequenceNumber id = send(command, l, true);
+ Mutex::ScopedLock l(lock);
+ SequenceNumber id = send(command, listener, true);
sendContent(content);
return id;
}
@@ -227,10 +241,17 @@
bool ExecutionHandler::isComplete(const SequenceNumber& id)
{
+ Mutex::ScopedLock l(lock);
return outgoingCompletionStatus.covers(id);
}
bool ExecutionHandler::isCompleteUpTo(const SequenceNumber& id)
{
+ Mutex::ScopedLock l(lock);
return outgoingCompletionStatus.mark >= id;
+}
+
+void ExecutionHandler::setCompletionListener(boost::function<void()> l)
+{
+ completionListener = l;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?rev=580915&r1=580914&r2=580915&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Mon Oct 1
03:24:25 2007
@@ -27,6 +27,7 @@
#include "qpid/framing/FrameSet.h"
#include "qpid/framing/MethodContent.h"
#include "qpid/framing/SequenceNumber.h"
+#include "qpid/sys/Mutex.h"
#include "ChainableFrameHandler.h"
#include "CompletionTracker.h"
#include "Correlator.h"
@@ -49,8 +50,10 @@
Correlator correlation;
CompletionTracker completion;
Demux demux;
+ sys::Mutex lock;
framing::ProtocolVersion version;
uint64_t maxFrameSize;
+ boost::function<void()> completionListener;
void complete(uint32_t mark, const framing::SequenceNumberSet& range);
void flush();
@@ -90,6 +93,8 @@
Correlator& getCorrelator() { return correlation; }
CompletionTracker& getCompletionTracker() { return completion; }
Demux& getDemux() { return demux; }
+
+ void setCompletionListener(boost::function<void()>);
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h?rev=580915&r1=580914&r2=580915&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h Mon Oct 1
03:24:25 2007
@@ -56,7 +56,7 @@
SessionHandler l2;
ExecutionHandler l3;
framing::Uuid uuid;
- bool sync;
+ volatile bool sync;
Reason reason;
protected: