Author: gsim
Date: Fri Nov 17 03:03:22 2006
New Revision: 476108

URL: http://svn.apache.org/viewvc?view=rev&rev=476108
Log:
Some fixes and tests for bugs uncovered during testing of persistence.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp
    incubator/qpid/trunk/qpid/python/qpid/testlib.py
    incubator/qpid/trunk/qpid/python/tests/tx.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp?view=diff&rev=476108&r1=476107&r2=476108
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp Fri Nov 17 
03:03:22 2006
@@ -87,6 +87,8 @@
     for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = 
consumers.begin() ){
         cancel(i);
     }
+    //requeue:
+    recover(true);
 }
 
 void Channel::begin(){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?view=diff&rev=476108&r1=476107&r2=476108
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Nov 17 
03:03:22 2006
@@ -64,7 +64,7 @@
 
 void DeliveryRecord::requeue() const{
     msg->redeliver();
-    queue->deliver(msg);
+    queue->process(msg);
 }
 
 void DeliveryRecord::addTo(Prefetch* const prefetch) const{

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp?view=diff&rev=476108&r1=476107&r2=476108
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp Fri 
Nov 17 03:03:22 2006
@@ -97,6 +97,9 @@
             client.getChannel().close(channel, e.code, e.text, 
method->amqpClassId(), method->amqpMethodId());
         }catch(ConnectionException& e){
             client.getConnection().close(0, e.code, e.text, 
method->amqpClassId(), method->amqpMethodId());
+        }catch(std::exception& e){
+            string error(e.what());
+            client.getConnection().close(0, 541/*internal error*/, error, 
method->amqpClassId(), method->amqpMethodId());
         }
        break;
 
@@ -132,16 +135,20 @@
 }
 
 void SessionHandlerImpl::closed(){
-    for(channel_iterator i = channels.begin(); i != channels.end(); i = 
channels.begin()){
-        Channel* c = i->second;
-        channels.erase(i);
-        c->close();
-        delete c;
-    }
-    for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); 
i = exclusiveQueues.begin()){
-        string name = (*i)->getName();
-        queues->destroy(name);
-        exclusiveQueues.erase(i);
+    try {
+        for(channel_iterator i = channels.begin(); i != channels.end(); i = 
channels.begin()){
+            Channel* c = i->second;
+            channels.erase(i);
+            c->close();
+            delete c;
+        }
+        for(queue_iterator i = exclusiveQueues.begin(); i < 
exclusiveQueues.end(); i = exclusiveQueues.begin()){
+            string name = (*i)->getName();
+            queues->destroy(name);
+            exclusiveQueues.erase(i);
+        }
+    } catch(std::exception& e) {
+        std::cout << "Caught unhandled exception while closing session: " << 
e.what() << std::endl;
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp?view=diff&rev=476108&r1=476107&r2=476108
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp Fri Nov 17 
03:03:22 2006
@@ -40,11 +40,13 @@
 void TxBuffer::commit()
 {
     for_each(ops.begin(), ops.end(), mem_fun(&TxOp::commit));
+    ops.clear();
 }
 
 void TxBuffer::rollback()
 {
     for_each(ops.begin(), ops.end(), mem_fun(&TxOp::rollback));
+    ops.clear();
 }
 
 void TxBuffer::enlist(TxOp* const op)

Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp?view=diff&rev=476108&r1=476107&r2=476108
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp Fri 
Nov 17 03:03:22 2006
@@ -153,6 +153,8 @@
     CPPUNIT_TEST(testPrepareAndCommit);
     CPPUNIT_TEST(testFailOnPrepare);
     CPPUNIT_TEST(testRollback);
+    CPPUNIT_TEST(testBufferIsClearedAfterRollback);
+    CPPUNIT_TEST(testBufferIsClearedAfterCommit);
     CPPUNIT_TEST_SUITE_END();
 
   public:
@@ -223,6 +225,38 @@
         opA.check();
         opB.check();
         opC.check();
+    }
+
+    void testBufferIsClearedAfterRollback(){
+        MockTxOp opA;
+        opA.expectRollback();
+        MockTxOp opB;
+        opB.expectRollback();
+
+        TxBuffer buffer;
+        buffer.enlist(&opA);
+        buffer.enlist(&opB);
+
+        buffer.rollback();
+        buffer.commit();//second call should not reach ops
+        opA.check();
+        opB.check();
+    }
+
+    void testBufferIsClearedAfterCommit(){
+        MockTxOp opA;
+        opA.expectCommit();
+        MockTxOp opB;
+        opB.expectCommit();
+
+        TxBuffer buffer;
+        buffer.enlist(&opA);
+        buffer.enlist(&opB);
+
+        buffer.commit();
+        buffer.rollback();//second call should not reach ops
+        opA.check();
+        opB.check();
     }
 };
 

