Author: gsim
Date: Mon May 12 12:45:22 2008
New Revision: 655619

URL: http://svn.apache.org/viewvc?rev=655619&view=rev
Log:
QPID-1052: Patch from Ted Ross

This patch contains the following:

1) The session-id reported by the management API now matches the session.name 
in the session table
2) management.py API has a new callback for closed connections
3) qpid-tool uses the closed-connection handler to notify the user of a lost 
connection


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
    incubator/qpid/trunk/qpid/python/commands/qpid-tool
    incubator/qpid/trunk/qpid/python/qpid/management.py
    incubator/qpid/trunk/qpid/python/qpid/managementdata.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=655619&r1=655618&r2=655619&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Mon May 12 
12:45:22 2008
@@ -117,7 +117,7 @@
 
     //TODO: need to revise session manager to support resume as well
     assertClosed("attach");
-    session.reset(new SessionState(0, this, 0, 0));
+    session.reset(new SessionState(0, this, 0, 0, name));
     peerSession.attached(name);
     peerSession.commandPoint(session->nextOut, 0);
 }
@@ -126,7 +126,7 @@
 {
     name = _name;//TODO: this should be used in conjunction with
                  //userid for connection as sessions identity
-    session.reset(new SessionState(0, this, 0, 0));
+    session.reset(new SessionState(0, this, 0, 0, name));
     peerSession.commandPoint(session->nextOut, 0);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp?rev=655619&r1=655618&r2=655619&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp Mon May 12 
12:45:22 2008
@@ -46,11 +46,11 @@
 
 // FIXME aconway 2008-02-01: pass handler*, allow open  unattached.
 std::auto_ptr<SessionState>  SessionManager::open(
-    SessionHandler& h, uint32_t timeout_)
+    SessionHandler& h, uint32_t timeout_, std::string _name)
 {
     Mutex::ScopedLock l(lock);
     std::auto_ptr<SessionState> session(
-        new SessionState(this, &h, timeout_, ack));
+        new SessionState(this, &h, timeout_, ack, _name));
     active.insert(session->getId());
     for_each(observers.begin(), observers.end(),
              boost::bind(&Observer::opened, _1,boost::ref(*session)));

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h?rev=655619&r1=655618&r2=655619&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h Mon May 12 
12:45:22 2008
@@ -58,7 +58,7 @@
     ~SessionManager();
     
     /** Open a new active session, caller takes ownership */
