Author: aconway
Date: Wed Jun 4 09:00:17 2008
New Revision: 663271
URL: http://svn.apache.org/viewvc?rev=663271&view=rev
Log:
Increased default flush interval to 1MB, send spontaneous known-completed at
the flush interval.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp?rev=663271&r1=663270&r2=663271&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp Wed Jun 4 09:00:17
2008
@@ -90,6 +90,10 @@
}
+SessionState::SendState::SendState() : unflushedSize(), replaySize(),
bytesSinceKnownCompleted() {}
+
+SessionState::ReceiveState::ReceiveState() {}
+
SessionPoint SessionState::senderGetCommandPoint() { return sender.sendPoint; }
SequenceSet SessionState::senderGetIncomplete() const { return
sender.incomplete; }
SessionPoint SessionState::senderGetReplayPoint() const { return
sender.replayPoint; }
@@ -112,6 +116,7 @@
stateful = true;
if (timeout) sender.replayList.push_back(f);
sender.unflushedSize += f.size();
+ sender.bytesSinceKnownCompleted += f.size();
sender.replaySize += f.size();
sender.incomplete += sender.sendPoint.command;
sender.sendPoint.advance(f);
@@ -129,6 +134,18 @@
sender.unflushedSize = 0;
}
+bool SessionState::senderNeedKnownCompleted() const {
+ // FIXME aconway 2008-06-04: this is unpleasant - replayFlushLimit == 0
+ // means never send spontaneous flush, but sends a knownCompleted for
+ // every completed. Need separate configuration?
+ //
+ return sender.bytesSinceKnownCompleted >= config.replayFlushLimit;
+}
+
+void SessionState::senderRecordKnownCompleted() {
+ sender.bytesSinceKnownCompleted = 0;
+}
+
void SessionState::senderConfirmed(const SessionPoint& confirmed) {
if (confirmed > sender.sendPoint)
throw InvalidArgumentException(QPID_MSG(getId() << "Confirmed commands
not yet sent."));
@@ -213,7 +230,8 @@
SessionState::Configuration::Configuration(size_t flush, size_t hard) :
replayFlushLimit(flush), replayHardLimit(hard) {}
-SessionState::SessionState(const SessionId& i, const Configuration& c) :
id(i), timeout(), config(c), stateful()
+SessionState::SessionState(const SessionId& i, const Configuration& c)
+ : id(i), timeout(), config(c), stateful()
{
QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h?rev=663271&r1=663270&r2=663271&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h Wed Jun 4 09:00:17
2008
@@ -78,7 +78,7 @@
typedef boost::iterator_range<ReplayList::iterator> ReplayRange;
struct Configuration {
- Configuration(size_t flush=0, size_t hard=0);
+ Configuration(size_t flush=1024*1024, size_t hard=0);
size_t replayFlushLimit; // Flush when the replay list >= N bytes. 0
disables.
size_t replayHardLimit; // Kill session if replay list > N bytes. 0
disables.
};
@@ -108,6 +108,12 @@
/** Called when flush for confirmed and completed commands is sent to
peer. */
virtual void senderRecordFlush();
+ /** True if we should reply to the next incoming completed command */
+ virtual bool senderNeedKnownCompleted() const;
+
+ /** Called when knownCompleted is sent to peer. */
+ virtual void senderRecordKnownCompleted();
+
/** Called when the peer confirms up to comfirmed. */
virtual void senderConfirmed(const SessionPoint& confirmed);
@@ -128,7 +134,6 @@
*/
virtual ReplayRange senderExpected(const SessionPoint& expected);
-
// ==== Functions for receiver state
/** Set the command point. */
@@ -161,7 +166,7 @@
private:
struct SendState {
- SendState() : unflushedSize(), replaySize() {}
+ SendState();
// invariant: replayPoint <= flushPoint <= sendPoint
SessionPoint replayPoint; // Can replay from this point
SessionPoint flushPoint; // Point of last flush
@@ -170,16 +175,15 @@
size_t unflushedSize; // Un-flushed bytes in replay list.
size_t replaySize; // Total bytes in replay list.
SequenceSet incomplete; // Commands sent and not yet completed.
+ size_t bytesSinceKnownCompleted; // Bytes sent since we last issued a
knownCompleted.
} sender;
struct ReceiveState {
- ReceiveState() {}
+ ReceiveState();
SessionPoint expected; // Expected from here
SessionPoint received; // Received to here. Invariant: expected <=
received.
SequenceSet unknownCompleted; // Received & completed, may not not
known-complete by peer.
SequenceSet incomplete; // Incomplete received commands.
- int segmentType;
-
} receiver;
SessionId id;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=663271&r1=663270&r2=663271&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Wed Jun
4 09:00:17 2008
@@ -196,11 +196,13 @@
getState()->senderConfirmed(commands.rangesBegin()->last());
}
-void SessionHandler::completed(const SequenceSet& commands, bool
/*timelyReply*/) {
+void SessionHandler::completed(const SequenceSet& commands, bool timelyReply) {
checkAttached();
getState()->senderCompleted(commands);
- if (!commands.empty())
- peer.knownCompleted(commands); // Always send a timely reply
+ if (getState()->senderNeedKnownCompleted() || timelyReply) {
+ peer.knownCompleted(commands);
+ getState()->senderRecordKnownCompleted();
+ }
}
void SessionHandler::knownCompleted(const SequenceSet& commands) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=663271&r1=663270&r2=663271&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Jun 4
09:00:17 2008
@@ -84,7 +84,7 @@
enableMgmt(1),
mgmtPubInterval(10),
auth(AUTH_DEFAULT),
- replayFlushLimit(64),
+ replayFlushLimit(1024),
replayHardLimit(0)
{
int c = sys::SystemInfo::concurrency();
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp?rev=663271&r1=663270&r2=663271&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp Wed Jun 4
09:00:17 2008
@@ -260,4 +260,41 @@
// TODO aconway 2008-04-30: missing tests for known-completed.
}
+QPID_AUTO_TEST_CASE(testNeedKnownCompleted) {
+ size_t flushInterval= 2*(transferFrameSize()+contentFrameSize())+1;
+ qpid::SessionState::Configuration c(flushInterval);
+ qpid::SessionState s(qpid::SessionId(), c);
+ s.senderGetCommandPoint();
+ transfers(s, "a");
+ SequenceSet set(SequenceSet() + 0);
+ s.senderCompleted(set);
+ BOOST_CHECK(!s.senderNeedKnownCompleted());
+
+ transfers(s, "b");
+ set += 1;
+ s.senderCompleted(set);
+ BOOST_CHECK(!s.senderNeedKnownCompleted());
+
+ transfers(s, "c");
+ set += 2;
+ s.senderCompleted(set);
+ BOOST_CHECK(s.senderNeedKnownCompleted());
+ s.senderRecordKnownCompleted();
+ BOOST_CHECK(!s.senderNeedKnownCompleted());
+
+ transfers(s, "de");
+ set += 3;
+ set += 4;
+ s.senderCompleted(set);
+ BOOST_CHECK(!s.senderNeedKnownCompleted());
+
+ transfers(s, "f");
+ set += 2;
+ s.senderCompleted(set);
+ BOOST_CHECK(s.senderNeedKnownCompleted());
+ s.senderRecordKnownCompleted();
+ BOOST_CHECK(!s.senderNeedKnownCompleted());
+}
+
+
QPID_AUTO_TEST_SUITE_END()