Author: gsim
Date: Fri Nov  3 05:44:21 2006
New Revision: 470810

URL: http://svn.apache.org/viewvc?view=rev&rev=470810
Log:
Added some methods to MessageStore interface and hooked these in where 
appropriate.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BasicHeaderProperties.h
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ConfigurationTest.cpp
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ExchangeTest.cpp
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/QueueTest.cpp
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxAckTest.cpp
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?view=diff&rev=470810&r1=470809&r2=470810
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri Nov  3 
05:44:21 2006
@@ -32,7 +32,8 @@
                                                      mandatory(_mandatory),
                                                      immediate(_immediate),
                                                      redelivered(false),
-                                                     size(0) {}
+                                                     size(0),
+                                                     persistenceId(0) {}
 
 Message::~Message(){}
 
@@ -92,3 +93,9 @@
     return publisher;
 }
 
+bool Message::isPersistent()
+{
+    if(!header) return false;
+    BasicHeaderProperties* props = getHeaderProperties();
+    return props && props->getDeliveryMode() == PERSISTENT;
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?view=diff&rev=470810&r1=470809&r2=470810
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Nov  3 05:44:21 
2006
@@ -48,7 +48,7 @@
             qpid::framing::AMQHeaderBody::shared_ptr header;
             content_list content;
             u_int64_t size;
-            TxBuffer* tx;
+            u_int64_t persistenceId;
 
             void sendContent(qpid::framing::OutputHandler* out, 
                              int channel, u_int32_t framesize);
@@ -78,11 +78,12 @@
             void redeliver();
 
             qpid::framing::BasicHeaderProperties* getHeaderProperties();
+            bool isPersistent();
             const string& getRoutingKey() const { return routingKey; }
             const string& getExchange() const { return exchange; }
             u_int64_t contentSize() const { return size; }
-            TxBuffer* getTx() const { return tx; }
-            void setTx(TxBuffer* _tx) { tx = _tx; }
+            u_int64_t getPersistenceId() const { return persistenceId; }
+            void setPersistenceId(u_int64_t _persistenceId) { persistenceId = 
_persistenceId; }
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?view=diff&rev=470810&r1=470809&r2=470810
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Fri Nov  3 
05:44:21 2006
@@ -24,12 +24,29 @@
 
 namespace qpid {
     namespace broker {
+        class Queue;
+        class QueueRegistry;
+
         /**
          * An abstraction of the persistent storage for messages.
          */
         class MessageStore : public TransactionalStore{
         public:
             /**
+             * Record the existance of a durable queue
+             */
+            virtual void create(const Queue& queue) = 0;
+            /**
+             * Destroy a durable queue
+             */
+            virtual void destroy(const Queue& queue) = 0;
+
+            /**
+             * Request recovery of queue and message state from store
+             */
+            virtual void recover(QueueRegistry& queues) = 0;
+
+            /**
              * Enqueues a message, storing the message if it has not
              * been previously stored and recording that the given
              * message is on the given queue.
@@ -40,7 +57,7 @@
              * distributed transaction in which the operation takes
              * place or null for 'local' transactions
              */
-            virtual void enqueue(Message::shared_ptr& msg, const string& 
queue, const string * const xid) = 0;
+            virtual void enqueue(Message::shared_ptr& msg, const Queue& queue, 
const string * const xid) = 0;
             /**
              * Dequeues a message, recording that the given message is
              * no longer on the given queue and deleting the message
@@ -52,7 +69,7 @@
              * distributed transaction in which the operation takes
              * place or null for 'local' transactions
              */
-            virtual void dequeue(Message::shared_ptr& msg, const string& 
queue, const string * const xid) = 0;
+            virtual void dequeue(Message::shared_ptr& msg, const Queue& queue, 
const string * const xid) = 0;
             /**
              * Treat all enqueue/dequeues where this xid was specified as 
being committed.
              */

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?view=diff&rev=470810&r1=470809&r2=470810
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Nov  3 05:44:21 
2006
@@ -23,13 +23,12 @@
 using namespace qpid::broker;
 using namespace qpid::concurrent;
 
-Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, 
+Queue::Queue(const string& _name, u_int32_t _autodelete, 
              MessageStore* const _store,
              const ConnectionToken* const _owner) :
 
     name(_name), 
     autodelete(_autodelete),
-    durable(_durable), 
     store(_store),
     owner(_owner), 
     queueing(false),
@@ -166,12 +165,26 @@
 
 void Queue::enqueue(Message::shared_ptr& msg, const string * const xid){
     if(store){
-        store->enqueue(msg, name, xid);
+        store->enqueue(msg, *this, xid);
     }
 }
 
 void Queue::dequeue(Message::shared_ptr& msg, const string * const xid){
     if(store){
-        store->dequeue(msg, name, xid);
+        store->dequeue(msg, *this, xid);
+    }
+}
+
+void Queue::create()
+{
+    if(store){
+        store->create(*this);
+    }
+}
+
+void Queue::destroy()
+{
+    if(store){
+        store->destroy(*this);
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?view=diff&rev=470810&r1=470809&r2=470810
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Fri Nov  3 05:44:21 
2006
@@ -47,7 +47,6 @@
         class Queue{
             const string name;
             const u_int32_t autodelete;
-            const bool durable;
             MessageStore* const store;
             const ConnectionToken* const owner;
             std::vector<Consumer*> consumers;
@@ -69,10 +68,13 @@
 
             typedef std::vector<shared_ptr> vector;
            
-            Queue(const string& name, bool durable = false, u_int32_t 
autodelete = 0, 
+            Queue(const string& name, u_int32_t autodelete = 0, 
                   MessageStore* const store = 0, 
                   const ConnectionToken* const owner = 0);
             ~Queue();
+
+            void create();
+            void destroy();
             /**
              * Informs the queue of a binding that should be cancelled on
              * destruction of the queue.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?view=diff&rev=470810&r1=470809&r2=470810
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Fri Nov  3 
05:44:21 2006
@@ -24,20 +24,20 @@
 using namespace qpid::broker;
 using namespace qpid::concurrent;
 
-QueueRegistry::QueueRegistry() : counter(1){}
+QueueRegistry::QueueRegistry(MessageStore* const _store) : counter(1), 
store(_store){}
 
 QueueRegistry::~QueueRegistry(){}
 
 std::pair<Queue::shared_ptr, bool>
-QueueRegistry::declare(const string& declareName, bool durable, u_int32_t 
autoDelete, 
-                       MessageStore* const store, const ConnectionToken* owner)
+QueueRegistry::declare(const string& declareName, bool durable, 
+                       u_int32_t autoDelete, const ConnectionToken* owner)
 {
     Locker locker(lock);
     string name = declareName.empty() ? generateName() : declareName;
     assert(!name.empty());
     QueueMap::iterator i =  queues.find(name);
     if (i == queues.end()) {
-       Queue::shared_ptr queue(new Queue(name, durable, autoDelete, store, 
owner));
+       Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 
0, owner));
        queues[name] = queue;
        return std::pair<Queue::shared_ptr, bool>(queue, true);
     } else {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?view=diff&rev=470810&r1=470809&r2=470810
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Fri Nov  3 
05:44:21 2006
@@ -37,7 +37,7 @@
 class QueueRegistry{
  
   public:
-    QueueRegistry();
+    QueueRegistry(MessageStore* const store = 0);
     ~QueueRegistry();
 
     /**
@@ -47,7 +47,6 @@
      * was created by this declare call false if it already existed.
      */
     std::pair<Queue::shared_ptr, bool> declare(const string& name, bool 
durable = false, u_int32_t autodelete = 0, 
-                                               MessageStore* const _store = 0,
                                                const ConnectionToken* const 
owner = 0);
 
     /**
@@ -79,7 +78,7 @@
     QueueMap queues;
     qpid::concurrent::Monitor lock;
     int counter;
-
+    MessageStore* const store;
 };
 
     

Modified: 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp?view=diff&rev=470810&r1=470809&r2=470810
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp 
Fri Nov  3 05:44:21 2006
@@ -33,7 +33,9 @@
 const std::string amq_match("amq.match");
 }
 
-SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : 
timeout(_timeout), cleaner(&queues, timeout/10){
+SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : 
+    queues(store.get()), timeout(_timeout), cleaner(&queues, timeout/10)
+{
     exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
     exchanges.declare(amq_direct, DirectExchange::typeName);
     exchanges.declare(amq_topic, TopicExchange::typeName);
@@ -42,10 +44,17 @@
     cleaner.start();
 }
 
-SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt){
+void SessionHandlerFactoryImpl::recover()
+{
+    if(store.get()) store->recover(queues);
+}
+
+SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt)
+{
     return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, 
timeout);
 }
 
-SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl(){
+SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl()
+{
     cleaner.stop();
 }

Modified: 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h?view=diff&rev=470810&r1=470809&r2=470810
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h 
Fri Nov  3 05:44:21 2006
@@ -18,27 +18,31 @@
 #ifndef _SessionHandlerFactoryImpl_
 #define _SessionHandlerFactoryImpl_
 
-#include "qpid/framing/AMQFrame.h"
 #include "qpid/broker/AutoDelete.h"
 #include "qpid/broker/ExchangeRegistry.h"
-#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/broker/MessageStore.h"
 #include "qpid/broker/QueueRegistry.h"
-#include "qpid/io/SessionHandlerFactory.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/io/SessionContext.h"
 #include "qpid/io/SessionHandler.h"
+#include "qpid/io/SessionHandlerFactory.h"
 #include "qpid/io/TimeoutHandler.h"
+#include <memory>
 
 namespace qpid {
     namespace broker {
 
         class SessionHandlerFactoryImpl : public virtual 
qpid::io::SessionHandlerFactory
         {
+            std::auto_ptr<MessageStore> store;
             QueueRegistry queues;
             ExchangeRegistry exchanges;
             const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
             AutoDelete cleaner;
         public:
             SessionHandlerFactoryImpl(u_int32_t timeout = 30000);
+            void recover();
             virtual qpid::io::SessionHandler* create(qpid::io::SessionContext* 
ctxt);
             virtual ~SessionHandlerFactoryImpl();
         };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp?view=diff&rev=470810&r1=470809&r2=470810
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp Fri 
Nov  3 05:44:21 2006
@@ -250,24 +250,28 @@
        queue = parent->getQueue(name, channel);
     } else {
        std::pair<Queue::shared_ptr, bool> queue_created =  
-            parent->queues->declare(name, durable, autoDelete ? 
parent->timeout : 0, 0, exclusive ? parent : 0);
+            parent->queues->declare(name, durable, autoDelete ? 
parent->timeout : 0, exclusive ? parent : 0);
        queue = queue_created.first;
        assert(queue);
        if (queue_created.second) { // This is a new queue
            parent->getChannel(channel)->setDefaultQueue(queue);
+
+            //create persistent record if required
+            queue_created.first->create();
+
            //add default binding:
            parent->exchanges->getDefault()->bind(queue, name, 0);
-           if(exclusive){
+           if (exclusive) {
                parent->exclusiveQueues.push_back(queue);
            } else if(autoDelete){
                parent->cleaner->add(queue);
            }
        }
     }
-    if(exclusive && !queue->isExclusiveOwner(parent)){
+    if (exclusive && !queue->isExclusiveOwner(parent)) {
        throw ChannelException(405, "Cannot grant exclusive access to queue");
     }
-    if(!nowait){
+    if (!nowait) {
         name = queue->getName();
         parent->client.getQueue().declareOk(channel, name, 
queue->getMessageCount(), queue->getConsumerCount());
     }
@@ -311,6 +315,7 @@
             if(i < parent->exclusiveQueues.end()) 
parent->exclusiveQueues.erase(i);
         }
         count = q->getMessageCount();
+        q->destroy();
         parent->queues->destroy(queue);
     }
     if(!nowait) parent->client.getQueue().deleteOk(channel, count);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BasicHeaderProperties.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BasicHeaderProperties.h?view=diff&rev=470810&r1=470809&r2=470810
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BasicHeaderProperties.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BasicHeaderProperties.h Fri 
Nov  3 05:44:21 2006
@@ -25,6 +25,7 @@
 
 namespace qpid {
 namespace framing {
+    enum delivery_mode {TRANSIENT = 1, PERSISTENT = 2};
 
     //TODO: This could be easily generated from the spec
     class BasicHeaderProperties : public HeaderProperties

Modified: 
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ConfigurationTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ConfigurationTest.cpp?view=diff&rev=470810&r1=470809&r2=470810
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ConfigurationTest.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ConfigurationTest.cpp 
Fri Nov  3 05:44:21 2006
@@ -60,8 +60,8 @@
     void testVarious() 
     {
         Configuration conf;
-        char* argv[] = {"ignore", "-t", "--worker-threads", "10", "-a", 
"blocking"};
-        conf.parse(6, argv);
+        char* argv[] = {"ignore", "-t", "--worker-threads", "10"};
+        conf.parse(4, argv);
         CPPUNIT_ASSERT_EQUAL(5672, conf.getPort());//default
         CPPUNIT_ASSERT_EQUAL(10, conf.getWorkerThreads());
         CPPUNIT_ASSERT(conf.isTrace());

Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ExchangeTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ExchangeTest.cpp?view=diff&rev=470810&r1=470809&r2=470810
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ExchangeTest.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ExchangeTest.cpp Fri 
Nov  3 05:44:21 2006
@@ -37,8 +37,8 @@
 
     void testMe() 
     {
-        Queue::shared_ptr queue(new Queue("queue", true, true));
-        Queue::shared_ptr queue2(new Queue("queue2", true, true));
+        Queue::shared_ptr queue(new Queue("queue", true));
+        Queue::shared_ptr queue2(new Queue("queue2", true));
 
         TopicExchange topic("topic");
         topic.bind(queue, "abc", 0);

Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/QueueTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/QueueTest.cpp?view=diff&rev=470810&r1=470809&r2=470810
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/QueueTest.cpp Fri Nov  
3 05:44:21 2006
@@ -52,7 +52,7 @@
 
   public:
     void testConsumers(){
-        Queue::shared_ptr queue(new Queue("my_queue", true, true));
+        Queue::shared_ptr queue(new Queue("my_queue", true));
     
         //Test adding consumers:
         TestConsumer c1; 
@@ -84,7 +84,7 @@
     }
 
     void testBinding(){
-        Queue::shared_ptr queue(new Queue("my_queue", true, true));
+        Queue::shared_ptr queue(new Queue("my_queue", true));
         //Test bindings:
         TestBinding a;
         TestBinding b;
@@ -118,7 +118,7 @@
     }
 
     void testDequeue(){
-        Queue::shared_ptr queue(new Queue("my_queue", true, true));
+        Queue::shared_ptr queue(new Queue("my_queue", true));
 
         Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", 
"A", true, true));
         Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", 
"B", true, true));

Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxAckTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxAckTest.cpp?view=diff&rev=470810&r1=470809&r2=470810
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxAckTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxAckTest.cpp Fri Nov  
3 05:44:21 2006
@@ -16,6 +16,7 @@
  *
  */
 #include "qpid/broker/MessageStore.h"
+#include "qpid/broker/QueueRegistry.h"
 #include "qpid/broker/TxAck.h"
 #include <qpid_test_plugin.h>
 #include <iostream>
@@ -29,81 +30,84 @@
 class TxAckTest : public CppUnit::TestCase  
 {
 
-        class TestMessageStore : public MessageStore
-        {
-            public:
-                vector<Message::shared_ptr> dequeued;
-
-                void dequeue(Message::shared_ptr& msg, const string& 
/*queue*/, const string * const /*xid*/)
-                {
-                    dequeued.push_back(msg);
-                }
-
-                //dont care about any of the other methods:
-                void enqueue(Message::shared_ptr&, const string&, const string 
* const){}
-                void committed(const string * const){}
-                void aborted(const string * const){}
-                void begin(){}
-                void commit(){}
-                void abort(){}        
-                ~TestMessageStore(){}
-        };
-
-        CPPUNIT_TEST_SUITE(TxAckTest);
-        CPPUNIT_TEST(testPrepare);
-        CPPUNIT_TEST(testCommit);
-        CPPUNIT_TEST_SUITE_END();
-
-
-        AccumulatedAck acked;
-        TestMessageStore store;
-        Queue::shared_ptr queue;
-        vector<Message::shared_ptr> messages;
-        list<DeliveryRecord> deliveries;
-        TxAck op;
-
-
+    class TestMessageStore : public MessageStore
+    {
     public:
+        vector<Message::shared_ptr> dequeued;
 
-        TxAckTest() : queue(new Queue("my_queue", true, false, &store, 0)), 
op(acked, deliveries)
+        void dequeue(Message::shared_ptr& msg, const Queue& /*queue*/, const 
string * const /*xid*/)
         {
-            for(int i = 0; i < 10; i++){
-                Message::shared_ptr msg(new Message(0, "exchange", 
"routing_key", false, false));
-                messages.push_back(msg);
-                deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1)));
-            }
-
-            //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not)
-            acked.range = 5;
-            acked.individual.push_back(7);
-            acked.individual.push_back(9);
-        }      
-
-        void testPrepare()
-        {
-            //ensure acked messages are discarded, i.e. dequeued from store
-            op.prepare();
-            CPPUNIT_ASSERT_EQUAL((size_t) 7, store.dequeued.size());
-            CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size());
-            CPPUNIT_ASSERT_EQUAL(messages[0], store.dequeued[0]);//msg 1
-            CPPUNIT_ASSERT_EQUAL(messages[1], store.dequeued[1]);//msg 2
-            CPPUNIT_ASSERT_EQUAL(messages[2], store.dequeued[2]);//msg 3
-            CPPUNIT_ASSERT_EQUAL(messages[3], store.dequeued[3]);//msg 4
-            CPPUNIT_ASSERT_EQUAL(messages[4], store.dequeued[4]);//msg 5
-            CPPUNIT_ASSERT_EQUAL(messages[6], store.dequeued[5]);//msg 7
-            CPPUNIT_ASSERT_EQUAL(messages[8], store.dequeued[6]);//msg 9
+            dequeued.push_back(msg);
         }
 
-        void testCommit()
-        {
-            //emsure acked messages are removed from list
-            op.commit();
-            CPPUNIT_ASSERT_EQUAL((size_t) 3, deliveries.size());
-            list<DeliveryRecord>::iterator i = deliveries.begin();
-            CPPUNIT_ASSERT(i->matches(6));//msg 6
-            CPPUNIT_ASSERT((++i)->matches(8));//msg 8
-            CPPUNIT_ASSERT((++i)->matches(10));//msg 10
+        //dont care about any of the other methods:
+        void create(const Queue&){}
+        void destroy(const Queue&){}        
+        void recover(QueueRegistry&){}
+        void enqueue(Message::shared_ptr&, const Queue&, const string * 
const){}
+        void committed(const string * const){}
+        void aborted(const string * const){}
+        void begin(){}
+        void commit(){}
+        void abort(){}        
+        ~TestMessageStore(){}
+    };
+
+    CPPUNIT_TEST_SUITE(TxAckTest);
+    CPPUNIT_TEST(testPrepare);
+    CPPUNIT_TEST(testCommit);
+    CPPUNIT_TEST_SUITE_END();
+
+
+    AccumulatedAck acked;
+    TestMessageStore store;
+    Queue::shared_ptr queue;
+    vector<Message::shared_ptr> messages;
+    list<DeliveryRecord> deliveries;
+    TxAck op;
+
+
+public:
+
+    TxAckTest() : queue(new Queue("my_queue", false, &store, 0)), op(acked, 
deliveries)
+    {
+        for(int i = 0; i < 10; i++){
+            Message::shared_ptr msg(new Message(0, "exchange", "routing_key", 
false, false));
+            messages.push_back(msg);
+            deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1)));
         }
+
+        //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not)
+        acked.range = 5;
+        acked.individual.push_back(7);
+        acked.individual.push_back(9);
+    }      
+
+    void testPrepare()
+    {
+        //ensure acked messages are discarded, i.e. dequeued from store
+        op.prepare();
+        CPPUNIT_ASSERT_EQUAL((size_t) 7, store.dequeued.size());
+        CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size());
+        CPPUNIT_ASSERT_EQUAL(messages[0], store.dequeued[0]);//msg 1
+        CPPUNIT_ASSERT_EQUAL(messages[1], store.dequeued[1]);//msg 2
+        CPPUNIT_ASSERT_EQUAL(messages[2], store.dequeued[2]);//msg 3
+        CPPUNIT_ASSERT_EQUAL(messages[3], store.dequeued[3]);//msg 4
+        CPPUNIT_ASSERT_EQUAL(messages[4], store.dequeued[4]);//msg 5
+        CPPUNIT_ASSERT_EQUAL(messages[6], store.dequeued[5]);//msg 7
+        CPPUNIT_ASSERT_EQUAL(messages[8], store.dequeued[6]);//msg 9
+    }
+
+    void testCommit()
+    {
+        //emsure acked messages are removed from list
+        op.commit();
+        CPPUNIT_ASSERT_EQUAL((size_t) 3, deliveries.size());
+        list<DeliveryRecord>::iterator i = deliveries.begin();
+        CPPUNIT_ASSERT(i->matches(6));//msg 6
+        CPPUNIT_ASSERT((++i)->matches(8));//msg 8
+        CPPUNIT_ASSERT((++i)->matches(10));//msg 10
+    }
 };
 
 // Make this test suite a plugin.

Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp?view=diff&rev=470810&r1=470809&r2=470810
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp Fri 
Nov  3 05:44:21 2006
@@ -16,6 +16,7 @@
  *
  */
 #include "qpid/broker/MessageStore.h"
+#include "qpid/broker/QueueRegistry.h"
 #include "qpid/broker/TxPublish.h"
 #include <qpid_test_plugin.h>
 #include <iostream>
@@ -30,71 +31,74 @@
 class TxPublishTest : public CppUnit::TestCase  
 {
 
-        class TestMessageStore : public MessageStore
-        {
-            public:
-                vector< pair<string, Message::shared_ptr> > enqueued;
-
-                void enqueue(Message::shared_ptr& msg, const string& queue, 
const string * const /*xid*/)
-                {
-                    enqueued.push_back(pair<string, 
Message::shared_ptr>(queue,msg));
-                }
-
-                //dont care about any of the other methods:
-                void dequeue(Message::shared_ptr&, const string&, const string 
* const){}
-                void committed(const string * const){}
-                void aborted(const string * const){}
-                void begin(){}
-                void commit(){}
-                void abort(){}        
-                ~TestMessageStore(){}
-        };
-
-        CPPUNIT_TEST_SUITE(TxPublishTest);
-        CPPUNIT_TEST(testPrepare);
-        CPPUNIT_TEST(testCommit);
-        CPPUNIT_TEST_SUITE_END();
-
-
-        TestMessageStore store;
-        Queue::shared_ptr queue1;
-        Queue::shared_ptr queue2;
-        Message::shared_ptr msg;
-        TxPublish op;
-
-
+    class TestMessageStore : public MessageStore
+    {
     public:
-
-        TxPublishTest() : queue1(new Queue("queue1", true, false, &store, 0)), 
-                          queue2(new Queue("queue2", true, false, &store, 0)), 
-                          msg(new Message(0, "exchange", "routing_key", false, 
false)),
-                          op(msg)
+        vector< pair<string, Message::shared_ptr> > enqueued;
+        
+        void enqueue(Message::shared_ptr& msg, const Queue& queue, const 
string * const /*xid*/)
         {
-            op.deliverTo(queue1);
-            op.deliverTo(queue2);
-        }      
-
-        void testPrepare()
-        {
-            //ensure messages are enqueued in store
-            op.prepare();
-            CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size());
-            CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first);
-            CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[0].second);
-            CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first);
-            CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[1].second);
-        }
-
-        void testCommit()
-        {
-            //ensure messages are delivered to queue
-            op.commit();
-            CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue1->getMessageCount());
-            CPPUNIT_ASSERT_EQUAL(msg, queue1->dequeue());
-
-            CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue2->getMessageCount());
-            CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue());            
+            enqueued.push_back(pair<string, 
Message::shared_ptr>(queue.getName(),msg));
         }
