Author: aconway
Date: Fri May 23 06:39:07 2008
New Revision: 659538

URL: http://svn.apache.org/viewvc?rev=659538&view=rev
Log:

qpid::SessionState: Added error checking for invalid frame sequences.
client: Fix client crash on error during connection shutdown.

Modified:
    incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb
    incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb?rev=659538&r1=659537&r2=659538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb Fri May 23 
06:39:07 2008
@@ -23,6 +23,7 @@
           }
           genl l.join(",\n")
         }
+        define_constants_for(@amqp.domain("segment-type").enum)
         namespace("execution") {
           
define_constants_for(@amqp.class_("execution").domain("error-code").enum)
         }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp?rev=659538&r1=659537&r2=659538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp Fri May 23 06:39:07 
2008
@@ -20,7 +20,7 @@
  */
 
 #include "SessionState.h"
-#include "qpid/amqp_0_10/exceptions.h"
+#include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/AMQMethodBody.h"
 #include "qpid/log/Statement.h"
 #include <boost/bind.hpp>
@@ -28,32 +28,56 @@
 
 namespace qpid {
 using framing::AMQFrame;
-using amqp_0_10::NotImplementedException;
-using amqp_0_10::InvalidArgumentException;
-using amqp_0_10::IllegalStateException;
-using amqp_0_10::ResourceLimitExceededException;
-using amqp_0_10::InternalErrorException;
+using framing::NotImplementedException;
+using framing::InvalidArgumentException;
+using framing::IllegalStateException;
+using framing::ResourceLimitExceededException;
+using framing::InternalErrorException;
+using framing::FramingErrorException;
 
 namespace {
 bool isControl(const AMQFrame& f) {
-    return f.getMethod() && f.getMethod()->type() == 0;
+    return f.getMethod() && f.getMethod()->type() == framing::CONTROL;
+}
+bool isCommand(const AMQFrame& f) {
+    return f.getMethod() && f.getMethod()->type() == framing::COMMAND;
 }
 } // namespace
 
-/** A point in the session - command id + offset */
+SessionPoint::SessionPoint(SequenceNumber c, uint64_t o) : command(c), 
offset(o) {}
+
+// TODO aconway 2008-05-22: Do complete frame sequence validity check here,
+// currently duplicated betwen broker and client session impl.
+//
 void SessionPoint::advance(const AMQFrame& f) {
-    if (f.isLastSegment() && f.isLastFrame()) {
-        ++command;
-        offset = 0;
+    if (isControl(f)) return;   // Ignore controls.
+    if (f.isFirstSegment() && f.isFirstFrame()) {
+        if (offset != 0)
+            throw FramingErrorException(QPID_MSG("Unexpected command start 
frame."));
+        if (!isCommand(f))
+            throw FramingErrorException(
+                QPID_MSG("Command start frame has invalid type" << 
f.getBody()->type()));
+        if (f.isLastSegment() && f.isLastFrame()) 
+            ++command;          // Single-frame command.
+        else
+            offset += f.size();
     }
-    else {
-        // TODO aconway 2008-04-24: if we go to support for partial
-        // command replay, then it may be better to record the unframed
-        // data size in a command point rather than the framed size so
-        // that the relationship of fragment offsets to the replay
-        // list can be computed more easily.
-        // 
-        offset += f.size();
+    else {                      // continuation frame for partial command
+        if (offset == 0)
+            throw FramingErrorException(QPID_MSG("Unexpected command 
continuation frame."));
+        if (f.isLastSegment() && f.isLastFrame()) {
+            ++command;
+            offset = 0;
+        }
+        else {
+            // TODO aconway 2008-04-24: if we go to support for partial
+            // command replay, then it may be better to record the unframed
+            // data size in a command point rather than the framed size so
+            // that the relationship of fragment offsets to the replay
+            // list can be computed more easily.
+            // 
+            offset += f.size();
+        }
     }
 }
 
@@ -65,6 +89,7 @@
     return command == x.command && offset == x.offset;
 }
 
+
 SessionPoint SessionState::senderGetCommandPoint() { return sender.sendPoint; }
 SequenceSet  SessionState::senderGetIncomplete() const { return 
sender.incomplete; }
 SessionPoint SessionState::senderGetReplayPoint() const { return 
sender.replayPoint; }
@@ -156,8 +181,7 @@
 }
     
 void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) {
-    if (!receiver.incomplete.contains(command))
-        throw InternalErrorException(QPID_MSG(getId() << "command is not 
received-incomplete: " << command ));
+    assert(receiver.incomplete.contains(command)); // Internal error to 
complete command twice.
     SequenceNumber first =cumulative ? receiver.incomplete.front() : command;
     SequenceNumber last = command;
     receiver.unknownCompleted.add(first, last);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h?rev=659538&r1=659537&r2=659538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h Fri May 23 06:39:07 
2008
@@ -38,7 +38,7 @@
 
 /** A point in the session. Points to command id + offset */
 struct SessionPoint : boost::totally_ordered1<SessionPoint> {
-    SessionPoint(SequenceNumber command_=0, uint64_t offset_ = 0) : 
command(command_), offset(offset_) {}
+    SessionPoint(SequenceNumber command = 0, uint64_t offset = 0);
 
     SequenceNumber command;
     uint64_t offset;
@@ -178,6 +178,8 @@
         SessionPoint received; // Received to here. Invariant: expected <= 
received.
         SequenceSet unknownCompleted; // Received & completed, may not  not 
known-complete by peer.
         SequenceSet incomplete;       // Incomplete received commands.
+        int segmentType;
+        
     } receiver;
 
     SessionId id;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=659538&r1=659537&r2=659538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Fri May 23 
06:39:07 2008
@@ -121,10 +121,8 @@
     Mutex::ScopedLock l(lock);
     if (isAttached()) 
         getConnection().outputTasks.activateOutput();
-    }
-    //This class could be used as the callback for queue notifications
-    //if not attached, it can simply ignore the callback, else pass it
-    //on to the connection
+    // FIXME aconway 2008-05-22: should we hold the lock over activateOutput??
+}
 
 ManagementObject::shared_ptr SessionState::GetManagementObject (void) const
 {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?rev=659538&r1=659537&r2=659538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Fri May 23 
06:39:07 2008
@@ -68,7 +68,8 @@
     if (impl)
         throw Exception(QPID_MSG("Connection::open() was already called"));
 
-    impl = boost::shared_ptr<ConnectionImpl>(new ConnectionImpl(version, 
settings));
+    impl = shared_ptr<ConnectionImpl>(new ConnectionImpl(version, settings));
+    impl->open(settings.host, settings.port);
     max_frame_size = impl->getNegotiatedSettings().maxFrameSize;
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=659538&r1=659537&r2=659538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Fri May 23 
06:39:07 2008
@@ -52,7 +52,6 @@
     connector.setTimeoutHandler(this);
     connector.setShutdownHandler(this);
 
-    open(settings.host, settings.port);
     //only set error handler once  open
     handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2);
 }
@@ -135,11 +134,13 @@
 }
 
 void ConnectionImpl::closed(uint16_t code, const std::string& text) 
-{
-    Mutex::ScopedLock l(lock);
-    if (isClosed) return;
-    SessionVector save(closeInternal(l));
-    Mutex::ScopedUnlock u(lock);
+{ 
+    SessionVector save;
+    {
+        Mutex::ScopedLock l(lock);
+        if (isClosed) return;
+        save = closeInternal(l);
+    }
     std::for_each(save.begin(), save.end(), 
boost::bind(&SessionImpl::connectionClosed, _1, code, text));
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=659538&r1=659537&r2=659538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Fri May 23 
06:39:07 2008
@@ -33,6 +33,7 @@
 #include <map>
 #include <boost/shared_ptr.hpp>
 #include <boost/weak_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
 
 namespace qpid {
 namespace client {
@@ -43,7 +44,8 @@
 class ConnectionImpl : public Bounds,
                        public framing::FrameHandler,
                        public sys::TimeoutHandler, 
-                       public sys::ShutdownHandler
+                       public sys::ShutdownHandler,
+                       public boost::enable_shared_from_this<ConnectionImpl>
 
 {
     typedef std::map<uint16_t, boost::weak_ptr<SessionImpl> > SessionMap;
@@ -59,8 +61,6 @@
 
     template <class F> void detachAll(const F&);
 
-    void open(const std::string& host, int port);
-
     SessionVector closeInternal(const sys::Mutex::ScopedLock&);
     void incoming(framing::AMQFrame& frame);    
     void closed(uint16_t, const std::string&);
@@ -73,6 +73,8 @@
     ConnectionImpl(framing::ProtocolVersion version, const ConnectionSettings& 
settings);
     ~ConnectionImpl();
     
+    void open(const std::string& host, int port);
+
     void addSession(const boost::shared_ptr<SessionImpl>&);
         
     void close();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=659538&r1=659537&r2=659538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Fri May 23 
06:39:07 2008
@@ -21,6 +21,7 @@
 #include "Connector.h"
 
 #include "Bounds.h"
+#include "ConnectionImpl.h"
 #include "ConnectionSettings.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/Time.h"
@@ -42,7 +43,9 @@
 using boost::format;
 using boost::str;
 
-Connector::Connector(ProtocolVersion ver, const ConnectionSettings& settings, 
Bounds* bounds)
+Connector::Connector(ProtocolVersion ver,
+                     const ConnectionSettings& settings,
+                     ConnectionImpl* cimpl)
     : maxFrameSize(settings.maxFrameSize),
       version(ver), 
       initiated(false),
@@ -52,8 +55,9 @@
       idleIn(0), idleOut(0), 
       timeoutHandler(0),
       shutdownHandler(0),
-      writer(maxFrameSize, bounds),
-      aio(0)
+      writer(maxFrameSize, cimpl),
+      aio(0),
+      impl(cimpl)
 {
     QPID_LOG(debug, "Connector created for " << version);
     socket.configure(settings);
@@ -294,6 +298,9 @@
 // TODO: astitcher 20070908 This version of the code can never time out, so 
the idle processing
 // will never be called
 void Connector::run(){
+    // Keep the connection impl in memory until run() completes.
+    boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this();
+    assert(protect);
     try {
         Dispatcher d(poller);
        

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h?rev=659538&r1=659537&r2=659538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h Fri May 23 
06:39:07 2008
@@ -37,6 +37,7 @@
 #include "qpid/sys/AsynchIO.h"
 
 #include <queue>
+#include <boost/weak_ptr.hpp>
 #include <boost/shared_ptr.hpp>
 
 namespace qpid {
@@ -45,6 +46,7 @@
 
 class Bounds;
 class ConnectionSettings;
+class ConnectionImpl;
 
 class Connector : public framing::OutputHandler, 
                   private sys::Runnable
@@ -121,13 +123,15 @@
     void eof(qpid::sys::AsynchIO&);
 
     std::string identifier;
+
+    ConnectionImpl* impl;
     
   friend class Channel;
 
   public:
     Connector(framing::ProtocolVersion pVersion,
               const ConnectionSettings&, 
-              Bounds* bounds = 0);
+              ConnectionImpl*);
     virtual ~Connector();
     virtual void connect(const std::string& host, int port);
     virtual void init();

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=659538&r1=659537&r2=659538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Fri May 23 
06:39:07 2008
@@ -157,7 +157,7 @@
         
fix.session.messageTransfer(content=TransferContent(lexical_cast<string>(i), 
"my-queue"));
     }
     t.join();
-    BOOST_CHECK_EQUAL(count, listener.messages.size());        
+    BOOST_REQUIRE_EQUAL(count, listener.messages.size());        
     for (size_t i = 0; i < count; ++i) 
         BOOST_CHECK_EQUAL(lexical_cast<string>(i), 
listener.messages[i].getData());
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp?rev=659538&r1=659537&r2=659538&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp Fri May 23 
06:39:07 2008
@@ -65,17 +65,18 @@
 // Make a transfer command frame.
 AMQFrame transferFrame(bool hasContent) {
     AMQFrame t(in_place<MessageTransferBody>());
-    t.setFirstFrame();
-    t.setLastFrame();
-    t.setFirstSegment();
+    t.setFirstFrame(true);
+    t.setLastFrame(true);
+    t.setFirstSegment(true);
     t.setLastSegment(!hasContent);
     return t;
 }
 // Make a content frame
 AMQFrame contentFrame(string content, bool isLast=true) {
     AMQFrame f(in_place<AMQContentBody>(content));
-    f.setFirstFrame();
-    f.setLastFrame();
+    f.setFirstFrame(true);
+    f.setLastFrame(true);
+    f.setFirstSegment(false);
     f.setLastSegment(isLast);
     return f;
 }


Reply via email to