Author: gsim
Date: Tue Oct  2 02:54:59 2007
New Revision: 581175

URL: http://svn.apache.org/viewvc?rev=581175&view=rev
Log:
Fixed recovery; unacked message records are now updated to hold the new command 
id when messages are resent.
Added unit and python test as previous bug was not being picked up by the 
existing tests.


Added:
    incubator/qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp   (with 
props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp
    incubator/qpid/trunk/qpid/python/tests_0-10/message.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h?rev=581175&r1=581174&r2=581175&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h Tue Oct  2 
02:54:59 2007
@@ -43,7 +43,6 @@
     {
     public:
         virtual DeliveryId deliver(Message::shared_ptr& msg, 
DeliveryToken::shared_ptr token) = 0;
-        virtual void redeliver(Message::shared_ptr& msg, 
DeliveryToken::shared_ptr token, DeliveryId tag) = 0;
         virtual ~DeliveryAdapter(){}
     };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=581175&r1=581174&r2=581175&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Tue Oct  2 
02:54:59 2007
@@ -27,13 +27,15 @@
 using namespace qpid::broker;
 using std::string;
 
-DeliveryRecord::DeliveryRecord(QueuedMessage& _msg, 
+DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, 
                                Queue::shared_ptr _queue, 
-                               const string _consumerTag, 
+                               const std::string _tag,
+                               DeliveryToken::shared_ptr _token, 
                                const DeliveryId _id,
                                bool _acquired, bool _confirmed) : msg(_msg), 
                                                                   
queue(_queue), 
-                                                                  
consumerTag(_consumerTag),
+                                                                  tag(_tag),
+                                                                  
token(_token),
                                                                   id(_id),
                                                                   
acquired(_acquired),
                                                                   
confirmed(_confirmed),
@@ -41,11 +43,10 @@
 {
 }
 
-DeliveryRecord::DeliveryRecord(QueuedMessage& _msg, 
+DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, 
                                Queue::shared_ptr _queue, 
                                const DeliveryId _id) : msg(_msg), 
                                                                 queue(_queue), 
-                                                                
consumerTag(""),
                                                                 id(_id),
                                                                 acquired(true),
                                                                 
confirmed(false),
@@ -74,13 +75,13 @@
     return range->covers(id);
 }
 
-void DeliveryRecord::redeliver(SemanticState* const session) const{
+void DeliveryRecord::redeliver(SemanticState* const session) {
     if (!confirmed) {
         if(pull){
             //if message was originally sent as response to get, we must 
requeue it
             requeue();
         }else{
-            session->deliver(msg.payload, consumerTag, id);
+            id = session->redeliver(msg.payload, token);
         }
     }
 }
@@ -151,11 +152,17 @@
 namespace qpid {
 namespace broker {
 
-std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r) {
+std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r) 
+{
     out << "{" << "id=" << r.id.getValue();
-    out << ", consumer=" << r.consumerTag;
+    out << ", tag=" << r.tag << "}";
     out << ", queue=" << r.queue->getName() << "}";
     return out;
+}
+
+bool operator<(const DeliveryRecord& a, const DeliveryRecord& b)
+{
+    return a.id < b.id;
 }
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=581175&r1=581174&r2=581175&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Tue Oct  2 
02:54:59 2007
@@ -29,6 +29,7 @@
 #include "Queue.h"
 #include "Consumer.h"
 #include "DeliveryId.h"
+#include "DeliveryToken.h"
 #include "Message.h"
 #include "Prefetch.h"
 
@@ -42,16 +43,17 @@
 class DeliveryRecord{
     mutable QueuedMessage msg;
     mutable Queue::shared_ptr queue;
-    const std::string consumerTag;
-    const DeliveryId id;
+    const std::string tag;
+    DeliveryToken::shared_ptr token;
+    DeliveryId id;
     bool acquired;
     const bool confirmed;
     const bool pull;
 
   public:
-    DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const 
std::string consumerTag, 
+    DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const 
std::string tag, DeliveryToken::shared_ptr token, 
                    const DeliveryId id, bool acquired, bool confirmed = false);
-    DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const 
DeliveryId id);
+    DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const 
DeliveryId id);
             
     void dequeue(TransactionContext* ctxt = 0) const;
     bool matches(DeliveryId tag) const;
@@ -61,17 +63,16 @@
     void requeue() const;
     void release();
     void reject();
-    void redeliver(SemanticState* const) const;
+    void redeliver(SemanticState* const);
     void updateByteCredit(uint32_t& credit) const;
     void addTo(Prefetch&) const;
     void subtractFrom(Prefetch&) const;
