Author: aconway
Date: Sun Apr 27 11:32:26 2008
New Revision: 651997

URL: http://svn.apache.org/viewvc?rev=651997&view=rev
Log:
Session state as per AMQP 0-10 specification.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.h
    incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=651997&r1=651996&r2=651997&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Sun Apr 27 11:32:26 2008
@@ -138,6 +138,8 @@
   $(rgen_framing_srcs) \
   $(platform_src) \
   qpid/Serializer.h \
+  qpid/SessionState.cpp \
+  qpid/SessionState.h \
   qpid/framing/AccumulatedAck.cpp \
   qpid/framing/AMQBody.cpp \
   qpid/framing/AMQMethodBody.cpp \

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp?rev=651997&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp Sun Apr 27 11:32:26 
2008
@@ -0,0 +1,165 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// FIXME aconway 2008-04-24: Reminders for handler implementation.
+//
+// - execution.sync results must be communicated to 
SessionState::peerConfirmed.
+//
+// 
+
+#include "SessionState.h"
+#include "qpid/amqp_0_10/exceptions.h"
+#include "qpid/framing/AMQMethodBody.h"
+#include <boost/bind.hpp>
+#include <numeric>
+
+namespace qpid {
+using framing::AMQFrame;
+using amqp_0_10::NotImplementedException;
+
+/** A point in the session - command id + offset */
+void SessionPoint::advance(const AMQFrame& f) {
+    if (f.isLastSegment() && f.isLastFrame()) {
+        ++command;
+        offset = 0;
+    }
+    else {
+        // TODO aconway 2008-04-24: if we go to support for partial
+        // command replay, then it may be better to record the unframed
+        // data size in a command point rather than the framed size so
+        // that the relationship of fragment offsets to the replay
+        // list can be computed more easily.
+        // 
+        offset += f.size();
+    }
+}
+
+bool SessionPoint::operator<(const SessionPoint& x) const {
+    return command < x.command || (command == x.command && offset < x.offset);
+}
+
+bool SessionPoint::operator==(const SessionPoint& x) const {
+    return command == x.command && offset == x.offset;
+}
+
+SendState::SendState(size_t syncSize, size_t killSize)
+    : replaySyncSize(syncSize), replayKillSize(killSize), unflushedSize() {}
+
+void SendState::send(const AMQFrame& f) {
+    if (f.getMethod() && f.getMethod()->type() == 0)         
+        return;                 // Don't replay control frames.
+    replayList.push_back(f);
+    unflushedSize += f.size();
+    sendPoint.advance(f);
+}
+
+bool SendState::needFlush() const { return unflushedSize >= replaySyncSize; }
+
+void SendState::sendFlush() {
+    assert(flushPoint <= sendPoint);
+    flushPoint = sendPoint;
+    unflushedSize = 0;
+}
+
+void SendState::peerConfirmed(const SessionPoint& confirmed) {
+    ReplayList::iterator i = replayList.begin();
+    // Ignore peerConfirmed.offset, we don't support partial replay.
+    while (i != replayList.end() && replayPoint.command < confirmed.command) {
+        assert(replayPoint <= flushPoint);
+        replayPoint.advance(*i);
+        assert(replayPoint <= sendPoint);
+        if (replayPoint > flushPoint) {
+            flushPoint.advance(*i);
+            assert(replayPoint <= flushPoint);
+            unflushedSize -= i->size();
+        }
+        ++i;
+    }
+    replayList.erase(replayList.begin(), i);
+    assert(replayPoint.offset == 0);
+}
+
+void SendState::peerCompleted(const SequenceSet& commands) {
+    if (commands.empty()) return;
+    sentCompleted += commands;
+    // Completion implies confirmation but we don't handle out-of-order
+    // confirmation, so confirm only the first contiguous range of commands.
+    peerConfirmed(SessionPoint(commands.rangesBegin()->end()));
+}
+
+bool ReceiveState::hasState() { return stateful; }
+
+void ReceiveState::setExpecting(const SessionPoint& point) {
+    if (!hasState())          // initializing a new session.
+        expecting = received = point;
+    else {                  // setting point in an existing session.
+        if (point > received)
+            throw NotImplementedException("command-point out of bounds.");
+        expecting = point;
+    }
+}
+
+ReceiveState::ReceiveState() : stateful() {}
+
+bool ReceiveState::receive(const AMQFrame& f) {
+    stateful = true;
+    expecting.advance(f);
+    if (expecting > received) {
+        received = expecting;
+        return true;
+    }
+    return false;
+}
+
+void ReceiveState::localCompleted(SequenceNumber command) {
+    assert(command < received.command); // Can't complete what we haven't 
received.
+    receivedCompleted += command;
+}
+    
+void ReceiveState::peerKnownComplete(const SequenceSet& commands) {
+    receivedCompleted -= commands;
+}
+
+SessionId::SessionId(const std::string& u, const std::string& n) : userId(u), 
name(n) {}
+
+bool SessionId::operator<(const SessionId& id) const {
+    return userId < id.userId || (userId == id.userId && name < id.name);
+}
+
+bool SessionId::operator==(const SessionId& id) const {
+    return id.name == name  && id.userId == userId;
+}
+
+SessionState::Configuration::Configuration()
+    : replaySyncSize(std::numeric_limits<size_t>::max()),
+      replayKillSize(std::numeric_limits<size_t>::max()) {}
+
+SessionState::SessionState(const SessionId& i, const Configuration& c)
+    : SendState(c.replaySyncSize, c.replayKillSize),
+      id(i), timeout(), config(c) {}
+
+void SessionState::clear() { *this = SessionState(id, config); }
+
+std::ostream& operator<<(std::ostream& o, const SessionPoint& p) {
+    return o << "(" << p.command.getValue() << "+" << p.offset << ")";
+}
+
+} // namespace qpid

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h?rev=651997&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h Sun Apr 27 11:32:26 
2008
@@ -0,0 +1,188 @@
+#ifndef QPID_SESSIONSTATE_H
+#define QPID_SESSIONSTATE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/framing/SequenceNumber.h>
+#include <qpid/framing/SequenceSet.h>
+#include <qpid/framing/AMQFrame.h>
+#include <boost/operators.hpp>
+#include <vector>
+#include <iosfwd>
+
+namespace qpid {
+using framing::SequenceNumber;
+using framing::SequenceSet;
+
+/** A point in the session. Points to command id + offset */
+struct SessionPoint : boost::totally_ordered1<SessionPoint> {
+    SessionPoint(SequenceNumber command_=0, uint64_t offset_ = 0) : 
command(command_), offset(offset_) {}
+
+    SequenceNumber command;
+    uint64_t offset;
+
+    /** Advance past frame f */
+    void advance(const framing::AMQFrame& f);
+
+    bool operator<(const SessionPoint&) const;
+    bool operator==(const SessionPoint&) const;
+};
+
+std::ostream& operator<<(std::ostream&, const SessionPoint&);
+
+/** The sending half of session state */
+class SendState {
+  public:
+    typedef std::vector<framing::AMQFrame> ReplayList;
+
+    /** Record frame f for replay. Should not be called during replay. */
+    void send(const framing::AMQFrame& f);
+
+    /** @return true if we should send flush for confirmed and completed 
commands. */
+    bool needFlush() const;
+
+    /** Called when flush for confirmed and completed commands is sent to 
peer. */
+    void sendFlush();
+
+    /** Called when the peer confirms up to commands. */
+    void peerConfirmed(const SessionPoint& confirmed);
+
+    /** Called when the peer indicates commands completed */
+    void peerCompleted(const SequenceSet& commands);
+
+    /** Get the replay list. @see getReplayPoint. */
+    const ReplayList& getReplayList() const { return replayList; }
+
+    /**
+     * The replay point is the point up to which all data has been
+     * confirmed.  Partial replay is not supported, it will always
+     * have offset==0.
+     */
+    const SessionPoint& getReplayPoint() const { return replayPoint; }
+
+    const SessionPoint& getSendPoint() const { return sendPoint; } 
+    const SequenceSet& getCompleted() const { return sentCompleted; }
+
+  protected:
+    SendState(size_t replaySyncSize, size_t replayKillSize);
+
+  private:
+    size_t replaySyncSize, replayKillSize; // @see SessionState::Configuration.
+    // invariant: replayPoint <= flushPoint <= sendPoint
+    SessionPoint replayPoint;   // Can replay from this point
+    SessionPoint sendPoint;     // Send from this point
+    SessionPoint flushPoint;    // Point of last flush
+    ReplayList replayList; // Starts from replayPoint.
+    size_t unflushedSize;       // Un-flushed bytes in replay list.
+    SequenceSet sentCompleted; // Commands sent and acknowledged as completed.
+};
+
+/** Receiving half of SessionState */
+class ReceiveState {
+  public:
+    bool hasState();
+
+    /** Set the command point. */
+    void setExpecting(const SessionPoint& point);
+
+    /** Returns true if frame should be be processed, false if it is a 
duplicate. */
+    bool receive(const framing::AMQFrame& f);
+
+    /** Command completed locally */
+    void localCompleted(SequenceNumber command);
+
+    /** Peer has indicated commands are known completed */
+    void peerKnownComplete(const SequenceSet& commands);
+
+    /** Recieved, completed and possibly not known by peer to be completed */
+    const SequenceSet& getReceivedCompleted() const { return 
receivedCompleted; }
+    const SessionPoint& getExpecting() const { return expecting; }
+    const SessionPoint& getReceived() const { return received; }
+
+  protected:
+    ReceiveState();
+
+  private:
+    bool stateful;              // True if session has state.
+    SessionPoint expecting;     // Expecting from here
+    SessionPoint received;      // Received to here. Invariant: expecting <= 
received.
+    SequenceSet receivedCompleted; // Received & completed, may not be not 
known-completed by peer
+};
+
+/** Identifier for a session */
+class SessionId : boost::totally_ordered1<SessionId> {
+    std::string userId;
+    std::string name;
+  public:
+    SessionId(const std::string& userId=std::string(), const std::string& 
name=std::string());
+    std::string getUserId() const { return userId; }
+    std::string getName() const { return name; }
+    bool operator<(const SessionId&) const ;
+    bool operator==(const SessionId& id) const;
+};
+
+
+/**
+ * Support for session idempotence barrier and resume as defined in
+ * AMQP 0-10.
+ *
+ * We only issue/use contiguous confirmations, out-of-order confirmation
+ * is ignored. Out of order completion is fully supported.
+ * 
+ * Raises NotImplemented if the command point is set greater than the
+ * max currently received command data, either explicitly via
+ * session.command-point or implicitly via session.gap.
+ *
+ * Partial replay is not supported, replay always begins on a command
+ * boundary, and we never confirm partial commands.
+ *
+ * The SessionPoint data structure does store offsets so this class
+ * could be extended to support partial replay without
+ * source-incompatbile API changes.
+ */
+class SessionState : public SendState, public ReceiveState {
+  public:
+    struct Configuration {
+        Configuration();
+        size_t replaySyncSize; // Issue a sync when the replay list holds >= N 
bytes
+        size_t replayKillSize; // Kill session if replay list grows beyond N 
bytes.
+    };
+    
+    SessionState(const SessionId& =SessionId(), const Configuration& 
=Configuration());
+
+    const SessionId& getId() const { return id; }
+    uint32_t getTimeout() const { return timeout; }
+    void setTimeout(uint32_t seconds) { timeout = seconds; }
+
+    /** Clear all state except Id. */
+    void clear();
+
+  private:
+    SessionId id;
+    uint32_t timeout;
+    Configuration config;
+};
+
+} // namespace qpid
+
+
+#endif  /*!QPID_SESSIONSTATE_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.h?rev=651997&r1=651996&r2=651997&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.h Sun Apr 27 
11:32:26 2008
@@ -31,6 +31,8 @@
 class SequenceSet : public RangeSet<SequenceNumber> {
   public:
     SequenceSet() {}
+    explicit SequenceSet(const RangeSet<SequenceNumber>& r)
+        : RangeSet<SequenceNumber>(r) {}
     explicit SequenceSet(const SequenceNumber& s) { add(s); }
     
     void encode(Buffer& buffer) const;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp?rev=651997&r1=651996&r2=651997&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp Sun Apr 27 
11:32:26 2008
@@ -16,17 +16,33 @@
  *
  */
 
-#include "qpid/framing/SessionState.h"
+#include "unit_test.h"
+
+#include "qpid/framing/SessionState.h" // FIXME aconway 2008-04-23: preview 
code to remove.
+#include "qpid/SessionState.h"
 #include "qpid/Exception.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/SessionFlushBody.h"
 
 #include <boost/bind.hpp>
-#include "unit_test.h"
+#include <algorithm>
+#include <functional>
+#include <numeric>
 
 QPID_AUTO_TEST_SUITE(SessionStateTestSuite)
 
 using namespace std;
-using namespace qpid::framing;
 using namespace boost;
+using namespace qpid::framing;
+
+// ================================================================
+// Utility functions.
+
+// Apply f to [begin, end) and accumulate the result
+template <class Iter, class T, class F>
+T applyAccumulate(Iter begin, Iter end, T seed, const F& f) {
+    return std::accumulate(begin, end, seed, bind(std::plus<T>(), _1, bind(f, 
_2)));
+}
 
 // Create a frame with a one-char string.
 AMQFrame& frame(char s) {
@@ -35,13 +51,215 @@
     return frame;
 }
 
-// Extract the one-char string from a frame.
-char charFromFrame(const AMQFrame& f) {
-    const AMQContentBody* b=dynamic_cast<const AMQContentBody*>(f.getBody());
-    BOOST_REQUIRE(b && b->getData().size() > 0);
-    return b->getData()[0];
+// Simple string representation of a frame.
+string str(const AMQFrame& f) {
+    if (f.getMethod()) return "C"; // Command or Control
+    const AMQContentBody* c = dynamic_cast<const AMQContentBody*>(f.getBody());
+    if (c) return c->getData(); // Return data for content frames.
+    return "H";                 // Must be a header.
+}
+// Make a string from a range of frames.
+string str(const vector<AMQFrame>& frames) {
+    string (*strFrame)(const AMQFrame&) = str;
+    return applyAccumulate(frames.begin(), frames.end(), string(), 
ptr_fun(strFrame));
+}
+// Make a transfer command frame.
+AMQFrame transferFrame(bool hasContent) {
+    AMQFrame t(in_place<MessageTransferBody>());
+    t.setFirstFrame();
+    t.setLastFrame();
+    t.setFirstSegment();
+    t.setLastSegment(!hasContent);
+    return t;
+}
+// Make a content frame
+AMQFrame contentFrame(string content, bool isLast=true) {
+    AMQFrame f(in_place<AMQContentBody>(content));
+    f.setFirstFrame();
+    f.setLastFrame();
+    f.setLastSegment(isLast);
+    return f;
+}
+AMQFrame contentFrameChar(char content, bool isLast=true) {
+    return contentFrame(string(1, content), isLast);
 }
 
+// Send frame & return size of frame.
+size_t send(qpid::SessionState& s, const AMQFrame& f) { s.send(f); return 
f.size(); }
+// Send transfer command with no content.
+size_t transfer0(qpid::SessionState& s) { return send(s, 
transferFrame(false)); }
+// Send transfer frame with single content frame.
+size_t transfer1(qpid::SessionState& s, string content) {
+    return send(s,transferFrame(true)) + send(s,contentFrame(content));
+}
+size_t transfer1Char(qpid::SessionState& s, char content) {
+    return transfer1(s, string(1,content));
+}
+        
+// Send transfer frame with multiple single-byte content frames.
+size_t transferN(qpid::SessionState& s, string content) {
+    size_t size=send(s, transferFrame(!content.empty()));
+    if (!content.empty()) {
+        char last = content[content.size()-1];
+        content.resize(content.size()-1);
+        size += applyAccumulate(content.begin(), content.end(), 0,
+                                bind(&send, ref(s),
+                                     bind(contentFrameChar, _1, false)));
+        size += send(s, contentFrameChar(last, true));
+    }
+    return size;
+}
+
+// Send multiple transfers with single-byte content.
+size_t transfers(qpid::SessionState& s, string content) {
+    return applyAccumulate(content.begin(), content.end(), 0,
+                           bind(transfer1Char, ref(s), _1));
+}
+
+size_t contentFrameSize(size_t n=1) { return 
AMQFrame(in_place<AMQContentBody>()).size() + n; }
+size_t transferFrameSize() { return 
AMQFrame(in_place<MessageTransferBody>()).size(); }
+
+// ==== qpid::SessionState test classes
+
+using qpid::SessionId;
+using qpid::SessionPoint;
+
+
+QPID_AUTO_TEST_CASE(testSendGetReplyList) {
+    qpid::SessionState s;
+    transfer1(s, "abc");
+    transfers(s, "def");
+    transferN(s, "xyz");
+    BOOST_CHECK_EQUAL(str(s.getReplayList()),"CabcCdCeCfCxyz");
+    // Ignore controls.
+    s.send(AMQFrame(in_place<SessionFlushBody>()));
+    BOOST_CHECK_EQUAL(str(s.getReplayList()),"CabcCdCeCfCxyz");    
+}
+
+QPID_AUTO_TEST_CASE(testNeedFlush) {
+    qpid::SessionState::Configuration c;
+    // sync after 2 1-byte transfers or equivalent bytes.
+    c.replaySyncSize = 2*(transferFrameSize()+contentFrameSize());
+    qpid::SessionState s(SessionId(), c);
+    transfers(s, "a");
+    BOOST_CHECK(!s.needFlush());
+    transfers(s, "b");
+    BOOST_CHECK(s.needFlush());
+    s.sendFlush();
+    BOOST_CHECK(!s.needFlush());
+    transfers(s, "c");
+    BOOST_CHECK(!s.needFlush());
+    transfers(s, "d");
+    BOOST_CHECK(s.needFlush());
+    BOOST_CHECK_EQUAL(str(s.getReplayList()), "CaCbCcCd");
+}
+
+QPID_AUTO_TEST_CASE(testPeerConfirmed) {
+    qpid::SessionState::Configuration c;
+    // sync after 2 1-byte transfers or equivalent bytes.
+    c.replaySyncSize = 2*(transferFrameSize()+contentFrameSize());
+    qpid::SessionState s(SessionId(), c);
+    transfers(s, "ab");
+    BOOST_CHECK(s.needFlush());
+    transfers(s, "cd");
+    BOOST_CHECK_EQUAL(str(s.getReplayList()), "CaCbCcCd");
+    s.peerConfirmed(SessionPoint(3));
+    BOOST_CHECK_EQUAL(str(s.getReplayList()), "Cd");
+    BOOST_CHECK(!s.needFlush());
+
+    // Never go backwards.
+    s.peerConfirmed(SessionPoint(2));
+    s.peerConfirmed(SessionPoint(3));
+
+    // Multi-frame transfer.
+    transfer1(s, "efg");
+    transfers(s, "xy");
+    BOOST_CHECK_EQUAL(str(s.getReplayList()), "CdCefgCxCy");
+    BOOST_CHECK(s.needFlush());
+
+    s.peerConfirmed(SessionPoint(4));
+    BOOST_CHECK_EQUAL(str(s.getReplayList()), "CefgCxCy");
+    BOOST_CHECK(s.needFlush());
+
+    s.peerConfirmed(SessionPoint(5));
+    BOOST_CHECK_EQUAL(str(s.getReplayList()), "CxCy");
+    BOOST_CHECK(s.needFlush());
+    
+    s.peerConfirmed(SessionPoint(6));
+    BOOST_CHECK_EQUAL(str(s.getReplayList()), "Cy");
+    BOOST_CHECK(!s.needFlush());
+}
+
+QPID_AUTO_TEST_CASE(testPeerCompleted) {
+    qpid::SessionState s;
+    // Completion implies confirmation 
+    transfers(s, "abc");
+    BOOST_CHECK_EQUAL(str(s.getReplayList()), "CaCbCc");
+    SequenceSet set(SequenceSet() + 0 + 1);
+    s.peerCompleted(set);
+    BOOST_CHECK_EQUAL(str(s.getReplayList()), "Cc");
+
+    transfers(s, "def");
+    // We dont do out-of-order confirmation, so this will only confirm up to 3:
+    set = SequenceSet(SequenceSet() + 2 + 3 + 5);
+    s.peerCompleted(set);    
+    BOOST_CHECK_EQUAL(str(s.getReplayList()), "CeCf");
+}
+
+QPID_AUTO_TEST_CASE(testReceive) {
+    // Advance expecting/received correctly
+    qpid::SessionState s;
+    BOOST_CHECK(!s.hasState());
+    BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(0));
+    BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(0));
+    
+    BOOST_CHECK(s.receive(transferFrame(false)));
+    BOOST_CHECK(s.hasState());
+    BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(1));
+    BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(1));
+    
+    BOOST_CHECK(s.receive(transferFrame(true)));
+    SessionPoint point = SessionPoint(1, transferFrameSize());
+    BOOST_CHECK_EQUAL(s.getExpecting(), point);
+    BOOST_CHECK_EQUAL(s.getReceived(), point);
+    BOOST_CHECK(s.receive(contentFrame("", false)));
+    point.offset += contentFrameSize(0);
+    BOOST_CHECK_EQUAL(s.getExpecting(), point);
+    BOOST_CHECK_EQUAL(s.getReceived(), point);
+    BOOST_CHECK(s.receive(contentFrame("", true)));
+    BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(2));
+    BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(2));
+
+    // Idempotence barrier, rewind expecting & receive some duplicates.
+    s.setExpecting(SessionPoint(1));
+    BOOST_CHECK(!s.receive(transferFrame(false)));
+    BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(2));
+    BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(2));
+    BOOST_CHECK(s.receive(transferFrame(false)));
+    BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(3));
+    BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(3));
+}
+
+QPID_AUTO_TEST_CASE(testCompleted) {
+    // completed & unknownCompleted
+    qpid::SessionState s;
+    s.receive(transferFrame(false));
+    s.receive(transferFrame(false));
+    s.receive(transferFrame(false));
+    s.localCompleted(1);
+    BOOST_CHECK_EQUAL(s.getReceivedCompleted(), SequenceSet(SequenceSet()+1));
+    s.localCompleted(0);
+    BOOST_CHECK_EQUAL(s.getReceivedCompleted(),
+                      SequenceSet(SequenceSet() + SequenceSet::Range(0,2)));
+    s.peerKnownComplete(SequenceSet(SequenceSet()+1));
+    BOOST_CHECK_EQUAL(s.getReceivedCompleted(), SequenceSet(SequenceSet()+2));
+}
+
+// ================================================================
+// FIXME aconway 2008-04-23: Below here is old preview framing::SessionState 
test, remove with preview code.
+
+using namespace qpid::framing;
+
 // Sent chars as frames
 void sent(SessionState& session, const std::string& frames) {
     for_each(frames.begin(), frames.end(),
@@ -54,26 +272,12 @@
              bind(&SessionState::received, ref(session), bind(frame, _1)));
 }
 
