Author: aconway
Date: Fri Jun 13 10:36:23 2008
New Revision: 667603

URL: http://svn.apache.org/viewvc?rev=667603&view=rev
Log:
Fix for broker wraparound problem.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp?rev=667603&r1=667602&r2=667603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp Fri Jun 13 10:36:23 
2008
@@ -124,18 +124,20 @@
         throw ResourceLimitExceededException("Replay buffer exceeeded hard 
limit");
 }
 
+static const uint32_t SPONTANEOUS_REQUEST_INTERVAL = 65536; 
+
 bool SessionState::senderNeedFlush() const {
-    return config.replayFlushLimit && sender.unflushedSize >= 
config.replayFlushLimit;
+    return (sender.sendPoint.command % SPONTANEOUS_REQUEST_INTERVAL == 0) ||
+        (config.replayFlushLimit && sender.unflushedSize >= 
config.replayFlushLimit);
 }
 
 void SessionState::senderRecordFlush() {
-    assert(sender.flushPoint <= sender.sendPoint);
     sender.flushPoint = sender.sendPoint;
     sender.unflushedSize = 0;
 }
 
 bool SessionState::senderNeedKnownCompleted() const {
-    return sender.bytesSinceKnownCompleted >= config.replayFlushLimit;
+    return config.replayFlushLimit && sender.bytesSinceKnownCompleted >= 
config.replayFlushLimit;
 }
 
 void SessionState::senderRecordKnownCompleted() {
@@ -214,7 +216,8 @@
 }
 
 bool SessionState::receiverNeedKnownCompleted() const {
-    return receiver.bytesSinceKnownCompleted >= config.replayFlushLimit;
+    return (receiver.expected.command % SPONTANEOUS_REQUEST_INTERVAL == 0) ||
+        (config.replayFlushLimit && receiver.bytesSinceKnownCompleted >= 
config.replayFlushLimit);
 }
         
 const SessionPoint& SessionState::receiverGetExpected() const { return 
receiver.expected; }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=667603&r1=667602&r2=667603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Fri Jun 
13 10:36:23 2008
@@ -75,6 +75,8 @@
                 throw IllegalStateException(QPID_MSG(getState()->getId() << ": 
Not ready to receive data"));
             if (!getState()->receiverRecord(f))
                 return; // Ignore duplicates.
+            if (getState()->receiverNeedKnownCompleted())
+                sendCompletion();
             getInHandler()->handle(f);
         }
     }
@@ -94,13 +96,22 @@
     }
 }
 
+namespace {
+bool isControl(const AMQFrame& f) {
+    return f.getMethod() && f.getMethod()->type() == framing::CONTROL;
+}
+bool isCommand(const AMQFrame& f) {
+    return f.getMethod() && f.getMethod()->type() == framing::COMMAND;
+}
+} // namespace
+
 void SessionHandler::handleOut(AMQFrame& f) {
     checkAttached();
     if (!sendReady)
         throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not 
ready to send data"));
     getState()->senderRecord(f); 
-    if (getState()->senderNeedFlush()) {
-        peer.flush(false, true, true); 
+    if (isCommand(f) && getState()->senderNeedFlush()) {
+        peer.flush(false, false, true); 
         getState()->senderRecordFlush();
     }
     channel.handle(f);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=667603&r1=667602&r2=667603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Jun 13 
10:36:23 2008
@@ -85,7 +85,7 @@
     mgmtPubInterval(10),
     auth(AUTH_DEFAULT),
     realm("QPID"),
-    replayFlushLimit(1024),
+    replayFlushLimit(0),
     replayHardLimit(0)
 {
     int c = sys::SystemInfo::concurrency();
@@ -109,9 +109,7 @@
         ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
         ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), 
"Management Publish Interval")
         ("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled 
all incoming connections will be trusted")
-        ("realm", optValue(realm, "REALM"), "Use the given realm when 
performing authentication")         
-        ("replay-flush-limit", optValue(replayFlushLimit, "KB"), "Send flush 
request when the replay buffer reaches this limit. 0 means no limit.")
-        ("replay-hard-limit", optValue(replayHardLimit, "KB"), "Kill a session 
if its replay buffer exceeds this limit. 0 means no limit.");
+        ("realm", optValue(realm, "REALM"), "Use the given realm when 
performing authentication");
 }
 
 const std::string empty;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp?rev=667603&r1=667602&r2=667603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp Fri Jun 
13 10:36:23 2008
@@ -26,7 +26,7 @@
 using qpid::framing::SequenceNumber;
 using qpid::framing::Buffer;
 
-SequenceNumber::SequenceNumber() : value(0 - 1) {}
+SequenceNumber::SequenceNumber() : value(0) {}
 
 SequenceNumber::SequenceNumber(uint32_t v) : value((int32_t) v) {}
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp?rev=667603&r1=667602&r2=667603&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp Fri Jun 13 10:36:23 
2008
@@ -48,7 +48,8 @@
     OstreamOutput(std::ostream& o) : out(&o) {}
 
     OstreamOutput(const string& file)
-        : out(new ofstream(file.c_str())), mine(out)
+        : out(new ofstream(file.c_str(), ios_base::out | ios_base::app)),
+          mine(out)
     {
         if (!out->good())
             throw std::runtime_error("Can't open log file: "+file);


Reply via email to