Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h Mon Oct 30 
11:27:54 2006
@@ -0,0 +1,102 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TxBuffer_
+#define _TxBuffer_
+
+#include <vector>
+#include "qpid/broker/TransactionalStore.h"
+#include "qpid/broker/TxOp.h"
+
+/**
+ * Represents a single transaction. As such, an instance of this class
+ * will hold a list of operations representing the workload of the
+ * transaction. This work can be committed or rolled back. Committing
+ * is a two-stage process: first all the operations should be
+ * prepared, then if that succeeds they can be committed.
+ * 
+ * In the 2pc case, a successful prepare may be followed by either a
+ * commit or a rollback.
+ * 
+ * Atomicity of prepare is ensured by using a lower level
+ * transactional facility. This saves explicitly rolling back all the
+ * successfully prepared ops when one of them fails. i.e. we do not
+ * use 2pc internally, we instead ensure that prepare is atomic at a
+ * lower level. This makes individual prepare operations easier to
+ * code.
+ * 
+ * Transactions on a messaging broker effect three types of 'action':
+ * (1) updates to persistent storage (2) updates to transient storage
+ * or cached data (3) network writes.
+ * 
+ * Of these, (1) should always occur atomically during prepare to
+ * ensure that if the broker crashes while a transaction is being
+ * completed the persistent state (which is all that then remains) is
+ * consistent. (3) can only be done on commit, after a successful
+ * prepare. There is a little more flexibility with (2) but any
+ * changes made during prepare should be subject to the control of the
+ * TransactionalStore in use.
+ */
+namespace qpid {
+    namespace broker {
+        class TxBuffer{
+            typedef std::vector<TxOp*>::iterator op_iterator;
+            std::vector<TxOp*> ops;
+        public:
+            /**
+             * Requests that all ops are prepared. This should
+             * primarily involve making sure that a persistent record
+             * of the operations is stored where necessary.
+             * 
+             * All ops will be prepared under a transaction on the
+             * specified store. If any operation fails on prepare,
+             * this transaction will be rolled back. 
+             * 
+             * Once prepared, a transaction can be committed (or in
+             * the 2pc case, rolled back).
+             * 
+             * @returns true if all the operations prepared
+             * successfully, false if not.
+             */
+            bool prepare(TransactionalStore* const store);
+            /**
+             * Signals that the ops all prepared all completed
+             * successfully and can now commit, i.e. the operation can
+             * now be fully carried out.
+             * 
+             * Should only be called after a call to prepare() returns
+             * true.
+             */
+            void commit();
+            /**
+             * Rolls back all the operations.
+             * 
+             * Should only be called either after a call to prepare()
+             * returns true (2pc) or instead of a prepare call
+             * ('server-local')
+             */
+            void rollback();
+            /**
+             * Adds an operation to the transaction.
+             */
+            void enlist(TxOp* const op);
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h Mon Oct 30 11:27:54 
2006
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TxOp_
+#define _TxOp_
+
+namespace qpid {
+    namespace broker {
+        class TxOp{
+        public:
+            virtual bool prepare() throw() = 0;
+            virtual void commit()  throw() = 0;
+            virtual void rollback()  throw() = 0;
+            virtual ~TxOp(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp Mon Oct 30 
11:27:54 2006
@@ -0,0 +1,56 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/TxPublish.h"
+
+using namespace qpid::broker;
+
+TxPublish::TxPublish(Message::shared_ptr _msg) : msg(_msg) {}
+
+bool TxPublish::prepare() throw(){
+    try{
+        for_each(queues.begin(), queues.end(), Prepare(msg, 0));
+        return true;
+    }catch(...){
+        std::cout << "TxPublish::prepare() - Failed to prepare" << std::endl;
+        return false;
+    }
+}
+
+void TxPublish::commit() throw(){
+    for_each(queues.begin(), queues.end(), Commit(msg));
+}
+
+void TxPublish::rollback() throw(){
+}
+
+void TxPublish::deliverTo(Queue::shared_ptr& queue){
+    queues.push_back(queue);
+}
+
+TxPublish::Prepare::Prepare(Message::shared_ptr& _msg, const string* const 
_xid) : msg(_msg), xid(_xid){}
+
+void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){
+    queue->enqueue(msg, xid);
+}
+
+TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){}
+
+void TxPublish::Commit::operator()(Queue::shared_ptr& queue){
+    queue->process(msg);
+}
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h Mon Oct 30 
11:27:54 2006
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TxPublish_
+#define _TxPublish_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include "qpid/broker/Deliverable.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/TxOp.h"
+
+namespace qpid {
+    namespace broker {
+        class TxPublish : public TxOp, public Deliverable{
+            class Prepare{
+                Message::shared_ptr& msg;
+                const string* const xid;
+            public:
+                Prepare(Message::shared_ptr& msg, const string* const xid);
+                void operator()(Queue::shared_ptr& queue);            
+            };
+
+            class Commit{
+                Message::shared_ptr& msg;
+            public:
+                Commit(Message::shared_ptr& msg);
+                void operator()(Queue::shared_ptr& queue);            
+            };
+
+            Message::shared_ptr msg;
+            std::list<Queue::shared_ptr> queues;
+
+        public:
+            TxPublish(Message::shared_ptr msg);
+            virtual bool prepare() throw();
+            virtual void commit() throw();
+            virtual void rollback() throw();
+
+            virtual void deliverTo(Queue::shared_ptr& queue);
+
+            virtual ~TxPublish(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ChannelTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ChannelTest.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ChannelTest.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ChannelTest.cpp Mon Oct 
30 11:27:54 2006
@@ -26,14 +26,6 @@
 using namespace qpid::framing;
 using namespace qpid::concurrent;
 
-struct DummyRouter{
-    Message::shared_ptr last;
-
-    void operator()(Message::shared_ptr& msg){
-       last = msg;
-    }
-};
-
 struct DummyHandler : OutputHandler{
     std::vector<AMQFrame*> frames; 
 
@@ -46,30 +38,11 @@
 class ChannelTest : public CppUnit::TestCase  
 {
     CPPUNIT_TEST_SUITE(ChannelTest);
-    CPPUNIT_TEST(testIncoming);
     CPPUNIT_TEST(testConsumerMgmt);
     CPPUNIT_TEST(testDeliveryNoAck);
     CPPUNIT_TEST_SUITE_END();
 
   public:
-
-    void testIncoming(){
-        Channel channel(0, 0, 10000);
-        string routingKey("my_routing_key");
-        channel.handlePublish(new Message(0, "test", routingKey, false, 
false));
-        AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
-        header->setContentSize(14);
-        string data1("abcdefg");
-        string data2("hijklmn");
-        AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
-        AMQContentBody::shared_ptr part2(new AMQContentBody(data2));        
-
-        CPPUNIT_ASSERT(!channel.handleHeader(header, DummyRouter()).last);
-        CPPUNIT_ASSERT(!channel.handleContent(part1, DummyRouter()).last);
-        DummyRouter router = channel.handleContent(part2, DummyRouter());
-        CPPUNIT_ASSERT(router.last);
-        CPPUNIT_ASSERT_EQUAL(routingKey, router.last->getRoutingKey());
-    }
 
     void testConsumerMgmt(){
         Queue::shared_ptr queue(new Queue("my_queue"));

Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ExchangeTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ExchangeTest.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ExchangeTest.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ExchangeTest.cpp Mon 
Oct 30 11:27:54 2006
@@ -16,6 +16,7 @@
  *
  */
 
+#include "qpid/broker/DeliverableMessage.h"
 #include "qpid/broker/DirectExchange.h"
 #include "qpid/broker/Exchange.h"
 #include "qpid/broker/Queue.h"
@@ -50,7 +51,8 @@
         queue.reset();
         queue2.reset();
 
-        Message::shared_ptr msg = Message::shared_ptr(new Message(0, "e", "A", 
true, true));
+        Message::shared_ptr msgPtr(new Message(0, "e", "A", true, true));
+        DeliverableMessage msg(msgPtr);
         topic.route(msg, "abc", 0);
         direct.route(msg, "abc", 0);
 

Added: 
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp 
(added)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp 
Mon Oct 30 11:27:54 2006
@@ -0,0 +1,110 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/Message.h"
+#include "qpid/broker/MessageBuilder.h"
+#include <qpid_test_plugin.h>
+#include <iostream>
+#include <memory>
+
+using namespace boost;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+class MessageBuilderTest : public CppUnit::TestCase  
+{
+    struct DummyHandler : MessageBuilder::CompletionHandler{
+        Message::shared_ptr msg;
+        
+        virtual void complete(Message::shared_ptr& _msg){
+            msg = _msg;
+        }
+    };
+
+
+    CPPUNIT_TEST_SUITE(MessageBuilderTest);
+    CPPUNIT_TEST(testHeaderOnly);
+    CPPUNIT_TEST(test1ContentFrame);
+    CPPUNIT_TEST(test2ContentFrames);
+    CPPUNIT_TEST_SUITE_END();
+
+  public:
+
+    void testHeaderOnly(){
+        DummyHandler handler;
+        MessageBuilder builder(&handler);
+
+        Message::shared_ptr message(new Message(0, "test", "my_routing_key", 
false, false));
+        AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+        header->setContentSize(0);
+        
+        builder.initialise(message);
+        CPPUNIT_ASSERT(!handler.msg);
+        builder.setHeader(header);
+        CPPUNIT_ASSERT(handler.msg);
+        CPPUNIT_ASSERT_EQUAL(message, handler.msg);
+    }
+
+    void test1ContentFrame(){
+        DummyHandler handler;
+        MessageBuilder builder(&handler);
+
+        string data1("abcdefg");
+
+        Message::shared_ptr message(new Message(0, "test", "my_routing_key", 
false, false));
+        AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+        header->setContentSize(7);
+        AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
+        
+        builder.initialise(message);
+        CPPUNIT_ASSERT(!handler.msg);
+        builder.setHeader(header);
+        CPPUNIT_ASSERT(!handler.msg);
+        builder.addContent(part1);
+        CPPUNIT_ASSERT(handler.msg);
+        CPPUNIT_ASSERT_EQUAL(message, handler.msg);
+    }
+
+    void test2ContentFrames(){
+        DummyHandler handler;
+        MessageBuilder builder(&handler);
+
+        string data1("abcdefg");
+        string data2("hijklmn");
+
+        Message::shared_ptr message(new Message(0, "test", "my_routing_key", 
false, false));
+        AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+        header->setContentSize(14);
+        AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
+        AMQContentBody::shared_ptr part2(new AMQContentBody(data2));        
+        
+        builder.initialise(message);
+        CPPUNIT_ASSERT(!handler.msg);
+        builder.setHeader(header);
+        CPPUNIT_ASSERT(!handler.msg);
+        builder.addContent(part1);
+        CPPUNIT_ASSERT(!handler.msg);
+        builder.addContent(part2);
+        CPPUNIT_ASSERT(handler.msg);
+        CPPUNIT_ASSERT_EQUAL(message, handler.msg);
+    }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(MessageBuilderTest);

Propchange: 
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/concurrent/APRBaseTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/concurrent/APRBaseTest.cpp?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/concurrent/APRBaseTest.cpp 
(added)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/concurrent/APRBaseTest.cpp Mon 
Oct 30 11:27:54 2006
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/concurrent/APRBase.h"
+#include <qpid_test_plugin.h>
+#include <iostream>
+
+using namespace qpid::concurrent;
+
+class APRBaseTest : public CppUnit::TestCase  
+{
+    CPPUNIT_TEST_SUITE(APRBaseTest);
+    CPPUNIT_TEST(testMe);
+    CPPUNIT_TEST_SUITE_END();
+
+  public:
+
+    void testMe() 
+    {
+        APRBase::increment();
+        APRBase::increment();
+        APRBase::decrement();
+        APRBase::decrement();
+    }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(APRBaseTest);
+

Propchange: 
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/concurrent/APRBaseTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to