Author: rhs
Date: Fri Mar 7 05:55:00 2008
New Revision: 634678
URL: http://svn.apache.org/viewvc?rev=634678&view=rev
Log:
added timeouts to hello-010-world; switched to conditions rather than events
for handling connection/session state; handle session exceptions
Modified:
incubator/qpid/trunk/qpid/python/hello-010-world
incubator/qpid/trunk/qpid/python/qpid/connection010.py
incubator/qpid/trunk/qpid/python/qpid/datatypes.py
incubator/qpid/trunk/qpid/python/qpid/delegates.py
incubator/qpid/trunk/qpid/python/qpid/queue.py
incubator/qpid/trunk/qpid/python/qpid/session.py
incubator/qpid/trunk/qpid/python/qpid/util.py
Modified: incubator/qpid/trunk/qpid/python/hello-010-world
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/hello-010-world?rev=634678&r1=634677&r2=634678&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/hello-010-world (original)
+++ incubator/qpid/trunk/qpid/python/hello-010-world Fri Mar 7 05:55:00 2008
@@ -29,13 +29,13 @@
ssn.message_transfer("a")
ssn.message_transfer("test")
-m1 = ssn.incoming("this").get()
+m1 = ssn.incoming("this").get(timeout=10)
print m1
-m2 = ssn.incoming("is").get()
+m2 = ssn.incoming("is").get(timeout=10)
print m2
-m3 = ssn.incoming("a").get()
+m3 = ssn.incoming("a").get(timeout=10)
print m3
-m4 = ssn.incoming("test").get()
+m4 = ssn.incoming("test").get(timeout=10)
print m4
ssn.message_accept(RangedSet(m1.id, m2.id, m3.id, m4.id))
Modified: incubator/qpid/trunk/qpid/python/qpid/connection010.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/connection010.py?rev=634678&r1=634677&r2=634678&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/connection010.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/connection010.py Fri Mar 7 05:55:00
2008
@@ -18,7 +18,8 @@
#
import datatypes, session
-from threading import Thread, Event, RLock
+from threading import Thread, Condition, RLock
+from util import wait
from framer import Closed
from assembler import Assembler, Segment
from codec010 import StringCodec
@@ -47,16 +48,21 @@
Assembler.__init__(self, sock)
self.spec = spec
self.track = self.spec["track"]
- self.delegate = delegate(self)
+
+ self.lock = RLock()
self.attached = {}
self.sessions = {}
- self.lock = RLock()
+
+ self.condition = Condition()
+ self.opened = False
+
self.thread = Thread(target=self.run)
self.thread.setDaemon(True)
- self.opened = Event()
- self.closed = Event()
+
self.channel_max = 65535
+ self.delegate = delegate(self)
+
def attach(self, name, ch, delegate, force=False):
self.lock.acquire()
try:
@@ -104,12 +110,13 @@
def session(self, name, timeout=None, delegate=session.client):
self.lock.acquire()
try:
- ssn = self.attach(name, Channel(self, self.__channel()), delegate)
+ ch = Channel(self, self.__channel())
+ ssn = self.attach(name, ch, delegate)
ssn.channel.session_attach(name)
- ssn.opened.wait(timeout)
- if ssn.opened.isSet():
+ if wait(ssn.condition, lambda: ssn.channel is not None, timeout):
return ssn
else:
+ self.detach(name, ch)
raise Timeout()
finally:
self.lock.release()
@@ -117,8 +124,7 @@
def start(self, timeout=None):
self.delegate.start()
self.thread.start()
- self.opened.wait(timeout=timeout)
- if not self.opened.isSet():
+ if not wait(self.condition, lambda: self.opened, timeout):
raise Timeout()
def run(self):
@@ -132,9 +138,9 @@
self.delegate.received(seg)
def close(self, timeout=None):
+ if not self.opened: return
Channel(self, 0).connection_close()
- self.closed.wait(timeout=timeout)
- if not self.closed.isSet():
+ if not wait(self.condition, lambda: not self.opened, timeout):
raise Timeout()
self.thread.join(timeout=timeout)
Modified: incubator/qpid/trunk/qpid/python/qpid/datatypes.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/datatypes.py?rev=634678&r1=634677&r2=634678&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/datatypes.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/datatypes.py Fri Mar 7 05:55:00 2008
@@ -112,16 +112,23 @@
return "RangedSet(%s)" % str(self.ranges)
class Future:
- def __init__(self, initial=None):
+ def __init__(self, initial=None, exception=Exception):
self.value = initial
+ self._error = None
self._set = threading.Event()
+ def error(self, error):
+ self._error = error
+ self._set.set()
+
def set(self, value):
self.value = value
self._set.set()
def get(self, timeout=None):
self._set.wait(timeout)
+ if self._error != None:
+ raise exception(self._error)
return self.value
def is_set(self):
Modified: incubator/qpid/trunk/qpid/python/qpid/delegates.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/delegates.py?rev=634678&r1=634677&r2=634678&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/delegates.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/delegates.py Fri Mar 7 05:55:00 2008
@@ -19,6 +19,7 @@
import connection010
import session
+from util import notify
class Delegate:
@@ -49,7 +50,8 @@
self.connection.sock.close()
def connection_close_ok(self, ch, close_ok):
- self.connection.closed.set()
+ self.connection.opened = False
+ notify(self.connection.condition)
def session_attach(self, ch, a):
try:
@@ -61,7 +63,7 @@
ch.session_detached(a.name)
def session_attached(self, ch, a):
- ch.session.opened.set()
+ notify(ch.session.condition)
def session_detach(self, ch, d):
self.connection.detach(d.name, ch)
@@ -70,7 +72,7 @@
def session_detached(self, ch, d):
ssn = self.connection.detach(d.name, ch)
if ssn is not None:
- ssn.closed.set()
+ notify(ch.session.condition)
def session_command_point(self, ch, cp):
ssn = ch.session
@@ -91,8 +93,9 @@
pass
def connection_open(self, ch, open):
- self.connection.opened.set()
+ self.connection.opened = True
ch.connection_open_ok()
+ notify(self.connection.condition)
class Client(Delegate):
@@ -108,4 +111,5 @@
ch.connection_open()
def connection_open_ok(self, ch, open_ok):
- self.connection.opened.set()
+ self.connection.opened = True
+ notify(self.connection.condition)
Modified: incubator/qpid/trunk/qpid/python/qpid/queue.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/queue.py?rev=634678&r1=634677&r2=634678&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/queue.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/queue.py Fri Mar 7 05:55:00 2008
@@ -35,10 +35,12 @@
def __init__(self, *args, **kwargs):
BaseQueue.__init__(self, *args, **kwargs)
+ self.error = None
self.listener = None
self.thread = None
- def close(self):
+ def close(self, error = None):
+ self.error = error
self.put(Queue.END)
def get(self, block = True, timeout = None):
@@ -47,7 +49,7 @@
# this guarantees that any other waiting threads or any future
# calls to get will also result in a Closed exception
self.put(Queue.END)
- raise Closed()
+ raise Closed(self.error)
else:
return result
Modified: incubator/qpid/trunk/qpid/python/qpid/session.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/session.py?rev=634678&r1=634677&r2=634678&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/session.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/session.py Fri Mar 7 05:55:00 2008
@@ -17,13 +17,14 @@
# under the License.
#
-from threading import Event, RLock
+from threading import Condition, RLock
from invoker import Invoker
from datatypes import RangedSet, Struct, Future
from codec010 import StringCodec
from assembler import Segment
from queue import Queue
from datatypes import Message
+from util import wait
from logging import getLogger
class SessionDetached(Exception): pass
@@ -34,6 +35,8 @@
def server(*args):
return Server(*args)
+class SessionException(Exception): pass
+
class Session(Invoker):
def __init__(self, name, spec, sync=True, timeout=10, delegate=client):
@@ -42,17 +45,22 @@
self.sync = sync
self.timeout = timeout
self.channel = None
- self.opened = Event()
- self.closed = Event()
+
+ self.condition = Condition()
+
+ self.send_id = True
self.receiver = Receiver(self)
self.sender = Sender(self)
- self.delegate = delegate(self)
- self.send_id = True
- self.results = {}
+
self.lock = RLock()
self._incoming = {}
+ self.results = {}
+ self.exceptions = []
+
self.assembly = None
+ self.delegate = delegate(self)
+
def incoming(self, destination):
self.lock.acquire()
try:
@@ -66,7 +74,7 @@
def close(self, timeout=None):
self.channel.session_detach(self.name)
- self.closed.wait(timeout=timeout)
+ wait(self.condition, lambda: self.channel is None, timeout)
def resolve_method(self, name):
cmd = self.spec.instructions.get(name)
@@ -105,7 +113,7 @@
type.segment_type, type.track, self.channel.id, sc.encoded)
if type.result:
- result = Future()
+ result = Future(exception=SessionException)
self.results[self.sender.next_id] = result
self.send(seg)
@@ -234,8 +242,26 @@
self.session = session
def execution_result(self, er):
- future = self.session.results[er.command_id]
+ future = self.session.results.pop(er.command_id)
future.set(er.value)
+
+ def execution_exception(self, ex):
+ self.session.lock.acquire()
+ try:
+ self.session.exceptions.append(ex)
+ excs = self.session.exceptions[:]
+ if len(excs) == 1:
+ error = excs[0]
+ else:
+ error = tuple(excs)
+ for id in self.session.results:
+ f = self.session.results.pop(id)
+ f.error(error)
+
+ for q in self.session._incoming.values():
+ q.close(error)
+ finally:
+ self.session.lock.release()
msg = getLogger("qpid.ssn.msg")
Modified: incubator/qpid/trunk/qpid/python/qpid/util.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/util.py?rev=634678&r1=634677&r2=634678&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/util.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/util.py Fri Mar 7 05:55:00 2008
@@ -17,7 +17,7 @@
# under the License.
#
-import os, socket
+import os, socket, time
def connect(host, port):
sock = socket.socket()
@@ -40,3 +40,28 @@
def mtime(filename):
return os.stat(filename).st_mtime
+
+def wait(condition, predicate, timeout=None):
+ condition.acquire()
+ try:
+ passed = 0
+ start = time.time()
+ while not predicate():
+ if timeout is None:
+ condition.wait()
+ elif passed < timeout:
+ condition.wait(timeout - passed)
+ else:
+ return False
+ passed = time.time() - start
+ return True
+ finally:
+ condition.release()
+
+def notify(condition, action=lambda: None):
+ condition.acquire()
+ try:
+ action()
+ condition.notifyAll()
+ finally:
+ condition.release()