Author: rhs
Date: Fri Mar  7 05:55:00 2008
New Revision: 634678

URL: http://svn.apache.org/viewvc?rev=634678&view=rev
Log:
added timeouts to hello-010-world; switched to conditions rather than events 
for handling connection/session state; handle session exceptions

Modified:
    incubator/qpid/trunk/qpid/python/hello-010-world
    incubator/qpid/trunk/qpid/python/qpid/connection010.py
    incubator/qpid/trunk/qpid/python/qpid/datatypes.py
    incubator/qpid/trunk/qpid/python/qpid/delegates.py
    incubator/qpid/trunk/qpid/python/qpid/queue.py
    incubator/qpid/trunk/qpid/python/qpid/session.py
    incubator/qpid/trunk/qpid/python/qpid/util.py

Modified: incubator/qpid/trunk/qpid/python/hello-010-world
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/hello-010-world?rev=634678&r1=634677&r2=634678&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/hello-010-world (original)
+++ incubator/qpid/trunk/qpid/python/hello-010-world Fri Mar  7 05:55:00 2008
@@ -29,13 +29,13 @@
 ssn.message_transfer("a")
 ssn.message_transfer("test")
 
-m1 = ssn.incoming("this").get()
+m1 = ssn.incoming("this").get(timeout=10)
 print m1
-m2 = ssn.incoming("is").get()
+m2 = ssn.incoming("is").get(timeout=10)
 print m2
-m3 = ssn.incoming("a").get()
+m3 = ssn.incoming("a").get(timeout=10)
 print m3
-m4 = ssn.incoming("test").get()
+m4 = ssn.incoming("test").get(timeout=10)
 print m4
 
 ssn.message_accept(RangedSet(m1.id, m2.id, m3.id, m4.id))

Modified: incubator/qpid/trunk/qpid/python/qpid/connection010.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/connection010.py?rev=634678&r1=634677&r2=634678&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/connection010.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/connection010.py Fri Mar  7 05:55:00 
2008
@@ -18,7 +18,8 @@
 #
 
 import datatypes, session
-from threading import Thread, Event, RLock
+from threading import Thread, Condition, RLock
+from util import wait
 from framer import Closed
 from assembler import Assembler, Segment
 from codec010 import StringCodec
@@ -47,16 +48,21 @@
     Assembler.__init__(self, sock)
     self.spec = spec
     self.track = self.spec["track"]
-    self.delegate = delegate(self)
+
+    self.lock = RLock()
     self.attached = {}
     self.sessions = {}
-    self.lock = RLock()
+
+    self.condition = Condition()
+    self.opened = False
+
     self.thread = Thread(target=self.run)
     self.thread.setDaemon(True)
-    self.opened = Event()
-    self.closed = Event()
+
     self.channel_max = 65535
 
+    self.delegate = delegate(self)
+
   def attach(self, name, ch, delegate, force=False):
     self.lock.acquire()
     try:
@@ -104,12 +110,13 @@
   def session(self, name, timeout=None, delegate=session.client):
     self.lock.acquire()
     try:
-      ssn = self.attach(name, Channel(self, self.__channel()), delegate)
+      ch = Channel(self, self.__channel())
+      ssn = self.attach(name, ch, delegate)
       ssn.channel.session_attach(name)
-      ssn.opened.wait(timeout)
-      if ssn.opened.isSet():
+      if wait(ssn.condition, lambda: ssn.channel is not None, timeout):
         return ssn
       else:
+        self.detach(name, ch)
         raise Timeout()
     finally:
       self.lock.release()
@@ -117,8 +124,7 @@
   def start(self, timeout=None):
     self.delegate.start()
     self.thread.start()
-    self.opened.wait(timeout=timeout)
-    if not self.opened.isSet():
+    if not wait(self.condition, lambda: self.opened, timeout):
       raise Timeout()
 
   def run(self):
@@ -132,9 +138,9 @@
       self.delegate.received(seg)
 
   def close(self, timeout=None):
+    if not self.opened: return
     Channel(self, 0).connection_close()
-    self.closed.wait(timeout=timeout)
-    if not self.closed.isSet():
+    if not wait(self.condition, lambda: not self.opened, timeout):
       raise Timeout()
     self.thread.join(timeout=timeout)
 

Modified: incubator/qpid/trunk/qpid/python/qpid/datatypes.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/datatypes.py?rev=634678&r1=634677&r2=634678&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/datatypes.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/datatypes.py Fri Mar  7 05:55:00 2008
@@ -112,16 +112,23 @@
     return "RangedSet(%s)" % str(self.ranges)
 
 class Future:
-  def __init__(self, initial=None):
+  def __init__(self, initial=None, exception=Exception):
     self.value = initial
+    self._error = None
     self._set = threading.Event()
 
+  def error(self, error):
+    self._error = error
+    self._set.set()
+
   def set(self, value):
     self.value = value
     self._set.set()
 
   def get(self, timeout=None):
     self._set.wait(timeout)
+    if self._error != None:
+      raise exception(self._error)
     return self.value
 
   def is_set(self):

Modified: incubator/qpid/trunk/qpid/python/qpid/delegates.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/delegates.py?rev=634678&r1=634677&r2=634678&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/delegates.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/delegates.py Fri Mar  7 05:55:00 2008
@@ -19,6 +19,7 @@
 
 import connection010
 import session
+from util import notify
 
 class Delegate:
 
@@ -49,7 +50,8 @@
     self.connection.sock.close()
 
   def connection_close_ok(self, ch, close_ok):
-    self.connection.closed.set()
+    self.connection.opened = False
+    notify(self.connection.condition)
 
   def session_attach(self, ch, a):
     try:
