Author: gsim
Date: Fri Oct 10 09:54:54 2008
New Revision: 703521

URL: http://svn.apache.org/viewvc?rev=703521&view=rev
Log:
Handle ttl in messages transfers received by the broker 7 added test for it
Moved Timer instance from DtxManager to Broker so it can be shared


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/python/tests_0-10/message.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=703521&r1=703520&r2=703521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Oct 10 
09:54:54 2008
@@ -136,6 +136,7 @@
     dataDir(conf.noDataDir ? std::string () : conf.dataDir),
     links(this),
     factory(new ConnectionFactory(*this)),
+    dtxManager(timer),
     sessionManager(
         qpid::SessionState::Configuration(
             conf.replayFlushLimit*1024, // convert kb to bytes.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=703521&r1=703520&r2=703521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Oct 10 09:54:54 
2008
@@ -33,6 +33,7 @@
 #include "SessionManager.h"
 #include "Vhost.h"
 #include "System.h"
+#include "Timer.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/management/ManagementBroker.h"
 #include "qmf/org/apache/qpid/broker/Broker.h"
@@ -97,13 +98,14 @@
     management::ManagementAgent::Singleton managementAgentSingleton;
     std::vector< boost::shared_ptr<sys::ProtocolFactory> > protocolFactories;
     MessageStore* store;
-       AclModule* acl;
+    AclModule* acl;
     DataDir dataDir;
 
     QueueRegistry queues;
     ExchangeRegistry exchanges;
     LinkRegistry links;
     boost::shared_ptr<sys::ConnectionCodec::Factory> factory;
+    Timer timer;
     DtxManager dtxManager;
     SessionManager sessionManager;
     management::ManagementAgent* managementAgent;
@@ -195,6 +197,8 @@
     boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { 
return factory; }
     void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> 
f) { factory = f; }
 
+    Timer& getTimer() { return timer; }
+
     boost::function<std::vector<Url> ()> getKnownBrokers;
     
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp?rev=703521&r1=703520&r2=703521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp Fri Oct 10 
09:54:54 2008
@@ -33,7 +33,7 @@
 using namespace qpid::broker;
 using namespace qpid::framing;
 
-DtxManager::DtxManager() : store(0) {}
+DtxManager::DtxManager(Timer& t) : store(0), timer(t) {}
 
 DtxManager::~DtxManager() {}
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h?rev=703521&r1=703520&r2=703521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h Fri Oct 10 
09:54:54 2008
@@ -47,14 +47,14 @@
     WorkMap work;
     TransactionalStore* store;
     qpid::sys::Mutex lock;
-    Timer timer;
+    Timer& timer;
 
     void remove(const std::string& xid);
     DtxWorkRecord* getWork(const std::string& xid);
     DtxWorkRecord* createWork(std::string xid);
 
 public:
-    DtxManager();
+    DtxManager(Timer&);
     ~DtxManager();
     void start(const std::string& xid, DtxBuffer::shared_ptr work);
     void join(const std::string& xid, DtxBuffer::shared_ptr work);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=703521&r1=703520&r2=703521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri Oct 10 
09:54:54 2008
@@ -30,15 +30,22 @@
 #include "qpid/framing/TypeFilter.h"
 #include "qpid/log/Statement.h"
 
+#include <time.h>
+
 using boost::intrusive_ptr;
 using namespace qpid::broker;
 using namespace qpid::framing;
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::sys::TIME_MSEC;
+using qpid::sys::FAR_FUTURE;
 using std::string;
 
 TransferAdapter Message::TRANSFER;
 
 Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), 
redelivered(false), loaded(false),
-staged(false), forcePersistentPolicy(false), publisher(0), adapter(0) {}
+                                             staged(false), 
forcePersistentPolicy(false), publisher(0), adapter(0), 
+                                             expiration(FAR_FUTURE) {}
 
 Message::~Message()
 {
@@ -297,3 +304,20 @@
         }        
     }
 }
