Author: aconway
Date: Fri Jun 13 10:36:23 2008
New Revision: 667603
URL: http://svn.apache.org/viewvc?rev=667603&view=rev
Log:
Fix for broker wraparound problem.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp?rev=667603&r1=667602&r2=667603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp Fri Jun 13 10:36:23
2008
@@ -124,18 +124,20 @@
throw ResourceLimitExceededException("Replay buffer exceeeded hard
limit");
}
+static const uint32_t SPONTANEOUS_REQUEST_INTERVAL = 65536;
+
bool SessionState::senderNeedFlush() const {
- return config.replayFlushLimit && sender.unflushedSize >=
config.replayFlushLimit;
+ return (sender.sendPoint.command % SPONTANEOUS_REQUEST_INTERVAL == 0) ||
+ (config.replayFlushLimit && sender.unflushedSize >=
config.replayFlushLimit);
}
void SessionState::senderRecordFlush() {
- assert(sender.flushPoint <= sender.sendPoint);
sender.flushPoint = sender.sendPoint;
sender.unflushedSize = 0;
}
bool SessionState::senderNeedKnownCompleted() const {
- return sender.bytesSinceKnownCompleted >= config.replayFlushLimit;
+ return config.replayFlushLimit && sender.bytesSinceKnownCompleted >=
config.replayFlushLimit;
}
void SessionState::senderRecordKnownCompleted() {
@@ -214,7 +216,8 @@
}
bool SessionState::receiverNeedKnownCompleted() const {
- return receiver.bytesSinceKnownCompleted >= config.replayFlushLimit;
+ return (receiver.expected.command % SPONTANEOUS_REQUEST_INTERVAL == 0) ||
+ (config.replayFlushLimit && receiver.bytesSinceKnownCompleted >=
config.replayFlushLimit);
}
const SessionPoint& SessionState::receiverGetExpected() const { return
receiver.expected; }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=667603&r1=667602&r2=667603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Fri Jun
13 10:36:23 2008
@@ -75,6 +75,8 @@
throw IllegalStateException(QPID_MSG(getState()->getId() << ":
Not ready to receive data"));
if (!getState()->receiverRecord(f))
return; // Ignore duplicates.
+ if (getState()->receiverNeedKnownCompleted())
+ sendCompletion();
getInHandler()->handle(f);
}
}
@@ -94,13 +96,22 @@
}
}
+namespace {
+bool isControl(const AMQFrame& f) {
+ return f.getMethod() && f.getMethod()->type() == framing::CONTROL;
+}
+bool isCommand(const AMQFrame& f) {
+ return f.getMethod() && f.getMethod()->type() == framing::COMMAND;
+}
+} // namespace
+
void SessionHandler::handleOut(AMQFrame& f) {
checkAttached();
if (!sendReady)
throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not
ready to send data"));
getState()->senderRecord(f);
- if (getState()->senderNeedFlush()) {
- peer.flush(false, true, true);
+ if (isCommand(f) && getState()->senderNeedFlush()) {
+ peer.flush(false, false, true);
getState()->senderRecordFlush();
}
channel.handle(f);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=667603&r1=667602&r2=667603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Jun 13
10:36:23 2008
@@ -85,7 +85,7 @@
mgmtPubInterval(10),
auth(AUTH_DEFAULT),
realm("QPID"),
- replayFlushLimit(1024),
+ replayFlushLimit(0),
replayHardLimit(0)
{
int c = sys::SystemInfo::concurrency();
@@ -109,9 +109,7 @@
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"),
"Management Publish Interval")
("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled
all incoming connections will be trusted")
- ("realm", optValue(realm, "REALM"), "Use the given realm when
performing authentication")
- ("replay-flush-limit", optValue(replayFlushLimit, "KB"), "Send flush
request when the replay buffer reaches this limit. 0 means no limit.")
- ("replay-hard-limit", optValue(replayHardLimit, "KB"), "Kill a session
if its replay buffer exceeds this limit. 0 means no limit.");
+ ("realm", optValue(realm, "REALM"), "Use the given realm when
performing authentication");
}
const std::string empty;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp?rev=667603&r1=667602&r2=667603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp Fri Jun
13 10:36:23 2008
@@ -26,7 +26,7 @@
using qpid::framing::SequenceNumber;
using qpid::framing::Buffer;
-SequenceNumber::SequenceNumber() : value(0 - 1) {}
+SequenceNumber::SequenceNumber() : value(0) {}
SequenceNumber::SequenceNumber(uint32_t v) : value((int32_t) v) {}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp?rev=667603&r1=667602&r2=667603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp Fri Jun 13 10:36:23
2008
@@ -48,7 +48,8 @@
OstreamOutput(std::ostream& o) : out(&o) {}
OstreamOutput(const string& file)
- : out(new ofstream(file.c_str())), mine(out)
+ : out(new ofstream(file.c_str(), ios_base::out | ios_base::app)),
+ mine(out)
{
if (!out->good())
throw std::runtime_error("Can't open log file: "+file);