Author: gsim
Date: Wed Oct 15 03:03:49 2008
New Revision: 704838

URL: http://svn.apache.org/viewvc?rev=704838&view=rev
Log:
c++ broker: Don't hold on to delivery records for accepted/released messages 
unless required due to being in windowing mode.
python client: Modified start() on incoming queue to setthe flow mode as credit 
(not windowing)


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    incubator/qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp
    incubator/qpid/trunk/qpid/python/qpid/session.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=704838&r1=704837&r2=704838&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Wed Oct 15 
03:03:49 2008
@@ -32,36 +32,24 @@
                                const std::string _tag,
                                DeliveryToken::shared_ptr _token, 
                                const DeliveryId _id,
-                               bool _acquired, bool accepted) : msg(_msg), 
-                                                                queue(_queue), 
-                                                                tag(_tag),
-                                                                token(_token),
-                                                                id(_id),
-                                                                
acquired(_acquired),
-                                                                pull(false), 
-                                                                
cancelled(false),
-                                                                
credit(msg.payload ? msg.payload->getRequiredCredit() : 0),
-                                                                
size(msg.payload ? msg.payload->contentSize() : 0),
-                                                                
completed(false),
-                                                                ended(accepted)
+                               bool _acquired, bool accepted, 
+                               bool _windowing) : msg(_msg), 
+                                                  queue(_queue), 
+                                                  tag(_tag),
+                                                  token(_token),
+                                                  id(_id),
+                                                  acquired(_acquired),
+                                                  pull(false), 
+                                                  cancelled(false),
+                                                  credit(msg.payload ? 
msg.payload->getRequiredCredit() : 0),
+                                                  size(msg.payload ? 
msg.payload->contentSize() : 0),
+                                                  completed(false),
+                                                  ended(accepted),
+                                                  windowing(_windowing)
 {
     if (accepted) setEnded();
 }
 
-DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, 
-                               Queue::shared_ptr _queue, 
-                               const DeliveryId _id) : msg(_msg), 
-                                                       queue(_queue), 
-                                                       id(_id),
-                                                       acquired(true),
-                                                       pull(true),
-                                                       cancelled(false),
-                                                       credit(msg.payload ? 
msg.payload->getRequiredCredit() : 0),
-                                                       size(msg.payload ? 
msg.payload->contentSize() : 0),
-                                                       completed(false),
-                                                       ended(false)
-{}
-
 void DeliveryRecord::setEnded()
 {
     ended = true;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=704838&r1=704837&r2=704838&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Wed Oct 15 
03:03:49 2008
@@ -53,12 +53,11 @@
 
     bool completed;
     bool ended;
+    const bool windowing;
 
   public:
     DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const 
std::string tag, DeliveryToken::shared_ptr token, 
-                   const DeliveryId id, bool acquired, bool confirmed = false);
-    DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const 
DeliveryId id);
-            
+                   const DeliveryId id, bool acquired, bool confirmed, bool 
windowing);
     bool matches(DeliveryId tag) const;
     bool matchOrAfter(DeliveryId tag) const;
     bool after(DeliveryId tag) const;
@@ -77,7 +76,7 @@
 
     bool isAcquired() const { return acquired; }
     bool isComplete() const { return completed; }
-    bool isRedundant() const { return ended && completed; }
+    bool isRedundant() const { return ended && (!windowing || completed); }
 
     uint32_t getCredit() const;
     const std::string& getTag() const { return tag; } 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=704838&r1=704837&r2=704838&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Oct 15 
03:03:49 2008
@@ -265,7 +265,7 @@
     DeliveryId deliveryTag =
         parent->deliveryAdapter.deliver(msg, token);
     if (windowing || ackExpected || !acquire) {
-        parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, 
acquire, !ackExpected));
+        parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, 
acquire, !ackExpected, windowing));
     } 
     if (acquire && !ackExpected) {
         queue->dequeue(0, msg);
@@ -444,20 +444,6 @@
     }
 }
 
-bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr 
queue, bool ackExpected)
-{
-    QueuedMessage msg = queue->get();
-    if(msg.payload){
-        DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg, token);
-        if(ackExpected){
-            unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
-        }
-        return true;
-    }else{
-        return false;
-    }
-}
-
 DeliveryId SemanticState::redeliver(QueuedMessage& msg, 
DeliveryToken::shared_ptr token)
 {
     return deliveryAdapter.deliver(msg, token);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=704838&r1=704837&r2=704838&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Wed Oct 15 
03:03:49 2008
@@ -179,7 +179,6 @@
     void flush(const std::string& destination);
     void stop(const std::string& destination);
 
-    bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool 
ackExpected);
     void startTx();
     void commit(MessageStore* const store);
     void rollback();

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp?rev=704838&r1=704837&r2=704838&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp Wed Oct 15 
03:03:49 2008
@@ -45,7 +45,7 @@
 
     list<DeliveryRecord> records;
     for (list<SequenceNumber>::iterator i = ids.begin(); i != ids.end(); i++) {
-        records.push_back(DeliveryRecord(QueuedMessage(0), 
Queue::shared_ptr(), "tag", DeliveryToken::shared_ptr(), *i, false, false));
+        records.push_back(DeliveryRecord(QueuedMessage(0), 
Queue::shared_ptr(), "tag", DeliveryToken::shared_ptr(), *i, false, false, 
false));
     }
     records.sort();
 

Modified: incubator/qpid/trunk/qpid/python/qpid/session.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/session.py?rev=704838&r1=704837&r2=704838&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/session.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/session.py Wed Oct 15 03:03:49 2008
@@ -344,6 +344,7 @@
     self.destination = destination
 
   def start(self):
+    self.session.message_set_flow_mode(self.destination, 
self.session.flow_mode.credit)
     for unit in self.session.credit_unit.values():
       self.session.message_flow(self.destination, unit, 0xFFFFFFFF)
 


Reply via email to