Author: gsim
Date: Mon Oct 15 11:34:39 2007
New Revision: 584841

URL: http://svn.apache.org/viewvc?rev=584841&view=rev
Log:
Locking for updates to subscriber credit. Revised log statements to reduce 
noise.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=584841&r1=584840&r2=584841&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Oct 15 
11:34:39 2007
@@ -255,7 +255,6 @@
 
             DeliveryId deliveryTag =
                 parent->deliveryAdapter.deliver(msg.payload, token);
-            QPID_LOG(debug, "Message delivered for destination " << name);
             if (windowing || ackExpected) {
                 parent->record(DeliveryRecord(msg, queue, name, token, 
deliveryTag, acquire, !ackExpected));
             }
@@ -266,18 +265,23 @@
 
 bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
 {
-    QPID_LOG(debug, "Credit check for destination " << name << " byteCredit: " 
<< byteCredit << " msgCredit: " << msgCredit);
     Mutex::ScopedLock l(lock);
     if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < 
msg->getRequiredCredit())) {
-        QPID_LOG(debug, "Credit is empty for destination " << name);
+        QPID_LOG(debug, "Not enough credit for '" << name << "', bytes: " << 
byteCredit << " msgs: " << msgCredit);
         return false;
     } else {
+        uint32_t originalMsgCredit = msgCredit;
+        uint32_t originalByteCredit = byteCredit;        
+
         if (msgCredit != 0xFFFFFFFF) {
             msgCredit--;
         }
         if (byteCredit != 0xFFFFFFFF) {
             byteCredit -= msg->getRequiredCredit();
         }
+        QPID_LOG(debug, "Credit available for '" << name 
+                 << "', was " << " bytes: " << originalByteCredit << " msgs: " 
<< originalMsgCredit
+                 << " now bytes: " << byteCredit << " msgs: " << msgCredit);
         return true;
     }
 }
@@ -388,7 +392,7 @@
 }
 
 void SemanticState::acknowledged(const DeliveryRecord& delivery)
-{
+{    
     delivery.subtractFrom(outstanding);
     ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
     if (i != consumers.end()) {
@@ -399,6 +403,7 @@
 void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
 {
     if (windowing) {
+        Mutex::ScopedLock l(lock);
         if (msgCredit != 0xFFFFFFFF) msgCredit++;
         if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit);
     }
@@ -480,14 +485,12 @@
 void SemanticState::addByteCredit(const std::string& destination, uint32_t 
value)
 {
     find(destination).addByteCredit(value);
-    QPID_LOG(debug, "Byte Credits Requested for " << destination << ": " << 
value);   
 }
 
 
 void SemanticState::addMessageCredit(const std::string& destination, uint32_t 
value)
 {
     find(destination).addMessageCredit(value);
-    QPID_LOG(debug, "Message Credit Requested for " << destination << ": " << 
value);
 }
 
 void SemanticState::flush(const std::string& destination)
@@ -514,13 +517,19 @@
 
 void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
 {
-    byteCredit += value;
+    {
+        Mutex::ScopedLock l(lock);
+        byteCredit += value;
+    }
     requestDispatch();
 }
 
 void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
 {
-    msgCredit += value;
+    {
+        Mutex::ScopedLock l(lock);
+        msgCredit += value;
+    }
     requestDispatch();
 }
 
@@ -535,6 +544,7 @@
 
 void SemanticState::ConsumerImpl::stop()
 {
+    Mutex::ScopedLock l(lock);
     msgCredit = 0;
     byteCredit = 0;
 }


Reply via email to