Author: aconway
Date: Thu May 22 08:51:15 2008
New Revision: 659139

URL: http://svn.apache.org/viewvc?rev=659139&view=rev
Log:
Improved logging for session state - show incomplete commands on 
receive-completed.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp?rev=659139&r1=659138&r2=659139&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp Thu May 22 08:51:15 
2008
@@ -32,6 +32,7 @@
 using amqp_0_10::InvalidArgumentException;
 using amqp_0_10::IllegalStateException;
 using amqp_0_10::ResourceLimitExceededException;
+using amqp_0_10::InternalErrorException;
 
 namespace {
 bool isControl(const AMQFrame& f) {
@@ -145,22 +146,25 @@
     stateful = true;
     receiver.expected.advance(f);
     bool firstTime = receiver.expected > receiver.received;
-    if (firstTime)
+    if (firstTime) {
         receiver.received = receiver.expected;
+        receiver.incomplete += receiverGetCurrent();
+    }
     QPID_LOG_IF(debug, f.getMethod(), getId() << ": recv cmd " << 
receiverGetCurrent() << ": " << *f.getMethod());
     QPID_LOG_IF(debug, !firstTime, "Ignoring duplicate frame: " << 
receiverGetCurrent() << ": " << f);
     return firstTime;
 }
     
 void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) {
-    assert(command <= receiver.received.command); // Internal error to 
complete an unreceived command.
-    assert(receiver.firstIncomplete <= command);
-    if (cumulative)
-        receiver.unknownCompleted.add(receiver.firstIncomplete, command);
-    else
-        receiver.unknownCompleted += command;
-    receiver.firstIncomplete = 
receiver.unknownCompleted.rangeContaining(receiver.firstIncomplete).end();
-    QPID_LOG(debug, getId() << ": receiver marked completed: " << command << " 
unknown: " << receiver.unknownCompleted);
+    if (!receiver.incomplete.contains(command))
+        throw InternalErrorException(QPID_MSG(getId() << "command is not 
received-incomplete: " << command ));
+    SequenceNumber first =cumulative ? receiver.incomplete.front() : command;
+    SequenceNumber last = command;
+    receiver.unknownCompleted.add(first, last);
+    receiver.incomplete.remove(first, last);
+    QPID_LOG(debug, getId() << ": receiver marked completed: " << command
+             << " incomplete: " << receiver.incomplete
+             << " unknown-completed: " << receiver.unknownCompleted);
 }
 
 void SessionState::receiverKnownCompleted(const SequenceSet& commands) {
@@ -173,6 +177,7 @@
 const SessionPoint& SessionState::receiverGetExpected() const { return 
receiver.expected; }
 const SessionPoint& SessionState::receiverGetReceived() const { return 
receiver.received; }
 const SequenceSet& SessionState::receiverGetUnknownComplete() const { return 
receiver.unknownCompleted; }
+const SequenceSet& SessionState::receiverGetIncomplete() const { return 
receiver.incomplete; }
 
 SequenceNumber SessionState::receiverGetCurrent() const {
     SequenceNumber current = receiver.expected.command;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h?rev=659139&r1=659138&r2=659139&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h Thu May 22 08:51:15 
2008
@@ -108,7 +108,7 @@
     /** Called when flush for confirmed and completed commands is sent to 
peer. */
     virtual void senderRecordFlush();
 
-        /** Called when the peer confirms up to comfirmed. */
+    /** Called when the peer confirms up to comfirmed. */
     virtual void senderConfirmed(const SessionPoint& confirmed);
 
     /** Called when the peer indicates commands completed */
@@ -117,15 +117,15 @@
     /** Point from which the next new (not replayed) data will be sent. */
     virtual SessionPoint senderGetCommandPoint();
 
-        /** Set of outstanding incomplete commands */
+    /** Set of outstanding incomplete commands */
     virtual SequenceSet senderGetIncomplete() const;
 
     /** Point from which we can replay. */
     virtual SessionPoint senderGetReplayPoint() const;
 
-        /** Peer expecting commands from this point.
-     virtual [EMAIL PROTECTED] Range of frames to be replayed.
-         */
+    /** Peer expecting commands from this point.
+        virtual [EMAIL PROTECTED] Range of frames to be replayed.
+    */
     virtual ReplayRange senderExpected(const SessionPoint& expected);
 
 
@@ -143,23 +143,25 @@
     /** Peer has indicated commands are known completed */
     virtual void receiverKnownCompleted(const SequenceSet& commands);
 
-        /** Get the incoming command point */
+    /** Get the incoming command point */
     virtual const SessionPoint& receiverGetExpected() const;
 
-        /** Get the received high-water-mark, may be > getExpected() during 
replay */
+    /** Get the received high-water-mark, may be > getExpected() during replay 
*/
     virtual const SessionPoint& receiverGetReceived() const;
 
-        /** Completed commands that the peer may not know about */
+    /** Completed received commands that the peer may not know about. */
     virtual const SequenceSet& receiverGetUnknownComplete() const;
 
-        /** ID of the command currently being handled. */
+    /** Incomplete received commands. */
+    virtual const SequenceSet& receiverGetIncomplete() const;
+
+    /** ID of the command currently being handled. */
     virtual SequenceNumber receiverGetCurrent() const;
 
   private:
 
     struct SendState {
-        SendState() : session(), unflushedSize(), replaySize() {}
-        SessionState* session;
+        SendState() : unflushedSize(), replaySize() {}
         // invariant: replayPoint <= flushPoint <= sendPoint
         SessionPoint replayPoint;   // Can replay from this point
         SessionPoint flushPoint;    // Point of last flush
@@ -171,12 +173,11 @@
     } sender;
 
     struct ReceiveState {
-        ReceiveState() : session() {}
-        SessionState* session;
+        ReceiveState() {}
         SessionPoint expected;  // Expected from here
         SessionPoint received; // Received to here. Invariant: expected <= 
received.
         SequenceSet unknownCompleted; // Received & completed, may not  not 
known-complete by peer.
-        SequenceNumber firstIncomplete;  // First incomplete command.
+        SequenceSet incomplete;       // Incomplete received commands.
     } receiver;
 
     SessionId id;


Reply via email to