Author: nsantos
Date: Thu Mar 27 13:51:42 2008
New Revision: 641976

URL: http://svn.apache.org/viewvc?rev=641976&view=rev
Log:
QPID-883: applying patch supplied by Ted Ross

Added:
    incubator/qpid/trunk/qpid/python/tests_0-10_preview/management.py   (with 
props)
Modified:
    incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py
    incubator/qpid/trunk/qpid/python/qpid/management.py
    incubator/qpid/trunk/qpid/python/tests_0-10_preview/__init__.py
    incubator/qpid/trunk/qpid/specs/management-schema.xml

Modified: incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py?rev=641976&r1=641975&r2=641976&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py (original)
+++ incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py Thu Mar 27 
13:51:42 2008
@@ -156,8 +156,7 @@
     self.mclient = managementClient (self.spec, self.ctrlHandler, 
self.configHandler,
                                      self.instHandler, self.methodReply)
     self.mclient.schemaListener (self.schemaHandler)
-    self.mch = managementChannel (self.channel, self.mclient.topicCb, 
self.mclient.replyCb)
-    self.mclient.addChannel (self.mch)
+    self.mch = self.mclient.addChannel (self.channel)
 
   def close (self):
     self.mclient.removeChannel (self.mch)

Modified: incubator/qpid/trunk/qpid/python/qpid/management.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/management.py?rev=641976&r1=641975&r2=641976&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/management.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/management.py Thu Mar 27 13:51:42 2008
@@ -26,12 +26,12 @@
 import socket
 from threading    import Thread
 from message      import Message
-from time         import sleep
+from time         import time
 from qpid.client  import Client
 from qpid.content import Content
 from cStringIO    import StringIO
 from codec        import Codec, EOF
-from threading    import Lock
+from threading    import Lock, Condition
 
 
 class SequenceManager:
@@ -61,10 +61,29 @@
     return data
 
 
+class mgmtObject (object):
+  """ Generic object that holds the contents of a management object with its
+      attributes set as object attributes. """
+
+  def __init__ (self, classKey, timestamps, row):
+    self.classKey   = classKey
+    self.timestamps = timestamps
+    for cell in row:
+      setattr (self, cell[0], cell[1])
+
+class methodResult:
+  """ Object that contains the result of a method call """
+
+  def __init__ (self, status, sText, args):
+    self.status     = status
+    self.statusText = sText
+    for arg in args:
+      setattr (self, arg, args[arg])
+
 class managementChannel:
   """ This class represents a connection to an AMQP broker. """
 
-  def __init__ (self, ch, topicCb, replyCb, cbContext=None):
+  def __init__ (self, ch, topicCb, replyCb, cbContext):
     """ Given a channel on an established AMQP broker connection, this method
     opens a session and performs all of the declarations and bindings needed
     to participate in the management protocol. """
@@ -120,10 +139,12 @@
   CTRL_SCHEMA_LOADED = 2
   CTRL_USER          = 3
 
+  SYNC_TIME = 10.0
+
   #========================================================
   # User API - interacts with the class's user
   #========================================================
-  def __init__ (self, amqpSpec, ctrlCb, configCb, instCb, methodCb=None):
+  def __init__ (self, amqpSpec, ctrlCb=None, configCb=None, instCb=None, 
methodCb=None):
     self.spec     = amqpSpec
     self.ctrlCb   = ctrlCb
     self.configCb = configCb
@@ -135,6 +156,10 @@
     self.seqMgr   = SequenceManager ()
     self.schema   = {}
     self.packages = {}
+    self.cv       = Condition ()
+    self.syncInFlight = False
+    self.syncSequence = 0
+    self.syncResult   = None
 
   def schemaListener (self, schemaCb):
     """ Optionally register a callback to receive details of the schema of
@@ -146,9 +171,11 @@
     in the network. """
     self.eventCb = eventCb
 
-  def addChannel (self, channel):
+  def addChannel (self, channel, cbContext=None):
     """ Register a new channel. """