+
+void Message::setTimestamp()
+{
+    time_t now = ::time(0);
+    DeliveryProperties* props = getProperties<DeliveryProperties>();    
+    props->setTimestamp(now);
+    if (props->getTtl()) {
+        //set expiration (nb: ttl is in millisecs, time_t is in secs)
+        props->setExpiration(now + (props->getTtl()/1000));
+        expiration = AbsTime(AbsTime::now(), Duration(props->getTtl() * 
TIME_MSEC));
+    }
+}
+
+bool Message::hasExpired() const
+{
+    return expiration < AbsTime::now();
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=703521&r1=703520&r2=703521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Oct 10 09:54:54 
2008
@@ -30,6 +30,7 @@
 #include "MessageAdapter.h"
 #include "qpid/framing/amqp_types.h"
 #include "qpid/sys/Mutex.h"
+#include "qpid/sys/Time.h"
 
 namespace qpid {
        
@@ -70,6 +71,8 @@
     const framing::FieldTable* getApplicationHeaders() const;
     bool isPersistent();
     bool requiresAccept();
+    void setTimestamp();
+    bool hasExpired() const;
 
     framing::FrameSet& getFrames() { return frames; } 
     const framing::FrameSet& getFrames() const { return frames; } 
@@ -147,6 +150,7 @@
        bool forcePersistentPolicy; // used to force message as durable, via a 
broker policy
     ConnectionToken* publisher;
     mutable MessageAdapter* adapter;
+    qpid::sys::AbsTime expiration;
 
     static TransferAdapter TRANSFER;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=703521&r1=703520&r2=703521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Oct 10 09:54:54 
2008
@@ -264,6 +264,12 @@
             return false;
         } else {
             QueuedMessage msg = messages.front();
+            if (msg.payload->hasExpired()) {
+                QPID_LOG(debug, "Message expired from queue '" << name << "'");
+                popAndDequeue();
+                continue;
+            }
+
             if (!optimisticConsume && store && 
!msg.payload->isEnqueueComplete()) { 
                 QPID_LOG(debug, "Messages not ready to dispatch on queue '" << 
name << "'");
                 addListener(c);
@@ -294,7 +300,7 @@
 {
     QueuedMessage msg(this);
     while (seek(msg, c)) {
-        if (c->filter(msg.payload)) {
+        if (c->filter(msg.payload) && !msg.payload->hasExpired()) {
             if (c->accept(msg.payload)) {
                 //consumer wants the message
                 c->position = msg.position;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=703521&r1=703520&r2=703521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Oct 10 
09:54:54 2008
@@ -341,6 +341,11 @@
     }
 }
 
+namespace
+{
+const std::string nullstring;
+}
+
 void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
     std::string exchangeName = msg->getExchangeName();
     //TODO: the following should be hidden behind message (using 
MessageAdapter or similar)
@@ -352,6 +357,7 @@
         if (!msg->hasProperties<DeliveryProperties>() ||
             msg->getProperties<DeliveryProperties>()->getExchange().empty())
             
msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
+        msg->setTimestamp();
     }
     if (!cacheExchange || cacheExchange->getName() != exchangeName){
         cacheExchange = session.getBroker().getExchanges().get(exchangeName);
@@ -359,18 +365,19 @@
 
     /* verify the userid if specified: */
     std::string id =
-       msg->hasProperties<MessageProperties>()? 
msg->getProperties<MessageProperties>()->getUserId():"";
+       msg->hasProperties<MessageProperties>() ? 
msg->getProperties<MessageProperties>()->getUserId() : nullstring;
 
     if (authMsg &&  !id.empty() && id != userID )
     {
-        QPID_LOG(debug, "user id : " << userID << " msgProps.getUserID() " << 
msg->getProperties<MessageProperties>()->getUserId());
-        throw UnauthorizedAccessException("user id in the message is not the 
same id used to authenticate the connection");
+        QPID_LOG(debug, "authorised user id : " << userID << " but user id in 
message declared as " << id);
+        throw UnauthorizedAccessException(QPID_MSG("authorised user id : " << 
userID << " but user id in message declared as " << id));
     }
 
     if (acl && acl->doTransferAcl())
     {
         if 
(!acl->authorise(getSession().getConnection().getUserId(),acl::PUBLISH,acl::EXCHANGE,exchangeName,
 msg->getRoutingKey() ))
-            throw NotAllowedException("ACL denied exhange publish request");
+            throw 
NotAllowedException(QPID_MSG(getSession().getConnection().getUserId() << " 
cannot publish to " <<
+                                               exchangeName << " with 
routing-key " << msg->getRoutingKey()));
     }
 
     cacheExchange->route(strategy, msg->getRoutingKey(), 
msg->getApplicationHeaders());

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=703521&r1=703520&r2=703521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Fri Oct 10 09:54:54 
2008
@@ -23,7 +23,7 @@
 from qpid.session import SessionException
 
 from qpid.content import Content
-
+from time import sleep
 
 class MessageTests(TestBase010):
     """Tests for 'methods' on the amqp message 'class'"""
@@ -369,7 +369,7 @@
             
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"),
 "abcdefgh"))
 
         #each message is currently interpreted as requiring msg_size bytes of 
credit
-        msg_size = 21
+        msg_size = 27
 
         #set byte credit to finite amount (less than enough for all messages)
         session.message_flow(unit = session.credit_unit.byte, value = 
msg_size*5, destination = "c")
@@ -438,7 +438,7 @@
             
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"),
 "abcdefgh"))
 
         #each message is currently interpreted as requiring msg_size bytes of 
credit
-        msg_size = 19
+        msg_size = 27
 
         #set byte credit to finite amount (less than enough for all messages)
         session.message_flow(unit = session.credit_unit.byte, value = 
msg_size*5, destination = "c")
@@ -810,6 +810,28 @@
         msg = messages.get()
         assert msg.body == "test"
 
+    def test_ttl(self):
+        q = "test_ttl"
+        session = self.session
+
+        session.queue_declare(queue=q, exclusive=True, auto_delete=True)
+
+        dp = session.delivery_properties(routing_key=q, ttl=500)#expire in 
half a second
+        session.message_transfer(message=Message(dp, "first"))
+
+        dp = session.delivery_properties(routing_key=q, ttl=300000)#expire in 
fives minutes
+        session.message_transfer(message=Message(dp, "second"))
+
+        d = "msgs"
+        session.message_subscribe(queue=q, destination=d)
+        messages = session.incoming(d)
+        sleep(1)
+        session.message_flow(unit = session.credit_unit.message, value=2, 
destination=d)
+        session.message_flow(unit = session.credit_unit.byte, 
value=0xFFFFFFFF, destination=d)
+        assert messages.get(timeout=1).body == "second"
+        self.assertEmpty(messages)
+
+
     def assertDataEquals(self, session, msg, expected):
         self.assertEquals(expected, msg.body)
 


Reply via email to