Author: aconway
Date: Mon Oct 29 09:50:45 2007
New Revision: 589731
URL: http://svn.apache.org/viewvc?rev=589731&view=rev
Log:
##-*-text-*-
Added qpidd --ack option to set ack/solicit-ack interval. 0 disabled acks.
Sessions with 0 timeout never ack and don't store replay frames.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=589731&r1=589730&r2=589731&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon Oct 29
09:50:45 2007
@@ -65,7 +65,7 @@
storeAsync(false),
enableMgmt(0),
mgmtPubInterval(10),
- ack(100)
+ ack(100)
{
addOptions()
("port,p", optValue(port,"PORT"),
@@ -87,7 +87,9 @@
("mgmt,m", optValue(enableMgmt,"yes|no"),
"Enable Management")
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"),
- "Management Publish Interval");
+ "Management Publish Interval")
+ ("ack", optValue(ack, "N"),
+ "Send ack/solicit-ack at least every N frames. 0 disables voluntary
acks/solitict-ack");
}
const std::string empty;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.cpp?rev=589731&r1=589730&r2=589731&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.cpp Mon Oct 29
09:50:45 2007
@@ -40,11 +40,22 @@
ackInterval(ack),
sendAckAt(lastReceived+ackInterval),
solicitAckAt(lastSent+ackInterval),
- ackSolicited(false)
+ ackSolicited(false),
+ resumable(true)
+{}
+
+SessionState::SessionState(const Uuid& uuid) :
+ state(ATTACHED),
+ id(uuid),
+ lastReceived(-1),
+ lastSent(-1),
+ ackInterval(0),
+ sendAckAt(0),
+ solicitAckAt(0),
+ ackSolicited(false),
+ resumable(false)
{
- assert(ackInterval > 0);
}
-
namespace {
bool isSessionCommand(const AMQFrame& f) {
return f.getMethod() && f.getMethod()->amqpClassId() == SESSION_CLASS_ID;
@@ -58,10 +69,9 @@
throw CommandInvalidException(
QPID_MSG("Invalid frame: Resuming session, expected session-ack"));
assert(state = ATTACHED);
- assert(lastReceived<sendAckAt);
++lastReceived;
QPID_LOG(trace, "Recv # "<< lastReceived << " " << id);
- if (lastReceived == sendAckAt)
+ if (ackInterval && lastReceived == sendAckAt)
return sendingAck();
else
return boost::none;
@@ -70,10 +80,12 @@
bool SessionState::sent(const AMQFrame& f) {
if (isSessionCommand(f))
return false;
- unackedOut.push_back(f);
+ if (resumable)
+ unackedOut.push_back(f);
++lastSent;
QPID_LOG(trace, "Sent # "<< lastSent << " " << id);
- return (state!=RESUMING) &&
+ return ackInterval &&
+ (state!=RESUMING) &&
(lastSent == solicitAckAt) &&
sendingSolicit();
}
@@ -105,10 +117,12 @@
if (ackSolicited)
return false;
solicitAckAt = lastSent + ackInterval;
- return true;
+ return ackInterval != 0;
}
SequenceNumber SessionState::resuming() {
+ if (!resumable)
+ throw InternalErrorException("Session is not resumable");
state = RESUMING;
return sendingAck();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.h?rev=589731&r1=589730&r2=589731&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.h Mon Oct 29
09:50:45 2007
@@ -35,12 +35,11 @@
/**
* Session state common to client and broker.
- * Implements session ack/resume protcools.
+ * Stores replay frames, implements session ack/resume protcools.
*
* A SessionState is always associated with an _open_ session (attached or
* suspended) it is destroyed when the session is closed.
*
- * A template to make it protocol independent and easy to test.
*/
class SessionState
{
@@ -58,9 +57,16 @@
*Create a newly opened active session.
[EMAIL PROTECTED] ackInterval send/solicit an ack whenever N unacked
frames
* have been received/sent.
- [EMAIL PROTECTED] ackInterval > 0
+ *
+ * N=0 disables voluntary send/solict ack.
+ */
+ SessionState(uint32_t ackInterval, const framing::Uuid&
id=framing::Uuid(true));
+
+ /**
+ * Create a non-resumable session. Does not store session frames,
+ * never volunteers ack or solicit-ack.
*/
- SessionState(uint32_t ackInterval=1, const framing::Uuid&
id=framing::Uuid(true));
+ SessionState(const framing::Uuid& id=framing::Uuid(true));
const framing::Uuid& getId() const { return id; }
State getState() const { return state; }
@@ -103,6 +109,7 @@
SequenceNumber getLastSent() const { return lastSent; }
SequenceNumber getLastReceived() const { return lastReceived; }
+
private:
typedef std::deque<AMQFrame> Unacked;
@@ -110,6 +117,7 @@
State state;
framing::Uuid id;
+
Unacked unackedOut;
SequenceNumber lastReceived;
SequenceNumber lastSent;
@@ -118,6 +126,7 @@
SequenceNumber solicitAckAt;
bool ackSolicited;
bool suspending;
+ bool resumable;
};