-    self.channels.append (channel)
+    mch = managementChannel (channel, self.topicCb, self.replyCb, cbContext)
+
+    self.channels.append (mch)
     codec = Codec (StringIO (), self.spec)
     self.setHeader (codec, ord ('B'))
     msg = Content  (codec.stream.getvalue ())
@@ -156,12 +183,13 @@
     msg["routing_key"]  = "agent"
     msg["reply_to"]     = self.spec.struct ("reply_to")
     msg["reply_to"]["exchange_name"] = "amq.direct"
-    msg["reply_to"]["routing_key"]   = channel.replyName
-    channel.send ("qpid.management", msg)
+    msg["reply_to"]["routing_key"]   = mch.replyName
+    mch.send ("qpid.management", msg)
+    return mch
 
-  def removeChannel (self, channel):
+  def removeChannel (self, mch):
     """ Remove a previously added channel from management. """
-    self.channels.remove (channel)
+    self.channels.remove (mch)
 
   def callMethod (self, channel, userSequence, objId, className, methodName, 
args=None):
     """ Invoke a method on a managed object. """
@@ -182,6 +210,55 @@
     msg["reply_to"]["routing_key"]   = channel.replyName
     channel.send ("qpid.management", msg)
 
+  def syncWaitForStable (self, channel):
+    """ Synchronous (blocking) call to wait for schema stability on a channel 
"""
+    self.cv.acquire ()
+    self.syncInFlight = True
+    starttime = time ()
+    while channel.reqsOutstanding != 0:
+      self.cv.wait (self.SYNC_TIME)
+      if time () - starttime > self.SYNC_TIME:
+        self.cv.release ()
+        raise RuntimeError ("Timed out waiting for response on channel")
+    self.cv.release ()
+
+  def syncCallMethod (self, channel, objId, className, methodName, args=None):
+    """ Synchronous (blocking) method call """
+    self.cv.acquire ()
+    self.syncInFlight = True
+    self.syncResult   = None
+    self.syncSequence = self.seqMgr.reserve ("sync")
+    self.cv.release ()
+    self.callMethod (channel, self.syncSequence, objId, className, methodName, 
args)
+    self.cv.acquire ()
+    starttime = time ()
+    while self.syncInFlight:
+      self.cv.wait (self.SYNC_TIME)
+      if time () - starttime > self.SYNC_TIME:
+        self.cv.release ()
+        raise RuntimeError ("Timed out waiting for response on channel")
+    result = self.syncResult
+    self.cv.release ()
+    return result
+
+  def syncGetObjects (self, channel, className):
+    """ Synchronous (blocking) get call """
+    self.cv.acquire ()
+    self.syncInFlight = True
+    self.syncResult   = []
+    self.syncSequence = self.seqMgr.reserve ("sync")
+    self.cv.release ()
+    self.getObjects (channel, self.syncSequence, className)
+    self.cv.acquire ()
+    starttime = time ()
+    while self.syncInFlight:
+      self.cv.wait (self.SYNC_TIME)
+      if time () - starttime > self.SYNC_TIME:
+        self.cv.release ()
+        raise RuntimeError ("Timed out waiting for response on channel")
+    result = self.syncResult
+    self.cv.release ()
+    return result
 
   #========================================================
   # Channel API - interacts with registered channel objects
@@ -312,10 +389,18 @@
     return data
 
   def incOutstanding (self, ch):
+    self.cv.acquire ()
     ch.reqsOutstanding = ch.reqsOutstanding + 1
+    self.cv.release ()
 
   def decOutstanding (self, ch):
+    self.cv.acquire ()
     ch.reqsOutstanding = ch.reqsOutstanding - 1
+    if ch.reqsOutstanding == 0 and self.syncInFlight:
+      self.syncInFlight = False
+      self.cv.notify ()
+    self.cv.release ()
+
     if ch.reqsOutstanding == 0:
       if self.ctrlCb != None:
         self.ctrlCb (ch.context, self.CTRL_SCHEMA_LOADED, None)
