Author: tross
Date: Thu Oct 23 17:38:30 2008
New Revision: 707514

URL: http://svn.apache.org/viewvc?rev=707514&view=rev
Log:
Added alternative functions for invoking methods

Modified:
    incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py

Modified: incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py?rev=707514&r1=707513&r2=707514&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py Thu Oct 23 17:38:30 2008
@@ -82,6 +82,10 @@
     """ """
     pass
 
+  def methodResponse(self, broker, seq, response):
+    """ """
+    pass
+
 class BrokerURL:
   def __init__(self, text):
     rex = re.compile(r"""
@@ -458,16 +462,21 @@
     code = codec.read_uint32()
     text = str(codec.read_str8())
     outArgs = {}
-    obj, method = self.seqMgr._release(seq)
+    method, synchronous = self.seqMgr._release(seq)
     if code == 0:
       for arg in method.arguments:
         if arg.dir.find("O") != -1:
           outArgs[arg.name] = self._decodeValue(codec, arg.type)
-    broker.cv.acquire()
-    broker.syncResult = MethodResult(code, text, outArgs)
-    broker.syncInFlight = False
-    broker.cv.notify()
-    broker.cv.release()
+    result = MethodResult(code, text, outArgs)
+    if synchronous:
+      broker.cv.acquire()
+      broker.syncResult = result
+      broker.syncInFlight = False
+      broker.cv.notify()
+      broker.cv.release()
+    else:
+      if self.console:
+        self.console.methodResponse(broker, seq, result)
 
   def _handleHeartbeatInd(self, broker, codec, seq):
     timestamp = codec.read_uint64()
@@ -612,6 +621,41 @@
     else:
       raise ValueError ("Invalid type code: %d" % typecode)
     
+  def _sendMethodRequest(self, broker, schemaKey, objectId, name, argList):
+    """ This function can be used to send a method request to an object given 
only the
+    broker, schemaKey, and objectId.  This is an uncommon usage pattern as 
methods are
+    normally invoked on the object itself.
+    """
+    schema = self.getSchema(schemaKey)
+    for method in schema.getMethods():
+      if name == method.name:
+        aIdx = 0
+        sendCodec = Codec(broker.conn.spec)
+        seq = self.seqMgr._reserve((method, False))
+        broker._setHeader(sendCodec, 'M', seq)
+        objectId.encode(sendCodec)
+        pname, cname, hash = schemaKey
+        sendCodec.write_str8(pname)
+        sendCodec.write_str8(cname)
+        sendCodec.write_bin128(hash)
+        sendCodec.write_str8(name)
+
+        count = 0
+        for arg in method.arguments:
+          if arg.dir.find("I") != -1:
+            count += 1
+        if count != len(argList):
+          raise Exception("Incorrect number of arguments: expected %d, got %d" 
% (count, len(argList)))
+
+        for arg in method.arguments:
+          if arg.dir.find("I") != -1:
+            self._encodeValue(sendCodec, argList[aIdx], arg.type)
+            aIdx += 1
+        smsg = broker._message(sendCodec.encoded, "agent.%d.%d" %
+                               (objectId.getBroker(), objectId.getBank()))
+        broker._send(smsg)
+        return seq
+    return None
 
 class Package:
   """ """
@@ -921,12 +965,12 @@
         return value
     raise Exception("Type Object has no attribute '%s'" % name)
 
-  def _invoke(self, name, args, kwargs):
+  def _sendMethodRequest(self, name, args, kwargs, synchronous=False):
     for method in self._schema.getMethods():
       if name == method.name:
         aIdx = 0
         sendCodec = Codec(self._broker.conn.spec)
-        seq = self._session.seqMgr._reserve((self, method))
+        seq = self._session.seqMgr._reserve((method, synchronous))
         self._broker._setHeader(sendCodec, 'M', seq)
         self._objectId.encode(sendCodec)
         pname, cname, hash = self._schema.getKey()
@@ -948,26 +992,30 @@
             aIdx += 1
         smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" %
                                      (self._objectId.getBroker(), 
self._objectId.getBank()))
-        self._broker.cv.acquire()
-        self._broker.syncInFlight = True
-        self._broker.cv.release()
-
+        if synchronous:
+          self._broker.cv.acquire()
+          self._broker.syncInFlight = True
+          self._broker.cv.release()
         self._broker._send(smsg)
+        return seq
+    return None
 
-        self._broker.cv.acquire()
-        starttime = time()
-        while self._broker.syncInFlight and self._broker.error == None:
-          self._broker.cv.wait(self._broker.SYNC_TIME)
-          if time() - starttime > self._broker.SYNC_TIME:
-            self._broker.cv.release()
-            self._session.seqMgr._release(seq)
-            raise RuntimeError("Timed out waiting for method to respond")
-        self._broker.cv.release()
-        if self._broker.error != None:
-          errorText = self._broker.error
-          self._broker.error = None
-          raise Exception(errorText)
-        return self._broker.syncResult
+  def _invoke(self, name, args, kwargs):
+    if self._sendMethodRequest(name, args, kwargs, True):
+      self._broker.cv.acquire()
+      starttime = time()
+      while self._broker.syncInFlight and self._broker.error == None:
+        self._broker.cv.wait(self._broker.SYNC_TIME)
+        if time() - starttime > self._broker.SYNC_TIME:
+          self._broker.cv.release()
+          self._session.seqMgr._release(seq)
+          raise RuntimeError("Timed out waiting for method to respond")
+      self._broker.cv.release()
+      if self._broker.error != None:
+        errorText = self._broker.error
+        self._broker.error = None
+        raise Exception(errorText)
+      return self._broker.syncResult
     raise Exception("Invalid Method (software defect) [%s]" % name)
 
   def _parsePresenceMasks(self, codec, schema):


Reply via email to