Modified: incubator/qpid/trunk/qpid/python/qpid/testlib.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/testlib.py?view=diff&rev=476108&r1=476107&r2=476108
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/testlib.py Fri Nov 17 03:03:22 2006
@@ -225,13 +225,13 @@
         self.assertPublishGet(self.consume(queue), exchange, routing_key, 
properties)
 
     def assertChannelException(self, expectedCode, message): 
-        self.assertEqual(message.method.klass.name, "channel")
-        self.assertEqual(message.method.name, "close")
-        self.assertEqual(message.reply_code, expectedCode)
+        self.assertEqual("channel", message.method.klass.name)
+        self.assertEqual("close", message.method.name)
+        self.assertEqual(expectedCode, message.reply_code)
 
 
     def assertConnectionException(self, expectedCode, message): 
-        self.assertEqual(message.method.klass.name, "connection")
-        self.assertEqual(message.method.name, "close")
-        self.assertEqual(message.reply_code, expectedCode)
+        self.assertEqual("connection", message.method.klass.name)
+        self.assertEqual("close", message.method.name)
+        self.assertEqual(expectedCode, message.reply_code)
 

Modified: incubator/qpid/trunk/qpid/python/tests/tx.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/tx.py?view=diff&rev=476108&r1=476107&r2=476108
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/tx.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/tx.py Fri Nov 17 03:03:22 2006
@@ -55,6 +55,42 @@
         channel.basic_ack(delivery_tag=0, multiple=True)
         channel.tx_commit()
 
+    def test_auto_rollback(self):
+        """
+        Test that a channel closed with an open transaction is effectively 
rolled back
+        """
+        channel = self.channel
+        queue_a, queue_b, queue_c = self.perform_txn_work(channel, 
"tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
+
+        for q in [queue_a, queue_b, queue_c]:
+            try:
+                extra = q.get(timeout=1)
+                self.fail("Got unexpected message: " + extra.content.body)
+            except Empty: None
+
+        channel.tx_rollback()
+
+        #check results
+        for i in range(1, 5):
+            msg = queue_a.get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.content.body)
+
+        msg = queue_b.get(timeout=1)
+        self.assertEqual("Message 6", msg.content.body)
+
+        msg = queue_c.get(timeout=1)
+        self.assertEqual("Message 7", msg.content.body)
+
+        for q in [queue_a, queue_b, queue_c]:
+            try:
+                extra = q.get(timeout=1)
+                self.fail("Got unexpected message: " + extra.content.body)
+            except Empty: None
+
+        #cleanup
+        channel.basic_ack(delivery_tag=0, multiple=True)
+        channel.tx_commit()
+
     def test_rollback(self):
         """
         Test that rolled back publishes are not delivered and rolled back acks 
are re-delivered
@@ -90,7 +126,7 @@
         #cleanup
         channel.basic_ack(delivery_tag=0, multiple=True)
         channel.tx_commit()
-        
+
     def perform_txn_work(self, channel, name_a, name_b, name_c):
         """
         Utility method that does some setup and some work under a transaction. 
Used for testing both


Reply via email to