Author: gsim
Date: Fri Nov 17 03:03:22 2006
New Revision: 476108
URL: http://svn.apache.org/viewvc?view=rev&rev=476108
Log:
Some fixes and tests for bugs uncovered during testing of persistence.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp
incubator/qpid/trunk/qpid/python/qpid/testlib.py
incubator/qpid/trunk/qpid/python/tests/tx.py
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp?view=diff&rev=476108&r1=476107&r2=476108
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp Fri Nov 17
03:03:22 2006
@@ -87,6 +87,8 @@
for(consumer_iterator i = consumers.begin(); i != consumers.end(); i =
consumers.begin() ){
cancel(i);
}
+ //requeue:
+ recover(true);
}
void Channel::begin(){
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?view=diff&rev=476108&r1=476107&r2=476108
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Nov 17
03:03:22 2006
@@ -64,7 +64,7 @@
void DeliveryRecord::requeue() const{
msg->redeliver();
- queue->deliver(msg);
+ queue->process(msg);
}
void DeliveryRecord::addTo(Prefetch* const prefetch) const{
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp?view=diff&rev=476108&r1=476107&r2=476108
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp Fri
Nov 17 03:03:22 2006
@@ -97,6 +97,9 @@
client.getChannel().close(channel, e.code, e.text,
method->amqpClassId(), method->amqpMethodId());
}catch(ConnectionException& e){
client.getConnection().close(0, e.code, e.text,
method->amqpClassId(), method->amqpMethodId());
+ }catch(std::exception& e){
+ string error(e.what());
+ client.getConnection().close(0, 541/*internal error*/, error,
method->amqpClassId(), method->amqpMethodId());
}
break;
@@ -132,16 +135,20 @@
}
void SessionHandlerImpl::closed(){
- for(channel_iterator i = channels.begin(); i != channels.end(); i =
channels.begin()){
- Channel* c = i->second;
- channels.erase(i);
- c->close();
- delete c;
- }
- for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end();
i = exclusiveQueues.begin()){
- string name = (*i)->getName();
- queues->destroy(name);
- exclusiveQueues.erase(i);
+ try {
+ for(channel_iterator i = channels.begin(); i != channels.end(); i =
channels.begin()){
+ Channel* c = i->second;
+ channels.erase(i);
+ c->close();
+ delete c;
+ }
+ for(queue_iterator i = exclusiveQueues.begin(); i <
exclusiveQueues.end(); i = exclusiveQueues.begin()){
+ string name = (*i)->getName();
+ queues->destroy(name);
+ exclusiveQueues.erase(i);
+ }
+ } catch(std::exception& e) {
+ std::cout << "Caught unhandled exception while closing session: " <<
e.what() << std::endl;
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp?view=diff&rev=476108&r1=476107&r2=476108
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp Fri Nov 17
03:03:22 2006
@@ -40,11 +40,13 @@
void TxBuffer::commit()
{
for_each(ops.begin(), ops.end(), mem_fun(&TxOp::commit));
+ ops.clear();
}
void TxBuffer::rollback()
{
for_each(ops.begin(), ops.end(), mem_fun(&TxOp::rollback));
+ ops.clear();
}
void TxBuffer::enlist(TxOp* const op)
Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp?view=diff&rev=476108&r1=476107&r2=476108
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxBufferTest.cpp Fri
Nov 17 03:03:22 2006
@@ -153,6 +153,8 @@
CPPUNIT_TEST(testPrepareAndCommit);
CPPUNIT_TEST(testFailOnPrepare);
CPPUNIT_TEST(testRollback);
+ CPPUNIT_TEST(testBufferIsClearedAfterRollback);
+ CPPUNIT_TEST(testBufferIsClearedAfterCommit);
CPPUNIT_TEST_SUITE_END();
public:
@@ -223,6 +225,38 @@
opA.check();
opB.check();
opC.check();
+ }
+
+ void testBufferIsClearedAfterRollback(){
+ MockTxOp opA;
+ opA.expectRollback();
+ MockTxOp opB;
+ opB.expectRollback();
+
+ TxBuffer buffer;
+ buffer.enlist(&opA);
+ buffer.enlist(&opB);
+
+ buffer.rollback();
+ buffer.commit();//second call should not reach ops
+ opA.check();
+ opB.check();
+ }
+
+ void testBufferIsClearedAfterCommit(){
+ MockTxOp opA;
+ opA.expectCommit();
+ MockTxOp opB;
+ opB.expectCommit();
+
+ TxBuffer buffer;
+ buffer.enlist(&opA);
+ buffer.enlist(&opB);
+
+ buffer.commit();
+ buffer.rollback();//second call should not reach ops
+ opA.check();
+ opB.check();
}
};
Modified: incubator/qpid/trunk/qpid/python/qpid/testlib.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/testlib.py?view=diff&rev=476108&r1=476107&r2=476108
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/testlib.py Fri Nov 17 03:03:22 2006
@@ -225,13 +225,13 @@
self.assertPublishGet(self.consume(queue), exchange, routing_key,
properties)
def assertChannelException(self, expectedCode, message):
- self.assertEqual(message.method.klass.name, "channel")
- self.assertEqual(message.method.name, "close")
- self.assertEqual(message.reply_code, expectedCode)
+ self.assertEqual("channel", message.method.klass.name)
+ self.assertEqual("close", message.method.name)
+ self.assertEqual(expectedCode, message.reply_code)
def assertConnectionException(self, expectedCode, message):
- self.assertEqual(message.method.klass.name, "connection")
- self.assertEqual(message.method.name, "close")
- self.assertEqual(message.reply_code, expectedCode)
+ self.assertEqual("connection", message.method.klass.name)
+ self.assertEqual("close", message.method.name)
+ self.assertEqual(expectedCode, message.reply_code)
Modified: incubator/qpid/trunk/qpid/python/tests/tx.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/tx.py?view=diff&rev=476108&r1=476107&r2=476108
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/tx.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/tx.py Fri Nov 17 03:03:22 2006
@@ -55,6 +55,42 @@
channel.basic_ack(delivery_tag=0, multiple=True)
channel.tx_commit()
+ def test_auto_rollback(self):
+ """
+ Test that a channel closed with an open transaction is effectively
rolled back
+ """
+ channel = self.channel
+ queue_a, queue_b, queue_c = self.perform_txn_work(channel,
"tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+ channel.tx_rollback()
+
+ #check results
+ for i in range(1, 5):
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("Message 6", msg.content.body)
+
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("Message 7", msg.content.body)
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+ #cleanup
+ channel.basic_ack(delivery_tag=0, multiple=True)
+ channel.tx_commit()
+
def test_rollback(self):
"""
Test that rolled back publishes are not delivered and rolled back acks
are re-delivered
@@ -90,7 +126,7 @@
#cleanup
channel.basic_ack(delivery_tag=0, multiple=True)
channel.tx_commit()
-
+
def perform_txn_work(self, channel, name_a, name_b, name_c):
"""
Utility method that does some setup and some work under a transaction.
Used for testing both