Author: gsim
Date: Mon Oct 15 11:34:39 2007
New Revision: 584841
URL: http://svn.apache.org/viewvc?rev=584841&view=rev
Log:
Locking for updates to subscriber credit. Revised log statements to reduce
noise.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=584841&r1=584840&r2=584841&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Oct 15
11:34:39 2007
@@ -255,7 +255,6 @@
DeliveryId deliveryTag =
parent->deliveryAdapter.deliver(msg.payload, token);
- QPID_LOG(debug, "Message delivered for destination " << name);
if (windowing || ackExpected) {
parent->record(DeliveryRecord(msg, queue, name, token,
deliveryTag, acquire, !ackExpected));
}
@@ -266,18 +265,23 @@
bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
{
- QPID_LOG(debug, "Credit check for destination " << name << " byteCredit: "
<< byteCredit << " msgCredit: " << msgCredit);
Mutex::ScopedLock l(lock);
if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit <
msg->getRequiredCredit())) {
- QPID_LOG(debug, "Credit is empty for destination " << name);
+ QPID_LOG(debug, "Not enough credit for '" << name << "', bytes: " <<
byteCredit << " msgs: " << msgCredit);
return false;
} else {
+ uint32_t originalMsgCredit = msgCredit;
+ uint32_t originalByteCredit = byteCredit;
+
if (msgCredit != 0xFFFFFFFF) {
msgCredit--;
}
if (byteCredit != 0xFFFFFFFF) {
byteCredit -= msg->getRequiredCredit();
}
+ QPID_LOG(debug, "Credit available for '" << name
+ << "', was " << " bytes: " << originalByteCredit << " msgs: "
<< originalMsgCredit
+ << " now bytes: " << byteCredit << " msgs: " << msgCredit);
return true;
}
}
@@ -388,7 +392,7 @@
}
void SemanticState::acknowledged(const DeliveryRecord& delivery)
-{
+{
delivery.subtractFrom(outstanding);
ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
@@ -399,6 +403,7 @@
void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
{
if (windowing) {
+ Mutex::ScopedLock l(lock);
if (msgCredit != 0xFFFFFFFF) msgCredit++;
if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit);
}
@@ -480,14 +485,12 @@
void SemanticState::addByteCredit(const std::string& destination, uint32_t
value)
{
find(destination).addByteCredit(value);
- QPID_LOG(debug, "Byte Credits Requested for " << destination << ": " <<
value);
}
void SemanticState::addMessageCredit(const std::string& destination, uint32_t
value)
{
find(destination).addMessageCredit(value);
- QPID_LOG(debug, "Message Credit Requested for " << destination << ": " <<
value);
}
void SemanticState::flush(const std::string& destination)
@@ -514,13 +517,19 @@
void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
{
- byteCredit += value;
+ {
+ Mutex::ScopedLock l(lock);
+ byteCredit += value;
+ }
requestDispatch();
}
void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
{
- msgCredit += value;
+ {
+ Mutex::ScopedLock l(lock);
+ msgCredit += value;
+ }
requestDispatch();
}
@@ -535,6 +544,7 @@
void SemanticState::ConsumerImpl::stop()
{
+ Mutex::ScopedLock l(lock);
msgCredit = 0;
byteCredit = 0;
}