-// Make a string from a ReplayRange.
-std::string replayChars(const SessionState::Replay& frames) {
-    string result(frames.size(), ' ');
-    transform(frames.begin(), frames.end(), result.begin(),
-              bind(&charFromFrame, _1));
-    return result;
-}
-
-namespace qpid {
-namespace framing {
-
 bool operator==(const AMQFrame& a, const AMQFrame& b) {
     const AMQContentBody* ab=dynamic_cast<const AMQContentBody*>(a.getBody());
     const AMQContentBody* bb=dynamic_cast<const AMQContentBody*>(b.getBody());
     return ab && bb && ab->getData() == bb->getData();
 }
 
-}} // namespace qpid::framing
-
-
 QPID_AUTO_TEST_CASE(testSent) {
     // Test that we send solicit-ack at the right interval.
     AMQContentBody f; 
@@ -101,21 +305,21 @@
     sent(session, "abc"); 
     session.suspend(); session.resuming();
     session.receivedAck(-1);
-    BOOST_CHECK_EQUAL(replayChars(session.replay()), "abc");
+    BOOST_CHECK_EQUAL(str(session.replay()), "abc");
 
     // Replay with acks
     session.receivedAck(0); // ack a.
     session.suspend();
     session.resuming();
     session.receivedAck(1); // ack b.
-    BOOST_CHECK_EQUAL(replayChars(session.replay()), "c");
+    BOOST_CHECK_EQUAL(str(session.replay()), "c");
 
     // Replay after further frames.
     sent(session, "def");
     session.suspend();
     session.resuming();
     session.receivedAck(3);
-    BOOST_CHECK_EQUAL(replayChars(session.replay()), "ef");
+    BOOST_CHECK_EQUAL(str(session.replay()), "ef");
 
     // Bad ack, too high
     try {


Reply via email to