Author: cctrieloff
Date: Mon Oct 13 11:07:07 2008
New Revision: 704192
URL: http://svn.apache.org/viewvc?rev=704192&view=rev
Log:
QPID-1351
-Support for sequencing messages through an exchange
-Related changes
- Bug fix for ptr saftey in Headers & FanOut exchange
- Added support for int64 and uint64 in fieldvalue / fieldtable
- Added tests for fieldtable
- Added tests for sequencing message feature.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.h
incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/FieldTable.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Mon Oct 13
11:07:07 2008
@@ -70,6 +70,7 @@
}
void DirectExchange::route(Deliverable& msg, const string& routingKey, const
FieldTable* /*args*/){
+ preRoute(msg);
Queues::ConstPtr p;
{
Mutex::ScopedLock l(lock);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Mon Oct 13
11:07:07 2008
@@ -22,8 +22,11 @@
#include "Exchange.h"
#include "ExchangeRegistry.h"
#include "qpid/agent/ManagementAgent.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/MessageProperties.h"
using namespace qpid::broker;
+using namespace qpid::framing;
using qpid::framing::Buffer;
using qpid::framing::FieldTable;
using qpid::management::ManagementAgent;
@@ -32,8 +35,15 @@
using qpid::management::Args;
namespace _qmf = qmf::org::apache::qpid::broker;
+namespace
+{
+const std::string qpidMsgSequence("qpid.msg_sequence");
+}
+
+
Exchange::Exchange (const string& _name, Manageable* parent) :
- name(_name), durable(false), persistenceId(0), mgmtExchange(0)
+ name(_name), durable(false), persistenceId(0), sequence(false),
+ sequenceNo(0), mgmtExchange(0)
{
if (parent != 0)
{
@@ -48,7 +58,8 @@
Exchange::Exchange(const string& _name, bool _durable, const
qpid::framing::FieldTable& _args,
Manageable* parent)
- : name(_name), durable(_durable), args(_args), alternateUsers(0),
persistenceId(0), mgmtExchange(0)
+ : name(_name), durable(_durable), args(_args), alternateUsers(0),
persistenceId(0),
+ sequence(false), sequenceNo(0), mgmtExchange(0)
{
if (parent != 0)
{
@@ -66,6 +77,10 @@
}
}
}
+
+ sequence = _args.get(qpidMsgSequence);
+ if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg
sequencing");
+
}
Exchange::~Exchange ()
@@ -74,6 +89,13 @@
mgmtExchange->resourceDestroy ();
}
+void Exchange::preRoute(Deliverable& msg){
+ if (sequence){
+ sys::Mutex::ScopedLock lock(sequenceLock);
+
msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,++sequenceNo);
+ }
+}
+
void Exchange::setPersistenceId(uint64_t id) const
{
if (mgmtExchange != 0 && persistenceId == 0)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Mon Oct 13
11:07:07 2008
@@ -28,6 +28,7 @@
#include "MessageStore.h"
#include "PersistableExchange.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/Mutex.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Exchange.h"
#include "qmf/org/apache/qpid/broker/Binding.h"
@@ -45,8 +46,14 @@
boost::shared_ptr<Exchange> alternate;
uint32_t alternateUsers;
mutable uint64_t persistenceId;
+ bool sequence;
+ mutable qpid::sys::Mutex sequenceLock;
+ uint64_t sequenceNo;
protected:
+
+ void preRoute(Deliverable& msg);
+
struct Binding : public management::Manageable {
typedef boost::shared_ptr<Binding> shared_ptr;
typedef std::vector<Binding::shared_ptr> vector;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Mon Oct 13
11:07:07 2008
@@ -69,15 +69,18 @@
}
void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/,
const FieldTable* /*args*/){
+ preRoute(msg);
uint32_t count(0);
BindingsArray::ConstPtr p = bindings.snapshot();
- for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i !=
p->end(); ++i, count++){
- msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched ();
+ if (p.get()){
+ for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i
!= p->end(); ++i, count++){
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
+ }
}
-
+
if (mgmtExchange != 0)
{
mgmtExchange->inc_msgReceives ();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Mon Oct
13 11:07:07 2008
@@ -105,14 +105,17 @@
void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/,
const FieldTable* args){
if (!args) return;//can't match if there were no headers passed in
+ preRoute(msg);
uint32_t count(0);
Bindings::ConstPtr p = bindings.snapshot();
- for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i !=
p->end(); ++i, count++) {
- if (match((*i)->args, *args)) msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched ();
+ if (p.get()){
+ for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin();
i != p->end(); ++i, count++) {
+ if (match((*i)->args, *args)) msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
+ }
}
if (mgmtExchange != 0)
@@ -136,9 +139,11 @@
bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const,
const FieldTable* const args)
{
Bindings::ConstPtr p = bindings.snapshot();
- for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i !=
p->end(); ++i) {
- if ( (!args || equal((*i)->args, *args)) && (!queue || (*i)->queue ==
queue)) {
- return true;
+ if (p.get()){
+ for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin();
i != p->end(); ++i) {
+ if ( (!args || equal((*i)->args, *args)) && (!queue || (*i)->queue
== queue)) {
+ return true;
+ }
}
}
return false;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Mon Oct 13
11:07:07 2008
@@ -180,6 +180,7 @@
void TopicExchange::route(Deliverable& msg, const string& routingKey, const
FieldTable* /*args*/){
RWlock::ScopedRlock l(lock);
+ preRoute(msg);
uint32_t count(0);
Tokens tokens(routingKey);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.cpp?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.cpp Mon Oct 13
11:07:07 2008
@@ -74,10 +74,18 @@
values[name] = ValuePtr(new IntegerValue(value));
}
+void FieldTable::setInt64(const std::string& name, int64_t value){
+ values[name] = ValuePtr(new Integer64Value(value));
+}
+
void FieldTable::setTimestamp(const std::string& name, uint64_t value){
values[name] = ValuePtr(new TimeValue(value));
}
+void FieldTable::setUInt64(const std::string& name, uint64_t value){
+ values[name] = ValuePtr(new Unsigned64Value(value));
+}
+
void FieldTable::setTable(const std::string& name, const FieldTable& value)
{
values[name] = ValuePtr(new FieldTableValue(value));
@@ -131,6 +139,14 @@
// return getValue<uint64_t>(name);
//}
+uint64_t FieldTable::getAsUInt64(const std::string& name) const {
+ return static_cast<uint64_t>( getValue<int64_t>(get(name)));
+}
+
+int64_t FieldTable::getAsInt64(const std::string& name) const {
+ return getValue<int64_t>(get(name));
+}
+
bool FieldTable::getTable(const std::string& name, FieldTable& value) const {
return getEncodedValue<FieldTable>(get(name), value);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.h?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.h Mon Oct 13
11:07:07 2008
@@ -63,7 +63,9 @@
void setString(const std::string& name, const std::string& value);
void setInt(const std::string& name, int value);
+ void setInt64(const std::string& name, int64_t value);
void setTimestamp(const std::string& name, uint64_t value);
+ void setUInt64(const std::string& name, uint64_t value);
void setTable(const std::string& name, const FieldTable& value);
void setArray(const std::string& name, const Array& value);
void setFloat(const std::string& name, float value);
@@ -73,11 +75,13 @@
std::string getString(const std::string& name) const;
int getInt(const std::string& name) const;
// uint64_t getTimestamp(const std::string& name) const;
+ uint64_t getAsUInt64(const std::string& name) const;
+ int64_t getAsInt64(const std::string& name) const;
bool getTable(const std::string& name, FieldTable& value) const;
bool getArray(const std::string& name, Array& value) const;
bool getFloat(const std::string& name, float& value) const;
bool getDouble(const std::string& name, double& value) const;
-// //void getDecimal(string& name, xxx& value);
+// void getDecimal(string& name, xxx& value);
void erase(const std::string& name);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp Mon Oct 13
11:07:07 2008
@@ -135,8 +135,7 @@
IntegerValue::IntegerValue(int v) :
FieldValue(0x21, new FixedWidthValue<4>(v))
-{
-}
+{}
FloatValue::FloatValue(float v) :
FieldValue(0x23, new FixedWidthValue<4>(reinterpret_cast<uint8_t*>(&v)))
@@ -146,8 +145,17 @@
FieldValue(0x33, new FixedWidthValue<8>(reinterpret_cast<uint8_t*>(&v)))
{}
-TimeValue::TimeValue(uint64_t v) :
+Integer64Value::Integer64Value(int64_t v) :
+ FieldValue(0x31, new FixedWidthValue<8>(v))
+{}
+
+Unsigned64Value::Unsigned64Value(uint64_t v) :
FieldValue(0x32, new FixedWidthValue<8>(v))
+{}
+
+
+TimeValue::TimeValue(uint64_t v) :
+ FieldValue(0x38, new FixedWidthValue<8>(v))
{
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.h?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.h Mon Oct 13
11:07:07 2008
@@ -106,12 +106,18 @@
inline bool FieldValue::convertsTo<int>() const { return
data->convertsToInt(); }
template <>
+inline bool FieldValue::convertsTo<int64_t>() const { return
data->convertsToInt(); }
+
+template <>
inline bool FieldValue::convertsTo<std::string>() const { return
data->convertsToString(); }
template <>
inline int FieldValue::get<int>() const { return data->getInt(); }
template <>
+inline int64_t FieldValue::get<int64_t>() const { return data->getInt(); }
+
+template <>
inline std::string FieldValue::get<std::string>() const { return
data->getString(); }
inline std::ostream& operator<<(std::ostream& out, const FieldValue& v) {
@@ -278,6 +284,16 @@
TimeValue(uint64_t v);
};
+class Integer64Value : public FieldValue {
+ public:
+ Integer64Value(int64_t v);
+};
+
+class Unsigned64Value : public FieldValue {
+ public:
+ Unsigned64Value(uint64_t v);
+};
+
class FieldTableValue : public FieldValue {
public:
FieldTableValue(const FieldTable&);
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp Mon Oct 13
11:07:07 2008
@@ -165,4 +165,62 @@
BOOST_CHECK_EQUAL(string("direct"), response.first->getType());
}
+intrusive_ptr<Message> cmessage(std::string exchange, std::string routingKey) {
+ intrusive_ptr<Message> msg(new Message());
+ AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), exchange,
0, 0));
+ AMQFrame header(in_place<AMQHeaderBody>());
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+ return msg;
+}
+
+QPID_AUTO_TEST_CASE(testSequenceOptions)
+{
+ FieldTable args;
+ args.setInt("qpid.msg_sequence",1);
+
+ DirectExchange direct("direct1", false, args);
+
+ intrusive_ptr<Message> msg1 = cmessage("e", "A");
+ intrusive_ptr<Message> msg2 = cmessage("e", "B");
+ intrusive_ptr<Message> msg3 = cmessage("e", "C");
+
+ DeliverableMessage dmsg1(msg1);
+ DeliverableMessage dmsg2(msg2);
+ DeliverableMessage dmsg3(msg3);
+
+ direct.route(dmsg1, "abc", 0);
+ direct.route(dmsg2, "abc", 0);
+ direct.route(dmsg3, "abc", 0);
+
+ BOOST_CHECK_EQUAL(1,
msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+ BOOST_CHECK_EQUAL(2,
msg2->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+ BOOST_CHECK_EQUAL(3,
msg3->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+
+ FanOutExchange fanout("fanout1", false, args);
+ HeadersExchange header("headers1", false, args);
+ TopicExchange topic ("topic1", false, args);
+
+ // check other exchanges, that they preroute
+ intrusive_ptr<Message> msg4 = cmessage("e", "A");
+ intrusive_ptr<Message> msg5 = cmessage("e", "B");
+ intrusive_ptr<Message> msg6 = cmessage("e", "C");
+
+ DeliverableMessage dmsg4(msg4);
+ DeliverableMessage dmsg5(msg5);
+ DeliverableMessage dmsg6(msg6);
+
+ fanout.route(dmsg4, "abc", 0);
+ BOOST_CHECK_EQUAL(1,
msg4->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+
+ FieldTable headers;
+ header.route(dmsg5, "abc", &headers);
+ BOOST_CHECK_EQUAL(1,
msg5->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+
+ topic.route(dmsg6, "abc", 0);
+ BOOST_CHECK_EQUAL(1,
msg6->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+
+}
+
QPID_AUTO_TEST_SUITE_END()
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/FieldTable.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/FieldTable.cpp?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/FieldTable.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/FieldTable.cpp Mon Oct 13 11:07:07
2008
@@ -157,4 +157,17 @@
}
}
+QPID_AUTO_TEST_CASE(test64GetAndSetConverts)
+{
+ FieldTable args;
+ args.setInt64("a",100);
+
+ args.setUInt64("b",1u);
+ BOOST_CHECK_EQUAL(1u, args.getAsUInt64("b"));
+ BOOST_CHECK_EQUAL(100u, args.getAsUInt64("a"));
+ BOOST_CHECK_EQUAL(1, args.getAsInt64("b"));
+ BOOST_CHECK_EQUAL(100, args.getAsInt64("a"));
+
+}
+
QPID_AUTO_TEST_SUITE_END()