@@ -330,6 +415,7 @@
 
     (userSequence, classId, methodName) = data
     args = {}
+    context = self.seqMgr.release (userSequence)
 
     if status == 0:
       schemaClass = self.schema[classId]
@@ -346,7 +432,13 @@
         if arg[2].find("O") != -1:
           args[arg[0]] = self.decodeValue (codec, arg[1])
 
-    if self.methodCb != None:
+    if context == "sync" and userSequence == self.syncSequence:
+      self.cv.acquire ()
+      self.syncInFlight = False
+      self.syncResult   = methodResult (status, sText, args)
+      self.cv.notify  ()
+      self.cv.release ()
+    elif self.methodCb != None:
       self.methodCb (ch.context, userSequence, status, sText, args)
 
   def handleCommandComplete (self, ch, codec, seq):
@@ -356,6 +448,11 @@
     context = self.seqMgr.release (seq)
     if context == "outstanding":
       self.decOutstanding (ch)
+    elif context == "sync" and seq == self.syncSequence:
+      self.cv.acquire ()
+      self.syncInFlight = False
+      self.cv.notify  ()
+      self.cv.release ()
     elif self.ctrlCb != None:
       self.ctrlCb (ch.context, self.CTRL_USER, data)
 
@@ -541,9 +638,9 @@
     if self.schemaCb != None:
       self.schemaCb (ch.context, classKey, configs, insts, methods, events)
 
-  def parseContent (self, ch, cls, codec):
+  def parseContent (self, ch, cls, codec, seq=0):
     """ Parse a received content message. """
-    if (cls == 'C' or cls == 'B') and self.configCb == None:
+    if (cls == 'C' or (cls == 'B' and seq == 0)) and self.configCb == None:
       return
     if cls == 'I' and self.instCb == None:
       return
@@ -582,8 +679,12 @@
         data = self.decodeValue (codec, tc)
         row.append ((name, data))
 
-    if   cls == 'C' or cls == 'B':
+    if   cls == 'C' or (cls == 'B' and seq != self.syncSequence):
       self.configCb (ch.context, classKey, row, timestamps)
+    elif cls == 'B' and seq == self.syncSequence:
+      if timestamps[2] == 0:
+        obj = mgmtObject (classKey, timestamps, row)
+        self.syncResult.append (obj)
     elif cls == 'I':
       self.instCb   (ch.context, classKey, row, timestamps)
 
@@ -596,7 +697,7 @@
     elif opcode == 'i':
       self.parseContent (ch, 'I', codec)
     elif opcode == 'g':
-      self.parseContent (ch, 'B', codec)
+      self.parseContent (ch, 'B', codec, seq)
     else:
       raise ValueError ("Unknown opcode: %c" % opcode);
 

Modified: incubator/qpid/trunk/qpid/python/tests_0-10_preview/__init__.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10_preview/__init__.py?rev=641976&r1=641975&r2=641976&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10_preview/__init__.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10_preview/__init__.py Thu Mar 27 
13:51:42 2008
@@ -25,6 +25,7 @@
 from example import *
 from exchange import *
 from execution import *
+from management import *
 from message import *
 from query import *
 from queue import *

