Author: gsim
Date: Mon Dec  1 11:49:23 2008
New Revision: 722200

URL: http://svn.apache.org/viewvc?rev=722200&view=rev
Log:
QPID-1497: Ensure policy count and size reflect transactionality of dequeues


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=722200&r1=722199&r2=722200&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Mon Dec  1 
11:49:23 2008
@@ -139,6 +139,10 @@
     }
 }
 
+void DeliveryRecord::committed() const{
+    queue->dequeueCommitted(msg);
+}
+
 void DeliveryRecord::reject() 
 {    
     Exchange::shared_ptr alternate = queue->getAlternateExchange();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=722200&r1=722199&r2=722200&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Mon Dec  1 
11:49:23 2008
@@ -99,6 +99,7 @@
     void complete();
     void accept(TransactionContext* ctxt);
     void setEnded();
+    void committed() const;
 
     bool isAcquired() const { return acquired; }
     bool isComplete() const { return completed; }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp?rev=722200&r1=722199&r2=722200&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp Mon Dec  1 
11:49:23 2008
@@ -48,6 +48,7 @@
 
 void DtxAck::commit() throw()
 {
+    for_each(pending.begin(), pending.end(), 
mem_fun_ref(&DeliveryRecord::committed));
     pending.clear();
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?rev=722200&r1=722199&r2=722200&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Mon Dec  
1 11:49:23 2008
@@ -32,6 +32,8 @@
 
 const std::string nullxid = "";
 
+class SimpleDummyCtxt : public TransactionContext {};
+
 class DummyCtxt : public TPCTransactionContext 
 {
     const std::string xid;
@@ -112,7 +114,7 @@
 
 std::auto_ptr<TransactionContext> NullMessageStore::begin()
 {
-    return std::auto_ptr<TransactionContext>();
+    return std::auto_ptr<TransactionContext>(new SimpleDummyCtxt());
 }
 
 std::auto_ptr<TPCTransactionContext> NullMessageStore::begin(const 
std::string& xid)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=722200&r1=722199&r2=722200&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Dec  1 11:49:23 
2008
@@ -609,7 +609,9 @@
     if (policy.get() && !policy->isEnqueued(msg)) return false;
     {
         Mutex::ScopedLock locker(messageLock);
-        dequeued(msg);
+        if (!ctxt) { 
+            dequeued(msg);
+        }
     }
     if (msg.payload->isPersistent() && store && !lastValueQueue) {
         msg.payload->dequeueAsync(shared_from_this(), store); //increment to 
async counter -- for message sent to more than one queue
@@ -620,6 +622,12 @@
     return false;
 }
 
+void Queue::dequeueCommitted(const QueuedMessage& msg)
+{
+    Mutex::ScopedLock locker(messageLock);
+    dequeued(msg);    
+}
+
 /**
  * Removes a message from the in-memory delivery queue as well
  * dequeing it from the logical (and persistent if applicable) queue

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=722200&r1=722199&r2=722200&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon Dec  1 11:49:23 
2008
@@ -222,6 +222,11 @@
              * dequeue from store (only done once messages is acknowledged)
              */
             bool dequeue(TransactionContext* ctxt, const QueuedMessage &msg);
+            /**
+             * Inform the queue that a previous transactional dequeue
+             * committed.
+             */
+            void dequeueCommitted(const QueuedMessage& msg);
 
             /**
              * Gets the next available message 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=722200&r1=722199&r2=722200&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Mon Dec  1 
11:49:23 2008
@@ -46,18 +46,12 @@
         if (count.get() > 0) {
             --count;
         } else {
-            //Temporarily disabling checking as it causes java dtx test to fail
-            //TODO: renable these once all tests are passing again
-            //throw Exception(QPID_MSG("Attempted count underflow on dequeue(" 
<< _size << "): " << *this));
-            QPID_LOG(error, "Attempted count underflow on dequeue(" << _size 
<< "): " << *this);
+            throw Exception(QPID_MSG("Attempted count underflow on dequeue(" 
<< _size << "): " << *this));
         }
     }
     if (maxSize) {
         if (_size > size.get()) {
-            //Temporarily disabling checking as it causes java dtx test to fail
-            //TODO: renable these once all tests are passing again
-            //throw Exception(QPID_MSG("Attempted size underflow on dequeue(" 
<< _size << "): " << *this));
-            QPID_LOG(error, "Attempted size underflow on dequeue(" << _size << 
"): " << *this);
+            throw Exception(QPID_MSG("Attempted size underflow on dequeue(" << 
_size << "): " << *this));
         } else {
             size -= _size;
         }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp?rev=722200&r1=722199&r2=722200&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp Mon Dec  1 
11:49:23 2008
@@ -37,6 +37,7 @@
 
 void TxAccept::RangeOp::commit()
 {
+    for_each(range.start, range.end, bind(&DeliveryRecord::committed, _1));
     for_each(range.start, range.end, bind(&DeliveryRecord::setEnded, _1));
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp?rev=722200&r1=722199&r2=722200&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp Mon Dec  1 
11:49:23 2008
@@ -185,5 +185,62 @@
     } catch (const ResourceLimitExceededException&) {}    
 }
 
+QPID_AUTO_TEST_CASE(testPolicyWithDtx) 
+{
+    FieldTable args;
+    std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy(5, 0, 
QueuePolicy::REJECT);
+    policy->update(args);
+
+    ProxySessionFixture f;
+    std::string q("my-policy-queue");
+    f.session.queueDeclare(arg::queue=q, arg::exclusive=true, 
arg::autoDelete=true, arg::arguments=args);
+    LocalQueue incoming;
+    SubscriptionSettings settings(FlowControl::unlimited());
+    settings.autoAck = 0; // no auto ack.
+    Subscription sub = f.subs.subscribe(incoming, q, settings); 
+    f.session.dtxSelect();
+    Xid tx1(1, "test-dtx-mgr", "tx1");
+    f.session.dtxStart(arg::xid=tx1);
+    for (int i = 0; i < 5; i++) {
+        
f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%")
 % "Message" % (i+1)).str(), q));
+    }
+    f.session.dtxEnd(arg::xid=tx1);
+    f.session.dtxCommit(arg::xid=tx1, arg::onePhase=true);
+
+    Xid tx2(1, "test-dtx-mgr", "tx2");
+    f.session.dtxStart(arg::xid=tx2);
+    for (int i = 0; i < 5; i++) {
+        BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") 
% "Message" % (i+1)).str());
+    }
+    SequenceSet accepting=sub.getUnaccepted();
+    f.session.messageAccept(accepting);
+    f.session.dtxEnd(arg::xid=tx2);
+    f.session.dtxPrepare(arg::xid=tx2);
+    f.session.dtxRollback(arg::xid=tx2);
+    f.session.messageRelease(accepting);
+
+    Xid tx3(1, "test-dtx-mgr", "tx3");
+    f.session.dtxStart(arg::xid=tx3);
+    for (int i = 0; i < 5; i++) {
+        incoming.pop();
+    }
+    accepting=sub.getUnaccepted();
+    f.session.messageAccept(accepting);
+    f.session.dtxEnd(arg::xid=tx3);
+    f.session.dtxPrepare(arg::xid=tx3);
+
+    Session other = f.connection.newSession();
+    try {
+        ScopedSuppressLogging sl; // Suppress messages for expected errors.
+        other.messageTransfer(arg::content=client::Message("Message_6", q));
+        BOOST_FAIL("expecting ResourceLimitExceededException.");
+    } catch (const ResourceLimitExceededException&) {}    
+
+    f.session.dtxCommit(arg::xid=tx3);
+    //now retry and this time should succeed
+    other = f.connection.newSession();
+    other.messageTransfer(arg::content=client::Message("Message_6", q));
+}
+
 
 QPID_AUTO_TEST_SUITE_END()


Reply via email to