Author: gsim
Date: Tue Nov 7 03:26:50 2006
New Revision: 472067
URL: http://svn.apache.org/viewvc?view=rev&rev=472067
Log:
Made passing of transaction context in message store explicit (to avoid thread
local storage in case this doesn't fit with new io design).
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp (with
props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (with
props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TransactionalStore.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxAckTest.cpp
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp Tue Nov 7
03:26:50 2006
@@ -198,7 +198,7 @@
throw InvalidAckException();
}else if(multiple){
ack_iterator end = ++i;
- for_each(unacked.begin(), end,
mem_fun_ref(&DeliveryRecord::discard));
+ for_each(unacked.begin(), end,
bind2nd(mem_fun_ref(&DeliveryRecord::discard), 0));
unacked.erase(unacked.begin(), end);
//recalculate the prefetch:
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.cpp?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.cpp Tue Nov 7
03:26:50 2006
@@ -21,8 +21,8 @@
DeletingTxOp::DeletingTxOp(TxOp* const _delegate) : delegate(_delegate){}
-bool DeletingTxOp::prepare() throw(){
- return delegate && delegate->prepare();
+bool DeletingTxOp::prepare(TransactionContext* ctxt) throw(){
+ return delegate && delegate->prepare(ctxt);
}
void DeletingTxOp::commit() throw(){
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.h?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.h Tue Nov 7
03:26:50 2006
@@ -30,7 +30,7 @@
TxOp* delegate;
public:
DeletingTxOp(TxOp* const delegate);
- virtual bool prepare() throw();
+ virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
virtual ~DeletingTxOp(){}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Tue Nov 7
03:26:50 2006
@@ -38,8 +38,8 @@
pull(true){}
-void DeliveryRecord::discard() const{
- queue->dequeue(msg, 0);
+void DeliveryRecord::discard(TransactionContext* ctxt) const{
+ queue->dequeue(ctxt, msg, 0);
}
bool DeliveryRecord::matches(u_int64_t tag) const{
@@ -48,10 +48,6 @@
bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{
return range->covers(deliveryTag);
-}
-
-void DeliveryRecord::discardIfCoveredBy(const AccumulatedAck* const range)
const{
- if(coveredBy(range)) discard();
}
void DeliveryRecord::redeliver(Channel* const channel) const{
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Tue Nov 7
03:26:50 2006
@@ -43,10 +43,9 @@
DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue,
const string consumerTag, const u_int64_t deliveryTag);
DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue,
const u_int64_t deliveryTag);
- void discard() const;
+ void discard(TransactionContext* ctxt = 0) const;
bool matches(u_int64_t tag) const;
bool coveredBy(const AccumulatedAck* const range) const;
- void discardIfCoveredBy(const AccumulatedAck* const range) const;
void requeue() const;
void redeliver(Channel* const) const;
void addTo(Prefetch* const prefetch) const;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Tue Nov 7
03:26:50 2006
@@ -57,7 +57,7 @@
* distributed transaction in which the operation takes
* place or null for 'local' transactions
*/
- virtual void enqueue(Message::shared_ptr& msg, const Queue& queue,
const string * const xid) = 0;
+ virtual void enqueue(TransactionContext* ctxt,
Message::shared_ptr& msg, const Queue& queue, const string * const xid) = 0;
/**
* Dequeues a message, recording that the given message is
* no longer on the given queue and deleting the message
@@ -69,7 +69,7 @@
* distributed transaction in which the operation takes
* place or null for 'local' transactions
*/
- virtual void dequeue(Message::shared_ptr& msg, const Queue& queue,
const string * const xid) = 0;
+ virtual void dequeue(TransactionContext* ctxt,
Message::shared_ptr& msg, const Queue& queue, const string * const xid) = 0;
/**
* Treat all enqueue/dequeues where this xid was specified as
being committed.
*/
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?view=auto&rev=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Tue Nov
7 03:26:50 2006
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/broker/NullMessageStore.h"
+
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueRegistry.h"
+
+#include <iostream>
+
+using namespace qpid::broker;
+
+void NullMessageStore::create(const Queue& queue){
+ std::cout << "WARNING: Can't create durable queue '" << queue.getName() <<
"'. Persistence not enabled." << std::endl;
+}
+void NullMessageStore::destroy(const Queue& queue){
+ std::cout << "WARNING: Can't destroy durable queue '" << queue.getName()
<< "'. Persistence not enabled." << std::endl;
+}
+void NullMessageStore::recover(QueueRegistry&){
+ std::cout << "WARNING: Persistence not enabled, no recovery of queues or
messages." << std::endl;
+}
+void NullMessageStore::enqueue(TransactionContext*, Message::shared_ptr&,
const Queue& queue, const string * const){
+ std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() <<
"'. Persistence not enabled." << std::endl;
+}
+void NullMessageStore::dequeue(TransactionContext*, Message::shared_ptr&,
const Queue& queue, const string * const){
+ std::cout << "WARNING: Can't dequeue message from '" << queue.getName() <<
"'. Persistence not enabled." << std::endl;
+}
+void NullMessageStore::committed(const string * const){
+ std::cout << "WARNING: Persistence not enabled." << std::endl;
+}
+void NullMessageStore::aborted(const string * const){
+ std::cout << "WARNING: Persistence not enabled." << std::endl;
+}
+TransactionContext* NullMessageStore::begin(){
+ return 0;
+}
+void NullMessageStore::commit(TransactionContext*){
+}
+void NullMessageStore::abort(TransactionContext*){
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?view=auto&rev=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Tue Nov 7
03:26:50 2006
@@ -0,0 +1,51 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _NullMessageStore_
+#define _NullMessageStore_
+
+#include "qpid/broker/Message.h"
+#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/Queue.h"
+
+namespace qpid {
+ namespace broker {
+ class Queue;
+ class QueueRegistry;
+
+ /**
+ * A null implementation of the MessageStore interface
+ */
+ class NullMessageStore : public MessageStore{
+ public:
+ void create(const Queue& queue);
+ void destroy(const Queue& queue);
+ void recover(QueueRegistry& queues);
+ void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg,
const Queue& queue, const string * const xid);
+ void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg,
const Queue& queue, const string * const xid);
+ void committed(const string * const xid);
+ void aborted(const string * const xid);
+ TransactionContext* begin();
+ void commit(TransactionContext* ctxt);
+ void abort(TransactionContext* ctxt);
+ ~NullMessageStore(){}
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Nov 7 03:26:50
2006
@@ -35,7 +35,8 @@
dispatching(false),
next(0),
lastUsed(0),
- exclusive(0)
+ exclusive(0),
+ persistenceId(0)
{
if(autodelete) lastUsed = Time::now().msecs();
}
@@ -52,7 +53,7 @@
}
void Queue::deliver(Message::shared_ptr& msg){
- enqueue(msg, 0);
+ enqueue(0, msg, 0);
process(msg);
}
@@ -163,15 +164,17 @@
return lastUsed && (Time::now().msecs() - lastUsed > autodelete);
}
-void Queue::enqueue(Message::shared_ptr& msg, const string * const xid){
- if(store){
- store->enqueue(msg, *this, xid);
+void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const
string * const xid)
+{
+ if(msg->isPersistent() && store){
+ store->enqueue(ctxt, msg, *this, xid);
}
}
-void Queue::dequeue(Message::shared_ptr& msg, const string * const xid){
- if(store){
- store->dequeue(msg, *this, xid);
+void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const
string * const xid)
+{
+ if(msg->isPersistent() && store){
+ store->dequeue(ctxt, msg, *this, xid);
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Nov 7 03:26:50
2006
@@ -58,6 +58,7 @@
mutable qpid::sys::Monitor lock;
int64_t lastUsed;
Consumer* exclusive;
+ u_int64_t persistenceId;
bool startDispatching();
bool dispatch(Message::shared_ptr& msg);
@@ -106,10 +107,13 @@
inline const string& getName() const { return name; }
inline const bool isExclusiveOwner(const ConnectionToken* const o)
const { return o == owner; }
inline bool hasExclusiveConsumer() const { return exclusive; }
+ inline u_int64_t getPersistenceId() const { return persistenceId; }
+ inline void setPersistenceId(u_int64_t _persistenceId) {
persistenceId = _persistenceId; }
+
bool canAutoDelete() const;
- void enqueue(Message::shared_ptr& msg, const string * const xid);
- void dequeue(Message::shared_ptr& msg, const string * const xid);
+ void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg,
const string * const xid);
+ void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg,
const string * const xid);
};
}
}
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
Tue Nov 7 03:26:50 2006
@@ -16,10 +16,12 @@
*
*/
#include "qpid/broker/SessionHandlerFactoryImpl.h"
-#include "qpid/broker/SessionHandlerImpl.h"
+
+#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
-#include "qpid/broker/DirectExchange.h"
+#include "qpid/broker/NullMessageStore.h"
+#include "qpid/broker/SessionHandlerImpl.h"
using namespace qpid::broker;
using namespace qpid::sys;
@@ -34,7 +36,7 @@
}
SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) :
- queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10)
+ store(new NullMessageStore()), queues(store.get()), timeout(_timeout),
cleaner(&queues, timeout/10)
{
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
exchanges.declare(amq_direct, DirectExchange::typeName);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TransactionalStore.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TransactionalStore.h?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TransactionalStore.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TransactionalStore.h Tue Nov
7 03:26:50 2006
@@ -20,11 +20,16 @@
namespace qpid {
namespace broker {
+ class TransactionContext{
+ public:
+ virtual ~TransactionContext(){}
+ };
+
class TransactionalStore{
public:
- virtual void begin() = 0;
- virtual void commit() = 0;
- virtual void abort() = 0;
+ virtual TransactionContext* begin() = 0;
+ virtual void commit(TransactionContext*) = 0;
+ virtual void abort(TransactionContext*) = 0;
virtual ~TransactionalStore(){}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp Tue Nov 7 03:26:50
2006
@@ -26,10 +26,15 @@
}
-bool TxAck::prepare() throw(){
+bool TxAck::prepare(TransactionContext* ctxt) throw(){
try{
//dequeue all acked messages from their queues
- for_each(unacked.begin(), unacked.end(),
bind2nd(mem_fun_ref(&DeliveryRecord::discardIfCoveredBy), &acked));
+ for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
+ if (i->coveredBy(&acked)) {
+ i->discard(ctxt);
+ }
+ }
+ //for_each(unacked.begin(), unacked.end(),
bind2nd(mem_fun_ref(&DeliveryRecord::discardIfCoveredBy), &acked));
return true;
}catch(...){
std::cout << "TxAck::prepare() - Failed to prepare" << std::endl;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h Tue Nov 7 03:26:50
2006
@@ -41,7 +41,7 @@
* @param unacked the record of delivered messages
*/
TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
- virtual bool prepare() throw();
+ virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
virtual ~TxAck(){}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp Tue Nov 7
03:26:50 2006
@@ -17,30 +17,28 @@
*/
#include "qpid/broker/TxBuffer.h"
+using std::mem_fun;
using namespace qpid::broker;
bool TxBuffer::prepare(TransactionalStore* const store){
- if(store) store->begin();
+ TransactionContext* ctxt(0);
+ if(store) ctxt = store->begin();
for(op_iterator i = ops.begin(); i < ops.end(); i++){
- if(!(*i)->prepare()){
- if(store) store->abort();
+ if(!(*i)->prepare(ctxt)){
+ if(store) store->abort(ctxt);
return false;
}
}
- if(store) store->commit();
+ if(store) store->commit(ctxt);
return true;
}
void TxBuffer::commit(){
- for(op_iterator i = ops.begin(); i < ops.end(); i++){
- (*i)->commit();
- }
+ for_each(ops.begin(), ops.end(), mem_fun(&TxOp::commit));
}
void TxBuffer::rollback(){
- for(op_iterator i = ops.begin(); i < ops.end(); i++){
- (*i)->rollback();
- }
+ for_each(ops.begin(), ops.end(), mem_fun(&TxOp::rollback));
}
void TxBuffer::enlist(TxOp* const op){
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h Tue Nov 7
03:26:50 2006
@@ -18,6 +18,8 @@
#ifndef _TxBuffer_
#define _TxBuffer_
+#include <algorithm>
+#include <functional>
#include <vector>
#include "qpid/broker/TransactionalStore.h"
#include "qpid/broker/TxOp.h"
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h Tue Nov 7 03:26:50
2006
@@ -18,11 +18,13 @@
#ifndef _TxOp_
#define _TxOp_
+#include "qpid/broker/TransactionalStore.h"
+
namespace qpid {
namespace broker {
class TxOp{
public:
- virtual bool prepare() throw() = 0;
+ virtual bool prepare(TransactionContext*) throw() = 0;
virtual void commit() throw() = 0;
virtual void rollback() throw() = 0;
virtual ~TxOp(){}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp Tue Nov 7
03:26:50 2006
@@ -21,9 +21,9 @@
TxPublish::TxPublish(Message::shared_ptr _msg) : msg(_msg) {}
-bool TxPublish::prepare() throw(){
+bool TxPublish::prepare(TransactionContext* ctxt) throw(){
try{
- for_each(queues.begin(), queues.end(), Prepare(msg, 0));
+ for_each(queues.begin(), queues.end(), Prepare(ctxt, msg, 0));
return true;
}catch(...){
std::cout << "TxPublish::prepare() - Failed to prepare" << std::endl;
@@ -42,10 +42,11 @@
queues.push_back(queue);
}
-TxPublish::Prepare::Prepare(Message::shared_ptr& _msg, const string* const
_xid) : msg(_msg), xid(_xid){}
+TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr&
_msg, const string* const _xid)
+ : ctxt(_ctxt), msg(_msg), xid(_xid){}
void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){
- queue->enqueue(msg, xid);
+ queue->enqueue(ctxt, msg, xid);
}
TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h Tue Nov 7
03:26:50 2006
@@ -41,10 +41,11 @@
*/
class TxPublish : public TxOp, public Deliverable{
class Prepare{
+ TransactionContext* ctxt;
Message::shared_ptr& msg;
const string* const xid;
public:
- Prepare(Message::shared_ptr& msg, const string* const xid);
+ Prepare(TransactionContext* ctxt, Message::shared_ptr& msg,
const string* const xid);
void operator()(Queue::shared_ptr& queue);
};
@@ -60,7 +61,7 @@
public:
TxPublish(Message::shared_ptr msg);
- virtual bool prepare() throw();
+ virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxAckTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxAckTest.cpp?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxAckTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxAckTest.cpp Tue Nov
7 03:26:50 2006
@@ -26,6 +26,7 @@
using std::list;
using std::vector;
using namespace qpid::broker;
+using namespace qpid::framing;
class TxAckTest : public CppUnit::TestCase
{
@@ -35,7 +36,7 @@
public:
vector<Message::shared_ptr> dequeued;
- void dequeue(Message::shared_ptr& msg, const Queue& /*queue*/, const
string * const /*xid*/)
+ void dequeue(TransactionContext*, Message::shared_ptr& msg, const
Queue& /*queue*/, const string * const /*xid*/)
{
dequeued.push_back(msg);
}
@@ -44,12 +45,12 @@
void create(const Queue&){}
void destroy(const Queue&){}
void recover(QueueRegistry&){}
- void enqueue(Message::shared_ptr&, const Queue&, const string *
const){}
+ void enqueue(TransactionContext*, Message::shared_ptr&, const Queue&,
const string * const){}
void committed(const string * const){}
void aborted(const string * const){}
- void begin(){}
- void commit(){}
- void abort(){}
+ TransactionContext* begin(){ return 0; }
+ void commit(TransactionContext*){}
+ void abort(TransactionContext*){}
~TestMessageStore(){}
};
@@ -73,6 +74,8 @@
{
for(int i = 0; i < 10; i++){
Message::shared_ptr msg(new Message(0, "exchange", "routing_key",
false, false));
+ msg->setHeader(AMQHeaderBody::shared_ptr(new
AMQHeaderBody(BASIC)));
+ msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
messages.push_back(msg);
deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1)));
}
@@ -86,7 +89,7 @@
void testPrepare()
{
//ensure acked messages are discarded, i.e. dequeued from store
- op.prepare();
+ op.prepare(0);
CPPUNIT_ASSERT_EQUAL((size_t) 7, store.dequeued.size());
CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size());
CPPUNIT_ASSERT_EQUAL(messages[0], store.dequeued[0]);//msg 1
Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp Tue
Nov 7 03:26:50 2006
@@ -34,6 +34,35 @@
class TxBufferTest : public CppUnit::TestCase
{
+ class TestTransactionContext : public TransactionContext{
+ enum states {OPEN = 1, COMMITTED = 2, ABORTED = 3};
+ int state;
+ public:
+ TestTransactionContext() : state(OPEN) {}
+ void commit(){
+ if(state != OPEN) throw "txn already completed";
+ state = COMMITTED;
+ }
+
+ void abort(){
+ if(state != OPEN) throw "txn already completed";
+ state = ABORTED;
+ }
+
+ bool isCommitted(){
+ return state == COMMITTED;
+ }
+
+ bool isAborted(){
+ return state == ABORTED;
+ }
+
+ bool isOpen(){
+ return state == OPEN;
+ }
+ ~TestTransactionContext(){}
+ };
+
class MockTxOp : public TxOp{
enum op_codes {PREPARE=2, COMMIT=4, ROLLBACK=8};
std::vector<int> expected;
@@ -43,7 +72,7 @@
MockTxOp() : failOnPrepare(false) {}
MockTxOp(bool _failOnPrepare) : failOnPrepare(_failOnPrepare) {}
- bool prepare() throw(){
+ bool prepare(TransactionContext*) throw(){
actual.push_back(PREPARE);
return !failOnPrepare;
}
@@ -75,15 +104,25 @@
enum op_codes {BEGIN=2, COMMIT=4, ABORT=8};
std::vector<int> expected;
std::vector<int> actual;
+
public:
- void begin(){
+ TestTransactionContext txn;
+
+ TransactionContext* begin(){
actual.push_back(BEGIN);
+ return &txn;
}
- void commit(){
+ void commit(TransactionContext* ctxt){
actual.push_back(COMMIT);
+ TestTransactionContext*
_txn(dynamic_cast<TestTransactionContext*>(ctxt));
+ CPPUNIT_ASSERT_EQUAL(_txn, &txn);
+ _txn->commit();
}
- void abort(){
+ void abort(TransactionContext* ctxt){
actual.push_back(ABORT);
+ TestTransactionContext*
_txn(dynamic_cast<TestTransactionContext*>(ctxt));
+ CPPUNIT_ASSERT_EQUAL(_txn, &txn);
+ _txn->abort();
}
MockTransactionalStore& expectBegin(){
expected.push_back(BEGIN);
@@ -131,6 +170,7 @@
CPPUNIT_ASSERT(buffer.prepare(&store));
buffer.commit();
store.check();
+ CPPUNIT_ASSERT(store.txn.isCommitted());
opA.check();
opB.check();
opC.check();
@@ -153,6 +193,7 @@
CPPUNIT_ASSERT(!buffer.prepare(&store));
store.check();
+ CPPUNIT_ASSERT(store.txn.isAborted());
opA.check();
opB.check();
opC.check();
@@ -181,3 +222,4 @@
// Make this test suite a plugin.
CPPUNIT_PLUGIN_IMPLEMENT();
CPPUNIT_TEST_SUITE_REGISTRATION(TxBufferTest);
+
Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp Tue
Nov 7 03:26:50 2006
@@ -27,6 +27,7 @@
using std::pair;
using std::vector;
using namespace qpid::broker;
+using namespace qpid::framing;
class TxPublishTest : public CppUnit::TestCase
{
@@ -36,7 +37,7 @@
public:
vector< pair<string, Message::shared_ptr> > enqueued;
- void enqueue(Message::shared_ptr& msg, const Queue& queue, const
string * const /*xid*/)
+ void enqueue(TransactionContext*, Message::shared_ptr& msg, const
Queue& queue, const string * const /*xid*/)
{
enqueued.push_back(pair<string,
Message::shared_ptr>(queue.getName(),msg));
}
@@ -45,12 +46,12 @@
void create(const Queue&){}
void destroy(const Queue&){}
void recover(QueueRegistry&){}
- void dequeue(Message::shared_ptr&, const Queue&, const string *
const){}
+ void dequeue(TransactionContext*, Message::shared_ptr&, const Queue&,
const string * const){}
void committed(const string * const){}
void aborted(const string * const){}
- void begin(){}
- void commit(){}
- void abort(){}
+ TransactionContext* begin(){ return 0; }
+ void commit(TransactionContext*){}
+ void abort(TransactionContext*){}
~TestMessageStore(){}
};
@@ -74,6 +75,8 @@
msg(new Message(0, "exchange", "routing_key", false,
false)),
op(msg)
{
+ msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));
+ msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
op.deliverTo(queue1);
op.deliverTo(queue2);
}
@@ -81,7 +84,7 @@
void testPrepare()
{
//ensure messages are enqueued in store
- op.prepare();
+ op.prepare(0);
CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size());
CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first);
CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[0].second);