Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h Mon Oct 30
11:27:54 2006
@@ -0,0 +1,102 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TxBuffer_
+#define _TxBuffer_
+
+#include <vector>
+#include "qpid/broker/TransactionalStore.h"
+#include "qpid/broker/TxOp.h"
+
+/**
+ * Represents a single transaction. As such, an instance of this class
+ * will hold a list of operations representing the workload of the
+ * transaction. This work can be committed or rolled back. Committing
+ * is a two-stage process: first all the operations should be
+ * prepared, then if that succeeds they can be committed.
+ *
+ * In the 2pc case, a successful prepare may be followed by either a
+ * commit or a rollback.
+ *
+ * Atomicity of prepare is ensured by using a lower level
+ * transactional facility. This saves explicitly rolling back all the
+ * successfully prepared ops when one of them fails. i.e. we do not
+ * use 2pc internally, we instead ensure that prepare is atomic at a
+ * lower level. This makes individual prepare operations easier to
+ * code.
+ *
+ * Transactions on a messaging broker effect three types of 'action':
+ * (1) updates to persistent storage (2) updates to transient storage
+ * or cached data (3) network writes.
+ *
+ * Of these, (1) should always occur atomically during prepare to
+ * ensure that if the broker crashes while a transaction is being
+ * completed the persistent state (which is all that then remains) is
+ * consistent. (3) can only be done on commit, after a successful
+ * prepare. There is a little more flexibility with (2) but any
+ * changes made during prepare should be subject to the control of the
+ * TransactionalStore in use.
+ */
+namespace qpid {
+ namespace broker {
+ class TxBuffer{
+ typedef std::vector<TxOp*>::iterator op_iterator;
+ std::vector<TxOp*> ops;
+ public:
+ /**
+ * Requests that all ops are prepared. This should
+ * primarily involve making sure that a persistent record
+ * of the operations is stored where necessary.
+ *
+ * All ops will be prepared under a transaction on the
+ * specified store. If any operation fails on prepare,
+ * this transaction will be rolled back.
+ *
+ * Once prepared, a transaction can be committed (or in
+ * the 2pc case, rolled back).
+ *
+ * @returns true if all the operations prepared
+ * successfully, false if not.
+ */
+ bool prepare(TransactionalStore* const store);
+ /**
+ * Signals that the ops all prepared all completed
+ * successfully and can now commit, i.e. the operation can
+ * now be fully carried out.
+ *
+ * Should only be called after a call to prepare() returns
+ * true.
+ */
+ void commit();
+ /**
+ * Rolls back all the operations.
+ *
+ * Should only be called either after a call to prepare()
+ * returns true (2pc) or instead of a prepare call
+ * ('server-local')
+ */
+ void rollback();
+ /**
+ * Adds an operation to the transaction.
+ */
+ void enlist(TxOp* const op);
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h Mon Oct 30 11:27:54
2006
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TxOp_
+#define _TxOp_
+
+namespace qpid {
+ namespace broker {
+ class TxOp{
+ public:
+ virtual bool prepare() throw() = 0;
+ virtual void commit() throw() = 0;
+ virtual void rollback() throw() = 0;
+ virtual ~TxOp(){}
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp Mon Oct 30
11:27:54 2006
@@ -0,0 +1,56 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/TxPublish.h"
+
+using namespace qpid::broker;
+
+TxPublish::TxPublish(Message::shared_ptr _msg) : msg(_msg) {}
+
+bool TxPublish::prepare() throw(){
+ try{
+ for_each(queues.begin(), queues.end(), Prepare(msg, 0));
+ return true;
+ }catch(...){
+ std::cout << "TxPublish::prepare() - Failed to prepare" << std::endl;
+ return false;
+ }
+}
+
+void TxPublish::commit() throw(){
+ for_each(queues.begin(), queues.end(), Commit(msg));
+}
+
+void TxPublish::rollback() throw(){
+}
+
+void TxPublish::deliverTo(Queue::shared_ptr& queue){
+ queues.push_back(queue);
+}
+
+TxPublish::Prepare::Prepare(Message::shared_ptr& _msg, const string* const
_xid) : msg(_msg), xid(_xid){}
+
+void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){
+ queue->enqueue(msg, xid);
+}
+
+TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){}
+
+void TxPublish::Commit::operator()(Queue::shared_ptr& queue){
+ queue->process(msg);
+}
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h Mon Oct 30
11:27:54 2006
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TxPublish_
+#define _TxPublish_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include "qpid/broker/Deliverable.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/TxOp.h"
+
+namespace qpid {
+ namespace broker {
+ class TxPublish : public TxOp, public Deliverable{
+ class Prepare{
+ Message::shared_ptr& msg;
+ const string* const xid;
+ public:
+ Prepare(Message::shared_ptr& msg, const string* const xid);
+ void operator()(Queue::shared_ptr& queue);
+ };
+
+ class Commit{
+ Message::shared_ptr& msg;
+ public:
+ Commit(Message::shared_ptr& msg);
+ void operator()(Queue::shared_ptr& queue);
+ };
+
+ Message::shared_ptr msg;
+ std::list<Queue::shared_ptr> queues;
+
+ public:
+ TxPublish(Message::shared_ptr msg);
+ virtual bool prepare() throw();
+ virtual void commit() throw();
+ virtual void rollback() throw();
+
+ virtual void deliverTo(Queue::shared_ptr& queue);
+
+ virtual ~TxPublish(){}
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ChannelTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ChannelTest.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ChannelTest.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ChannelTest.cpp Mon Oct
30 11:27:54 2006
@@ -26,14 +26,6 @@
using namespace qpid::framing;
using namespace qpid::concurrent;
-struct DummyRouter{
- Message::shared_ptr last;
-
- void operator()(Message::shared_ptr& msg){
- last = msg;
- }
-};
-
struct DummyHandler : OutputHandler{
std::vector<AMQFrame*> frames;
@@ -46,30 +38,11 @@
class ChannelTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(ChannelTest);
- CPPUNIT_TEST(testIncoming);
CPPUNIT_TEST(testConsumerMgmt);
CPPUNIT_TEST(testDeliveryNoAck);
CPPUNIT_TEST_SUITE_END();
public:
-
- void testIncoming(){
- Channel channel(0, 0, 10000);
- string routingKey("my_routing_key");
- channel.handlePublish(new Message(0, "test", routingKey, false,
false));
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- header->setContentSize(14);
- string data1("abcdefg");
- string data2("hijklmn");
- AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
- AMQContentBody::shared_ptr part2(new AMQContentBody(data2));
-
- CPPUNIT_ASSERT(!channel.handleHeader(header, DummyRouter()).last);
- CPPUNIT_ASSERT(!channel.handleContent(part1, DummyRouter()).last);
- DummyRouter router = channel.handleContent(part2, DummyRouter());
- CPPUNIT_ASSERT(router.last);
- CPPUNIT_ASSERT_EQUAL(routingKey, router.last->getRoutingKey());
- }
void testConsumerMgmt(){
Queue::shared_ptr queue(new Queue("my_queue"));
Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ExchangeTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ExchangeTest.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ExchangeTest.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ExchangeTest.cpp Mon
Oct 30 11:27:54 2006
@@ -16,6 +16,7 @@
*
*/
+#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/Queue.h"
@@ -50,7 +51,8 @@
queue.reset();
queue2.reset();
- Message::shared_ptr msg = Message::shared_ptr(new Message(0, "e", "A",
true, true));
+ Message::shared_ptr msgPtr(new Message(0, "e", "A", true, true));
+ DeliverableMessage msg(msgPtr);
topic.route(msg, "abc", 0);
direct.route(msg, "abc", 0);
Added:
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp
(added)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp
Mon Oct 30 11:27:54 2006
@@ -0,0 +1,110 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/Message.h"
+#include "qpid/broker/MessageBuilder.h"
+#include <qpid_test_plugin.h>
+#include <iostream>
+#include <memory>
+
+using namespace boost;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+class MessageBuilderTest : public CppUnit::TestCase
+{
+ struct DummyHandler : MessageBuilder::CompletionHandler{
+ Message::shared_ptr msg;
+
+ virtual void complete(Message::shared_ptr& _msg){
+ msg = _msg;
+ }
+ };
+
+
+ CPPUNIT_TEST_SUITE(MessageBuilderTest);
+ CPPUNIT_TEST(testHeaderOnly);
+ CPPUNIT_TEST(test1ContentFrame);
+ CPPUNIT_TEST(test2ContentFrames);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ void testHeaderOnly(){
+ DummyHandler handler;
+ MessageBuilder builder(&handler);
+
+ Message::shared_ptr message(new Message(0, "test", "my_routing_key",
false, false));
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ header->setContentSize(0);
+
+ builder.initialise(message);
+ CPPUNIT_ASSERT(!handler.msg);
+ builder.setHeader(header);
+ CPPUNIT_ASSERT(handler.msg);
+ CPPUNIT_ASSERT_EQUAL(message, handler.msg);
+ }
+
+ void test1ContentFrame(){
+ DummyHandler handler;
+ MessageBuilder builder(&handler);
+
+ string data1("abcdefg");
+
+ Message::shared_ptr message(new Message(0, "test", "my_routing_key",
false, false));
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ header->setContentSize(7);
+ AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
+
+ builder.initialise(message);
+ CPPUNIT_ASSERT(!handler.msg);
+ builder.setHeader(header);
+ CPPUNIT_ASSERT(!handler.msg);
+ builder.addContent(part1);
+ CPPUNIT_ASSERT(handler.msg);
+ CPPUNIT_ASSERT_EQUAL(message, handler.msg);
+ }
+
+ void test2ContentFrames(){
+ DummyHandler handler;
+ MessageBuilder builder(&handler);
+
+ string data1("abcdefg");
+ string data2("hijklmn");
+
+ Message::shared_ptr message(new Message(0, "test", "my_routing_key",
false, false));
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ header->setContentSize(14);
+ AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
+ AMQContentBody::shared_ptr part2(new AMQContentBody(data2));
+
+ builder.initialise(message);
+ CPPUNIT_ASSERT(!handler.msg);
+ builder.setHeader(header);
+ CPPUNIT_ASSERT(!handler.msg);
+ builder.addContent(part1);
+ CPPUNIT_ASSERT(!handler.msg);
+ builder.addContent(part2);
+ CPPUNIT_ASSERT(handler.msg);
+ CPPUNIT_ASSERT_EQUAL(message, handler.msg);
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(MessageBuilderTest);
Propchange:
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/concurrent/APRBaseTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/concurrent/APRBaseTest.cpp?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/concurrent/APRBaseTest.cpp
(added)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/concurrent/APRBaseTest.cpp Mon
Oct 30 11:27:54 2006
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/concurrent/APRBase.h"
+#include <qpid_test_plugin.h>
+#include <iostream>
+
+using namespace qpid::concurrent;
+
+class APRBaseTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(APRBaseTest);
+ CPPUNIT_TEST(testMe);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ void testMe()
+ {
+ APRBase::increment();
+ APRBase::increment();
+ APRBase::decrement();
+ APRBase::decrement();
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(APRBaseTest);
+
Propchange:
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/concurrent/APRBaseTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native