Added: incubator/qpid/trunk/qpid/python/tests_0-10_preview/management.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10_preview/management.py?rev=641976&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10_preview/management.py (added)
+++ incubator/qpid/trunk/qpid/python/tests_0-10_preview/management.py Thu Mar 
27 13:51:42 2008
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.datatypes import Message, RangedSet
+from qpid.testlib import TestBase
+from qpid.management import managementChannel, managementClient
+
+class ManagementTest (TestBase):
+    """
+    Tests for the management hooks
+    """
+
+    def test_broker_connectivity (self):
+        """
+        Call the "echo" method on the broker to verify it is alive and talking.
+        """
+        channel = self.client.channel(2)
+ 
+        mc  = managementClient (channel.spec)
+        mch = mc.addChannel (channel)
+
+        mc.syncWaitForStable (mch)
+        brokers = mc.syncGetObjects (mch, "broker")
+        self.assertEqual (len (brokers), 1)
+        broker = brokers[0]
+        args = {}
+        body = "Echo Message Body"
+        args["body"] = body
+
+        for seq in range (1, 5):
+            args["sequence"] = seq
+            res = mc.syncCallMethod (mch, broker.id, broker.classKey, "echo", 
args)
+            self.assertEqual (res.status,     0)
+            self.assertEqual (res.statusText, "OK")
+            self.assertEqual (res.sequence,   seq)
+            self.assertEqual (res.body,       body)
+
+    def test_system_object (self):
+        channel = self.client.channel(2)
+ 
+        mc  = managementClient (channel.spec)
+        mch = mc.addChannel (channel)
+
+        mc.syncWaitForStable (mch)
+        systems = mc.syncGetObjects (mch, "system")
+        self.assertEqual (len (systems), 1)
+
+    def test_standard_exchanges (self):
+        channel = self.client.channel(2)
+ 
+        mc  = managementClient (channel.spec)
+        mch = mc.addChannel (channel)
+
+        mc.syncWaitForStable (mch)
+        exchanges = mc.syncGetObjects (mch, "exchange")
+        exchange = self.findExchange (exchanges, "")
+        self.assertEqual (exchange.type, "direct")
+        exchange = self.findExchange (exchanges, "amq.direct")
+        self.assertEqual (exchange.type, "direct")
+        exchange = self.findExchange (exchanges, "amq.topic")
+        self.assertEqual (exchange.type, "topic")
+        exchange = self.findExchange (exchanges, "amq.fanout")
+        self.assertEqual (exchange.type, "fanout")
+        exchange = self.findExchange (exchanges, "amq.match")
+        self.assertEqual (exchange.type, "headers")
+        exchange = self.findExchange (exchanges, "qpid.management")
+        self.assertEqual (exchange.type, "topic")
+
+    def findExchange (self, exchanges, name):
+        for exchange in exchanges:
+            if exchange.name == name:
+                return exchange
+        return None

Propchange: incubator/qpid/trunk/qpid/python/tests_0-10_preview/management.py
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=641976&r1=641975&r2=641976&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Thu Mar 27 13:51:42 
2008
@@ -47,11 +47,11 @@
   <class name="system">
     <configElement name="sysId" index="y" type="sstr" access="RC"/>
 
-    <instElement name="osName"   type="sstr" desc="Operating System Name"/>
-    <instElement name="nodeName" type="sstr" desc="Node Name"/>
-    <instElement name="release"  type="sstr"/>
-    <instElement name="version"  type="sstr"/>
-    <instElement name="machine"  type="sstr"/>
+    <configElement name="osName"   type="sstr" access="RO" desc="Operating 
System Name"/>
+    <configElement name="nodeName" type="sstr" access="RO" desc="Node Name"/>
+    <configElement name="release"  type="sstr" access="RO"/>
+    <configElement name="version"  type="sstr" access="RO"/>
+    <configElement name="machine"  type="sstr" access="RO"/>
 
   </class>
 
@@ -215,11 +215,11 @@
 
     This class represents an inter-broker connection.
 
-    <configElement name="vhostRef" type="objId"  access="RC" index="y" 
parentRef="y"/>
-    <configElement name="address"  type="sstr"   access="RC" index="y"/>
+    <configElement name="vhostRef"     type="objId" access="RC" index="y" 
parentRef="y"/>
+    <configElement name="address"      type="sstr"  access="RC" index="y"/>
+    <configElement name="authIdentity" type="sstr"  access="RO"/>
 
     <instElement name="closing"          type="bool" desc="This link is 
closing by management request"/>
-    <instElement name="authIdentity"     type="sstr"/>
     <instElement name="framesFromPeer"   type="count64"/>
     <instElement name="framesToPeer"     type="count64"/>
     <instElement name="bytesFromPeer"    type="count64"/>


Reply via email to