Author: gsim
Date: Tue Nov  7 03:26:50 2006
New Revision: 472067

URL: http://svn.apache.org/viewvc?view=rev&rev=472067
Log:
Made passing of transaction context in message store explicit (to avoid thread 
local storage in case this doesn't fit with new io design).


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp   (with 
props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h   (with 
props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TransactionalStore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxAckTest.cpp
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp Tue Nov  7 
03:26:50 2006
@@ -198,7 +198,7 @@
             throw InvalidAckException();
         }else if(multiple){     
             ack_iterator end = ++i;
-            for_each(unacked.begin(), end, 
mem_fun_ref(&DeliveryRecord::discard));
+            for_each(unacked.begin(), end, 
bind2nd(mem_fun_ref(&DeliveryRecord::discard), 0));
             unacked.erase(unacked.begin(), end);
 
             //recalculate the prefetch:

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.cpp?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.cpp Tue Nov  7 
03:26:50 2006
@@ -21,8 +21,8 @@
 
 DeletingTxOp::DeletingTxOp(TxOp* const _delegate) : delegate(_delegate){}
 
-bool DeletingTxOp::prepare() throw(){
-    return delegate && delegate->prepare();
+bool DeletingTxOp::prepare(TransactionContext* ctxt) throw(){
+    return delegate && delegate->prepare(ctxt);
 }
 
 void DeletingTxOp::commit() throw(){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.h?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.h Tue Nov  7 
03:26:50 2006
@@ -30,7 +30,7 @@
             TxOp* delegate;
         public:
             DeletingTxOp(TxOp* const delegate);
-            virtual bool prepare() throw();
+            virtual bool prepare(TransactionContext* ctxt) throw();
             virtual void commit()  throw();
             virtual void rollback()  throw();
             virtual ~DeletingTxOp(){}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Tue Nov  7 
03:26:50 2006
@@ -38,8 +38,8 @@
                                                                pull(true){}
 
 
-void DeliveryRecord::discard() const{
-    queue->dequeue(msg, 0);
+void DeliveryRecord::discard(TransactionContext* ctxt) const{
+    queue->dequeue(ctxt, msg, 0);
 }
 
 bool DeliveryRecord::matches(u_int64_t tag) const{
@@ -48,10 +48,6 @@
 
 bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{
     return range->covers(deliveryTag);
-}
-
-void DeliveryRecord::discardIfCoveredBy(const AccumulatedAck* const range) 
const{
-    if(coveredBy(range)) discard();
 }
 
 void DeliveryRecord::redeliver(Channel* const channel) const{

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Tue Nov  7 
03:26:50 2006
@@ -43,10 +43,9 @@
             DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, 
const string consumerTag, const u_int64_t deliveryTag);
             DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, 
const u_int64_t deliveryTag);
             
-            void discard() const;
+            void discard(TransactionContext* ctxt = 0) const;
             bool matches(u_int64_t tag) const;
             bool coveredBy(const AccumulatedAck* const range) const;
-            void discardIfCoveredBy(const AccumulatedAck* const range) const;
             void requeue() const;
             void redeliver(Channel* const) const;
             void addTo(Prefetch* const prefetch) const;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Tue Nov  7 
03:26:50 2006
@@ -57,7 +57,7 @@
              * distributed transaction in which the operation takes
              * place or null for 'local' transactions
              */
-            virtual void enqueue(Message::shared_ptr& msg, const Queue& queue, 
const string * const xid) = 0;
+            virtual void enqueue(TransactionContext* ctxt, 
Message::shared_ptr& msg, const Queue& queue, const string * const xid) = 0;
             /**
              * Dequeues a message, recording that the given message is
              * no longer on the given queue and deleting the message
@@ -69,7 +69,7 @@
              * distributed transaction in which the operation takes
              * place or null for 'local' transactions
              */
-            virtual void dequeue(Message::shared_ptr& msg, const Queue& queue, 
const string * const xid) = 0;
+            virtual void dequeue(TransactionContext* ctxt, 
Message::shared_ptr& msg, const Queue& queue, const string * const xid) = 0;
             /**
              * Treat all enqueue/dequeues where this xid was specified as 
being committed.
              */

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?view=auto&rev=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Tue Nov  
7 03:26:50 2006
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/broker/NullMessageStore.h"
+
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueRegistry.h"
+
+#include <iostream>
+
+using namespace qpid::broker;
+
+void NullMessageStore::create(const Queue& queue){
+    std::cout << "WARNING: Can't create durable queue '" << queue.getName() << 
"'. Persistence not enabled." << std::endl;
+}
+void NullMessageStore::destroy(const Queue& queue){
+    std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() 
<< "'. Persistence not enabled." << std::endl;
+}
+void NullMessageStore::recover(QueueRegistry&){
+    std::cout << "WARNING: Persistence not enabled, no recovery of queues or 
messages." << std::endl;
+}
+void NullMessageStore::enqueue(TransactionContext*, Message::shared_ptr&, 
const Queue& queue, const string * const){
+    std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << 
"'. Persistence not enabled." << std::endl;
+}
+void NullMessageStore::dequeue(TransactionContext*, Message::shared_ptr&, 
const Queue& queue, const string * const){
+    std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << 
"'. Persistence not enabled." << std::endl;
+}
+void NullMessageStore::committed(const string * const){
+    std::cout << "WARNING: Persistence not enabled." << std::endl;
+}
+void NullMessageStore::aborted(const string * const){
+    std::cout << "WARNING: Persistence not enabled." << std::endl;
+}
+TransactionContext* NullMessageStore::begin(){
+    return 0;
+}
+void NullMessageStore::commit(TransactionContext*){
+}
+void NullMessageStore::abort(TransactionContext*){
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?view=auto&rev=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Tue Nov  7 
03:26:50 2006
@@ -0,0 +1,51 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _NullMessageStore_
+#define _NullMessageStore_
+
+#include "qpid/broker/Message.h"
+#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/Queue.h"
+
+namespace qpid {
+    namespace broker {
+        class Queue;
+        class QueueRegistry;
+
+        /**
+         * A null implementation of the MessageStore interface
+         */
+        class NullMessageStore : public MessageStore{
+        public:
+            void create(const Queue& queue);
+            void destroy(const Queue& queue);
+            void recover(QueueRegistry& queues);
+            void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, 
const Queue& queue, const string * const xid);
+            void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, 
const Queue& queue, const string * const xid);
+            void committed(const string * const xid);
+            void aborted(const string * const xid);
+            TransactionContext* begin();
+            void commit(TransactionContext* ctxt);
+            void abort(TransactionContext* ctxt);
+            ~NullMessageStore(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Nov  7 03:26:50 
2006
@@ -35,7 +35,8 @@
     dispatching(false),
     next(0),
     lastUsed(0),
-    exclusive(0)
+    exclusive(0),
+    persistenceId(0)
 {
     if(autodelete) lastUsed = Time::now().msecs();
 }
@@ -52,7 +53,7 @@
 }
 
 void Queue::deliver(Message::shared_ptr& msg){
-    enqueue(msg, 0);
+    enqueue(0, msg, 0);
     process(msg);
 }
 
@@ -163,15 +164,17 @@
     return lastUsed && (Time::now().msecs() - lastUsed > autodelete);
 }
 
-void Queue::enqueue(Message::shared_ptr& msg, const string * const xid){
-    if(store){
-        store->enqueue(msg, *this, xid);
+void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const 
string * const xid)
+{
+    if(msg->isPersistent() && store){
+        store->enqueue(ctxt, msg, *this, xid);
     }
 }
 
-void Queue::dequeue(Message::shared_ptr& msg, const string * const xid){
-    if(store){
-        store->dequeue(msg, *this, xid);
+void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const 
string * const xid)
+{
+    if(msg->isPersistent() && store){
+        store->dequeue(ctxt, msg, *this, xid);
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Nov  7 03:26:50 
2006
@@ -58,6 +58,7 @@
             mutable qpid::sys::Monitor lock;
             int64_t lastUsed;
             Consumer* exclusive;
+            u_int64_t persistenceId;
 
             bool startDispatching();
             bool dispatch(Message::shared_ptr& msg);
@@ -106,10 +107,13 @@
             inline const string& getName() const { return name; }
             inline const bool isExclusiveOwner(const ConnectionToken* const o) 
const { return o == owner; }
             inline bool hasExclusiveConsumer() const { return exclusive; }
+            inline u_int64_t getPersistenceId() const { return persistenceId; }
+            inline void setPersistenceId(u_int64_t _persistenceId) { 
persistenceId = _persistenceId; }
+
             bool canAutoDelete() const;
 
-            void enqueue(Message::shared_ptr& msg, const string * const xid);
-            void dequeue(Message::shared_ptr& msg, const string * const xid);
+            void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, 
const string * const xid);
+            void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, 
const string * const xid);
         };
     }
 }

Modified: 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp 
Tue Nov  7 03:26:50 2006
@@ -16,10 +16,12 @@
  *
  */
 #include "qpid/broker/SessionHandlerFactoryImpl.h"
-#include "qpid/broker/SessionHandlerImpl.h"
+
+#include "qpid/broker/DirectExchange.h"
 #include "qpid/broker/FanOutExchange.h"
 #include "qpid/broker/HeadersExchange.h"
-#include "qpid/broker/DirectExchange.h"
+#include "qpid/broker/NullMessageStore.h"
+#include "qpid/broker/SessionHandlerImpl.h"
 
 using namespace qpid::broker;
 using namespace qpid::sys;
@@ -34,7 +36,7 @@
 }
 
 SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : 
-    queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10)
+    store(new NullMessageStore()), queues(store.get()), timeout(_timeout), 
cleaner(&queues, timeout/10)
 {
     exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
     exchanges.declare(amq_direct, DirectExchange::typeName);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TransactionalStore.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TransactionalStore.h?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TransactionalStore.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TransactionalStore.h Tue Nov  
7 03:26:50 2006
@@ -20,11 +20,16 @@
 
 namespace qpid {
     namespace broker {
+        class TransactionContext{
+        public:
+            virtual ~TransactionContext(){}
+        };
+
         class TransactionalStore{
         public:
-            virtual void begin() = 0;
-            virtual void commit() = 0;
-            virtual void abort() = 0;
+            virtual TransactionContext* begin() = 0;
+            virtual void commit(TransactionContext*) = 0;
+            virtual void abort(TransactionContext*) = 0;
 
             virtual ~TransactionalStore(){}
         };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp Tue Nov  7 03:26:50 
2006
@@ -26,10 +26,15 @@
 
 }
 
-bool TxAck::prepare() throw(){
+bool TxAck::prepare(TransactionContext* ctxt) throw(){
     try{
         //dequeue all acked messages from their queues
-        for_each(unacked.begin(), unacked.end(), 
bind2nd(mem_fun_ref(&DeliveryRecord::discardIfCoveredBy), &acked));
+        for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
+            if (i->coveredBy(&acked)) {
+                i->discard(ctxt);
+            }
+        }
+        //for_each(unacked.begin(), unacked.end(), 
bind2nd(mem_fun_ref(&DeliveryRecord::discardIfCoveredBy), &acked));
         return true;
     }catch(...){
         std::cout << "TxAck::prepare() - Failed to prepare" << std::endl;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h Tue Nov  7 03:26:50 
2006
@@ -41,7 +41,7 @@
              * @param unacked the record of delivered messages
              */
             TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
-            virtual bool prepare() throw();
+            virtual bool prepare(TransactionContext* ctxt) throw();
             virtual void commit() throw();
             virtual void rollback() throw();
             virtual ~TxAck(){}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp Tue Nov  7 
03:26:50 2006
@@ -17,30 +17,28 @@
  */
 #include "qpid/broker/TxBuffer.h"
 
+using std::mem_fun;
 using namespace qpid::broker;
 
 bool TxBuffer::prepare(TransactionalStore* const store){
-    if(store) store->begin();
+    TransactionContext* ctxt(0);
+    if(store) ctxt = store->begin();
     for(op_iterator i = ops.begin(); i < ops.end(); i++){
-        if(!(*i)->prepare()){
-            if(store) store->abort();
+        if(!(*i)->prepare(ctxt)){
+            if(store) store->abort(ctxt);
             return false;
         }
     }
-    if(store) store->commit();
+    if(store) store->commit(ctxt);
     return true;
 }
 
 void TxBuffer::commit(){
-    for(op_iterator i = ops.begin(); i < ops.end(); i++){
-        (*i)->commit();
-    }
+    for_each(ops.begin(), ops.end(), mem_fun(&TxOp::commit));
 }
 
 void TxBuffer::rollback(){
-    for(op_iterator i = ops.begin(); i < ops.end(); i++){
-        (*i)->rollback();
-    }
+    for_each(ops.begin(), ops.end(), mem_fun(&TxOp::rollback));
 }
 
 void TxBuffer::enlist(TxOp* const op){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h Tue Nov  7 
03:26:50 2006
@@ -18,6 +18,8 @@
 #ifndef _TxBuffer_
 #define _TxBuffer_
 
+#include <algorithm>
+#include <functional>
 #include <vector>
 #include "qpid/broker/TransactionalStore.h"
 #include "qpid/broker/TxOp.h"

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h Tue Nov  7 03:26:50 
2006
@@ -18,11 +18,13 @@
 #ifndef _TxOp_
 #define _TxOp_
 
+#include "qpid/broker/TransactionalStore.h"
+
 namespace qpid {
     namespace broker {
         class TxOp{
         public:
-            virtual bool prepare() throw() = 0;
+            virtual bool prepare(TransactionContext*) throw() = 0;
             virtual void commit()  throw() = 0;
             virtual void rollback()  throw() = 0;
             virtual ~TxOp(){}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp Tue Nov  7 
03:26:50 2006
@@ -21,9 +21,9 @@
 
 TxPublish::TxPublish(Message::shared_ptr _msg) : msg(_msg) {}
 
-bool TxPublish::prepare() throw(){
+bool TxPublish::prepare(TransactionContext* ctxt) throw(){
     try{
-        for_each(queues.begin(), queues.end(), Prepare(msg, 0));
+        for_each(queues.begin(), queues.end(), Prepare(ctxt, msg, 0));
         return true;
     }catch(...){
         std::cout << "TxPublish::prepare() - Failed to prepare" << std::endl;
@@ -42,10 +42,11 @@
     queues.push_back(queue);
 }
 
-TxPublish::Prepare::Prepare(Message::shared_ptr& _msg, const string* const 
_xid) : msg(_msg), xid(_xid){}
+TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& 
_msg, const string* const _xid) 
+    : ctxt(_ctxt), msg(_msg), xid(_xid){}
 
 void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){
-    queue->enqueue(msg, xid);
+    queue->enqueue(ctxt, msg, xid);
 }
 
 TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h Tue Nov  7 
03:26:50 2006
@@ -41,10 +41,11 @@
          */
         class TxPublish : public TxOp, public Deliverable{
             class Prepare{
+                TransactionContext* ctxt;
                 Message::shared_ptr& msg;
                 const string* const xid;
             public:
-                Prepare(Message::shared_ptr& msg, const string* const xid);
+                Prepare(TransactionContext* ctxt, Message::shared_ptr& msg, 
const string* const xid);
                 void operator()(Queue::shared_ptr& queue);            
             };
 
@@ -60,7 +61,7 @@
 
         public:
             TxPublish(Message::shared_ptr msg);
-            virtual bool prepare() throw();
+            virtual bool prepare(TransactionContext* ctxt) throw();
             virtual void commit() throw();
             virtual void rollback() throw();
 

Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxAckTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxAckTest.cpp?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxAckTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxAckTest.cpp Tue Nov  
7 03:26:50 2006
@@ -26,6 +26,7 @@
 using std::list;
 using std::vector;
 using namespace qpid::broker;
+using namespace qpid::framing;
 
 class TxAckTest : public CppUnit::TestCase  
 {
@@ -35,7 +36,7 @@
     public:
         vector<Message::shared_ptr> dequeued;
 
-        void dequeue(Message::shared_ptr& msg, const Queue& /*queue*/, const 
string * const /*xid*/)
+        void dequeue(TransactionContext*, Message::shared_ptr& msg, const 
Queue& /*queue*/, const string * const /*xid*/)
         {
             dequeued.push_back(msg);
         }
@@ -44,12 +45,12 @@
         void create(const Queue&){}
         void destroy(const Queue&){}        
         void recover(QueueRegistry&){}
-        void enqueue(Message::shared_ptr&, const Queue&, const string * 
const){}
+        void enqueue(TransactionContext*, Message::shared_ptr&, const Queue&, 
const string * const){}
         void committed(const string * const){}
         void aborted(const string * const){}
-        void begin(){}
-        void commit(){}
-        void abort(){}        
+        TransactionContext* begin(){ return 0; }
+        void commit(TransactionContext*){}
+        void abort(TransactionContext*){}        
         ~TestMessageStore(){}
     };
 
@@ -73,6 +74,8 @@
     {
         for(int i = 0; i < 10; i++){
             Message::shared_ptr msg(new Message(0, "exchange", "routing_key", 
false, false));
+            msg->setHeader(AMQHeaderBody::shared_ptr(new 
AMQHeaderBody(BASIC)));
+            msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
             messages.push_back(msg);
             deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1)));
         }
@@ -86,7 +89,7 @@
     void testPrepare()
     {
         //ensure acked messages are discarded, i.e. dequeued from store
-        op.prepare();
+        op.prepare(0);
         CPPUNIT_ASSERT_EQUAL((size_t) 7, store.dequeued.size());
         CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size());
         CPPUNIT_ASSERT_EQUAL(messages[0], store.dequeued[0]);//msg 1

Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp Tue 
Nov  7 03:26:50 2006
@@ -34,6 +34,35 @@
 
 class TxBufferTest : public CppUnit::TestCase  
 {
+    class TestTransactionContext : public TransactionContext{
+        enum states {OPEN = 1, COMMITTED = 2, ABORTED = 3};
+        int state;
+    public:
+        TestTransactionContext() : state(OPEN) {}
+        void commit(){
+            if(state != OPEN) throw "txn already completed";
+            state = COMMITTED;
+        }
+
+        void abort(){
+            if(state != OPEN) throw "txn already completed";
+            state = ABORTED;
+        }
+
+        bool isCommitted(){
+            return state == COMMITTED;
+        }
+
+        bool isAborted(){
+            return state == ABORTED;
+        }
+
+        bool isOpen(){
+            return state == OPEN;
+        }
+        ~TestTransactionContext(){}
+    };
+
     class MockTxOp : public TxOp{
         enum op_codes {PREPARE=2, COMMIT=4, ROLLBACK=8};
         std::vector<int> expected;
@@ -43,7 +72,7 @@
         MockTxOp() : failOnPrepare(false) {}
         MockTxOp(bool _failOnPrepare) : failOnPrepare(_failOnPrepare) {}
 
-        bool prepare() throw(){
+        bool prepare(TransactionContext*) throw(){
             actual.push_back(PREPARE);
             return !failOnPrepare;
         }
@@ -75,15 +104,25 @@
         enum op_codes {BEGIN=2, COMMIT=4, ABORT=8};
         std::vector<int> expected;
         std::vector<int> actual;
+
     public:
-        void begin(){
+        TestTransactionContext txn;
+
+        TransactionContext* begin(){
             actual.push_back(BEGIN);
+            return &txn;
         }
-        void commit(){
+        void commit(TransactionContext* ctxt){
             actual.push_back(COMMIT);
+            TestTransactionContext* 
_txn(dynamic_cast<TestTransactionContext*>(ctxt));
+            CPPUNIT_ASSERT_EQUAL(_txn, &txn);
+            _txn->commit();
         }
-        void abort(){
+        void abort(TransactionContext* ctxt){
             actual.push_back(ABORT);
+            TestTransactionContext* 
_txn(dynamic_cast<TestTransactionContext*>(ctxt));
+            CPPUNIT_ASSERT_EQUAL(_txn, &txn);
+            _txn->abort();
         }        
         MockTransactionalStore& expectBegin(){
             expected.push_back(BEGIN);
@@ -131,6 +170,7 @@
         CPPUNIT_ASSERT(buffer.prepare(&store));
         buffer.commit();
         store.check();
+        CPPUNIT_ASSERT(store.txn.isCommitted());
         opA.check();
         opB.check();
         opC.check();
@@ -153,6 +193,7 @@
 
         CPPUNIT_ASSERT(!buffer.prepare(&store));
         store.check();
+        CPPUNIT_ASSERT(store.txn.isAborted());
         opA.check();
         opB.check();
         opC.check();
@@ -181,3 +222,4 @@
 // Make this test suite a plugin.
 CPPUNIT_PLUGIN_IMPLEMENT();
 CPPUNIT_TEST_SUITE_REGISTRATION(TxBufferTest);
+

Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp?view=diff&rev=472067&r1=472066&r2=472067
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp Tue 
Nov  7 03:26:50 2006
@@ -27,6 +27,7 @@
 using std::pair;
 using std::vector;
 using namespace qpid::broker;
+using namespace qpid::framing;
 
 class TxPublishTest : public CppUnit::TestCase  
 {
@@ -36,7 +37,7 @@
     public:
         vector< pair<string, Message::shared_ptr> > enqueued;
         
-        void enqueue(Message::shared_ptr& msg, const Queue& queue, const 
string * const /*xid*/)
+        void enqueue(TransactionContext*, Message::shared_ptr& msg, const 
Queue& queue, const string * const /*xid*/)
         {
             enqueued.push_back(pair<string, 
Message::shared_ptr>(queue.getName(),msg));
         }
@@ -45,12 +46,12 @@
         void create(const Queue&){}
         void destroy(const Queue&){}
         void recover(QueueRegistry&){}
-        void dequeue(Message::shared_ptr&, const Queue&, const string * 
const){}
+        void dequeue(TransactionContext*, Message::shared_ptr&, const Queue&, 
const string * const){}
         void committed(const string * const){}
         void aborted(const string * const){}
-        void begin(){}
-        void commit(){}
-        void abort(){}        
+        TransactionContext* begin(){ return 0; }
+        void commit(TransactionContext*){}
+        void abort(TransactionContext*){}        
         ~TestMessageStore(){}
     };
     
@@ -74,6 +75,8 @@
                       msg(new Message(0, "exchange", "routing_key", false, 
false)),
                       op(msg)
     {
+        msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));
+        msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
         op.deliverTo(queue1);
         op.deliverTo(queue2);
     }      
@@ -81,7 +84,7 @@
     void testPrepare()
     {
         //ensure messages are enqueued in store
-        op.prepare();
+        op.prepare(0);
         CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size());
         CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first);
         CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[0].second);


Reply via email to