@@ -61,7 +63,7 @@
       ch.session_detached(a.name)
 
   def session_attached(self, ch, a):
-    ch.session.opened.set()
+    notify(ch.session.condition)
 
   def session_detach(self, ch, d):
     self.connection.detach(d.name, ch)
@@ -70,7 +72,7 @@
   def session_detached(self, ch, d):
     ssn = self.connection.detach(d.name, ch)
     if ssn is not None:
-      ssn.closed.set()
+      notify(ch.session.condition)
 
   def session_command_point(self, ch, cp):
     ssn = ch.session
@@ -91,8 +93,9 @@
     pass
 
   def connection_open(self, ch, open):
-    self.connection.opened.set()
+    self.connection.opened = True
     ch.connection_open_ok()
+    notify(self.connection.condition)
 
 class Client(Delegate):
 
@@ -108,4 +111,5 @@
     ch.connection_open()
 
   def connection_open_ok(self, ch, open_ok):
-    self.connection.opened.set()
+    self.connection.opened = True
+    notify(self.connection.condition)

Modified: incubator/qpid/trunk/qpid/python/qpid/queue.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/queue.py?rev=634678&r1=634677&r2=634678&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/queue.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/queue.py Fri Mar  7 05:55:00 2008
@@ -35,10 +35,12 @@
 
   def __init__(self, *args, **kwargs):
     BaseQueue.__init__(self, *args, **kwargs)
+    self.error = None
     self.listener = None
     self.thread = None
 
-  def close(self):
+  def close(self, error = None):
+    self.error = error
     self.put(Queue.END)
 
   def get(self, block = True, timeout = None):
@@ -47,7 +49,7 @@
       # this guarantees that any other waiting threads or any future
       # calls to get will also result in a Closed exception
       self.put(Queue.END)
-      raise Closed()
+      raise Closed(self.error)
     else:
       return result
 

Modified: incubator/qpid/trunk/qpid/python/qpid/session.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/session.py?rev=634678&r1=634677&r2=634678&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/session.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/session.py Fri Mar  7 05:55:00 2008
@@ -17,13 +17,14 @@
 # under the License.
 #
 
-from threading import Event, RLock
+from threading import Condition, RLock
 from invoker import Invoker
 from datatypes import RangedSet, Struct, Future
 from codec010 import StringCodec
 from assembler import Segment
 from queue import Queue
 from datatypes import Message
+from util import wait
 from logging import getLogger
 
 class SessionDetached(Exception): pass
@@ -34,6 +35,8 @@
 def server(*args):
   return Server(*args)
 
+class SessionException(Exception): pass
+
 class Session(Invoker):
 
   def __init__(self, name, spec, sync=True, timeout=10, delegate=client):
@@ -42,17 +45,22 @@
     self.sync = sync
     self.timeout = timeout
     self.channel = None
-    self.opened = Event()
-    self.closed = Event()
+
+    self.condition = Condition()
+
+    self.send_id = True
     self.receiver = Receiver(self)
     self.sender = Sender(self)
-    self.delegate = delegate(self)
-    self.send_id = True
-    self.results = {}
+
     self.lock = RLock()
     self._incoming = {}
+    self.results = {}
+    self.exceptions = []
+
     self.assembly = None
 
+    self.delegate = delegate(self)
+
   def incoming(self, destination):
     self.lock.acquire()
     try:
@@ -66,7 +74,7 @@
 
   def close(self, timeout=None):
     self.channel.session_detach(self.name)
-    self.closed.wait(timeout=timeout)
+    wait(self.condition, lambda: self.channel is None, timeout)
 
   def resolve_method(self, name):
     cmd = self.spec.instructions.get(name)
@@ -105,7 +113,7 @@
                   type.segment_type, type.track, self.channel.id, sc.encoded)
 
     if type.result:
-      result = Future()
+      result = Future(exception=SessionException)
       self.results[self.sender.next_id] = result
 
     self.send(seg)
@@ -234,8 +242,26 @@
     self.session = session
 
   def execution_result(self, er):
-    future = self.session.results[er.command_id]
+    future = self.session.results.pop(er.command_id)
     future.set(er.value)
+
+  def execution_exception(self, ex):
+    self.session.lock.acquire()
+    try:
+      self.session.exceptions.append(ex)
+      excs = self.session.exceptions[:]
+      if len(excs) == 1:
+        error = excs[0]
+      else:
+        error = tuple(excs)
+      for id in self.session.results:
+        f = self.session.results.pop(id)
+        f.error(error)
+
+      for q in self.session._incoming.values():
+        q.close(error)
+    finally:
+      self.session.lock.release()
 
 msg = getLogger("qpid.ssn.msg")
 

Modified: incubator/qpid/trunk/qpid/python/qpid/util.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/util.py?rev=634678&r1=634677&r2=634678&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/util.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/util.py Fri Mar  7 05:55:00 2008
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-import os, socket
+import os, socket, time
 
 def connect(host, port):
   sock = socket.socket()
@@ -40,3 +40,28 @@
 
 def mtime(filename):
   return os.stat(filename).st_mtime
+
+def wait(condition, predicate, timeout=None):
+  condition.acquire()
+  try:
+    passed = 0
+    start = time.time()
+    while not predicate():
+      if timeout is None:
+        condition.wait()
+      elif passed < timeout:
+        condition.wait(timeout - passed)
+      else:
+        return False
+      passed = time.time() - start
+    return True
+  finally:
+    condition.release()
+
+def notify(condition, action=lambda: None):
+  condition.acquire()
+  try:
+    action()
+    condition.notifyAll()
+  finally:
+    condition.release()


Reply via email to