-    const std::string& getConsumerTag() const { return consumerTag; } 
+    const std::string& getTag() const { return tag; } 
     bool isPull() const { return pull; }
     bool isAcquired() const { return acquired; }
-    //void setAcquired(bool isAcquired) { acquired = isAcquired; }
     void acquire(std::vector<DeliveryId>& results);
-            
-  friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
+    friend bool operator<(const DeliveryRecord&, const DeliveryRecord&);       
  
+    friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
 };
 
 typedef std::list<DeliveryRecord>::iterator ack_iterator; 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?rev=581175&r1=581174&r2=581175&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Tue Oct  
2 02:54:59 2007
@@ -176,12 +176,6 @@
     return outgoing.hwm;
 }
 
-void SemanticHandler::redeliver(Message::shared_ptr& msg, 
DeliveryToken::shared_ptr token, DeliveryId tag)
-{
-    MessageDelivery::deliver(msg, session.getHandler().out, tag, token,
-                             session.getConnection().getFrameMax());
-}
-
 SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)
 {
     //will be replaced by field in 0-10 frame header

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h?rev=581175&r1=581174&r2=581175&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h Tue Oct  2 
02:54:59 2007
@@ -71,7 +71,6 @@
 
     //delivery adapter methods:
     DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr 
token);
-    void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, 
DeliveryId tag);
 
     framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
     Connection& getConnection() { return session.getConnection(); }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=581175&r1=581174&r2=581175&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Oct  2 
02:54:59 2007
@@ -250,7 +250,7 @@
             DeliveryId deliveryTag =
                 parent->deliveryAdapter.deliver(msg.payload, token);
             if (windowing || ackExpected) {
-                parent->record(DeliveryRecord(msg, queue, name, deliveryTag, 
acquire, !ackExpected));
+                parent->record(DeliveryRecord(msg, queue, name, token, 
deliveryTag, acquire, !ackExpected));
             }
         }
         return !blocked;
@@ -273,11 +273,6 @@
     }
 }
 
-void SemanticState::ConsumerImpl::redeliver(Message::shared_ptr& msg, 
DeliveryId deliveryTag) {
-    Mutex::ScopedLock locker(parent->deliveryLock);
-    parent->deliveryAdapter.redeliver(msg, token, deliveryTag);
-}
-
 SemanticState::ConsumerImpl::~ConsumerImpl() {
     cancel();
 }
@@ -384,7 +379,7 @@
 void SemanticState::acknowledged(const DeliveryRecord& delivery)
 {
     delivery.subtractFrom(outstanding);
-    ConsumerImplMap::iterator i = consumers.find(delivery.getConsumerTag());
+    ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
     if (i != consumers.end()) {
         i->acknowledged(delivery);
     }
@@ -411,6 +406,10 @@
         for_each(copy.rbegin(), copy.rend(), 
mem_fun_ref(&DeliveryRecord::requeue));
     }else{
         for_each(unacked.begin(), unacked.end(), 
bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));        
+        //unconfirmed messages re redelivered and therefore have their
+        //id adjusted, confirmed messages are not and so the ordering
+        //w.r.t id is lost
+        unacked.sort();
     }
 }
 
@@ -429,13 +428,10 @@
     }
 }
 
-void SemanticState::deliver(Message::shared_ptr& msg, const string& 
consumerTag,
-                      DeliveryId deliveryTag)
+DeliveryId SemanticState::redeliver(Message::shared_ptr& msg, 
DeliveryToken::shared_ptr token)
 {
-    ConsumerImplMap::iterator i = consumers.find(consumerTag);
-    if (i != consumers.end()){
-        i->redeliver(msg, deliveryTag);
-    }
+    Mutex::ScopedLock locker(deliveryLock);
+    return deliveryAdapter.deliver(msg, token);
 }
 
 void SemanticState::flow(bool active)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=581175&r1=581174&r2=581175&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Tue Oct  2 
02:54:59 2007
@@ -77,7 +77,6 @@
                      bool ack, bool nolocal, bool acquire);
         ~ConsumerImpl();
         bool deliver(QueuedMessage& msg);            
-        void redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag);
         void cancel();
         void requestDispatch();
 
@@ -169,7 +168,7 @@
     void ackRange(DeliveryId deliveryTag, DeliveryId endTag);
     void recover(bool requeue);
     void flow(bool active);
-    void deliver(Message::shared_ptr& msg, const string& consumerTag, 
DeliveryId deliveryTag);            
+    DeliveryId redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr 
token);            
     void acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& 
acquired);
     void release(DeliveryId first, DeliveryId last);
     void reject(DeliveryId first, DeliveryId last);

