Author: aconway
Date: Wed Oct 31 09:56:57 2007
New Revision: 590751
URL: http://svn.apache.org/viewvc?rev=590751&view=rev
Log:
Simplify SessionState, preparing for session thread safety fixes.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.h
incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=590751&r1=590750&r2=590751&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Wed Oct 31
09:56:57 2007
@@ -38,7 +38,8 @@
connection(c), channel(ch, &c.getOutput()),
proxy(out), // Via my own handleOut() for L2 data.
peerSession(channel), // Direct to channel for L2 commands.
- ignoring(false) {}
+ ignoring(false),
+ resuming(false) {}
SessionHandler::~SessionHandler() {}
@@ -114,7 +115,8 @@
assertClosed("resume");
session = connection.broker.getSessionManager().resume(id);
session->attach(*this);
- SequenceNumber seq = session->resuming();
+ resuming=true;
+ SequenceNumber seq = session->sendingAck();
peerSession.attached(session->getId(), session->getTimeout());
proxy.getSession().ack(seq, SequenceNumberSet());
}
@@ -148,7 +150,7 @@
}
void SessionHandler::localSuspend() {
- if (session.get() && session->getState() == SessionState::ATTACHED) {
+ if (session.get()) {
session->detach();
connection.broker.getSessionManager().suspend(session);
}
@@ -166,7 +168,8 @@
const SequenceNumberSet& /*seenFrameSet*/)
{
assertAttached("ack");
- if (session->getState() == SessionState::RESUMING) {
+ if (resuming) {
+ resuming=false;
session->receivedAck(cumulativeSeenMark);
framing::SessionState::Replay replay=session->replay();
std::for_each(replay.begin(), replay.end(),
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=590751&r1=590750&r2=590751&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Wed Oct 31
09:56:57 2007
@@ -92,6 +92,7 @@
framing::AMQP_ClientProxy proxy;
framing::AMQP_ClientProxy::Session peerSession;
bool ignoring;
+ bool resuming;
std::auto_ptr<SessionState> session;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp?rev=590751&r1=590750&r2=590751&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp Wed Oct 31
09:56:57 2007
@@ -56,7 +56,6 @@
void SessionManager::suspend(std::auto_ptr<SessionState> session) {
Mutex::ScopedLock l(lock);
active.erase(session->getId());
- session->suspend();
session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC);
suspended.push_back(session.release()); // In expiry order
eraseExpired();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=590751&r1=590750&r2=590751&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Wed Oct 31
09:56:57 2007
@@ -52,7 +52,6 @@
break;
case RESUMING:
assert(session);
- assert(session->getState() == SessionState::RESUMING);
assert(code==REPLY_SUCCESS);
assert(connection);
assert(channel.get());
@@ -143,7 +142,6 @@
if (state != CLOSED) {
invariant();
detach(code, text);
- session->suspend();
setState(SUSPENDED);
}
}
@@ -202,7 +200,7 @@
if (state==OPEN)
doSuspend(REPLY_SUCCESS, OK);
check(state==SUSPENDED, COMMAND_INVALID, QPID_MSG("Session cannot be
resumed."));
- SequenceNumber sendAck=session->resuming();
+ SequenceNumber sendAck=session->sendingAck();
attaching(c);
proxy.resume(getId());
waitFor(OPEN);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.cpp?rev=590751&r1=590750&r2=590751&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.cpp Wed Oct 31
09:56:57 2007
@@ -33,7 +33,6 @@
namespace framing {
SessionState::SessionState(uint32_t ack, const Uuid& uuid) :
- state(ATTACHED),
id(uuid),
lastReceived(-1),
lastSent(-1),
@@ -45,7 +44,6 @@
{}
SessionState::SessionState(const Uuid& uuid) :
- state(ATTACHED),
id(uuid),
lastReceived(-1),
lastSent(-1),
@@ -65,10 +63,6 @@
boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) {
if (isSessionCommand(f))
return boost::none;
- if (state==RESUMING)
- throw CommandInvalidException(
- QPID_MSG("Invalid frame: Resuming session, expected session-ack"));
- assert(state = ATTACHED);
++lastReceived;
QPID_LOG(trace, "Recv # "<< lastReceived << " " << id);
if (ackInterval && lastReceived == sendAckAt)
@@ -85,7 +79,6 @@
++lastSent;
QPID_LOG(trace, "Sent # "<< lastSent << " " << id);
return ackInterval &&
- (state!=RESUMING) &&
(lastSent == solicitAckAt) &&
sendingSolicit();
}
@@ -97,8 +90,6 @@
}
void SessionState::receivedAck(SequenceNumber acked) {
- if (state==RESUMING) state=ATTACHED;
- assert(state==ATTACHED);
if (lastSent < acked)
throw InvalidArgumentException("Invalid sequence number in ack");
size_t keep = lastSent - acked;
@@ -113,22 +104,10 @@
}
bool SessionState::sendingSolicit() {
- assert(state == ATTACHED);
if (ackSolicited)
return false;
solicitAckAt = lastSent + ackInterval;
return ackInterval != 0;
-}
-
-SequenceNumber SessionState::resuming() {
- if (!resumable)
- throw InternalErrorException("Session is not resumable");
- state = RESUMING;
- return sendingAck();
-}
-
-void SessionState::suspend() {
- state = SUSPENDED;
}
}} // namespace qpid::framing
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.h?rev=590751&r1=590750&r2=590751&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.h Wed Oct 31
09:56:57 2007
@@ -35,7 +35,10 @@
/**
* Session state common to client and broker.
- * Stores replay frames, implements session ack/resume protcools.
+ *
+ * Stores data needed to resume a session: replay frames, implements
+ * session ack/resume protcools. Stores handler chains for the session,
+ * handlers may themselves store state.
*
* A SessionState is always associated with an _open_ session (attached or
* suspended) it is destroyed when the session is closed.
@@ -46,13 +49,6 @@
public:
typedef std::vector<AMQFrame> Replay;
- /** States of a session. */
- enum State {
- SUSPENDED, ///< Suspended, detached from any channel.
- RESUMING, ///< Resuming: waiting for initial ack from peer.
- ATTACHED ///< Attached to channel and operating normally.
- };
-
/**
*Create a newly opened active session.
[EMAIL PROTECTED] ackInterval send/solicit an ack whenever N unacked
frames
@@ -60,7 +56,8 @@
*
* N=0 disables voluntary send/solict ack.
*/
- SessionState(uint32_t ackInterval, const framing::Uuid&
id=framing::Uuid(true));
+ SessionState(uint32_t ackInterval,
+ const framing::Uuid& id=framing::Uuid(true));
/**
* Create a non-resumable session. Does not store session frames,
@@ -69,7 +66,6 @@
SessionState(const framing::Uuid& id=framing::Uuid(true));
const framing::Uuid& getId() const { return id; }
- State getState() const { return state; }
/** Received incoming L3 frame.
* @return SequenceNumber if an ack should be sent, empty otherwise.
@@ -92,13 +88,6 @@
*/
Replay replay();
- /** Suspend the session. */
- void suspend();
-
- /** Start resume protocol for the session.
- [EMAIL PROTECTED] sequence number to ack immediately. */
- SequenceNumber resuming();
-
/** About to send an unscheduled ack, e.g. to respond to a solicit-ack.
*
* Note: when received() returns a sequence number this function
@@ -115,9 +104,7 @@
bool sendingSolicit();
- State state;
framing::Uuid id;
-
Unacked unackedOut;
SequenceNumber lastReceived;
SequenceNumber lastSent;
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp?rev=590751&r1=590750&r2=590751&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp Wed Oct 31
09:56:57 2007
@@ -97,21 +97,16 @@
// Replay of all frames.
SessionState session(100);
sent(session, "abc");
- session.suspend(); session.resuming();
session.receivedAck(-1);
BOOST_CHECK_EQUAL(replayChars(session.replay()), "abc");
// Replay with acks
session.receivedAck(0); // ack a.
- session.suspend();
- session.resuming();
session.receivedAck(1); // ack b.
BOOST_CHECK_EQUAL(replayChars(session.replay()), "c");
// Replay after further frames.
sent(session, "def");
- session.suspend();
- session.resuming();
session.receivedAck(3);
BOOST_CHECK_EQUAL(replayChars(session.replay()), "ef");