Author: aconway
Date: Wed Jun  4 09:00:17 2008
New Revision: 663271

URL: http://svn.apache.org/viewvc?rev=663271&view=rev
Log:
Increased default flush interval to 1MB, send spontaneous known-completed at 
the flush interval.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp?rev=663271&r1=663270&r2=663271&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp Wed Jun  4 09:00:17 
2008
@@ -90,6 +90,10 @@
 }
 
 
+SessionState::SendState::SendState() : unflushedSize(), replaySize(), 
bytesSinceKnownCompleted() {}
+
+SessionState::ReceiveState::ReceiveState() {}
+
 SessionPoint SessionState::senderGetCommandPoint() { return sender.sendPoint; }
 SequenceSet  SessionState::senderGetIncomplete() const { return 
sender.incomplete; }
 SessionPoint SessionState::senderGetReplayPoint() const { return 
sender.replayPoint; }
@@ -112,6 +116,7 @@
     stateful = true;
     if (timeout) sender.replayList.push_back(f);
     sender.unflushedSize += f.size();
+    sender.bytesSinceKnownCompleted += f.size();
     sender.replaySize += f.size();
     sender.incomplete += sender.sendPoint.command;
     sender.sendPoint.advance(f);
@@ -129,6 +134,18 @@
     sender.unflushedSize = 0;
 }
 