-    std::auto_ptr<SessionState> open(SessionHandler& c, uint32_t timeout_);
+    std::auto_ptr<SessionState> open(SessionHandler& c, uint32_t timeout_, 
std::string name);
     
     /** Suspend a session, start it's timeout counter.
      * The factory takes ownership.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=655619&r1=655618&r2=655619&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Mon May 12 
12:45:22 2008
@@ -45,12 +45,12 @@
 using qpid::management::Args;
 
 SessionState::SessionState(
-    SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack) 
+      SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack, 
string& _name) 
     : framing::SessionState(ack, timeout_ > 0), nextOut(0),
       factory(f), handler(h), id(true), timeout(timeout_),
       broker(h->getConnection().broker),
       version(h->getConnection().getVersion()),
-      ignoring(false),
+      ignoring(false), name(_name),
       semanticState(*this, *this),
       adapter(semanticState),
       msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
@@ -68,7 +68,7 @@
         if (agent.get () != 0)
         {
             mgmtObject = management::Session::shared_ptr
-                (new management::Session (this, parent, id.str ()));
+                (new management::Session (this, parent, name));
             mgmtObject->set_attached (1);
             mgmtObject->set_clientRef 
(h->getConnection().GetManagementObject()->getObjectId());
             mgmtObject->set_channelId (h->getChannel());

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=655619&r1=655618&r2=655619&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Mon May 12 
12:45:22 2008
@@ -111,7 +111,8 @@
     SessionState(SessionManager*,
                  SessionHandler* out,
                  uint32_t timeout,
-                 uint32_t ackInterval);
+                 uint32_t ackInterval,
+                 std::string& name);
     
 
     framing::SequenceSet completed;
@@ -131,6 +132,7 @@
     framing::ProtocolVersion version;
     sys::Mutex lock;
     bool ignoring;
+    std::string name;
 
     SemanticState semanticState;
     SessionAdapter adapter;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/federation.py?rev=655619&r1=655618&r2=655619&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/federation.py (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/federation.py Mon May 12 12:45:22 
2008
@@ -142,6 +142,9 @@
 
         mgmt.call_method(bridge, "close")
         mgmt.call_method(link, "close")
+        sleep(6)
+        self.assertEqual(len(mgmt.get_objects("bridge")), 0)
+        self.assertEqual(len(mgmt.get_objects("link")), 0)
 
         mgmt.shutdown()
 
@@ -191,6 +194,9 @@
 
         mgmt.call_method(bridge, "close")
         mgmt.call_method(link, "close")
+        sleep(6)
+        self.assertEqual(len(mgmt.get_objects("bridge")), 0)
+        self.assertEqual(len(mgmt.get_objects("link")), 0)
 
         mgmt.shutdown ()
 
@@ -239,6 +245,9 @@
 
         mgmt.call_method(bridge, "close")
         mgmt.call_method(link, "close")
+        sleep(6)
+        self.assertEqual(len(mgmt.get_objects("bridge")), 0)
+        self.assertEqual(len(mgmt.get_objects("link")), 0)
 
         mgmt.shutdown ()
 

Modified: incubator/qpid/trunk/qpid/python/commands/qpid-tool
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-tool?rev=655619&r1=655618&r2=655619&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-tool (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-tool Mon May 12 12:45:22 2008
@@ -31,16 +31,23 @@
 
 class Mcli (Cmd):
   """ Management Command Interpreter """
-  prompt = "qpid: "
 
   def __init__ (self, dataObject, dispObject):
     Cmd.__init__ (self)
     self.dataObject = dataObject
     self.dispObject = dispObject
+    self.dataObject.setCli (self)
+    self.prompt = "qpid: "
     
   def emptyline (self):
     pass
 
+  def setPromptMessage (self, p):
+    if p == None:
+      self.prompt = "qpid: "
+    else:
+      self.prompt = "qpid[%s]: " % p
+
   def do_help (self, data):
     print "Management Tool for QPID"
     print

Modified: incubator/qpid/trunk/qpid/python/qpid/management.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/management.py?rev=655619&r1=655618&r2=655619&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/management.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/management.py Mon May 12 12:45:22 2008
@@ -81,7 +81,7 @@
 class managementChannel:
   """ This class represents a connection to an AMQP broker. """
 
-  def __init__ (self, ssn, topicCb, replyCb, cbContext, _detlife=0):
+  def __init__ (self, ssn, topicCb, replyCb, exceptionCb, cbContext, 
_detlife=0):
     """ Given a channel on an established AMQP broker connection, this method
     opens a session and performs all of the declarations and bindings needed
     to participate in the management protocol. """
@@ -93,6 +93,7 @@
     self.qpidChannel = ssn
     self.tcb         = topicCb
     self.rcb         = replyCb
+    self.ecb         = exceptionCb
     self.context     = cbContext
     self.reqsOutstanding = 0
 
@@ -104,7 +105,7 @@
     ssn.message_subscribe (queue=self.topicName, destination="tdest")
     ssn.message_subscribe (queue=self.replyName, destination="rdest")
 
-    ssn.incoming ("tdest").listen (self.topicCb)
+    ssn.incoming ("tdest").listen (self.topicCb, self.exceptionCb)
     ssn.incoming ("rdest").listen (self.replyCb)
 
     ssn.message_set_flow_mode (destination="tdest", flow_mode=1)
@@ -130,6 +131,10 @@
     if self.enabled:
       self.rcb (self, msg)
 
+  def exceptionCb (self, data):
+    if self.ecb != None:
+      self.ecb (data)
+
   def send (self, exchange, msg):
     if self.enabled:
       self.qpidChannel.message_transfer (destination=exchange, message=msg)
@@ -160,12 +165,13 @@
   #========================================================
   # User API - interacts with the class's user
   #========================================================
-  def __init__ (self, amqpSpec, ctrlCb=None, configCb=None, instCb=None, 
methodCb=None):
+  def __init__ (self, amqpSpec, ctrlCb=None, configCb=None, instCb=None, 
methodCb=None, closeCb=None):
     self.spec     = amqpSpec
     self.ctrlCb   = ctrlCb
     self.configCb = configCb
     self.instCb   = instCb
     self.methodCb = methodCb
+    self.closeCb  = closeCb
     self.schemaCb = None
     self.eventCb  = None
     self.channels = []
@@ -189,7 +195,7 @@
 
   def addChannel (self, channel, cbContext=None):
     """ Register a new channel. """
-    mch = managementChannel (channel, self.topicCb, self.replyCb, cbContext)
+    mch = managementChannel (channel, self.topicCb, self.replyCb, 
self.exceptCb, cbContext)
 
     self.channels.append (mch)
     self.incOutstanding (mch)
@@ -312,6 +318,10 @@
       self.parse (ch, codec, hdr[0], hdr[1])
     ch.accept(msg)
 
+  def exceptCb (self, data):
+    if self.closeCb != None:
+      self.closeCb (data)
+
   #========================================================
   # Internal Functions
   #========================================================

Modified: incubator/qpid/trunk/qpid/python/qpid/managementdata.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/managementdata.py?rev=655619&r1=655618&r2=655619&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/managementdata.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/managementdata.py Mon May 12 12:45:22 
2008
@@ -160,11 +160,20 @@
     finally:
       self.lock.release ()
 
+  def closeHandler (self, reason):
+    print "Connection to broker lost:", reason
+    self.operational = False
+    if self.cli != None:
+      self.cli.setPromptMessage ("Broker Disconnected")
+
   def schemaHandler (self, context, className, configs, insts, methods, 
events):
     """ Callback for schema updates """
     if className not in self.schema:
       self.schema[className] = (configs, insts, methods, events)
 
+  def setCli (self, cliobj):
+    self.cli = cliobj
+
   def __init__ (self, disp, host, username="guest", password="guest",
                 specfile="../../specs/amqp.0-10.xml"):
     self.spec           = qpid.spec.load (specfile)
@@ -184,9 +193,11 @@
     self.conn.start ()
 
     self.mclient = managementClient (self.spec, self.ctrlHandler, 
self.configHandler,
-                                     self.instHandler, self.methodReply)
+                                     self.instHandler, self.methodReply, 
self.closeHandler)
     self.mclient.schemaListener (self.schemaHandler)
     self.mch = self.mclient.addChannel (self.conn.session(self.sessionId))
+    self.operational = True
+    self.cli         = None
 
   def close (self):
     pass


Reply via email to