Added: incubator/qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp?rev=581175&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp Tue Oct  2 
02:54:59 2007
@@ -0,0 +1,68 @@
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/DeliveryRecord.h"
+#include "qpid_test_plugin.h"
+#include <iostream>
+#include <memory>
+#include <boost/format.hpp>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+using namespace qpid::framing;
+using boost::dynamic_pointer_cast;
+using std::list;
+
+class DeliveryRecordTest : public CppUnit::TestCase  
+{
+    CPPUNIT_TEST_SUITE(DeliveryRecordTest);
+    CPPUNIT_TEST(testSort);
+    CPPUNIT_TEST_SUITE_END();
+
+public:
+
+    void testSort()
+    {
+        list<SequenceNumber> ids;
+        ids.push_back(SequenceNumber(6));
+        ids.push_back(SequenceNumber(2));
+        ids.push_back(SequenceNumber(4));
+        ids.push_back(SequenceNumber(5));
+        ids.push_back(SequenceNumber(1));
+        ids.push_back(SequenceNumber(3));
+
+        list<DeliveryRecord> records;
+        for (list<SequenceNumber>::iterator i = ids.begin(); i != ids.end(); 
i++) {
+            records.push_back(DeliveryRecord(QueuedMessage(), 
Queue::shared_ptr(), "tag", DeliveryToken::shared_ptr(), *i, false, false));
+        }
+        records.sort();
+
+        SequenceNumber expected(0);
+        for (list<DeliveryRecord>::iterator i = records.begin(); i != 
records.end(); i++) {
+            CPPUNIT_ASSERT(i->matches(++expected));
+        }
+    }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(DeliveryRecordTest);
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=581175&r1=581174&r2=581175&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Tue Oct  2 02:54:59 2007
@@ -86,6 +86,7 @@
 broker_unit_tests =    \
   AccumulatedAckTest   \
   DtxWorkRecordTest     \
+  DeliveryRecordTest    \
   ExchangeTest         \
   HeadersExchangeTest  \
   MessageTest          \

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp?rev=581175&r1=581174&r2=581175&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp Tue Oct  2 02:54:59 
2007
@@ -78,7 +78,7 @@
             messages.push_back(msg);
             QueuedMessage qm;
             qm.payload = msg;
-            deliveries.push_back(DeliveryRecord(qm, queue, "xyz", (i+1), 
true));
+            deliveries.push_back(DeliveryRecord(qm, queue, "xyz", 
DeliveryToken::shared_ptr(), (i+1), true));
         }
 
         //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not)

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=581175&r1=581174&r2=581175&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Tue Oct  2 02:54:59 
2007
@@ -187,6 +187,46 @@
             self.fail("Got unexpected message: " + extra.content.body)
         except Empty: None
 
+
+    def test_recover(self):
+        """
+        Test recover behaviour
+        """
+        channel = self.channel
+        channel.queue_declare(queue="queue-a", exclusive=True)
+        channel.queue_bind(exchange="amq.fanout", queue="queue-a")
+        channel.queue_declare(queue="queue-b", exclusive=True)
+        channel.queue_bind(exchange="amq.fanout", queue="queue-b")
+        
+        self.subscribe(queue="queue-a", destination="unconfirmed", 
confirm_mode=1)
+        self.subscribe(queue="queue-b", destination="confirmed", 
confirm_mode=0)
+        confirmed = self.client.queue("confirmed")
+        unconfirmed = self.client.queue("unconfirmed")
+
+        data = ["One", "Two", "Three", "Four", "Five"]
+        for d in data:
+            channel.message_transfer(destination="amq.fanout", 
content=Content(body=d))
+
+        for q in [confirmed, unconfirmed]:    
+            for d in data:
+                self.assertEqual(d, q.get(timeout=1).content.body)
+            self.assertEmpty(q)        
+
+        channel.message_recover(requeue=False)
+
+        self.assertEmpty(confirmed)
+
+        while len(data):
+            msg = None
+            for d in data:
+                msg = unconfirmed.get(timeout=1)
+                self.assertEqual(d, msg.content.body)
+            self.assertEmpty(unconfirmed)
+            data.remove(msg.content.body)
+            msg.complete(cumulative=False)
+            channel.message_recover(requeue=False)
+        
+
     def test_recover_requeue(self):
         """
         Test requeing on recovery
@@ -551,3 +591,9 @@
 
     def assertDataEquals(self, channel, msg, expected):
         self.assertEquals(expected, msg.content.body)
+
+    def assertEmpty(self, queue):
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Queue not empty, contains: " + extra.content.body)
+        except Empty: None


Reply via email to