+        
+        //dont care about any of the other methods:
+        void create(const Queue&){}
+        void destroy(const Queue&){}
+        void recover(QueueRegistry&){}
+        void dequeue(Message::shared_ptr&, const Queue&, const string * 
const){}
+        void committed(const string * const){}
+        void aborted(const string * const){}
+        void begin(){}
+        void commit(){}
+        void abort(){}        
+        ~TestMessageStore(){}
+    };
+    
+    CPPUNIT_TEST_SUITE(TxPublishTest);
+    CPPUNIT_TEST(testPrepare);
+    CPPUNIT_TEST(testCommit);
+    CPPUNIT_TEST_SUITE_END();
+    
+    
+    TestMessageStore store;
+    Queue::shared_ptr queue1;
+    Queue::shared_ptr queue2;
+    Message::shared_ptr msg;
+    TxPublish op;
+    
+    
+public:
+    
+    TxPublishTest() : queue1(new Queue("queue1", false, &store, 0)), 
+                      queue2(new Queue("queue2", false, &store, 0)), 
+                      msg(new Message(0, "exchange", "routing_key", false, 
false)),
+                      op(msg)
+    {
+        op.deliverTo(queue1);
+        op.deliverTo(queue2);
+    }      
+
+    void testPrepare()
+    {
+        //ensure messages are enqueued in store
+        op.prepare();
+        CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size());
+        CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first);
+        CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[0].second);
+        CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first);
+        CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[1].second);
+    }
+
+    void testCommit()
+    {
+        //ensure messages are delivered to queue
+        op.commit();
+        CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue1->getMessageCount());
+        CPPUNIT_ASSERT_EQUAL(msg, queue1->dequeue());
+
+        CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue2->getMessageCount());
+        CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue());            
+    }
 };
 
 // Make this test suite a plugin.


Reply via email to