Author: gsim
Date: Fri Oct 10 09:54:54 2008
New Revision: 703521
URL: http://svn.apache.org/viewvc?rev=703521&view=rev
Log:
Handle ttl in messages transfers received by the broker 7 added test for it
Moved Timer instance from DtxManager to Broker so it can be shared
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
incubator/qpid/trunk/qpid/python/tests_0-10/message.py
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=703521&r1=703520&r2=703521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Oct 10
09:54:54 2008
@@ -136,6 +136,7 @@
dataDir(conf.noDataDir ? std::string () : conf.dataDir),
links(this),
factory(new ConnectionFactory(*this)),
+ dtxManager(timer),
sessionManager(
qpid::SessionState::Configuration(
conf.replayFlushLimit*1024, // convert kb to bytes.
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=703521&r1=703520&r2=703521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Oct 10 09:54:54
2008
@@ -33,6 +33,7 @@
#include "SessionManager.h"
#include "Vhost.h"
#include "System.h"
+#include "Timer.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementBroker.h"
#include "qmf/org/apache/qpid/broker/Broker.h"
@@ -97,13 +98,14 @@
management::ManagementAgent::Singleton managementAgentSingleton;
std::vector< boost::shared_ptr<sys::ProtocolFactory> > protocolFactories;
MessageStore* store;
- AclModule* acl;
+ AclModule* acl;
DataDir dataDir;
QueueRegistry queues;
ExchangeRegistry exchanges;
LinkRegistry links;
boost::shared_ptr<sys::ConnectionCodec::Factory> factory;
+ Timer timer;
DtxManager dtxManager;
SessionManager sessionManager;
management::ManagementAgent* managementAgent;
@@ -195,6 +197,8 @@
boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() {
return factory; }
void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory>
f) { factory = f; }
+ Timer& getTimer() { return timer; }
+
boost::function<std::vector<Url> ()> getKnownBrokers;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp?rev=703521&r1=703520&r2=703521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp Fri Oct 10
09:54:54 2008
@@ -33,7 +33,7 @@
using namespace qpid::broker;
using namespace qpid::framing;
-DtxManager::DtxManager() : store(0) {}
+DtxManager::DtxManager(Timer& t) : store(0), timer(t) {}
DtxManager::~DtxManager() {}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h?rev=703521&r1=703520&r2=703521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h Fri Oct 10
09:54:54 2008
@@ -47,14 +47,14 @@
WorkMap work;
TransactionalStore* store;
qpid::sys::Mutex lock;
- Timer timer;
+ Timer& timer;
void remove(const std::string& xid);
DtxWorkRecord* getWork(const std::string& xid);
DtxWorkRecord* createWork(std::string xid);
public:
- DtxManager();
+ DtxManager(Timer&);
~DtxManager();
void start(const std::string& xid, DtxBuffer::shared_ptr work);
void join(const std::string& xid, DtxBuffer::shared_ptr work);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=703521&r1=703520&r2=703521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri Oct 10
09:54:54 2008
@@ -30,15 +30,22 @@
#include "qpid/framing/TypeFilter.h"
#include "qpid/log/Statement.h"
+#include <time.h>
+
using boost::intrusive_ptr;
using namespace qpid::broker;
using namespace qpid::framing;
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::sys::TIME_MSEC;
+using qpid::sys::FAR_FUTURE;
using std::string;
TransferAdapter Message::TRANSFER;
Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0),
redelivered(false), loaded(false),
-staged(false), forcePersistentPolicy(false), publisher(0), adapter(0) {}
+ staged(false),
forcePersistentPolicy(false), publisher(0), adapter(0),
+ expiration(FAR_FUTURE) {}
Message::~Message()
{
@@ -297,3 +304,20 @@
}
}
}
+
+void Message::setTimestamp()
+{
+ time_t now = ::time(0);
+ DeliveryProperties* props = getProperties<DeliveryProperties>();
+ props->setTimestamp(now);
+ if (props->getTtl()) {
+ //set expiration (nb: ttl is in millisecs, time_t is in secs)
+ props->setExpiration(now + (props->getTtl()/1000));
+ expiration = AbsTime(AbsTime::now(), Duration(props->getTtl() *
TIME_MSEC));
+ }
+}
+
+bool Message::hasExpired() const
+{
+ return expiration < AbsTime::now();
+}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=703521&r1=703520&r2=703521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Oct 10 09:54:54
2008
@@ -30,6 +30,7 @@
#include "MessageAdapter.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Time.h"
namespace qpid {
@@ -70,6 +71,8 @@
const framing::FieldTable* getApplicationHeaders() const;
bool isPersistent();
bool requiresAccept();
+ void setTimestamp();
+ bool hasExpired() const;
framing::FrameSet& getFrames() { return frames; }
const framing::FrameSet& getFrames() const { return frames; }
@@ -147,6 +150,7 @@
bool forcePersistentPolicy; // used to force message as durable, via a
broker policy
ConnectionToken* publisher;
mutable MessageAdapter* adapter;
+ qpid::sys::AbsTime expiration;
static TransferAdapter TRANSFER;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=703521&r1=703520&r2=703521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Oct 10 09:54:54
2008
@@ -264,6 +264,12 @@
return false;
} else {
QueuedMessage msg = messages.front();
+ if (msg.payload->hasExpired()) {
+ QPID_LOG(debug, "Message expired from queue '" << name << "'");
+ popAndDequeue();
+ continue;
+ }
+
if (!optimisticConsume && store &&
!msg.payload->isEnqueueComplete()) {
QPID_LOG(debug, "Messages not ready to dispatch on queue '" <<
name << "'");
addListener(c);
@@ -294,7 +300,7 @@
{
QueuedMessage msg(this);
while (seek(msg, c)) {
- if (c->filter(msg.payload)) {
+ if (c->filter(msg.payload) && !msg.payload->hasExpired()) {
if (c->accept(msg.payload)) {
//consumer wants the message
c->position = msg.position;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=703521&r1=703520&r2=703521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Oct 10
09:54:54 2008
@@ -341,6 +341,11 @@
}
}
+namespace
+{
+const std::string nullstring;
+}
+
void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
std::string exchangeName = msg->getExchangeName();
//TODO: the following should be hidden behind message (using
MessageAdapter or similar)
@@ -352,6 +357,7 @@
if (!msg->hasProperties<DeliveryProperties>() ||
msg->getProperties<DeliveryProperties>()->getExchange().empty())
msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
+ msg->setTimestamp();
}
if (!cacheExchange || cacheExchange->getName() != exchangeName){
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
@@ -359,18 +365,19 @@
/* verify the userid if specified: */
std::string id =
- msg->hasProperties<MessageProperties>()?
msg->getProperties<MessageProperties>()->getUserId():"";
+ msg->hasProperties<MessageProperties>() ?
msg->getProperties<MessageProperties>()->getUserId() : nullstring;
if (authMsg && !id.empty() && id != userID )
{
- QPID_LOG(debug, "user id : " << userID << " msgProps.getUserID() " <<
msg->getProperties<MessageProperties>()->getUserId());
- throw UnauthorizedAccessException("user id in the message is not the
same id used to authenticate the connection");
+ QPID_LOG(debug, "authorised user id : " << userID << " but user id in
message declared as " << id);
+ throw UnauthorizedAccessException(QPID_MSG("authorised user id : " <<
userID << " but user id in message declared as " << id));
}
if (acl && acl->doTransferAcl())
{
if
(!acl->authorise(getSession().getConnection().getUserId(),acl::PUBLISH,acl::EXCHANGE,exchangeName,
msg->getRoutingKey() ))
- throw NotAllowedException("ACL denied exhange publish request");
+ throw
NotAllowedException(QPID_MSG(getSession().getConnection().getUserId() << "
cannot publish to " <<
+ exchangeName << " with
routing-key " << msg->getRoutingKey()));
}
cacheExchange->route(strategy, msg->getRoutingKey(),
msg->getApplicationHeaders());
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=703521&r1=703520&r2=703521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Fri Oct 10 09:54:54
2008
@@ -23,7 +23,7 @@
from qpid.session import SessionException
from qpid.content import Content
-
+from time import sleep
class MessageTests(TestBase010):
"""Tests for 'methods' on the amqp message 'class'"""
@@ -369,7 +369,7 @@
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"),
"abcdefgh"))
#each message is currently interpreted as requiring msg_size bytes of
credit
- msg_size = 21
+ msg_size = 27
#set byte credit to finite amount (less than enough for all messages)
session.message_flow(unit = session.credit_unit.byte, value =
msg_size*5, destination = "c")
@@ -438,7 +438,7 @@
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"),
"abcdefgh"))
#each message is currently interpreted as requiring msg_size bytes of
credit
- msg_size = 19
+ msg_size = 27
#set byte credit to finite amount (less than enough for all messages)
session.message_flow(unit = session.credit_unit.byte, value =
msg_size*5, destination = "c")
@@ -810,6 +810,28 @@
msg = messages.get()
assert msg.body == "test"
+ def test_ttl(self):
+ q = "test_ttl"
+ session = self.session
+
+ session.queue_declare(queue=q, exclusive=True, auto_delete=True)
+
+ dp = session.delivery_properties(routing_key=q, ttl=500)#expire in
half a second
+ session.message_transfer(message=Message(dp, "first"))
+
+ dp = session.delivery_properties(routing_key=q, ttl=300000)#expire in
fives minutes
+ session.message_transfer(message=Message(dp, "second"))
+
+ d = "msgs"
+ session.message_subscribe(queue=q, destination=d)
+ messages = session.incoming(d)
+ sleep(1)
+ session.message_flow(unit = session.credit_unit.message, value=2,
destination=d)
+ session.message_flow(unit = session.credit_unit.byte,
value=0xFFFFFFFF, destination=d)
+ assert messages.get(timeout=1).body == "second"
+ self.assertEmpty(messages)
+
+
def assertDataEquals(self, session, msg, expected):
self.assertEquals(expected, msg.body)