+bool SessionState::senderNeedKnownCompleted() const {
+    // FIXME aconway 2008-06-04: this is unpleasant - replayFlushLimit == 0
+    // means never send spontaneous flush, but sends a knownCompleted for
+    // every completed. Need separate configuration?
+    // 
+    return sender.bytesSinceKnownCompleted >= config.replayFlushLimit;
+}
+
+void SessionState::senderRecordKnownCompleted() {
+    sender.bytesSinceKnownCompleted = 0;
+}
+
 void SessionState::senderConfirmed(const SessionPoint& confirmed) {
     if (confirmed > sender.sendPoint)
         throw InvalidArgumentException(QPID_MSG(getId() << "Confirmed commands 
not yet sent."));
@@ -213,7 +230,8 @@
 SessionState::Configuration::Configuration(size_t flush, size_t hard) :
     replayFlushLimit(flush), replayHardLimit(hard) {}
 
-SessionState::SessionState(const SessionId& i, const Configuration& c) : 
id(i), timeout(), config(c), stateful()
+SessionState::SessionState(const SessionId& i, const Configuration& c)
+  : id(i), timeout(), config(c), stateful()
 {
     QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h?rev=663271&r1=663270&r2=663271&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h Wed Jun  4 09:00:17 
2008
@@ -78,7 +78,7 @@
     typedef boost::iterator_range<ReplayList::iterator> ReplayRange;
 
     struct Configuration {
-        Configuration(size_t flush=0, size_t hard=0);
+        Configuration(size_t flush=1024*1024, size_t hard=0);
         size_t replayFlushLimit; // Flush when the replay list >= N bytes. 0 
disables.
         size_t replayHardLimit; // Kill session if replay list > N bytes. 0 
disables.
     };
@@ -108,6 +108,12 @@
     /** Called when flush for confirmed and completed commands is sent to 
peer. */
     virtual void senderRecordFlush();
 
+    /** True if we should reply to the next incoming completed command */
+    virtual bool senderNeedKnownCompleted() const;
+
+    /** Called when knownCompleted is sent to peer. */
+    virtual void senderRecordKnownCompleted();
+
     /** Called when the peer confirms up to comfirmed. */
     virtual void senderConfirmed(const SessionPoint& confirmed);
 
@@ -128,7 +134,6 @@
     */
     virtual ReplayRange senderExpected(const SessionPoint& expected);
 
-
     // ==== Functions for receiver state
 
     /** Set the command point. */
@@ -161,7 +166,7 @@
   private:
 
     struct SendState {
-        SendState() : unflushedSize(), replaySize() {}
+        SendState();
         // invariant: replayPoint <= flushPoint <= sendPoint
         SessionPoint replayPoint;   // Can replay from this point
         SessionPoint flushPoint;    // Point of last flush
@@ -170,16 +175,15 @@
         size_t unflushedSize;       // Un-flushed bytes in replay list.
         size_t replaySize;          // Total bytes in replay list.
         SequenceSet incomplete;     // Commands sent and not yet completed.
+        size_t bytesSinceKnownCompleted; // Bytes sent since we last issued a 
knownCompleted.
     } sender;
 
     struct ReceiveState {
-        ReceiveState() {}
+        ReceiveState();
         SessionPoint expected;  // Expected from here
         SessionPoint received; // Received to here. Invariant: expected <= 
received.
         SequenceSet unknownCompleted; // Received & completed, may not  not 
known-complete by peer.
         SequenceSet incomplete;       // Incomplete received commands.
-        int segmentType;
-        
     } receiver;
 
     SessionId id;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=663271&r1=663270&r2=663271&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Wed Jun 
 4 09:00:17 2008
@@ -196,11 +196,13 @@
         getState()->senderConfirmed(commands.rangesBegin()->last());
 }
 
-void SessionHandler::completed(const SequenceSet& commands, bool 
/*timelyReply*/) {
+void SessionHandler::completed(const SequenceSet& commands, bool timelyReply) {
     checkAttached();
     getState()->senderCompleted(commands);
-    if (!commands.empty())
-        peer.knownCompleted(commands);    // Always send a timely reply
+    if (getState()->senderNeedKnownCompleted() || timelyReply) {
+        peer.knownCompleted(commands);
+        getState()->senderRecordKnownCompleted();
+    }
 }
 
 void SessionHandler::knownCompleted(const SequenceSet& commands) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=663271&r1=663270&r2=663271&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Jun  4 
09:00:17 2008
@@ -84,7 +84,7 @@
     enableMgmt(1),
     mgmtPubInterval(10),
     auth(AUTH_DEFAULT),
-    replayFlushLimit(64),
+    replayFlushLimit(1024),
     replayHardLimit(0)
 {
     int c = sys::SystemInfo::concurrency();

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp?rev=663271&r1=663270&r2=663271&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp Wed Jun  4 
09:00:17 2008
@@ -260,4 +260,41 @@
     // TODO aconway 2008-04-30: missing tests for known-completed.
 }
 
+QPID_AUTO_TEST_CASE(testNeedKnownCompleted) {
+    size_t flushInterval= 2*(transferFrameSize()+contentFrameSize())+1;
+    qpid::SessionState::Configuration c(flushInterval);
+    qpid::SessionState s(qpid::SessionId(), c);
+    s.senderGetCommandPoint();
+    transfers(s, "a");
+    SequenceSet set(SequenceSet() + 0);
+    s.senderCompleted(set);
+    BOOST_CHECK(!s.senderNeedKnownCompleted());
+
+    transfers(s, "b");
+    set += 1;
+    s.senderCompleted(set);
+    BOOST_CHECK(!s.senderNeedKnownCompleted());
+
+    transfers(s, "c");
+    set += 2;
+    s.senderCompleted(set);
+    BOOST_CHECK(s.senderNeedKnownCompleted());
+    s.senderRecordKnownCompleted();
+    BOOST_CHECK(!s.senderNeedKnownCompleted());
+
+    transfers(s, "de");
+    set += 3;
+    set += 4;
+    s.senderCompleted(set);
+    BOOST_CHECK(!s.senderNeedKnownCompleted());
+
+    transfers(s, "f");
+    set += 2;
+    s.senderCompleted(set);
+    BOOST_CHECK(s.senderNeedKnownCompleted());
+    s.senderRecordKnownCompleted();
+    BOOST_CHECK(!s.senderNeedKnownCompleted());
+}
+
+
 QPID_AUTO_TEST_SUITE_END()


Reply via email to