Author: cctrieloff
Date: Mon Oct 13 11:07:07 2008
New Revision: 704192

URL: http://svn.apache.org/viewvc?rev=704192&view=rev
Log:
QPID-1351

-Support for sequencing messages through an exchange
-Related changes
    - Bug fix for ptr saftey in Headers & FanOut exchange
    - Added support for int64 and uint64 in fieldvalue / fieldtable
    - Added tests for fieldtable
    - Added tests for sequencing message feature.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/FieldTable.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Mon Oct 13 
11:07:07 2008
@@ -70,6 +70,7 @@
 }
 
 void DirectExchange::route(Deliverable& msg, const string& routingKey, const 
FieldTable* /*args*/){
+    preRoute(msg);
     Queues::ConstPtr p;
     {
         Mutex::ScopedLock l(lock);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Mon Oct 13 
11:07:07 2008
@@ -22,8 +22,11 @@
 #include "Exchange.h"
 #include "ExchangeRegistry.h"
 #include "qpid/agent/ManagementAgent.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/MessageProperties.h"
 
 using namespace qpid::broker;
+using namespace qpid::framing;
 using qpid::framing::Buffer;
 using qpid::framing::FieldTable;
 using qpid::management::ManagementAgent;
@@ -32,8 +35,15 @@
 using qpid::management::Args;
 namespace _qmf = qmf::org::apache::qpid::broker;
 
+namespace 
+{
+const std::string qpidMsgSequence("qpid.msg_sequence");
+}
+
+
 Exchange::Exchange (const string& _name, Manageable* parent) :
-    name(_name), durable(false), persistenceId(0), mgmtExchange(0)
+    name(_name), durable(false), persistenceId(0), sequence(false), 
+       sequenceNo(0), mgmtExchange(0)
 {
     if (parent != 0)
     {
@@ -48,7 +58,8 @@
 
 Exchange::Exchange(const string& _name, bool _durable, const 
qpid::framing::FieldTable& _args,
                    Manageable* parent)
-    : name(_name), durable(_durable), args(_args), alternateUsers(0), 
persistenceId(0), mgmtExchange(0)
+    : name(_name), durable(_durable), args(_args), alternateUsers(0), 
persistenceId(0), 
+       sequence(false), sequenceNo(0), mgmtExchange(0)
 {
     if (parent != 0)
     {
@@ -66,6 +77,10 @@
             }
         }
     }
+       
+    sequence = _args.get(qpidMsgSequence);
+    if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg 
sequencing");
+
 }
 
 Exchange::~Exchange ()
@@ -74,6 +89,13 @@
         mgmtExchange->resourceDestroy ();
 }
 
+void Exchange::preRoute(Deliverable& msg){
+       if (sequence){
+        sys::Mutex::ScopedLock lock(sequenceLock);
+               
msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,++sequenceNo);
 
+       }
+}
+
 void Exchange::setPersistenceId(uint64_t id) const
 {
     if (mgmtExchange != 0 && persistenceId == 0)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Mon Oct 13 
11:07:07 2008
@@ -28,6 +28,7 @@
 #include "MessageStore.h"
 #include "PersistableExchange.h"
 #include "qpid/framing/FieldTable.h"
+#include "qpid/sys/Mutex.h"
 #include "qpid/management/Manageable.h"
 #include "qmf/org/apache/qpid/broker/Exchange.h"
 #include "qmf/org/apache/qpid/broker/Binding.h"
@@ -45,8 +46,14 @@
             boost::shared_ptr<Exchange> alternate;
             uint32_t alternateUsers;
             mutable uint64_t persistenceId;
+            bool sequence;
+            mutable qpid::sys::Mutex sequenceLock;
+            uint64_t sequenceNo;
 
         protected:
+               
+            void preRoute(Deliverable& msg);
+               
             struct Binding : public management::Manageable {
                 typedef boost::shared_ptr<Binding>       shared_ptr;
                 typedef std::vector<Binding::shared_ptr> vector;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Mon Oct 13 
11:07:07 2008
@@ -69,15 +69,18 @@
 }
 
 void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, 
const FieldTable* /*args*/){
+    preRoute(msg);
     uint32_t count(0);
 
     BindingsArray::ConstPtr p = bindings.snapshot();
-    for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != 
p->end(); ++i, count++){
-        msg.deliverTo((*i)->queue);
-        if ((*i)->mgmtBinding != 0)
-            (*i)->mgmtBinding->inc_msgMatched ();
+    if (p.get()){
+        for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i 
!= p->end(); ++i, count++){
+            msg.deliverTo((*i)->queue);
+            if ((*i)->mgmtBinding != 0)
+                (*i)->mgmtBinding->inc_msgMatched ();
+        }
     }
