Author: gsim
Date: Fri May 25 04:24:54 2007
New Revision: 541619

URL: http://svn.apache.org/viewvc?view=rev&rev=541619
Log:
Added support for recovering prepared transactions.


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableTransaction.h   
(with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp   (with 
props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h   (with 
props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp   (with 
props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h   (with 
props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/CommonOptions.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=541619&r1=541618&r2=541619
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri May 25 04:24:54 2007
@@ -181,6 +181,8 @@
   qpid/broker/QueuePolicy.cpp \
   qpid/broker/QueueRegistry.cpp \
   qpid/broker/RecoveryManagerImpl.cpp \
+  qpid/broker/RecoveredEnqueue.cpp \
+  qpid/broker/RecoveredDequeue.cpp \
   qpid/broker/Reference.cpp \
   qpid/broker/TopicExchange.cpp \
   qpid/broker/TxAck.cpp \
@@ -234,7 +236,10 @@
   qpid/broker/RecoverableExchange.h \
   qpid/broker/RecoverableMessage.h \
   qpid/broker/RecoverableQueue.h \
+  qpid/broker/RecoverableTransaction.h \
   qpid/broker/RecoveryManager.h \
+  qpid/broker/RecoveredEnqueue.h \
+  qpid/broker/RecoveredDequeue.h \
   qpid/broker/Reference.h \
   qpid/broker/TxBuffer.h \
   qpid/broker/TxOp.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/CommonOptions.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/CommonOptions.cpp?view=diff&rev=541619&r1=541618&r2=541619
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/CommonOptions.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/CommonOptions.cpp Fri May 25 
04:24:54 2007
@@ -19,6 +19,7 @@
 #include "CommonOptions.h"
 #include <fstream>
 #include <algorithm>
+#include <iostream>
 
 namespace qpid {
 namespace program_options {
@@ -28,10 +29,9 @@
 }
     
 const std::string envPrefix("QPID_");
-const std::string ignore("QPID_DIR");//temporary hack - this env var is used 
in other ways; not an option
 
 std::string env2option(const std::string& env) {
-    if (env != ignore /*temp hack, see above*/ && env.find(envPrefix) == 0) {
+    if (env.find(envPrefix) == 0) {
         std::string opt = env.substr(envPrefix.size());
         std::transform(opt.begin(), opt.end(), opt.begin(), env2optchar);
         return opt;
@@ -62,6 +62,9 @@
     try { 
         po::store(po::parse_environment(desc, po::env2option), vm);
     }
+    catch (const po::unknown_option& e) {
+        std::cerr << e.what() << std::endl;
+    } 
     catch (const po::error& e) {
         throw po::error(std::string("parsing environment variables: ")
                           + e.what());

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?view=diff&rev=541619&r1=541618&r2=541619
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri May 25 
04:24:54 2007
@@ -92,8 +92,8 @@
     exchanges.declare(amq_match, HeadersExchange::typeName);
 
     if(store.get()) {
-        RecoveryManagerImpl recoverer(
-            queues, exchanges, conf.stagingThreshold);
+        RecoveryManagerImpl recoverer(queues, exchanges, dtxManager, 
+                                      conf.stagingThreshold);
         store->recover(recoverer);
     }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp?view=diff&rev=541619&r1=541618&r2=541619
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp Fri May 25 
04:24:54 2007
@@ -29,11 +29,20 @@
 
 void DtxManager::start(std::string xid, DtxBuffer::shared_ptr ops)
 {
+    /*
     WorkMap::iterator i = work.find(xid);
     if (i == work.end()) {
         i = work.insert(xid, new DtxWorkRecord(xid, store)).first;
     }
     i->add(ops);
+    */
+
+    getOrCreateWork(xid)->add(ops);
+}
+
+void DtxManager::recover(std::string xid, std::auto_ptr<TPCTransactionContext> 
txn, DtxBuffer::shared_ptr ops)
+{
+    getOrCreateWork(xid)->recover(txn, ops);
 }
 
 void DtxManager::prepare(const std::string& xid) 
@@ -54,6 +63,17 @@
 DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid)
 {
     WorkMap::iterator i = work.find(xid);
-    if (i == work.end()) throw ConnectionException(503, 
boost::format("Unrecognised xid %1%!") % xid);
+    if (i == work.end()) {
+        throw ConnectionException(503, boost::format("Unrecognised xid %1%!") 
% xid);
+    }
+    return i;
+}
+
+DtxManager::WorkMap::iterator DtxManager::getOrCreateWork(std::string& xid)
+{
+    WorkMap::iterator i = work.find(xid);
+    if (i == work.end()) {
+        i = work.insert(xid, new DtxWorkRecord(xid, store)).first;
+    }
     return i;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h?view=diff&rev=541619&r1=541618&r2=541619
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h Fri May 25 
04:24:54 2007
@@ -37,11 +37,13 @@
     TransactionalStore* const store;
 
     WorkMap::iterator getWork(const std::string& xid);
+    WorkMap::iterator getOrCreateWork(std::string& xid);
 
 public:
     DtxManager(TransactionalStore* const store);
     ~DtxManager();
     void start(std::string xid, DtxBuffer::shared_ptr work);
+    void recover(std::string xid, std::auto_ptr<TPCTransactionContext> txn, 
DtxBuffer::shared_ptr work);
     void prepare(const std::string& xid);
     void commit(const std::string& xid);
     void rollback(const std::string& xid);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp?view=diff&rev=541619&r1=541618&r2=541619
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp Fri May 25 
04:24:54 2007
@@ -105,3 +105,9 @@
     }
     for_each(work.begin(), work.end(), mem_fn(&TxBuffer::rollback));
 }
+
+void DtxWorkRecord::recover(std::auto_ptr<TPCTransactionContext> _txn, 
DtxBuffer::shared_ptr ops)
+{
+    add(ops);
+    txn = _txn;
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h?view=diff&rev=541619&r1=541618&r2=541619
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h Fri May 25 
04:24:54 2007
@@ -56,6 +56,7 @@
     void commit();
     void rollback();
     void add(DtxBuffer::shared_ptr ops);
+    void recover(std::auto_ptr<TPCTransactionContext> txn, 
DtxBuffer::shared_ptr ops);
 };
 
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h?view=diff&rev=541619&r1=541618&r2=541619
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableExchange.h Fri May 
25 04:24:54 2007
@@ -23,6 +23,7 @@
  */
 
 #include <boost/shared_ptr.hpp>
+#include "qpid/framing/FieldTable.h"
 
 namespace qpid {
 namespace broker {

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableTransaction.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableTransaction.h?view=auto&rev=541619
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableTransaction.h 
(added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableTransaction.h Fri 
May 25 04:24:54 2007
@@ -0,0 +1,49 @@
+#ifndef _broker_RecoverableTransaction_h
+#define _broker_RecoverableTransaction_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+
+#include "RecoverableMessage.h"
+#include "RecoverableQueue.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface through which prepared 2pc transactions are
+ * recovered.
+ */
+class RecoverableTransaction
+{
+public:
+    typedef boost::shared_ptr<RecoverableTransaction> shared_ptr;
+    virtual void enqueue(RecoverableQueue::shared_ptr queue, 
RecoverableMessage::shared_ptr message) = 0;
+    virtual void dequeue(RecoverableQueue::shared_ptr queue, 
RecoverableMessage::shared_ptr message) = 0;
+    virtual ~RecoverableTransaction() {};
+};
+
+}}
+
+
+#endif

Propchange: 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableTransaction.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoverableTransaction.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp?view=auto&rev=541619
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp Fri May 
25 04:24:54 2007
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "RecoveredDequeue.h"
+
+using namespace qpid::broker;
+
+RecoveredDequeue::RecoveredDequeue(Queue::shared_ptr _queue, 
Message::shared_ptr _msg) : queue(_queue), msg(_msg) {}
+
+bool RecoveredDequeue::prepare(TransactionContext*) throw(){
+    //should never be called; transaction has already prepared if an enqueue 
is recovered
+    return false;
+}
+
+void RecoveredDequeue::commit() throw(){
+}
+
+void RecoveredDequeue::rollback() throw(){
+    queue->process(msg);
+}
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h?view=auto&rev=541619
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h Fri May 25 
04:24:54 2007
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _RecoveredDequeue_
+#define _RecoveredDequeue_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include "Deliverable.h"
+#include "BrokerMessage.h"
+#include "MessageStore.h"
+#include "BrokerQueue.h"
+#include "TxOp.h"
+
+namespace qpid {
+    namespace broker {
+        class RecoveredDequeue : public TxOp{
+            Queue::shared_ptr queue;
+            Message::shared_ptr msg;
+
+        public:
+            RecoveredDequeue(Queue::shared_ptr queue, Message::shared_ptr msg);
+            virtual bool prepare(TransactionContext* ctxt) throw();
+            virtual void commit() throw();
+            virtual void rollback() throw();
+            virtual ~RecoveredDequeue(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp?view=auto&rev=541619
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp Fri May 
25 04:24:54 2007
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "RecoveredEnqueue.h"
+
+using namespace qpid::broker;
+
+RecoveredEnqueue::RecoveredEnqueue(Queue::shared_ptr _queue, 
Message::shared_ptr _msg) : queue(_queue), msg(_msg) {}
+
+bool RecoveredEnqueue::prepare(TransactionContext*) throw(){
+    //should never be called; transaction has already prepared if an enqueue 
is recovered
+    return false;
+}
+
+void RecoveredEnqueue::commit() throw(){
+    queue->process(msg);
+}
+
+void RecoveredEnqueue::rollback() throw(){
+}
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h?view=auto&rev=541619
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h Fri May 25 
04:24:54 2007
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _RecoveredEnqueue_
+#define _RecoveredEnqueue_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include "Deliverable.h"
+#include "BrokerMessage.h"
+#include "MessageStore.h"
+#include "BrokerQueue.h"
+#include "TxOp.h"
+
+namespace qpid {
+    namespace broker {
+        class RecoveredEnqueue : public TxOp{
+            Queue::shared_ptr queue;
+            Message::shared_ptr msg;
+
+        public:
+            RecoveredEnqueue(Queue::shared_ptr queue, Message::shared_ptr msg);
+            virtual bool prepare(TransactionContext* ctxt) throw();
+            virtual void commit() throw();
+            virtual void rollback() throw();
+            virtual ~RecoveredEnqueue(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h?view=diff&rev=541619&r1=541618&r2=541619
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManager.h Fri May 25 
04:24:54 2007
@@ -24,6 +24,8 @@
 #include "RecoverableExchange.h"
 #include "RecoverableQueue.h"
 #include "RecoverableMessage.h"
+#include "RecoverableTransaction.h"
+#include "TransactionalStore.h"
 #include "qpid/framing/Buffer.h"
 
 namespace qpid {
@@ -35,6 +37,8 @@
         virtual RecoverableExchange::shared_ptr 
recoverExchange(framing::Buffer& buffer) = 0;
         virtual RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& 
buffer) = 0;
         virtual RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& 
buffer) = 0;
+        virtual RecoverableTransaction::shared_ptr recoverTransaction(const 
std::string& xid, 
+                                                                      
std::auto_ptr<TPCTransactionContext> txn) = 0;
         virtual void recoveryComplete() = 0;
     };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?view=diff&rev=541619&r1=541618&r2=541619
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Fri 
May 25 04:24:54 2007
@@ -23,6 +23,8 @@
 #include "BrokerMessage.h"
 #include "BrokerMessageMessage.h"
 #include "BrokerQueue.h"
+#include "RecoveredEnqueue.h"
+#include "RecoveredDequeue.h"
 
 using namespace qpid;
 using namespace qpid::broker;
@@ -32,8 +34,9 @@
 static const uint8_t BASIC = 1;
 static const uint8_t MESSAGE = 2;
 
-RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, 
ExchangeRegistry& _exchanges, uint64_t _stagingThreshold) 
-    : queues(_queues), exchanges(_exchanges), 
stagingThreshold(_stagingThreshold) {}
+RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, 
ExchangeRegistry& _exchanges, 
+                                         DtxManager& _dtxMgr, uint64_t 
_stagingThreshold) 
+    : queues(_queues), exchanges(_exchanges), dtxMgr(_dtxMgr), 
stagingThreshold(_stagingThreshold) {}
 
 RecoveryManagerImpl::~RecoveryManagerImpl() {}
 
@@ -49,6 +52,8 @@
     bool loadContent(uint64_t available);
     void decodeContent(framing::Buffer& buffer);
     void recover(Queue::shared_ptr queue);
+    void enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue);
+    void dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue);
 };
 
 class RecoverableQueueImpl : public RecoverableQueue
@@ -59,6 +64,8 @@
     ~RecoverableQueueImpl() {};
     void setPersistenceId(uint64_t id);
     void recover(RecoverableMessage::shared_ptr msg);
+    void enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr 
msg);
+    void dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr 
msg);
 };
 
 class RecoverableExchangeImpl : public RecoverableExchange
@@ -71,6 +78,15 @@
     void bind(std::string& queue, std::string& routingKey, 
qpid::framing::FieldTable& args);
 };
 
+class RecoverableTransactionImpl : public RecoverableTransaction
+{
+    DtxBuffer::shared_ptr buffer;
+public:
+    RecoverableTransactionImpl(DtxBuffer::shared_ptr _buffer) : 
buffer(_buffer) {}
+    void enqueue(RecoverableQueue::shared_ptr queue, 
RecoverableMessage::shared_ptr message);
+    void dequeue(RecoverableQueue::shared_ptr queue, 
RecoverableMessage::shared_ptr message);
+};
+
 RecoverableExchange::shared_ptr 
RecoveryManagerImpl::recoverExchange(framing::Buffer& buffer)
 {
     return RecoverableExchange::shared_ptr(new 
RecoverableExchangeImpl(Exchange::decode(exchanges, buffer), queues));
@@ -102,6 +118,14 @@
     return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, 
stagingThreshold));    
 }
 
+RecoverableTransaction::shared_ptr 
RecoveryManagerImpl::recoverTransaction(const std::string& xid, 
+                                                                           
std::auto_ptr<TPCTransactionContext> txn)
+{
+    DtxBuffer::shared_ptr buffer(new DtxBuffer());
+    dtxMgr.recover(xid, txn, buffer);
+    return RecoverableTransaction::shared_ptr(new 
RecoverableTransactionImpl(buffer));
+}
+
 void RecoveryManagerImpl::recoveryComplete()
 {
     //TODO (finalise binding setup etc)
@@ -161,4 +185,34 @@
 {
     Queue::shared_ptr queue = queues.find(queueName);
     exchange->bind(queue, key, &args);
+}
+
+void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, 
Queue::shared_ptr queue)
+{
+    buffer->enlist(TxOp::shared_ptr(new RecoveredDequeue(queue, msg)));
+}
+
+void RecoverableMessageImpl::enqueue(DtxBuffer::shared_ptr buffer, 
Queue::shared_ptr queue)
+{
+    buffer->enlist(TxOp::shared_ptr(new RecoveredEnqueue(queue, msg)));
+}
+
+void RecoverableQueueImpl::dequeue(DtxBuffer::shared_ptr buffer, 
RecoverableMessage::shared_ptr message)
+{
+    dynamic_pointer_cast<RecoverableMessageImpl>(message)->dequeue(buffer, 
queue);
+}
+
+void RecoverableQueueImpl::enqueue(DtxBuffer::shared_ptr buffer, 
RecoverableMessage::shared_ptr message)
+{
+    dynamic_pointer_cast<RecoverableMessageImpl>(message)->enqueue(buffer, 
queue);
+}
+
+void RecoverableTransactionImpl::dequeue(RecoverableQueue::shared_ptr queue, 
RecoverableMessage::shared_ptr message)
+{
+    dynamic_pointer_cast<RecoverableQueueImpl>(queue)->dequeue(buffer, 
message);
+}
+
+void RecoverableTransactionImpl::enqueue(RecoverableQueue::shared_ptr queue, 
RecoverableMessage::shared_ptr message)
+{
+    dynamic_pointer_cast<RecoverableQueueImpl>(queue)->enqueue(buffer, 
message);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h?view=diff&rev=541619&r1=541618&r2=541619
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h Fri May 
25 04:24:54 2007
@@ -22,6 +22,7 @@
 #define _RecoveryManagerImpl_
 
 #include <list>
+#include "DtxManager.h"
 #include "ExchangeRegistry.h"
 #include "QueueRegistry.h"
 #include "RecoveryManager.h"
@@ -32,14 +33,17 @@
     class RecoveryManagerImpl : public RecoveryManager{
         QueueRegistry& queues;
         ExchangeRegistry& exchanges;
+        DtxManager& dtxMgr;
         const uint64_t stagingThreshold;
     public:
-        RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& 
exchanges, uint64_t stagingThreshold);
+        RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& 
exchanges, DtxManager& dtxMgr, uint64_t stagingThreshold);
         ~RecoveryManagerImpl();
 
         RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& 
buffer);
         RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer);
         RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer);
+        RecoverableTransaction::shared_ptr recoverTransaction(const 
std::string& xid, 
+                                                              
std::auto_ptr<TPCTransactionContext> txn);
         void recoveryComplete();
 
         static uint8_t decodeMessageType(framing::Buffer& buffer);


Reply via email to