Author: gsim
Date: Fri Jun 8 08:24:12 2007
New Revision: 545531
URL: http://svn.apache.org/viewvc?view=rev&rev=545531
Log:
Timeout handling for dtx, plus tests.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h (with props)
incubator/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h
incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
incubator/qpid/trunk/qpid/python/tests_0-9/dtx.py
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Jun 8 08:24:12 2007
@@ -189,6 +189,7 @@
qpid/broker/DtxBuffer.cpp \
qpid/broker/DtxHandlerImpl.cpp \
qpid/broker/DtxManager.cpp \
+ qpid/broker/DtxTimeout.cpp \
qpid/broker/DtxWorkRecord.cpp \
qpid/broker/ExchangeRegistry.cpp \
qpid/broker/FanOutExchange.cpp \
@@ -206,6 +207,7 @@
qpid/broker/RecoveredEnqueue.cpp \
qpid/broker/RecoveredDequeue.cpp \
qpid/broker/Reference.cpp \
+ qpid/broker/Timer.cpp \
qpid/broker/TopicExchange.cpp \
qpid/broker/TxAck.cpp \
qpid/broker/TxBuffer.cpp \
@@ -243,6 +245,7 @@
qpid/broker/DtxBuffer.h \
qpid/broker/DtxHandlerImpl.h \
qpid/broker/DtxManager.h \
+ qpid/broker/DtxTimeout.h \
qpid/broker/DtxWorkRecord.h \
qpid/broker/ExchangeRegistry.h \
qpid/broker/FanOutExchange.h \
@@ -285,6 +288,7 @@
qpid/broker/PersistableQueue.h \
qpid/broker/QueuePolicy.h \
qpid/broker/RecoveryManagerImpl.h \
+ qpid/broker/Timer.h \
qpid/broker/TopicExchange.h \
qpid/broker/TransactionalStore.h \
qpid/broker/TxAck.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Fri Jun 8
08:24:12 2007
@@ -38,6 +38,7 @@
#include "Connection.h"
#include "DeliverableMessage.h"
#include "DtxAck.h"
+#include "DtxTimeout.h"
#include "MessageStore.h"
#include "TxAck.h"
#include "TxPublish.h"
@@ -154,18 +155,15 @@
% dtxBuffer->getXid() % xid);
}
+ txBuffer.reset();//ops on this channel no longer transactional
+
+ checkDtxTimeout();
if (fail) {
- accumulatedAck.clear();
dtxBuffer->fail();
} else {
- TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
- accumulatedAck.clear();
- dtxBuffer->enlist(txAck);
dtxBuffer->markEnded();
- }
-
+ }
dtxBuffer.reset();
- txBuffer.reset();
}
void Channel::suspendDtx(const std::string& xid){
@@ -173,8 +171,10 @@
throw ConnectionException(503, boost::format("xid specified on start
was %1%, but %2% specified on suspend")
% dtxBuffer->getXid() % xid);
}
+ txBuffer.reset();//ops on this channel no longer transactional
+
+ checkDtxTimeout();
dtxBuffer->setSuspended(true);
- txBuffer.reset();
}
void Channel::resumeDtx(const std::string& xid){
@@ -185,10 +185,20 @@
if (!dtxBuffer->isSuspended()) {
throw ConnectionException(503, boost::format("xid %1% not suspended")%
xid);
}
- dtxBuffer->setSuspended(true);
+
+ checkDtxTimeout();
+ dtxBuffer->setSuspended(false);
txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
}
+void Channel::checkDtxTimeout()
+{
+ if (dtxBuffer->isExpired()) {
+ dtxBuffer.reset();
+ throw DtxTimeoutException();
+ }
+}
+
void Channel::deliver(
Message::shared_ptr& msg, const string& consumerTag,
Queue::shared_ptr& queue, bool ackExpected)
@@ -302,9 +312,14 @@
void Channel::ack(uint64_t firstTag, uint64_t lastTag){
if (txBuffer.get()) {
accumulatedAck.update(firstTag, lastTag);
-
//TODO: I think the outstanding prefetch size & count should be
updated at this point...
//TODO: ...this may then necessitate dispatching to consumers
+ if (dtxBuffer.get()) {
+ TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+ accumulatedAck.clear();
+ dtxBuffer->enlist(txAck);
+ }
+
} else {
Mutex::ScopedLock locker(deliveryLock);//need to synchronize with
possible concurrent delivery
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h Fri Jun 8
08:24:12 2007
@@ -106,6 +106,8 @@
void deliver(Message::shared_ptr& msg, const string& tag,
Queue::shared_ptr& queue, bool ackExpected);
bool checkPrefetch(Message::shared_ptr& msg);
+
+ void checkDtxTimeout();
public:
Channel(Connection& parent,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp Fri Jun 8
08:24:12 2007
@@ -23,7 +23,8 @@
using namespace qpid::broker;
using qpid::sys::Mutex;
-DtxBuffer::DtxBuffer(const std::string& _xid) : xid(_xid), ended(false),
suspended(false), failed(false) {}
+DtxBuffer::DtxBuffer(const std::string& _xid)
+ : xid(_xid), ended(false), suspended(false), failed(false), expired(false)
{}
DtxBuffer::~DtxBuffer() {}
@@ -68,3 +69,15 @@
return xid;
}
+void DtxBuffer::timedout()
+{
+ Mutex::ScopedLock locker(lock);
+ expired = true;
+ fail();
+}
+
+bool DtxBuffer::isExpired()
+{
+ Mutex::ScopedLock locker(lock);
+ return expired;
+}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h Fri Jun 8
08:24:12 2007
@@ -32,6 +32,7 @@
bool ended;
bool suspended;
bool failed;
+ bool expired;
public:
typedef boost::shared_ptr<DtxBuffer> shared_ptr;
@@ -44,6 +45,8 @@
bool isSuspended();
void fail();
bool isRollbackOnly();
+ void timedout();
+ bool isExpired();
const std::string& getXid();
};
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp Fri Jun 8
08:24:12 2007
@@ -61,21 +61,24 @@
bool fail,
bool suspend)
{
-
- if (fail) {
- channel.endDtx(xid, true);
- if (suspend) {
- throw ConnectionException(503, "End and suspend cannot both be
set.");
- } else {
- dClient.endOk(XA_RBROLLBACK, context.getRequestId());
- }
- } else {
- if (suspend) {
- channel.suspendDtx(xid);
+ try {
+ if (fail) {
+ channel.endDtx(xid, true);
+ if (suspend) {
+ throw ConnectionException(503, "End and suspend cannot both be
set.");
+ } else {
+ dClient.endOk(XA_RBROLLBACK, context.getRequestId());
+ }
} else {
- channel.endDtx(xid, false);
+ if (suspend) {
+ channel.suspendDtx(xid);
+ } else {
+ channel.endDtx(xid, false);
+ }
+ dClient.endOk(XA_OK, context.getRequestId());
}
- dClient.endOk(XA_OK, context.getRequestId());
+ } catch (DtxTimeoutException e) {
+ dClient.endOk(XA_RBTIMEOUT, context.getRequestId());
}
}
@@ -88,12 +91,16 @@
if (join && resume) {
throw ConnectionException(503, "Join and resume cannot both be set.");
}
- if (resume) {
- channel.resumeDtx(xid);
- } else {
- channel.startDtx(xid, broker.getDtxManager(), join);
+ try {
+ if (resume) {
+ channel.resumeDtx(xid);
+ } else {
+ channel.startDtx(xid, broker.getDtxManager(), join);
+ }
+ dClient.startOk(XA_OK, context.getRequestId());
+ } catch (DtxTimeoutException e) {
+ dClient.startOk(XA_RBTIMEOUT, context.getRequestId());
}
- dClient.startOk(XA_OK, context.getRequestId());
}
// DtxCoordinationHandler:
@@ -102,8 +109,12 @@
u_int16_t /*ticket*/,
const string& xid)
{
- bool ok = broker.getDtxManager().prepare(xid);
- cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId());
+ try {
+ bool ok = broker.getDtxManager().prepare(xid);
+ cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId());
+ } catch (DtxTimeoutException e) {
+ cClient.prepareOk(XA_RBTIMEOUT, context.getRequestId());
+ }
}
void DtxHandlerImpl::commit(const MethodContext& context,
@@ -111,8 +122,12 @@
const string& xid,
bool onePhase)
{
- bool ok = broker.getDtxManager().commit(xid, onePhase);
- cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId());
+ try {
+ bool ok = broker.getDtxManager().commit(xid, onePhase);
+ cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId());
+ } catch (DtxTimeoutException e) {
+ cClient.commitOk(XA_RBTIMEOUT, context.getRequestId());
+ }
}
@@ -120,8 +135,12 @@
u_int16_t /*ticket*/,
const string& xid )
{
- broker.getDtxManager().rollback(xid);
- cClient.rollbackOk(XA_OK, context.getRequestId());
+ try {
+ broker.getDtxManager().rollback(xid);
+ cClient.rollbackOk(XA_OK, context.getRequestId());
+ } catch (DtxTimeoutException e) {
+ cClient.rollbackOk(XA_RBTIMEOUT, context.getRequestId());
+ }
}
void DtxHandlerImpl::recover(const MethodContext& context,
@@ -129,8 +148,6 @@
bool /*startscan*/,
u_int32_t /*endscan*/ )
{
- //TODO
-
//TODO: what do startscan and endscan actually mean?
// response should hold on key value pair with key = 'xids' and
@@ -171,19 +188,21 @@
throw ConnectionException(503, boost::format("Forget is invalid. Branch
with xid %1% not heuristically completed!") % xid);
}
-void DtxHandlerImpl::getTimeout(const MethodContext& /*context*/,
- const string& /*xid*/ )
+void DtxHandlerImpl::getTimeout(const MethodContext& context,
+ const string& xid)
{
- //TODO
+ uint32_t timeout = broker.getDtxManager().getTimeout(xid);
+ cClient.getTimeoutOk(timeout, context.getRequestId());
}
-void DtxHandlerImpl::setTimeout(const MethodContext& /*context*/,
+void DtxHandlerImpl::setTimeout(const MethodContext& context,
u_int16_t /*ticket*/,
- const string& /*xid*/,
- u_int32_t /*timeout*/ )
+ const string& xid,
+ u_int32_t timeout)
{
- //TODO
+ broker.getDtxManager().setTimeout(xid, timeout);
+ cClient.setTimeoutOk(context.getRequestId());
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp Fri Jun 8
08:24:12 2007
@@ -19,6 +19,8 @@
*
*/
#include "DtxManager.h"
+#include "DtxTimeout.h"
+#include "qpid/log/Statement.h"
#include <boost/format.hpp>
#include <iostream>
using qpid::sys::Mutex;
@@ -29,37 +31,52 @@
DtxManager::~DtxManager() {}
-void DtxManager::start(std::string xid, DtxBuffer::shared_ptr ops)
+void DtxManager::start(const std::string& xid, DtxBuffer::shared_ptr ops)
{
createWork(xid)->add(ops);
}
-void DtxManager::join(std::string xid, DtxBuffer::shared_ptr ops)
+void DtxManager::join(const std::string& xid, DtxBuffer::shared_ptr ops)
{
getWork(xid)->add(ops);
}
-void DtxManager::recover(std::string xid, std::auto_ptr<TPCTransactionContext>
txn, DtxBuffer::shared_ptr ops)
+void DtxManager::recover(const std::string& xid,
std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops)
{
createWork(xid)->recover(txn, ops);
}
bool DtxManager::prepare(const std::string& xid)
{
- return getWork(xid)->prepare();
+ try {
+ return getWork(xid)->prepare();
+ } catch (DtxTimeoutException& e) {
+ remove(xid);
+ throw e;
+ }
}
bool DtxManager::commit(const std::string& xid, bool onePhase)
{
- bool result = getWork(xid)->commit(onePhase);
- remove(xid);
- return result;
+ try {
+ bool result = getWork(xid)->commit(onePhase);
+ remove(xid);
+ return result;
+ } catch (DtxTimeoutException& e) {
+ remove(xid);
+ throw e;
+ }
}
void DtxManager::rollback(const std::string& xid)
{
- getWork(xid)->rollback();
- remove(xid);
+ try {
+ getWork(xid)->rollback();
+ remove(xid);
+ } catch (DtxTimeoutException& e) {
+ remove(xid);
+ throw e;
+ }
}
DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid)
@@ -83,7 +100,7 @@
}
}
-DtxManager::WorkMap::iterator DtxManager::createWork(std::string& xid)
+DtxManager::WorkMap::iterator DtxManager::createWork(std::string xid)
{
Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
@@ -91,5 +108,50 @@
throw ConnectionException(503, boost::format("Xid %1% is already known
(use 'join' to add work to an existing xid)!") % xid);
} else {
return work.insert(xid, new DtxWorkRecord(xid, store)).first;
+ }
+}
+
+void DtxManager::setTimeout(const std::string& xid, uint32_t secs)
+{
+ WorkMap::iterator record = getWork(xid);
+ DtxTimeout::shared_ptr timeout = record->getTimeout();
+ if (timeout.get()) {
+ if (timeout->timeout == secs) return;//no need to do anything further
if timeout hasn't changed
+ timeout->cancelled = true;
+ }
+ timeout = DtxTimeout::shared_ptr(new DtxTimeout(secs, *this, xid));
+ record->setTimeout(timeout);
+ timer.add(boost::static_pointer_cast<TimerTask>(timeout));
+
+}
+
+uint32_t DtxManager::getTimeout(const std::string& xid)
+{
+ DtxTimeout::shared_ptr timeout = getWork(xid)->getTimeout();
+ return !timeout ? 0 : timeout->timeout;
+}
+
+void DtxManager::timedout(const std::string& xid)
+{
+ Mutex::ScopedLock locker(lock);
+ WorkMap::iterator i = work.find(xid);
+ if (i == work.end()) {
+ QPID_LOG(warning, "Transaction timeout failed: no record for xid");
+ } else {
+ i->timedout();
+ //TODO: do we want to have a timed task to cleanup, or can we rely on
an explicit completion?
+ //timer.add(TimerTask::shared_ptr(new DtxCleanup(60*30/*30 mins*/,
*this, xid)));
+ }
+}
+
+DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const
std::string& _xid)
+ : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC)),
mgr(_mgr), xid(_xid) {}
+
+void DtxManager::DtxCleanup::fire()
+{
+ try {
+ mgr.remove(xid);
+ } catch (ConnectionException& e) {
+ //assume it was explicitly cleaned up after a call to prepare, commit
or rollback
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h Fri Jun 8
08:24:12 2007
@@ -24,6 +24,7 @@
#include <boost/ptr_container/ptr_map.hpp>
#include "DtxBuffer.h"
#include "DtxWorkRecord.h"
+#include "Timer.h"
#include "TransactionalStore.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/sys/Mutex.h"
@@ -34,23 +35,37 @@
class DtxManager{
typedef boost::ptr_map<std::string, DtxWorkRecord> WorkMap;
+
+ struct DtxCleanup : public TimerTask
+ {
+ DtxManager& mgr;
+ const std::string& xid;
+
+ DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid);
+ void fire();
+ };
+
WorkMap work;
TransactionalStore* const store;
qpid::sys::Mutex lock;
+ Timer timer;
void remove(const std::string& xid);
WorkMap::iterator getWork(const std::string& xid);
- WorkMap::iterator createWork(std::string& xid);
+ WorkMap::iterator createWork(std::string xid);
public:
DtxManager(TransactionalStore* const store);
~DtxManager();
- void start(std::string xid, DtxBuffer::shared_ptr work);
- void join(std::string xid, DtxBuffer::shared_ptr work);
- void recover(std::string xid, std::auto_ptr<TPCTransactionContext> txn,
DtxBuffer::shared_ptr work);
+ void start(const std::string& xid, DtxBuffer::shared_ptr work);
+ void join(const std::string& xid, DtxBuffer::shared_ptr work);
+ void recover(const std::string& xid, std::auto_ptr<TPCTransactionContext>
txn, DtxBuffer::shared_ptr work);
bool prepare(const std::string& xid);
bool commit(const std::string& xid, bool onePhase);
void rollback(const std::string& xid);
+ void setTimeout(const std::string& xid, uint32_t secs);
+ uint32_t getTimeout(const std::string& xid);
+ void timedout(const std::string& xid);
};
}
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp?view=auto&rev=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp Fri Jun 8
08:24:12 2007
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DtxTimeout.h"
+#include "DtxManager.h"
+#include "qpid/sys/Time.h"
+
+using namespace qpid::broker;
+
+DtxTimeout::DtxTimeout(uint32_t _timeout, DtxManager& _mgr, const std::string&
_xid)
+ : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC)),
timeout(_timeout), mgr(_mgr), xid(_xid)
+{
+}
+
+void DtxTimeout::fire()
+{
+ mgr.timedout(xid);
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h?view=auto&rev=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h Fri Jun 8
08:24:12 2007
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DtxTimeout_
+#define _DtxTimeout_
+
+#include "qpid/Exception.h"
+#include "Timer.h"
+
+namespace qpid {
+namespace broker {
+
+class DtxManager;
+
+
+struct DtxTimeoutException : public Exception
+{
+ DtxTimeoutException() {}
+};
+
+
+struct DtxTimeout : public TimerTask
+{
+ typedef boost::shared_ptr<DtxTimeout> shared_ptr;
+ const uint32_t timeout;
+ DtxManager& mgr;
+ const std::string xid;
+
+ DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid);
+ void fire();
+};
+
+}
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp Fri Jun 8
08:24:12 2007
@@ -27,9 +27,14 @@
using namespace qpid::broker;
DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore*
const _store) :
- xid(_xid), store(_store), completed(false), rolledback(false),
prepared(false) {}
+ xid(_xid), store(_store), completed(false), rolledback(false),
prepared(false), expired(false) {}
-DtxWorkRecord::~DtxWorkRecord() {}
+DtxWorkRecord::~DtxWorkRecord()
+{
+ if (timeout.get()) {
+ timeout->cancelled = true;
+ }
+}
bool DtxWorkRecord::prepare()
{
@@ -110,6 +115,9 @@
void DtxWorkRecord::add(DtxBuffer::shared_ptr ops)
{
Mutex::ScopedLock locker(lock);
+ if (expired) {
+ throw DtxTimeoutException();
+ }
if (completed) {
throw ConnectionException(503, boost::format("Branch with xid %1% has
been completed!") % xid);
}
@@ -118,6 +126,9 @@
bool DtxWorkRecord::check()
{
+ if (expired) {
+ throw DtxTimeoutException();
+ }
if (!completed) {
//iterate through all DtxBuffers and ensure they are all ended
for (Work::iterator i = work.begin(); i != work.end(); i++) {
@@ -148,4 +159,19 @@
ops->markEnded();
completed = true;
prepared = true;
+}
+
+void DtxWorkRecord::timedout()
+{
+ Mutex::ScopedLock locker(lock);
+ expired = true;
+ rolledback = true;
+ if (!completed) {
+ for (Work::iterator i = work.begin(); i != work.end(); i++) {
+ if (!(*i)->isEnded()) {
+ (*i)->timedout();
+ }
+ }
+ }
+ abort();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h Fri Jun 8
08:24:12 2007
@@ -25,6 +25,7 @@
#include <functional>
#include <vector>
#include "DtxBuffer.h"
+#include "DtxTimeout.h"
#include "TransactionalStore.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/sys/Mutex.h"
@@ -46,6 +47,8 @@
bool completed;
bool rolledback;
bool prepared;
+ bool expired;
+ DtxTimeout::shared_ptr timeout;
Work work;
std::auto_ptr<TPCTransactionContext> txn;
qpid::sys::Mutex lock;
@@ -61,6 +64,9 @@
void rollback();
void add(DtxBuffer::shared_ptr ops);
void recover(std::auto_ptr<TPCTransactionContext> txn,
DtxBuffer::shared_ptr ops);
+ void timedout();
+ void setTimeout(DtxTimeout::shared_ptr t) { timeout = t; }
+ DtxTimeout::shared_ptr getTimeout() { return timeout; }
};
}
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp?view=auto&rev=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp Fri Jun 8 08:24:12
2007
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Timer.h"
+#include <iostream>
+
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::sys::Monitor;
+using qpid::sys::Thread;
+using namespace qpid::broker;
+
+TimerTask::TimerTask(Duration timeout) : time(AbsTime::now(), timeout),
cancelled(false) {}
+TimerTask::TimerTask(AbsTime _time) : time(_time), cancelled(false) {}
+TimerTask::~TimerTask(){}
+
+Timer::Timer() : active(false)
+{
+ start();
+}
+
+Timer::~Timer()
+{
+ stop();
+}
+
+void Timer::run()
+{
+ Monitor::ScopedLock l(monitor);
+ while(active){
+ if (tasks.empty()) {
+ monitor.wait();
+ } else {
+ TimerTask::shared_ptr t = tasks.top();
+ if (t->cancelled) {
+ tasks.pop();
+ } else if(t->time < AbsTime::now()) {
+ tasks.pop();
+ t->fire();
+ } else {
+ monitor.wait(t->time);
+ }
+ }
+ }
+}
+
+void Timer::add(TimerTask::shared_ptr task)
+{
+ Monitor::ScopedLock l(monitor);
+ tasks.push(task);
+ monitor.notify();
+}
+
+void Timer::start()
+{
+ Monitor::ScopedLock l(monitor);
+ if (!active) {
+ active = true;
+ runner = std::auto_ptr<Thread>(new Thread(this));
+ }
+}
+
+void Timer::stop()
+{
+ signalStop();
+ if (runner.get()) {
+ runner->join();
+ runner.reset();
+ }
+}
+void Timer::signalStop()
+{
+ Monitor::ScopedLock l(monitor);
+ if (active) {
+ active = false;
+ monitor.notifyAll();
+ }
+}
+
+bool Later::operator()(const TimerTask::shared_ptr& a, const
TimerTask::shared_ptr& b) const
+{
+ return a.get() && b.get() && a->time > b->time;
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h?view=auto&rev=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h Fri Jun 8 08:24:12
2007
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Timer_
+#define _Timer_
+
+#include <memory>
+#include <queue>
+#include <boost/shared_ptr.hpp>
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Runnable.h"
+
+namespace qpid {
+namespace broker {
+
+struct TimerTask
+{
+ typedef boost::shared_ptr<TimerTask> shared_ptr;
+
+ const qpid::sys::AbsTime time;
+ volatile bool cancelled;
+
+ TimerTask(qpid::sys::Duration timeout);
+ TimerTask(qpid::sys::AbsTime time);
+ virtual ~TimerTask();
+ virtual void fire() = 0;
+};
+
+ struct Later
+ {
+ bool operator()(const TimerTask::shared_ptr& a, const
TimerTask::shared_ptr& b) const;
+ };
+
+class Timer : private qpid::sys::Runnable
+{
+ qpid::sys::Monitor monitor;
+ std::priority_queue<TimerTask::shared_ptr,
std::vector<TimerTask::shared_ptr>, Later> tasks;
+ std::auto_ptr<qpid::sys::Thread> runner;
+ bool active;
+
+ void run();
+ void signalStop();
+
+public:
+ Timer();
+ ~Timer();
+
+ void add(TimerTask::shared_ptr task);
+ void start();
+ void stop();
+
+};
+
+}
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h Fri Jun 8 08:24:12 2007
@@ -44,6 +44,9 @@
static AbsTime now();
inline static AbsTime FarFuture();
+
+ friend bool operator<(const AbsTime& a, const AbsTime& b);
+ friend bool operator>(const AbsTime& a, const AbsTime& b);
};
class Duration {
@@ -66,6 +69,9 @@
AbsTime AbsTime::FarFuture() { AbsTime ff; ff.time_ns =
std::numeric_limits<int64_t>::max(); return ff;}
inline AbsTime now() { return AbsTime::now(); }
+
+inline bool operator<(const AbsTime& a, const AbsTime& b) { return a.time_ns <
b.time_ns; }
+inline bool operator>(const AbsTime& a, const AbsTime& b) { return a.time_ns >
b.time_ns; }
Duration::Duration(int64_t time0) :
nanosecs(time0)
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Fri Jun 8 08:24:12 2007
@@ -32,6 +32,7 @@
QueueRegistryTest \
QueueTest \
QueuePolicyTest \
+ TimerTest \
TopicExchangeTest \
TxAckTest \
TxBufferTest \
Added: incubator/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp?view=auto&rev=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp Fri Jun 8 08:24:12
2007
@@ -0,0 +1,128 @@
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Timer.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid_test_plugin.h"
+#include <math.h>
+#include <iostream>
+#include <memory>
+#include <boost/format.hpp>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+using boost::dynamic_pointer_cast;
+
+class TimerTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(TimerTest);
+ CPPUNIT_TEST(testGeneral);
+ CPPUNIT_TEST_SUITE_END();
+
+ class Counter
+ {
+ Mutex lock;
+ uint counter;
+ public:
+ Counter() : counter(0) {}
+ uint next()
+ {
+ Mutex::ScopedLock l(lock);
+ return ++counter;
+ }
+ };
+
+ class TestTask : public TimerTask
+ {
+ const AbsTime start;
+ const Duration expected;
+ AbsTime end;
+ bool fired;
+ uint position;
+ Monitor monitor;
+ Counter& counter;
+
+ public:
+ TestTask(Duration timeout, Counter& _counter)
+ : TimerTask(timeout), start(now()), expected(timeout), end(start),
fired(false), counter(_counter) {}
+
+ void fire()
+ {
+ Monitor::ScopedLock l(monitor);
+ fired = true;
+ position = counter.next();
+ end = now();
+ monitor.notify();
+ }
+
+ void check(uint expected_position, uint64_t tolerance = 500 *
TIME_MSEC)
+ {
+ Monitor::ScopedLock l(monitor);
+ CPPUNIT_ASSERT(fired);
+ CPPUNIT_ASSERT_EQUAL(expected_position, position);
+ Duration actual(start, end);
+ uint64_t difference = abs(expected - actual);
+ std::string
msg(boost::lexical_cast<std::string>(boost::format("tolerance = %1%, difference
= %2%") % tolerance % difference));
+ CPPUNIT_ASSERT_MESSAGE(msg, difference < tolerance);
+ }
+
+ void wait(Duration d)
+ {
+ Monitor::ScopedLock l(monitor);
+ monitor.wait(AbsTime(now(), d));
+ }
+ };
+
+ class DummyRunner : public Runnable
+ {
+ public:
+ void run() {}
+ };
+
+public:
+
+ void testGeneral()
+ {
+ Counter counter;
+ Timer timer;
+ TestTask::shared_ptr task1(new TestTask(Duration(3 * TIME_SEC),
counter));
+ TestTask::shared_ptr task2(new TestTask(Duration(1 * TIME_SEC),
counter));
+ TestTask::shared_ptr task3(new TestTask(Duration(4 * TIME_SEC),
counter));
+ TestTask::shared_ptr task4(new TestTask(Duration(2 * TIME_SEC),
counter));
+
+ timer.add(task1);
+ timer.add(task2);
+ timer.add(task3);
+ timer.add(task4);
+
+ dynamic_pointer_cast<TestTask>(task3)->wait(Duration(6 * TIME_SEC));
+
+ dynamic_pointer_cast<TestTask>(task1)->check(3);
+ dynamic_pointer_cast<TestTask>(task2)->check(1);
+ dynamic_pointer_cast<TestTask>(task3)->check(4);
+ dynamic_pointer_cast<TestTask>(task4)->check(2);
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(TimerTest);
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/python/tests_0-9/dtx.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-9/dtx.py?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-9/dtx.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-9/dtx.py Fri Jun 8 08:24:12 2007
@@ -21,6 +21,7 @@
from qpid.content import Content
from qpid.testlib import testrunner, TestBase
from struct import pack, unpack
+from time import sleep
class DtxTests(TestBase):
"""
@@ -37,6 +38,7 @@
"""
XA_RBROLLBACK = 1
+ XA_RBTIMEOUT = 2
XA_OK = 8
def test_simple_commit(self):
@@ -450,6 +452,51 @@
self.assertEqual(self.XA_RBROLLBACK,
channel1.dtx_coordination_prepare(xid=tx).flags)
channel1.dtx_coordination_rollback(xid=tx)
+
+ def test_get_timeout(self):
+ """
+ Check that get-timeout returns the correct value, (and that a
+ transaction with a timeout can complete normally)
+ """
+ channel = self.channel
+ tx = self.xid("dummy")
+
+ channel.dtx_demarcation_select()
+ channel.dtx_demarcation_start(xid=tx)
+ self.assertEqual(0,
channel.dtx_coordination_get_timeout(xid=tx).timeout)
+ channel.dtx_coordination_set_timeout(xid=tx, timeout=60)
+ self.assertEqual(60,
channel.dtx_coordination_get_timeout(xid=tx).timeout)
+ self.assertEqual(self.XA_OK, channel.dtx_demarcation_end(xid=tx).flags)
+ self.assertEqual(self.XA_OK,
channel.dtx_coordination_rollback(xid=tx).flags)
+
+ def test_set_timeout(self):
+ """
+ Test the timeout of a transaction results in the expected
+ behaviour
+ """
+ #open new channel to allow self.channel to be used in checking te queue
+ channel = self.client.channel(2)
+ channel.channel_open()
+ #setup:
+ tx = self.xid("dummy")
+ channel.queue_declare(queue="queue-a", exclusive=True)
+ channel.queue_declare(queue="queue-b", exclusive=True)
+ channel.message_transfer(routing_key="queue-a", message_id="timeout",
body="DtxMessage")
+
+ channel.dtx_demarcation_select()
+ channel.dtx_demarcation_start(xid=tx)
+ self.swap(channel, "queue-a", "queue-b")
+ channel.dtx_coordination_set_timeout(xid=tx, timeout=2)
+ sleep(3)
+ #check that the work has been rolled back already
+ self.assertMessageCount(1, "queue-a")
+ self.assertMessageCount(0, "queue-b")
+ self.assertMessageId("timeout", "queue-a")
+ #check the correct codes are returned when we try to complete the txn
+ self.assertEqual(self.XA_RBTIMEOUT,
channel.dtx_demarcation_end(xid=tx).flags)
+ self.assertEqual(self.XA_RBTIMEOUT,
channel.dtx_coordination_rollback(xid=tx).flags)
+
+
def test_recover(self):
"""