-
+    
     if (mgmtExchange != 0)
     {
         mgmtExchange->inc_msgReceives  ();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Mon Oct 
13 11:07:07 2008
@@ -105,14 +105,17 @@
 
 void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, 
const FieldTable* args){
     if (!args) return;//can't match if there were no headers passed in
+    preRoute(msg);
 
     uint32_t count(0);
 
     Bindings::ConstPtr p = bindings.snapshot();
-    for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != 
p->end(); ++i, count++) {
-        if (match((*i)->args, *args)) msg.deliverTo((*i)->queue);
-        if ((*i)->mgmtBinding != 0)
-            (*i)->mgmtBinding->inc_msgMatched ();
+    if (p.get()){
+        for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); 
i != p->end(); ++i, count++) {
+            if (match((*i)->args, *args)) msg.deliverTo((*i)->queue);
+            if ((*i)->mgmtBinding != 0)
+                (*i)->mgmtBinding->inc_msgMatched ();
+        }
     }
 
     if (mgmtExchange != 0)
@@ -136,9 +139,11 @@
 bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, 
const FieldTable* const args)
 {
     Bindings::ConstPtr p = bindings.snapshot();
-    for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != 
p->end(); ++i) {
-        if ( (!args || equal((*i)->args, *args)) && (!queue || (*i)->queue == 
queue)) {
-            return true;
+    if (p.get()){
+        for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); 
i != p->end(); ++i) {
+            if ( (!args || equal((*i)->args, *args)) && (!queue || (*i)->queue 
== queue)) {
+                return true;
+            }
         }
     }
     return false;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Mon Oct 13 
11:07:07 2008
@@ -180,6 +180,7 @@
 
 void TopicExchange::route(Deliverable& msg, const string& routingKey, const 
FieldTable* /*args*/){
     RWlock::ScopedRlock l(lock);
+    preRoute(msg);
     uint32_t count(0);
     Tokens   tokens(routingKey);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.cpp?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.cpp Mon Oct 13 
11:07:07 2008
@@ -74,10 +74,18 @@
     values[name] = ValuePtr(new IntegerValue(value));
 }
 
+void FieldTable::setInt64(const std::string& name, int64_t value){
+    values[name] = ValuePtr(new Integer64Value(value));
+}
+
 void FieldTable::setTimestamp(const std::string& name, uint64_t value){
     values[name] = ValuePtr(new TimeValue(value));
 }
 
