Author: tross
Date: Fri Oct 24 12:13:33 2008
New Revision: 707724

URL: http://svn.apache.org/viewvc?rev=707724&view=rev
Log:
Fixed the following problems in qmfconsole.py:

  - Fixed typo in getTimestamp()
  - Updated lock usage to use the try: acquire() finally: release() pattern
  - Removed unused match method from URL
  - Fixed a bug where 'packages' was accessed without lock protection


Modified:
    incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py

Modified: incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py?rev=707724&r1=707723&r2=707724&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py Fri Oct 24 12:13:33 2008
@@ -106,9 +106,6 @@
   def name(self):
     return self.host + ":" + str(self.port)
 
-  def match(self, host, port):
-    return socket.gethostbyname(self.host) == socket.gethostbyname(host) and 
self.port == port
-
 class Session:
   """
   An instance of the Session class represents a console session running
@@ -326,10 +323,12 @@
     for agent in agentList:
       broker = agent.broker
       sendCodec = Codec(broker.conn.spec)
-      self.cv.acquire()
-      seq = self.seqMgr._reserve(self._CONTEXT_MULTIGET)
-      self.syncSequenceList.append(seq)
-      self.cv.release()
+      try:
+        self.cv.acquire()
+        seq = self.seqMgr._reserve(self._CONTEXT_MULTIGET)
+        self.syncSequenceList.append(seq)
+      finally:
+        self.cv.release()
       broker._setHeader(sendCodec, 'G', seq)
       sendCodec.write_map(map)
       smsg = broker._message(sendCodec.encoded, "agent.%s" % agent.bank)
@@ -337,15 +336,17 @@
 
     starttime = time()
     timeout = False
-    self.cv.acquire()
-    while len(self.syncSequenceList) > 0 and self.error == None:
-      self.cv.wait(self.GET_WAIT_TIME)
-      if time() - starttime > self.GET_WAIT_TIME:
-        for pendingSeq in self.syncSequenceList:
-          self.seqMgr._release(pendingSeq)
-        self.syncSequenceList = []
-        timeout = True
-    self.cv.release()
+    try:
+      self.cv.acquire()
+      while len(self.syncSequenceList) > 0 and self.error == None:
+        self.cv.wait(self.GET_WAIT_TIME)
+        if time() - starttime > self.GET_WAIT_TIME:
+          for pendingSeq in self.syncSequenceList:
+            self.seqMgr._release(pendingSeq)
+          self.syncSequenceList = []
+          timeout = True
+    finally:
+      self.cv.release()
 
     if self.error:
       errorText = self.error
@@ -397,14 +398,16 @@
 
   def _handlePackageInd(self, broker, codec, seq):
     pname = str(codec.read_str8())
-    self.cv.acquire()
-    if pname not in self.packages:
-      self.packages[pname] = {}
-      self.cv.release()
-      if self.console != None:
-        self.console.newPackage(pname)
-    else:
+    notify = False
+    try:
+      self.cv.acquire()
+      if pname not in self.packages:
+        self.packages[pname] = {}
+        notify = True
+    finally:
       self.cv.release()
+    if notify and self.console != None:
+      self.console.newPackage(pname)
 
     # Send a class request
     broker._incOutstanding()
@@ -422,30 +425,38 @@
     if context == self._CONTEXT_STARTUP:
       broker._decOutstanding()
     elif context == self._CONTEXT_SYNC and seq == broker.syncSequence:
-      broker.cv.acquire()
-      broker.syncInFlight = False
-      broker.cv.notify()
-      broker.cv.release()
+      try:
+        broker.cv.acquire()
+        broker.syncInFlight = False
+        broker.cv.notify()
+      finally:
+        broker.cv.release()
     elif context == self._CONTEXT_MULTIGET and seq in self.syncSequenceList:
-      self.cv.acquire()
-      self.syncSequenceList.remove(seq)
-      if len(self.syncSequenceList) == 0:
-        self.cv.notify()
-      self.cv.release()
+      try:
+        self.cv.acquire()
+        self.syncSequenceList.remove(seq)
+        if len(self.syncSequenceList) == 0:
+          self.cv.notify()
+      finally:
+        self.cv.release()
 
   def _handleClassInd(self, broker, codec, seq):
     kind  = codec.read_uint8()
     pname = str(codec.read_str8())
     cname = str(codec.read_str8())
     hash  = codec.read_bin128()
+    unknown = False
 
-    self.cv.acquire()
-    if pname not in self.packages:
+    try:
+      self.cv.acquire()
+      if pname in self.packages:
+        if (cname, hash) not in self.packages[pname]:
+          unknown = True
+    finally:
       self.cv.release()
-      return
-    if (cname, hash) not in self.packages[pname]:
+
+    if unknown:
       # Send a schema request for the unknown class
-      self.cv.release()
       broker._incOutstanding()
       sendCodec = Codec(broker.conn.spec)
       seq = self.seqMgr._reserve(self._CONTEXT_STARTUP)
@@ -455,8 +466,6 @@
       sendCodec.write_bin128(hash)
       smsg = broker._message(sendCodec.encoded)
       broker._send(smsg)
-    else:
-      self.cv.release()
 
   def _handleMethodResp(self, broker, codec, seq):
     code = codec.read_uint32()
@@ -469,11 +478,13 @@
           outArgs[arg.name] = self._decodeValue(codec, arg.type)
     result = MethodResult(code, text, outArgs)
     if synchronous:
-      broker.cv.acquire()
-      broker.syncResult = result
-      broker.syncInFlight = False
-      broker.cv.notify()
-      broker.cv.release()
+      try:
+        broker.cv.acquire()
+        broker.syncResult = result
+        broker.syncInFlight = False
+        broker.cv.notify()
+      finally:
+        broker.cv.release()
     else:
       if self.console:
         self.console.methodResponse(broker, seq, result)
@@ -495,9 +506,11 @@
     hash  = codec.read_bin128()
     classKey = (pname, cname, hash)
     _class = SchemaClass(kind, classKey, codec)
-    self.cv.acquire()
-    self.packages[pname][(cname, hash)] = _class
-    self.cv.release()
+    try:
+      self.cv.acquire()
+      self.packages[pname][(cname, hash)] = _class
+    finally:
+      self.cv.release()
     broker._decOutstanding()
     if self.console != None:
       self.console.newClass(kind, classKey)
@@ -507,26 +520,28 @@
     cname = str(codec.read_str8())
     hash  = codec.read_bin128()
     classKey = (pname, cname, hash)
-    self.cv.acquire()
-    if pname not in self.packages:
-      self.cv.release()
-      return
-    if (cname, hash) not in self.packages[pname]:
+    try:
+      self.cv.acquire()
+      if pname not in self.packages:
+        return
+      if (cname, hash) not in self.packages[pname]:
+        return
+      schema = self.packages[pname][(cname, hash)]
+    finally:
       self.cv.release()
-      return
-    self.cv.release()
-    schema = self.packages[pname][(cname, hash)]
+
     object = Object(self, broker, schema, codec, prop, stat)
     if pname == "org.apache.qpid.broker" and cname == "agent":
       broker._updateAgent(object)
 
-    self.cv.acquire()
-    if seq in self.syncSequenceList:
-      if object.getTimestamps()[2] == 0 and self._selectMatch(object):
-        self.getResult.append(object)
+    try:
+      self.cv.acquire()
+      if seq in self.syncSequenceList:
+        if object.getTimestamps()[2] == 0 and self._selectMatch(object):
+          self.getResult.append(object)
+        return
+    finally:
       self.cv.release()
-      return
-    self.cv.release()
 
     if self.console != None:
       if prop:
@@ -536,10 +551,12 @@
 
   def _handleError(self, error):
     self.error = error
-    self.cv.acquire()
-    self.syncSequenceList = []
-    self.cv.notify()
-    self.cv.release()
+    try:
+      self.cv.acquire()
+      self.syncSequenceList = []
+      self.cv.notify()
+    finally:
+      self.cv.release()
 
   def _selectMatch(self, object):
     """ Check the object against self.getSelect to check for a match """
@@ -993,24 +1010,27 @@
         smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" %
                                      (self._objectId.getBroker(), 
self._objectId.getBank()))
         if synchronous:
-          self._broker.cv.acquire()
-          self._broker.syncInFlight = True
-          self._broker.cv.release()
+          try:
+            self._broker.cv.acquire()
+            self._broker.syncInFlight = True
+          finally:
+            self._broker.cv.release()
         self._broker._send(smsg)
         return seq
     return None
 
   def _invoke(self, name, args, kwargs):
     if self._sendMethodRequest(name, args, kwargs, True):
-      self._broker.cv.acquire()
-      starttime = time()
-      while self._broker.syncInFlight and self._broker.error == None:
-        self._broker.cv.wait(self._broker.SYNC_TIME)
-        if time() - starttime > self._broker.SYNC_TIME:
-          self._broker.cv.release()
-          self._session.seqMgr._release(seq)
-          raise RuntimeError("Timed out waiting for method to respond")
-      self._broker.cv.release()
+      try:
+        self._broker.cv.acquire()
+        starttime = time()
+        while self._broker.syncInFlight and self._broker.error == None:
+          self._broker.cv.wait(self._broker.SYNC_TIME)
+          if time() - starttime > self._broker.SYNC_TIME:
+            self._session.seqMgr._release(seq)
+            raise RuntimeError("Timed out waiting for method to respond")
+      finally:
+        self._broker.cv.release()
       if self._broker.error != None:
         errorText = self._broker.error
         self._broker.error = None
@@ -1208,36 +1228,40 @@
       raise Exception("Broker already disconnected")
 
   def _waitForStable(self):
-    self.cv.acquire()
-    if self.reqsOutstanding == 0:
+    try:
+      self.cv.acquire()
+      if self.reqsOutstanding == 0:
+        return
+      self.syncInFlight = True
+      starttime = time()
+      while self.reqsOutstanding != 0:
+        self.cv.wait(self.SYNC_TIME)
+        if time() - starttime > self.SYNC_TIME:
+          raise RuntimeError("Timed out waiting for broker to synchronize")
+    finally:
       self.cv.release()
-      return
-    self.syncInFlight = True
-    starttime = time()
-    while self.reqsOutstanding != 0:
-      self.cv.wait(self.SYNC_TIME)
-      if time() - starttime > self.SYNC_TIME:
-        self.cv.release()
-        raise RuntimeError("Timed out waiting for broker to synchronize")
-    self.cv.release()
 
   def _incOutstanding(self):
-    self.cv.acquire()
-    self.reqsOutstanding += 1
-    self.cv.release()
+    try:
+      self.cv.acquire()
+      self.reqsOutstanding += 1
+    finally:
+      self.cv.release()
 
   def _decOutstanding(self):
-    self.cv.acquire()
-    self.reqsOutstanding -= 1
-    if self.reqsOutstanding == 0 and not self.topicBound:
-      self.topicBound = True
-      for key in self.session.bindingKeyList:
-        self.amqpSession.exchange_bind(exchange="qpid.management",
-                                       queue=self.topicName, binding_key=key)
-    if self.reqsOutstanding == 0 and self.syncInFlight:
-      self.syncInFlight = False
-      self.cv.notify()
-    self.cv.release()
+    try:
+      self.cv.acquire()
+      self.reqsOutstanding -= 1
+      if self.reqsOutstanding == 0 and not self.topicBound:
+        self.topicBound = True
+        for key in self.session.bindingKeyList:
+          self.amqpSession.exchange_bind(exchange="qpid.management",
+                                         queue=self.topicName, binding_key=key)
+      if self.reqsOutstanding == 0 and self.syncInFlight:
+        self.syncInFlight = False
+        self.cv.notify()
+    finally:
+      self.cv.release()
 
   def _replyCb(self, msg):
     codec = Codec(self.conn.spec, msg.body)
@@ -1259,10 +1283,12 @@
   def _exceptionCb(self, data):
     self.isConnected = False
     self.error = data
-    self.cv.acquire()
-    if self.syncInFlight:
-      self.cv.notify()
-    self.cv.release()
+    try:
+      self.cv.acquire()
+      if self.syncInFlight:
+        self.cv.notify()
+    finally:
+      self.cv.release()
     self.session._handleError(self.error)
     self.session._handleBrokerDisconnect(self)
 
@@ -1326,7 +1352,7 @@
     return self.arguments
 
   def getTimestamp(self):
-    return self.timerstamp
+    return self.timestamp
 
   def getName(self):
     return self.name
@@ -1343,21 +1369,25 @@
 
   def _reserve(self, data):
     """ Reserve a unique sequence number """
-    self.lock.acquire()
-    result = self.sequence
-    self.sequence = self.sequence + 1
-    self.pending[result] = data
-    self.lock.release()
+    try:
+      self.lock.acquire()
+      result = self.sequence
+      self.sequence = self.sequence + 1
+      self.pending[result] = data
+    finally:
+      self.lock.release()
     return result
 
   def _release(self, seq):
     """ Release a reserved sequence number """
     data = None
-    self.lock.acquire()
-    if seq in self.pending:
-      data = self.pending[seq]
-      del self.pending[seq]
-    self.lock.release()
+    try:
+      self.lock.acquire()
+      if seq in self.pending:
+        data = self.pending[seq]
+        del self.pending[seq]
+    finally:
+      self.lock.release()
     return data
 
 


Reply via email to