+void FieldTable::setUInt64(const std::string& name, uint64_t value){
+    values[name] = ValuePtr(new Unsigned64Value(value));
+}
+
 void FieldTable::setTable(const std::string& name, const FieldTable& value)
 {
     values[name] = ValuePtr(new FieldTableValue(value));
@@ -131,6 +139,14 @@
 //    return getValue<uint64_t>(name);
 //}
 
+uint64_t FieldTable::getAsUInt64(const std::string& name) const {
+    return static_cast<uint64_t>( getValue<int64_t>(get(name)));
+}
+
+int64_t FieldTable::getAsInt64(const std::string& name) const {
+    return getValue<int64_t>(get(name));
+}
+
 bool FieldTable::getTable(const std::string& name, FieldTable& value) const {
     return getEncodedValue<FieldTable>(get(name), value);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.h?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.h Mon Oct 13 
11:07:07 2008
@@ -63,7 +63,9 @@
 
     void setString(const std::string& name, const std::string& value);
     void setInt(const std::string& name, int value);
+    void setInt64(const std::string& name, int64_t value);
     void setTimestamp(const std::string& name, uint64_t value);
+    void setUInt64(const std::string& name, uint64_t value);
     void setTable(const std::string& name, const FieldTable& value);
     void setArray(const std::string& name, const Array& value);
     void setFloat(const std::string& name, float value);
@@ -73,11 +75,13 @@
     std::string getString(const std::string& name) const;
     int getInt(const std::string& name) const;
 //    uint64_t getTimestamp(const std::string& name) const;
+    uint64_t getAsUInt64(const std::string& name) const;
+    int64_t getAsInt64(const std::string& name) const;
     bool getTable(const std::string& name, FieldTable& value) const;
     bool getArray(const std::string& name, Array& value) const;
     bool getFloat(const std::string& name, float& value) const;
     bool getDouble(const std::string& name, double& value) const;
-//    //void getDecimal(string& name, xxx& value);
+//    void getDecimal(string& name, xxx& value);
     void erase(const std::string& name);
     
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp Mon Oct 13 
11:07:07 2008
@@ -135,8 +135,7 @@
 
 IntegerValue::IntegerValue(int v) :
     FieldValue(0x21, new FixedWidthValue<4>(v))
-{
-}
+{}
 
 FloatValue::FloatValue(float v) :
     FieldValue(0x23, new FixedWidthValue<4>(reinterpret_cast<uint8_t*>(&v)))
@@ -146,8 +145,17 @@
     FieldValue(0x33, new FixedWidthValue<8>(reinterpret_cast<uint8_t*>(&v)))
 {}
 
-TimeValue::TimeValue(uint64_t v) :
+Integer64Value::Integer64Value(int64_t v) :
+    FieldValue(0x31, new FixedWidthValue<8>(v))
+{}
+
+Unsigned64Value::Unsigned64Value(uint64_t v) :
     FieldValue(0x32, new FixedWidthValue<8>(v))
+{}
+
+
+TimeValue::TimeValue(uint64_t v) :
+    FieldValue(0x38, new FixedWidthValue<8>(v))
 {
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.h?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.h Mon Oct 13 
11:07:07 2008
@@ -106,12 +106,18 @@
 inline bool FieldValue::convertsTo<int>() const { return 
data->convertsToInt(); }
 
 template <>
+inline bool FieldValue::convertsTo<int64_t>() const { return 
data->convertsToInt(); }
+
+template <>
 inline bool FieldValue::convertsTo<std::string>() const { return 
data->convertsToString(); }
 
 template <>
 inline int FieldValue::get<int>() const { return data->getInt(); }
 
 template <>
+inline int64_t FieldValue::get<int64_t>() const { return data->getInt(); }
+
+template <>
 inline std::string FieldValue::get<std::string>() const { return 
data->getString(); }
 
 inline std::ostream& operator<<(std::ostream& out, const FieldValue& v) {
@@ -278,6 +284,16 @@
     TimeValue(uint64_t v);
 };
 
+class Integer64Value : public FieldValue {
+  public:
+    Integer64Value(int64_t v);
+};
+
+class Unsigned64Value : public FieldValue {
+  public:
+    Unsigned64Value(uint64_t v);
+};
+
 class FieldTableValue : public FieldValue {
   public:
     FieldTableValue(const FieldTable&);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp Mon Oct 13 
11:07:07 2008
@@ -165,4 +165,62 @@
     BOOST_CHECK_EQUAL(string("direct"), response.first->getType());  
 }
 
+intrusive_ptr<Message> cmessage(std::string exchange, std::string routingKey) {
+    intrusive_ptr<Message> msg(new Message());
+    AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), exchange, 
0, 0));
+    AMQFrame header(in_place<AMQHeaderBody>());
+    msg->getFrames().append(method);
+    msg->getFrames().append(header);
+    
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+    return msg;
+}
+
+QPID_AUTO_TEST_CASE(testSequenceOptions) 
+{
+    FieldTable args;
+    args.setInt("qpid.msg_sequence",1);
+    
+    DirectExchange direct("direct1", false, args);
+    
+    intrusive_ptr<Message> msg1 = cmessage("e", "A");
+    intrusive_ptr<Message> msg2 = cmessage("e", "B");
+    intrusive_ptr<Message> msg3 = cmessage("e", "C");
+
+    DeliverableMessage dmsg1(msg1);
+    DeliverableMessage dmsg2(msg2);
+    DeliverableMessage dmsg3(msg3);
+
+    direct.route(dmsg1, "abc", 0);
+    direct.route(dmsg2, "abc", 0);
+    direct.route(dmsg3, "abc", 0);
+
+    BOOST_CHECK_EQUAL(1, 
msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+    BOOST_CHECK_EQUAL(2, 
msg2->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+    BOOST_CHECK_EQUAL(3, 
msg3->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+    
+    FanOutExchange fanout("fanout1", false, args);
+    HeadersExchange header("headers1", false, args);
+    TopicExchange topic ("topic1", false, args);
+    
+    // check other exchanges, that they preroute
+    intrusive_ptr<Message> msg4 = cmessage("e", "A");
+    intrusive_ptr<Message> msg5 = cmessage("e", "B");
+    intrusive_ptr<Message> msg6 = cmessage("e", "C");
+
+    DeliverableMessage dmsg4(msg4);
+    DeliverableMessage dmsg5(msg5);
+    DeliverableMessage dmsg6(msg6);
+    
+    fanout.route(dmsg4, "abc", 0);
+    BOOST_CHECK_EQUAL(1, 
msg4->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+    
+    FieldTable headers;
+    header.route(dmsg5, "abc", &headers);
+    BOOST_CHECK_EQUAL(1, 
msg5->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+
+    topic.route(dmsg6, "abc", 0);
+    BOOST_CHECK_EQUAL(1, 
msg6->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+
+}
+
 QPID_AUTO_TEST_SUITE_END()

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/FieldTable.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/FieldTable.cpp?rev=704192&r1=704191&r2=704192&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/FieldTable.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/FieldTable.cpp Mon Oct 13 11:07:07 
2008
@@ -157,4 +157,17 @@
     }
 }
 
+QPID_AUTO_TEST_CASE(test64GetAndSetConverts)
+{
+    FieldTable args;
+    args.setInt64("a",100);
+    
+    args.setUInt64("b",1u);
+    BOOST_CHECK_EQUAL(1u, args.getAsUInt64("b"));
+    BOOST_CHECK_EQUAL(100u, args.getAsUInt64("a"));
+    BOOST_CHECK_EQUAL(1, args.getAsInt64("b"));
+    BOOST_CHECK_EQUAL(100, args.getAsInt64("a"));
+
+}
+ 
 QPID_AUTO_TEST_SUITE_END()


Reply via email to