On 02/05/2015 05:10 PM, Rafael Schloming wrote:
On Wed, Feb 4, 2015 at 8:39 PM, Ted Ross <tr...@redhat.com> wrote:
I'm a bit confused by this push.  How does this set of examples relate to
the examples Gordon has been developing in examples/engine/py?
[...]
The short summary is that I've worked to implement in C a lot of the
concepts Gordon already developed so that we can gain the benefits of his
work across all languages, not just Python.

I didn't really develop any concepts. I focused on getting a set of working examples that demonstrated a level of usability and functional completeness I believe is necessary, along with utilities (essentially some handlers and a 'container' class) that enable this. The examples and the utilities are currently in python.

Ultimately, we'll want to
reconcile the Python container with the C Reactor. As you point out they
are actually quite close already.

The purpose of the container (as well as being the simple entry point for applications), was as a place to locate the convenience functions for connecting, establishing links etc, that allow applications to remain succinct and (I hope) intuitive. I.e. the important difference is exactly what the container is seeking to add.

What the reactor replaces is the plumbing that was necessary to drive everything. The container can be viewed as a layer on top of that or an extension to it.

Gordon and I have discussed how some of
the remaining differences can be reconciled. I'll be posting some notes on
this in the next few days.

I have all the 'container' examples working over the reactor now. There is a small change to the reactor itself to allow for more flexibility in configuration of ssl and sasl:

https://reviews.apache.org/r/30926/

There is a trivial temporary workaround for the lack of sufficient transport events being emitted when using ssl:

https://reviews.apache.org/r/30927/

Then there are a couple of small additions to the python wrapped version of the reactor:

https://reviews.apache.org/r/30928/

https://reviews.apache.org/r/30929/

With these in place the two attached patches will apply and complete the work (I'm not sure how to put patches up on reviewboard that depend on other patches). The only real change to the api is that the schedule method takes a relative duration, where previously it took an absolute time. It also allows a handler to be specified for the specific scheduled event. I think these are both improvements.

(I split the attachments into two to make it easier to review. The first is an entirely non-functional change that simply moves code, unaltered. This makes it clearer that there are no further changes to the reactor. As a single diff it was a lot harder to read).
>From 85ac30deec13be6736e847adc360185db3478872 Mon Sep 17 00:00:00 2001
From: Gordon Sim <g...@redhat.com>
Date: Wed, 11 Feb 2015 22:56:16 +0000
Subject: [PATCH 5/6] move reactor and associated classes before container, so
 that it can use them

---
 proton-c/bindings/python/proton/reactors.py | 306 ++++++++++++++--------------
 1 file changed, 153 insertions(+), 153 deletions(-)

diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py
index f4f32fcd83d3285f88afd570439fd8a33944bd5b..7da3531991e45e2c82c228a54f9a0eff18149ed3 100644
--- a/proton-c/bindings/python/proton/reactors.py
+++ b/proton-c/bindings/python/proton/reactors.py
@@ -26,6 +26,159 @@ from select import select
 from proton.handlers import OutgoingMessageHandler, ScopedHandler
 from proton import unicode2utf8, utf82unicode
 
+import traceback
+from proton import WrappedHandler, _chandler, secs2millis, millis2secs, Selectable
+from wrapper import Wrapper, PYCTX
+from cproton import *
+
+class Task(Wrapper):
+
+    @staticmethod
+    def wrap(impl):
+        if impl is None:
+            return None
+        else:
+            return Task(impl)
+
+    def __init__(self, impl):
+        Wrapper.__init__(self, impl, pn_task_attachments)
+
+    def _init(self):
+        pass
+
+class Acceptor(Wrapper):
+
+    def __init__(self, impl):
+        Wrapper.__init__(self, impl)
+
+    def close(self):
+        pn_acceptor_close(self._impl)
+
+class Reactor(Wrapper):
+
+    @staticmethod
+    def wrap(impl):
+        if impl is None:
+            return None
+        else:
+            record = pn_reactor_attachments(impl)
+            attrs = pn_void2py(pn_record_get(record, PYCTX))
+            if attrs and 'subclass' in attrs:
+                return attrs['subclass'](impl=impl)
+            else:
+                return Reactor(impl=impl)
+
+    def __init__(self, *handlers, **kwargs):
+        Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments)
+        for h in handlers:
+            self.handler.add(h)
+
+    def _init(self):
+        self.errors = []
+
+    def on_error(self, info):
+        self.errors.append(info)
+        self.yield_()
+
+    def _get_global(self):
+        return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
+
+    def _set_global(self, handler):
+        impl = _chandler(handler, self.on_error)
+        pn_reactor_set_global_handler(self._impl, impl)
+        pn_decref(impl)
+
+    global_handler = property(_get_global, _set_global)
+
+    def _get_timeout(self):
+        return millis2secs(pn_reactor_get_timeout(self._impl))
+
+    def _set_timeout(self, secs):
+        return pn_reactor_set_timeout(self._impl, secs2millis(secs))
+
+    timeout = property(_get_timeout, _set_timeout)
+
+    def yield_(self):
+        pn_reactor_yield(self._impl)
+
+    def mark(self):
+        pn_reactor_mark(self._impl)
+
+    def _get_handler(self):
+        return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
+
+    def _set_handler(self, handler):
+        impl = _chandler(handler, self.on_error)
+        pn_reactor_set_handler(self._impl, impl)
+        pn_decref(impl)
+
+    handler = property(_get_handler, _set_handler)
+
+    def run(self):
+        self.timeout = 3.14159265359
+        self.start()
+        while self.process(): pass
+        self.stop()
+
+    def start(self):
+        pn_reactor_start(self._impl)
+
+    @property
+    def quiesced(self):
+        return pn_reactor_quiesced(self._impl)
+
+    def process(self):
+        result = pn_reactor_process(self._impl)
+        if self.errors:
+            for exc, value, tb in self.errors[:-1]:
+                traceback.print_exception(exc, value, tb)
+            exc, value, tb = self.errors[-1]
+            raise exc, value, tb
+        return result
+
+    def stop(self):
+        pn_reactor_stop(self._impl)
+
+    def schedule(self, delay, task):
+        impl = _chandler(task, self.on_error)
+        task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
+        pn_decref(impl)
+        return task
+
+    def acceptor(self, host, port, handler=None):
+        impl = _chandler(handler, self.on_error)
+        aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
+        pn_decref(impl)
+        if aimpl:
+            return Acceptor(aimpl)
+        else:
+            raise IOError("%s (%s:%s)" % (pn_error_text(pn_io_error(pn_reactor_io(self._impl))), host, port))
+
+    def connection(self, handler=None):
+        impl = _chandler(handler, self.on_error)
+        result = Connection.wrap(pn_reactor_connection(self._impl, impl))
+        pn_decref(impl)
+        return result
+
+    def selectable(self, handler=None):
+        impl = _chandler(handler, self.on_error)
+        result = Selectable.wrap(pn_reactor_selectable(self._impl))
+        if impl:
+            record = pn_selectable_attachments(result._impl)
+            pn_record_set_handler(record, impl)
+            pn_decref(impl)
+        return result
+
+    def update(self, sel):
+        pn_reactor_update(self._impl, sel._impl)
+
+    def push_event(self, obj, etype):
+        pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
+
+from proton import wrappers as _wrappers
+_wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
+_wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
+
 class AmqpSocket(object):
     """
     Associates a transport with a connection and a socket and can be
@@ -877,156 +1030,3 @@ class Container(object):
 
     def do_work(self, timeout=None):
         return self.loop.do_work(timeout)
-
-import traceback
-from proton import WrappedHandler, _chandler, secs2millis, millis2secs, Selectable
-from wrapper import Wrapper, PYCTX
-from cproton import *
-
-class Task(Wrapper):
-
-    @staticmethod
-    def wrap(impl):
-        if impl is None:
-            return None
-        else:
-            return Task(impl)
-
-    def __init__(self, impl):
-        Wrapper.__init__(self, impl, pn_task_attachments)
-
-    def _init(self):
-        pass
-
-class Acceptor(Wrapper):
-
-    def __init__(self, impl):
-        Wrapper.__init__(self, impl)
-
-    def close(self):
-        pn_acceptor_close(self._impl)
-
-class Reactor(Wrapper):
-
-    @staticmethod
-    def wrap(impl):
-        if impl is None:
-            return None
-        else:
-            record = pn_reactor_attachments(impl)
-            attrs = pn_void2py(pn_record_get(record, PYCTX))
-            if attrs and 'subclass' in attrs:
-                return attrs['subclass'](impl=impl)
-            else:
-                return Reactor(impl=impl)
-
-    def __init__(self, *handlers, **kwargs):
-        Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments)
-        for h in handlers:
-            self.handler.add(h)
-
-    def _init(self):
-        self.errors = []
-
-    def on_error(self, info):
-        self.errors.append(info)
-        self.yield_()
-
-    def _get_global(self):
-        return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
-
-    def _set_global(self, handler):
-        impl = _chandler(handler, self.on_error)
-        pn_reactor_set_global_handler(self._impl, impl)
-        pn_decref(impl)
-
-    global_handler = property(_get_global, _set_global)
-
-    def _get_timeout(self):
-        return millis2secs(pn_reactor_get_timeout(self._impl))
-
-    def _set_timeout(self, secs):
-        return pn_reactor_set_timeout(self._impl, secs2millis(secs))
-
-    timeout = property(_get_timeout, _set_timeout)
-
-    def yield_(self):
-        pn_reactor_yield(self._impl)
-
-    def mark(self):
-        pn_reactor_mark(self._impl)
-
-    def _get_handler(self):
-        return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
-
-    def _set_handler(self, handler):
-        impl = _chandler(handler, self.on_error)
-        pn_reactor_set_handler(self._impl, impl)
-        pn_decref(impl)
-
-    handler = property(_get_handler, _set_handler)
-
-    def run(self):
-        self.timeout = 3.14159265359
-        self.start()
-        while self.process(): pass
-        self.stop()
-
-    def start(self):
-        pn_reactor_start(self._impl)
-
-    @property
-    def quiesced(self):
-        return pn_reactor_quiesced(self._impl)
-
-    def process(self):
-        result = pn_reactor_process(self._impl)
-        if self.errors:
-            for exc, value, tb in self.errors[:-1]:
-                traceback.print_exception(exc, value, tb)
-            exc, value, tb = self.errors[-1]
-            raise exc, value, tb
-        return result
-
-    def stop(self):
-        pn_reactor_stop(self._impl)
-
-    def schedule(self, delay, task):
-        impl = _chandler(task, self.on_error)
-        task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
-        pn_decref(impl)
-        return task
-
-    def acceptor(self, host, port, handler=None):
-        impl = _chandler(handler, self.on_error)
-        aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
-        pn_decref(impl)
-        if aimpl:
-            return Acceptor(aimpl)
-        else:
-            raise IOError("%s (%s:%s)" % (pn_error_text(pn_io_error(pn_reactor_io(self._impl))), host, port))
-
-    def connection(self, handler=None):
-        impl = _chandler(handler, self.on_error)
-        result = Connection.wrap(pn_reactor_connection(self._impl, impl))
-        pn_decref(impl)
-        return result
-
-    def selectable(self, handler=None):
-        impl = _chandler(handler, self.on_error)
-        result = Selectable.wrap(pn_reactor_selectable(self._impl))
-        if impl:
-            record = pn_selectable_attachments(result._impl)
-            pn_record_set_handler(record, impl)
-            pn_decref(impl)
-        return result
-
-    def update(self, sel):
-        pn_reactor_update(self._impl, sel._impl)
-
-    def push_event(self, obj, etype):
-        pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
-
-from proton import wrappers as _wrappers
-_wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
-_wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
-- 
1.7.11.7

>From 411192c1d3a11c0594559bd3a966ac7550fd9ee9 Mon Sep 17 00:00:00 2001
From: Gordon Sim <g...@redhat.com>
Date: Wed, 11 Feb 2015 22:57:28 +0000
Subject: [PATCH 6/6] make container a subclass of reactor and remove
 redundant code which is replaced by reactor

---
 examples/engine/py/client_http.py               |  13 +-
 examples/engine/py/db_send.py                   |   9 +-
 examples/engine/py/helloworld.py                |   1 -
 examples/engine/py/helloworld_direct_tornado.py |  17 +-
 examples/engine/py/helloworld_tornado.py        |   9 +-
 examples/engine/py/proton_tornado.py            | 124 +++--
 examples/engine/py/recurring_timer.py           |  11 +-
 examples/engine/py/test_examples.py             |   9 +-
 proton-c/bindings/python/proton/handlers.py     |   7 +
 proton-c/bindings/python/proton/reactors.py     | 577 +++++-------------------
 proton-c/bindings/python/proton/utils.py        |  12 +-
 11 files changed, 254 insertions(+), 535 deletions(-)

diff --git a/examples/engine/py/client_http.py b/examples/engine/py/client_http.py
index aa46ed90cc7f19bec7dcc098e44ad8e51899a93c..cd0d63fbd6290a33cf3795bd1fa3d815374cc7a8 100755
--- a/examples/engine/py/client_http.py
+++ b/examples/engine/py/client_http.py
@@ -18,11 +18,11 @@
 # under the License.
 #
 
+import tornado.ioloop
+import tornado.web
 from proton import Message
 from proton.handlers import MessagingHandler
-from proton_tornado import TornadoLoop
-from tornado.ioloop import IOLoop
-import tornado.web
+from proton_tornado import Container
 
 class Client(MessagingHandler):
     def __init__(self, host, address):
@@ -65,6 +65,7 @@ class Client(MessagingHandler):
     def request(self, body, handler):
         self.pending.append((body, handler))
         self.do_request()
+        self.container.touch()
 
 class ExampleHandler(tornado.web.RequestHandler):
     def initialize(self, client):
@@ -100,11 +101,13 @@ class ExampleHandler(tornado.web.RequestHandler):
                    '</form>')
 
 
+loop = tornado.ioloop.IOLoop.instance()
 client = Client("localhost:5672", "examples")
-loop = TornadoLoop(client)
+client.container = Container(client, loop=loop)
+client.container.initialise()
 app = tornado.web.Application([tornado.web.url(r"/client", ExampleHandler, dict(client=client))])
 app.listen(8888)
 try:
-    loop.run()
+    loop.start()
 except KeyboardInterrupt:
     loop.stop()
diff --git a/examples/engine/py/db_send.py b/examples/engine/py/db_send.py
index 1e1ad3f3e0da2d0e36bf74ca86c4e4b5538276b3..c2e3fc2f6cbe37c9e0200e06325aec42834670de 100755
--- a/examples/engine/py/db_send.py
+++ b/examples/engine/py/db_send.py
@@ -50,7 +50,7 @@ class Send(MessagingHandler):
             if event.subject == self.load_count:
                 print "Exhausted available data, waiting to recheck..."
                 # check for new data after 5 seconds
-                self.container.schedule(time.time() + 5, link=self.sender, subject="data")
+                self.container.schedule(5, self)
         else:
             self.send()
 
@@ -86,10 +86,9 @@ class Send(MessagingHandler):
         self.db.reset()
         self.sent = self.confirmed
 
-    def on_timer(self, event):
-        if event.subject == "data":
-            print "Rechecking for data..."
-            self.request_records()
+    def on_timer_task(self, event):
+        print "Rechecking for data..."
+        self.request_records()
 
 parser = optparse.OptionParser(usage="usage: %prog [options]",
                                description="Send messages to the supplied address.")
diff --git a/examples/engine/py/helloworld.py b/examples/engine/py/helloworld.py
index 4ec53ca49a053074aebe88f629043e85f0077fd2..e1ea2eeec0a20cb7058b5736dcd9905a1f370d65 100755
--- a/examples/engine/py/helloworld.py
+++ b/examples/engine/py/helloworld.py
@@ -42,4 +42,3 @@ class HelloWorld(MessagingHandler):
         event.connection.close()
 
 Container(HelloWorld("localhost:5672", "examples")).run()
-
diff --git a/examples/engine/py/helloworld_direct_tornado.py b/examples/engine/py/helloworld_direct_tornado.py
index e798dae56318da6719bbb40e8ee4396ed0f435a2..2466f80fd0291dc07d46d2da03cce6fc11af65e7 100755
--- a/examples/engine/py/helloworld_direct_tornado.py
+++ b/examples/engine/py/helloworld_direct_tornado.py
@@ -20,19 +20,16 @@
 
 from proton import Message
 from proton.handlers import MessagingHandler
-from proton_tornado import TornadoLoop
+from proton_tornado import Container
 
 class HelloWorld(MessagingHandler):
-    def __init__(self, server, address):
+    def __init__(self, url):
         super(HelloWorld, self).__init__()
-        self.server = server
-        self.address = address
+        self.url = url
 
     def on_start(self, event):
-        self.eventloop = event.container
-        self.acceptor = event.container.listen(self.server)
-        conn = event.container.connect(self.server)
-        event.container.create_sender(conn, self.address)
+        self.acceptor = event.container.listen(self.url)
+        event.container.create_sender(self.url)
 
     def on_sendable(self, event):
         event.sender.send(Message(body=u"Hello World!"))
@@ -46,7 +43,5 @@ class HelloWorld(MessagingHandler):
 
     def on_connection_closed(self, event):
         self.acceptor.close()
-        self.eventloop.stop()
-
-TornadoLoop(HelloWorld("localhost:8888", "examples")).run()
 
+Container(HelloWorld("localhost:8888/examples")).run()
diff --git a/examples/engine/py/helloworld_tornado.py b/examples/engine/py/helloworld_tornado.py
index c56ca8d0e68e77886c856d31b263e6f4b0ebf822..d4b32cf2cc4b93f173f477b9ff87aa791ec61c62 100755
--- a/examples/engine/py/helloworld_tornado.py
+++ b/examples/engine/py/helloworld_tornado.py
@@ -20,7 +20,7 @@
 
 from proton import Message
 from proton.handlers import MessagingHandler
-from proton_tornado import TornadoLoop
+from proton_tornado import Container
 
 class HelloWorld(MessagingHandler):
     def __init__(self, server, address):
@@ -29,7 +29,6 @@ class HelloWorld(MessagingHandler):
         self.address = address
 
     def on_start(self, event):
-        self.eventloop = event.container
         conn = event.container.connect(self.server)
         event.container.create_receiver(conn, self.address)
         event.container.create_sender(conn, self.address)
@@ -42,8 +41,4 @@ class HelloWorld(MessagingHandler):
         print event.message.body
         event.connection.close()
 
-    def on_connection_closed(self, event):
-        self.eventloop.stop()
-
-TornadoLoop(HelloWorld("localhost:5672", "examples")).run()
-
+Container(HelloWorld("localhost:5672", "examples")).run()
diff --git a/examples/engine/py/proton_tornado.py b/examples/engine/py/proton_tornado.py
index cfe7d6f34008300c457bd84a5e79b9217c9fff09..d4afeba99173a6dcbcf8ecc1003081c8eb8a44da 100755
--- a/examples/engine/py/proton_tornado.py
+++ b/examples/engine/py/proton_tornado.py
@@ -18,53 +18,97 @@
 # under the License.
 #
 
-from proton.reactors import ApplicationEvent, Container, StartEvent
 import tornado.ioloop
+from proton.reactors import Container as BaseContainer
+from proton.handlers import IOHandler
 
-class TornadoLoop(Container):
-    def __init__(self, *handlers):
-        super(TornadoLoop, self).__init__(*handlers)
-        self.loop = tornado.ioloop.IOLoop.current()
+class TornadoLoopHandler:
 
-    def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None):
-        conn = super(TornadoLoop, self).connect(url, urls, address, handler, reconnect)
-        self.events.process()
-        return conn
+    def __init__(self, loop=None, handler_base=None):
+        self.loop = loop or tornado.ioloop.IOLoop.instance()
+        self.io = handler_base or IOHandler()
+        self.count = 0
 
-    def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None):
-        self.loop.call_at(deadline, self.events.dispatch, ApplicationEvent("timer", connection, session, link, delivery, subject))
+    def on_reactor_init(self, event):
+        self.reactor = event.reactor
 
-    def add(self, conn):
-        self.loop.add_handler(conn, self._connection_ready, tornado.ioloop.IOLoop.READ | tornado.ioloop.IOLoop.WRITE)
+    def on_reactor_quiesced(self, event):
+        event.reactor.yield_()
 
-    def remove(self, conn):
-        self.loop.remove_handler(conn)
+    def on_unhandled(self, name, event):
+        event.dispatch(self.io)
 
-    def run(self):
-        self.events.dispatch(StartEvent(self))
-        self.loop.start()
+    def _events(self, sel):
+        events = self.loop.ERROR
+        if sel.reading:
+            events |= self.loop.READ
+        if sel.writing:
+            events |= self.loop.WRITE
+        return events
+
+    def _schedule(self, sel):
+        if sel.deadline:
+            self.loop.add_timeout(sel.deadline, lambda: self.expired(sel))
+
+    def _expired(self, sel):
+        sel.expired()
+
+    def _process(self):
+        self.reactor.process()
+        if not self.reactor.quiesced:
+            self.loop.add_callback(self._process)
+
+    def _callback(self, sel, events):
+        if self.loop.READ & events:
+            sel.readable()
+        if self.loop.WRITE & events:
+            sel.writable()
+        self._process()
+
+    def on_selectable_init(self, event):
+        sel = event.context
+        if sel.fileno() >= 0:
+            self.loop.add_handler(sel.fileno(), lambda fd, events: self._callback(sel, events), self._events(sel))
+        self._schedule(sel)
+        self.count += 1
+
+    def on_selectable_updated(self, event):
+        sel = event.context
+        if sel.fileno() > 0:
+            self.loop.update_handler(sel.fileno(), self._events(sel))
+        self._schedule(sel)
 
-    def stop(self):
+    def on_selectable_final(self, event):
+        sel = event.context
+        if sel.fileno() > 0:
+            self.loop.remove_handler(sel.fileno())
+        sel.release()
+        self.count -= 1
+        if self.count == 0:
+            self.loop.add_callback(self._stop)
+
+    def _stop(self):
+        self.reactor.stop()
         self.loop.stop()
 
-    def _get_event_flags(self, conn):
-        flags = 0
-        if conn.reading():
-            flags |= tornado.ioloop.IOLoop.READ
-        # FIXME: need way to update flags to avoid busy loop
-        #if conn.writing():
-        #    flags |= tornado.ioloop.IOLoop.WRITE
-        flags |= tornado.ioloop.IOLoop.WRITE
-        return flags
-
-    def _connection_ready(self, conn, events):
-        if events & tornado.ioloop.IOLoop.READ:
-            conn.readable()
-        if events & tornado.ioloop.IOLoop.WRITE:
-            conn.writable()
-        if events & tornado.ioloop.IOLoop.ERROR:# or conn.closed():
-            self.loop.remove_handler(conn)
-            conn.close()
-            conn.removed()
-        self.events.process()
-        self.loop.update_handler(conn, self._get_event_flags(conn))
+class Container(object):
+    def __init__(self, *handlers, **kwargs):
+        self.tornado_loop = kwargs.get('loop', tornado.ioloop.IOLoop.instance())
+        kwargs['global_handler'] = TornadoLoopHandler(self.tornado_loop, kwargs.get('handler_base', None))
+        self.container = BaseContainer(*handlers, **kwargs)
+
+    def initialise(self):
+        self.container.start()
+        self.container.process()
+
+    def run(self):
+        self.initialise()
+        self.tornado_loop.start()
+
+    def touch(self):
+        self._process()
+
+    def _process(self):
+        self.container.process()
+        if not self.container.quiesced:
+            self.tornado_loop.add_callback(self._process)
diff --git a/examples/engine/py/recurring_timer.py b/examples/engine/py/recurring_timer.py
index de530d3b296ba1cde55aeeddcca5b845af00982b..c8d5acf66bb7efd6a4918842ec973cd9df344416 100755
--- a/examples/engine/py/recurring_timer.py
+++ b/examples/engine/py/recurring_timer.py
@@ -18,20 +18,19 @@
 # under the License.
 #
 
-import time
 from proton.reactors import Container, Handler
 
 class Recurring(Handler):
     def __init__(self, period):
         self.period = period
 
-    def on_start(self, event):
-        self.container = event.container
-        self.container.schedule(time.time() + self.period, subject=self)
+    def on_reactor_init(self, event):
+        self.container = event.reactor
+        self.container.schedule(self.period, self)
 
-    def on_timer(self, event):
+    def on_timer_task(self, event):
         print "Tick..."
-        self.container.schedule(time.time() + self.period, subject=self)
+        self.container.schedule(self.period, self)
 
 try:
     container = Container(Recurring(1.0))
diff --git a/examples/engine/py/test_examples.py b/examples/engine/py/test_examples.py
index ed0aabc76c17d5333ef49c0b26dfb3dde54cd0cb..b46b85b24e96b54d3ca25aa031bb461bdba45bd9 100644
--- a/examples/engine/py/test_examples.py
+++ b/examples/engine/py/test_examples.py
@@ -18,6 +18,7 @@
 #
 
 import subprocess
+import time
 import unittest
 
 class ExamplesTest(unittest.TestCase):
@@ -48,8 +49,10 @@ class ExamplesTest(unittest.TestCase):
         expected = ["{'sequence': %iL}" % (i+1) for i in range(100)]
         self.assertEqual(actual, expected)
 
-    def test_client_server(self, client=['client.py'], server=['server.py']):
+    def test_client_server(self, client=['client.py'], server=['server.py'], sleep=0):
         s = subprocess.Popen(server, stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
+        if sleep:
+            time.sleep(sleep)
         c = subprocess.Popen(client, stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
         c.wait()
         s.terminate()
@@ -71,10 +74,10 @@ class ExamplesTest(unittest.TestCase):
         self.test_client_server(client=['sync_client.py'], server=['server_tx.py'])
 
     def test_client_server_direct(self):
-        self.test_client_server(client=['client.py', '-a', 'localhost:8888/examples'], server=['server_direct.py'])
+        self.test_client_server(client=['client.py', '-a', 'localhost:8888/examples'], server=['server_direct.py'], sleep=0.5)
 
     def test_sync_client_server_direct(self):
-        self.test_client_server(client=['sync_client.py', 'localhost:8888/examples'], server=['server_direct.py'])
+        self.test_client_server(client=['sync_client.py', 'localhost:8888/examples'], server=['server_direct.py'], sleep=0.5)
 
     def test_db_send_recv(self):
         self.maxDiff = None
diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py
index f7356475ba5cf69a58c3690e4b58f9974a2e95e1..f5e67d94a353a296ae1cc1602bb56006ce3ff7ef 100644
--- a/proton-c/bindings/python/proton/handlers.py
+++ b/proton-c/bindings/python/proton/handlers.py
@@ -367,6 +367,13 @@ class MessagingHandler(Handler, Acking):
         EndpointStateHandler.print_error(event.link, "link")
         event.connection.close()
 
+    def on_reactor_init(self, event):
+        if hasattr(event.reactor, 'subclass'):
+            setattr(event, event.reactor.subclass.__name__.lower(), event.reactor)
+        self.on_start(event)
+
+    def on_start(self, event): pass
+
 class TransactionHandler(object):
     """
     The interface for transaction handlers, i.e. objects that want to
diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py
index 7da3531991e45e2c82c228a54f9a0eff18149ed3..0a4caef43190d4e953010518ec43e1f47e3f0d0f 100644
--- a/proton-c/bindings/python/proton/reactors.py
+++ b/proton-c/bindings/python/proton/reactors.py
@@ -179,189 +179,6 @@ from proton import wrappers as _wrappers
 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
 
-class AmqpSocket(object):
-    """
-    Associates a transport with a connection and a socket and can be
-    used in an io loop to track the io for an AMQP 1.0 connection.
-    """
-
-    def __init__(self, conn, sock, events, heartbeat=None):
-        self.events = events
-        self.conn = conn
-        self.transport = Transport()
-        if heartbeat: self.transport.idle_timeout = heartbeat
-        self.transport.bind(self.conn)
-        self.socket = sock
-        self.socket.setblocking(0)
-        self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
-        self.write_done = False
-        self.read_done = False
-        self._closed = False
-
-    def accept(self, force_sasl=True, ssl_domain=None):
-        if ssl_domain:
-            self.ssl = SSL(self.transport, ssl_domain)
-        if force_sasl:
-            sasl = self.transport.sasl()
-            sasl.mechanisms("ANONYMOUS")
-            sasl.server()
-            sasl.done(SASL.OK)
-        #TODO: use SASL anyway if requested by peer
-        return self
-
-    def connect(self, host, port=None, username=None, password=None, force_sasl=True, ssl_domain=None):
-        if ssl_domain:
-            self.ssl = SSL(self.transport, ssl_domain)
-            self.ssl.peer_hostname = host
-        if username and password:
-            sasl = self.transport.sasl()
-            sasl.plain(username, password)
-        elif force_sasl:
-            sasl = self.transport.sasl()
-            sasl.mechanisms('ANONYMOUS')
-            sasl.client()
-        try:
-            self.socket.connect_ex((host, port or 5672))
-        except socket.gaierror, e:
-            raise ConnectionException("Cannot resolve '%s': %s" % (host, e))
-        return self
-
-    def _closed_cleanly(self):
-        return self.conn.state & Endpoint.LOCAL_CLOSED and self.conn.state & Endpoint.REMOTE_CLOSED
-
-    def closed(self):
-        if not self._closed and self.write_done and self.read_done:
-            self.close()
-            return True
-        else:
-            return False
-
-    def close(self):
-        self.socket.close()
-        self._closed = True
-
-    def fileno(self):
-        return self.socket.fileno()
-
-    def reading(self):
-        if self.read_done: return False
-        c = self.transport.capacity()
-        if c > 0:
-            return True
-        elif c < 0:
-            self.read_done = True
-        return False
-
-    def writing(self):
-        if self.write_done: return False
-        try:
-            p = self.transport.pending()
-            if p > 0:
-                return True
-            elif p < 0:
-                self.write_done = True
-                return False
-            else: # p == 0
-                return False
-        except TransportException, e:
-            self.write_done = True
-            return False
-
-    def readable(self):
-        c = self.transport.capacity()
-        if c > 0:
-            try:
-                data = self.socket.recv(c)
-                if data:
-                    self.transport.push(data)
-                else:
-                    if not self._closed_cleanly():
-                        self.read_done = True
-                        self.write_done = True
-                    else:
-                        self.transport.close_tail()
-            except TransportException, e:
-                logging.error("Error on read: %s" % e)
-                self.read_done = True
-            except socket.error, e:
-                logging.error("Error on recv: %s" % e)
-                self.read_done = True
-                self.write_done = True
-        elif c < 0:
-            self.read_done = True
-
-    def writable(self):
-        try:
-            p = self.transport.pending()
-            if p > 0:
-                data = self.transport.peek(p)
-                n = self.socket.send(data)
-                self.transport.pop(n)
-            elif p < 0:
-                self.write_done = True
-        except TransportException, e:
-            logging.error("Error on write: %s" % e)
-            self.write_done = True
-        except socket.error, e:
-            logging.error("Error on send: %s" % e)
-            self.write_done = True
-
-    def removed(self):
-        if not self._closed_cleanly():
-            self.transport.unbind()
-            self.events.dispatch(ApplicationEvent("disconnected", connection=self.conn))
-
-    def tick(self):
-        t = self.transport.tick(time.time())
-        if t: return t
-        else: return None
-
-class AmqpAcceptor:
-    """
-    Listens for incoming sockets, creates an AmqpSocket for them and
-    adds that to the list of tracked 'selectables'. The acceptor can
-    itself be added to an io loop.
-    """
-
-    def __init__(self, events, loop, host, port, ssl_domain=None):
-        self.events = events
-        self.loop = loop
-        self.socket = socket.socket()
-        self.socket.setblocking(0)
-        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        self.socket.bind((host, port))
-        self.socket.listen(5)
-        self.ssl_domain = ssl_domain
-        self.loop.add(self)
-        self._closed = False
-
-    def closed(self):
-        if self._closed:
-            self.socket.close()
-            return True
-        else:
-            return False
-
-    def close(self):
-        self._closed = True
-
-    def fileno(self):
-        return self.socket.fileno()
-
-    def reading(self):
-        return not self._closed
-
-    def writing(self):
-        return False
-
-    def readable(self):
-        sock, addr = self.socket.accept()
-        if sock:
-            self.loop.add(AmqpSocket(self.events.connection(), sock, self.events).accept(ssl_domain=self.ssl_domain))
-
-    def removed(self): pass
-    def tick(self): return None
-
 
 class EventInjector(object):
     """
@@ -369,8 +186,7 @@ class EventInjector(object):
     external thread but handled on the event thread associated with
     the loop.
     """
-    def __init__(self, collector):
-        self.collector = collector
+    def __init__(self):
         self.queue = Queue.Queue()
         self.pipe = os.pipe()
         self._closed = False
@@ -379,9 +195,6 @@ class EventInjector(object):
         self.queue.put(event)
         os.write(self.pipe[1], "!")
 
-    def closed(self):
-        return self._closed and self.queue.empty()
-
     def close(self):
         self._closed = True
         os.write(self.pipe[1], "!")
@@ -389,98 +202,22 @@ class EventInjector(object):
     def fileno(self):
         return self.pipe[0]
 
-    def reading(self):
-        return not self.closed()
+    def on_selectable_init(self, event):
+        sel = event.context
+        sel.fileno(self.fileno())
+        sel.reading = True
+        event.reactor.update(sel)
 
-    def writing(self):
-        return False
-
-    def readable(self):
+    def on_selectable_readable(self, event):
         os.read(self.pipe[0], 512)
         while not self.queue.empty():
-            event = self.queue.get()
-            self.collector.put(event.context, event.type)
-
-    def removed(self): pass
-    def tick(self): return None
-
-class PQueue:
-
-    def __init__(self):
-        self.entries = []
-
-    def add(self, priority, task):
-        heappush(self.entries, (priority, task))
-
-    def peek(self):
-        if self.entries:
-            return nsmallest(1, self.entries)[0]
-        else:
-            return None
-
-    def pop(self):
-        if self.entries:
-            return heappop(self.entries)
-        else:
-            return None
-
-    def __nonzero__(self):
-        if self.entries:
-            return True
-        else:
-            return False
-
-class Timer:
-    def __init__(self, collector):
-        self.collector = collector
-        self.events = PQueue()
-
-    def schedule(self, deadline, event):
-        self.events.add(deadline, event)
-
-    def tick(self):
-        while self.events:
-            deadline, event = self.events.peek()
-            if time.time() > deadline:
-                self.events.pop()
-                self.collector.put(event.context, event.type)
-            else:
-                return deadline
-        return None
-
-    @property
-    def pending(self):
-        return bool(self.events)
-
-class Events(object):
-    def __init__(self, *handlers):
-        self.collector = Collector()
-        self.timer = Timer(self.collector)
-        self.handlers = handlers
-
-    def connection(self):
-        conn = Connection()
-        conn.collect(self.collector)
-        return conn
-
-    def process(self):
-        result = False
-        while True:
-            ev = self.collector.peek()
-            if ev:
-                self.dispatch(ev)
-                self.collector.pop()
-                result = True
-            else:
-                return result
-
-    def dispatch(self, event):
-        for h in self.handlers:
-            event.dispatch(h)
+            requested = self.queue.get()
+            event.reactor.push_event(requested.context, requested.type)
+        if self._closed:
+            s = event.context
+            s.terminate()
+            event.reactor.update(s)
 
-    @property
-    def empty(self):
-        return self.collector.peek() == None and not self.timer.pending
 
 class Names(object):
     def __init__(self, base=10000):
@@ -530,88 +267,6 @@ class StartEvent(ApplicationEvent):
         super(StartEvent, self).__init__("start")
         self.container = container
 
-def _min(a, b):
-    if a and b: return min(a, b)
-    elif a: return a
-    else: return b
-
-class SelectLoop(object):
-    """
-    An io loop based on select()
-    """
-    def __init__(self, events):
-        self.events = events
-        self.selectables = []
-        self._abort = False
-
-    def abort(self):
-        self._abort = True
-
-    def add(self, selectable):
-        self.selectables.append(selectable)
-
-    def remove(self, selectable):
-        self.selectables.remove(selectable)
-
-    @property
-    def redundant(self):
-        return self.events.empty and not self.selectables
-
-    @property
-    def aborted(self):
-        return self._abort
-
-    def run(self):
-        while not (self._abort or self.redundant):
-            self.do_work()
-
-    def do_work(self, timeout=None):
-        """@return True if some work was done, False if time-out expired"""
-        tick = self.events.timer.tick()
-
-        if self.events.process():
-            tick = self.events.timer.tick()
-            while self.events.process():
-                if self._abort: return
-                tick = self.events.timer.tick()
-            return True # Did work, let caller check their conditions, don't select.
-
-        stable = False
-        while not stable:
-            reading = []
-            writing = []
-            closed = []
-            for s in self.selectables:
-                if s.reading(): reading.append(s)
-                if s.writing(): writing.append(s)
-                if s.closed(): closed.append(s)
-                else: tick = _min(tick, s.tick())
-
-            for s in closed:
-                self.selectables.remove(s)
-                s.removed()
-            stable = len(closed) == 0
-
-        if self.redundant:
-            return False
-
-        if tick:
-            timeout = _min(tick - time.time(), timeout)
-        if timeout and timeout < 0:
-            timeout = 0
-        if reading or writing or timeout:
-            readable, writable, _ = select(reading, writing, [], timeout)
-            for s in self.selectables:
-                s.tick()
-            for s in readable:
-                s.readable()
-            for s in writable:
-                s.writable()
-
-            return bool(readable or writable)
-        else:
-            return False
-
 
 class Transaction(object):
     """
@@ -626,15 +281,6 @@ class Transaction(object):
         self.failed = False
         self._pending = []
         self.settle_before_discharge = settle_before_discharge
-        class InternalTransactionHandler(OutgoingMessageHandler):
-            def __init__(self):
-                super(InternalTransactionHandler, self).__init__(auto_settle=True)
-
-            def on_settled(self, event):
-                if hasattr(event.delivery, "transaction"):
-                    event.transaction = event.delivery.transaction
-                    event.delivery.transaction.handle_outcome(event)
-        self.internal_handler = InternalTransactionHandler()
         self.declare()
 
     def commit(self):
@@ -652,7 +298,6 @@ class Transaction(object):
 
     def _send_ctrl(self, descriptor, value):
         delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value)))
-        delivery.context=self.internal_handler
         delivery.transaction = self
         return delivery
 
@@ -803,58 +448,92 @@ class SessionPerConnection(object):
         event.connection.close()
         self._default_session = None
 
+class GlobalOverrides(object):
+    """
+    Internal handler that triggers the necessary socket connect for an
+    opened connection.
+    """
+    def __init__(self, base):
+        self.base = base
+
+    def on_unhandled(self, name, event):
+        if not self._override(event):
+            event.dispatch(self.base)
+
+    def _override(self, event):
+        conn = event.connection
+        return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
+
+from cproton import pn_incref
 class Connector(Handler):
     """
     Internal handler that triggers the necessary socket connect for an
     opened connection.
     """
-    def __init__(self, loop, ssl_domain=None):
-        self.loop = loop
-        self.ssl_domain = ssl_domain
-
-    def _get_ssl_domain(self, connection, scheme):
-        if hasattr(connection, 'ssl_domain'):
-            return connection.ssl_domain
-        elif scheme == 'amqps':
-            return self.ssl_domain
-        else:
-            return None
+    def __init__(self, connection):
+        self.connection = connection
+        self.address = None
+        self.heartbeat = None
+        self.reconnect = None
+        self.ssl_domain = None
 
     def _connect(self, connection):
-        url = connection.address.next()
-        logging.info("connecting to %s:%i" % (url.host, url.port))
-        heartbeat = None
-        if hasattr(connection, 'heartbeat'):
-            heartbeat = connection.heartbeat
-        s = AmqpSocket(connection, socket.socket(), self.loop.events, heartbeat=heartbeat)
-        s.connect(url.host, url.port, username=url.username, password=url.password, ssl_domain=self._get_ssl_domain(connection, url.scheme))
-        self.loop.add(s)
-        connection._pin = None #connection is now referenced by AmqpSocket, so no need for circular reference
+        url = self.address.next()
+        # IoHandler uses the hostname to determine where to try to connect to
+        connection.hostname = "%s:%i" % (url.host, url.port)
+        logging.info("connecting to %s..." % connection.hostname)
+
+        self.transport = Transport()
+        # We are customising one stage of the IoHandler connecting
+        # logic here; however the default implementation for this
+        # stage in c creates a transport also, but one that needs
+        # freed on bound. This requires an extra reference to be
+        # forged here, to prevent the python wraooer being left with a
+        # dangling reference by the next stage of the IoHandler logic.
+        pn_incref(self.transport._impl)
+        self.transport.bind(connection)
+        if self.heartbeat:
+            self.transport.idle_timeout = self.heartbeat
+        if url.scheme == 'amqps' and self.ssl_domain:
+            self.ssl = SSL(self.transport, self.ssl_domain)
+            self.ssl.peer_hostname = url.host
+        if url.username:
+            sasl = self.transport.sasl()
+            if url.username == 'anonymous':
+                sasl.mechanisms('ANONYMOUS')
+            else:
+                sasl.plain(url.username, url.password)
 
     def on_connection_local_open(self, event):
-        if hasattr(event.connection, "address"):
-            self._connect(event.connection)
+        self._connect(event.connection)
 
     def on_connection_remote_open(self, event):
-        if hasattr(event.connection, "reconnect"):
-            event.connection.reconnect.reset()
-
-    def on_disconnected(self, event):
-        if hasattr(event.connection, "reconnect"):
-            event.connection._pin = event.connection #no longer referenced by AmqpSocket, so pin in memory with circular reference
-            delay = event.connection.reconnect.next()
-            if delay == 0:
-                logging.info("Disconnected, reconnecting...")
-                self._connect(event.connection)
+        logging.info("connected to %s" % event.connection.hostname)
+        if self.reconnect:
+            self.reconnect.reset()
+
+    def on_transport_closed(self, event):
+        if self.connection:
+            if self.reconnect:
+                self.transport.unbind()
+                self.transport = None
+                delay = self.reconnect.next()
+                if delay == 0:
+                    logging.info("Disconnected, reconnecting...")
+                    self._connect(self.connection)
+                else:
+                    logging.info("Disconnected will try to reconnect after %s seconds" % delay)
+                    event.reactor.schedule(delay, self)
             else:
-                logging.info("Disconnected will try to reconnect after %s seconds" % delay)
-                self.loop.schedule(time.time() + delay, connection=event.connection, subject=self)
-        else:
-            logging.info("Disconnected")
+                logging.info("Disconnected")
+                self.connection = None
+
+    def on_timer_task(self, event):
+        self._connect(self.connection)
 
-    def on_timer(self, event):
-        if event.subject == self and event.connection:
-            self._connect(event.connection)
+    def on_connection_remote_close(self, event):
+        if event.connection == self.connection:
+            self.connection = None
 
 class Backoff(object):
     """
@@ -904,33 +583,33 @@ class SSLConfig(object):
         self.server.set_trusted_ca_db(certificate_db)
 
 
-class Container(object):
-    def __init__(self, *handlers):
-        self.ssl = SSLConfig()
-        h = [Connector(self, self.ssl.client), ScopedHandler()]
-        h.extend(handlers)
-        self.events = Events(*h)
-        self.loop = SelectLoop(self.events)
-        self.trigger = None
-        self.container_id = str(generate_uuid())
+class Container(Reactor):
+    def __init__(self, *handlers, **kwargs):
+        super(Container, self).__init__(*handlers, **kwargs)
+        if "impl" not in kwargs:
+            self.ssl = SSLConfig()
+            self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler))
+            self.trigger = None
+            self.container_id = str(generate_uuid())
+            Wrapper.__setattr__(self, 'subclass', self.__class__)
 
     def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None):
-        conn = self.events.connection()
-        conn._pin = conn #circular reference until the open event gets handled
-        if handler:
-            conn.context = handler
+        conn = self.connection(handler)
         conn.container = self.container_id or str(generate_uuid())
-        conn.heartbeat = heartbeat
-        if url: conn.address = Urls([url])
-        elif urls: conn.address = Urls(urls)
-        elif address: conn.address = address
+
+        connector = Connector(conn)
+        conn._overrides = connector
+        if url: connector.address = Urls([url])
+        elif urls: connector.address = Urls(urls)
+        elif address: connector.address = address
         else: raise ValueError("One of url, urls or address required")
+        if heartbeat:
+            connector.heartbeat = heartbeat
         if reconnect:
-            conn.reconnect = reconnect
+            connector.reconnect = reconnect
         elif reconnect is None:
-            conn.reconnect = Backoff()
-        if ssl_domain:
-            conn.ssl_domain = ssl_domain
+            connector.reconnect = Backoff()
+        connector.ssl_domain = ssl_domain or self.ssl.client
         conn._session_policy = SessionPerConnection() #todo: make configurable
         conn.open()
         return conn
@@ -966,7 +645,7 @@ class Container(object):
         if target:
             snd.target.address = target
         if handler:
-            snd.context = handler
+            snd.handler = handler
         if tags:
             snd.tag_generator = tags
         _apply_link_options(options, snd)
@@ -987,46 +666,40 @@ class Container(object):
         if target:
             rcv.target.address = target
         if handler:
-            rcv.context = handler
+            rcv.handler = handler
         _apply_link_options(options, rcv)
         rcv.open()
         return rcv
 
     def declare_transaction(self, context, handler=None, settle_before_discharge=False):
         if not _get_attr(context, '_txn_ctrl'):
-            context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl')
+            class InternalTransactionHandler(OutgoingMessageHandler):
+                def __init__(self):
+                    super(InternalTransactionHandler, self).__init__(auto_settle=True)
+
+                def on_settled(self, event):
+                    if hasattr(event.delivery, "transaction"):
+                        event.transaction = event.delivery.transaction
+                        event.delivery.transaction.handle_outcome(event)
+            context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler())
             context._txn_ctrl.target.type = Terminus.COORDINATOR
             context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
         return Transaction(context._txn_ctrl, handler, settle_before_discharge)
 
     def listen(self, url, ssl_domain=None):
-        url = Urls([url]).next()
+        url = Url(url)
         ssl_config = ssl_domain
         if not ssl_config and url.scheme == 'amqps':
             ssl_config = self.ssl_domain
-        return AmqpAcceptor(self.events, self, url.host, url.port, ssl_domain=ssl_config)
-
-    def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None):
-        self.events.timer.schedule(deadline, ApplicationEvent("timer", connection, session, link, delivery, subject))
+        return self.acceptor(url.host, url.port)
 
     def get_event_trigger(self):
         if not self.trigger or self.trigger.closed():
-            self.trigger = EventInjector(self.events.collector)
-            self.add(self.trigger)
+            self.trigger = EventInjector()
+            self.selectable(self.trigger)
         return self.trigger
 
-    def add(self, selectable):
-        self.loop.add(selectable)
-
-    def remove(self, selectable):
-        self.loop.remove(selectable)
-
-    def run(self):
-        self.events.dispatch(StartEvent(self))
-        self.loop.run()
-
-    def stop(self):
-        self.loop.abort()
-
     def do_work(self, timeout=None):
-        return self.loop.do_work(timeout)
+        if timeout:
+            self.timeout = timeout
+        return self.process()
diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py
index 6ea5b4b6c534de79eb7729304741e183b6459da6..924f3d99c25900c601568b00c603618ceee2fe40 100644
--- a/proton-c/bindings/python/proton/utils.py
+++ b/proton-c/bindings/python/proton/utils.py
@@ -19,8 +19,8 @@
 import collections, Queue, socket, time, threading
 from proton import ConnectionException, Delivery, Endpoint, Handler, LinkException, Message
 from proton import ProtonException, Timeout, Url
-from proton.reactors import AmqpSocket, Container, Events, SelectLoop
-from proton.handlers import Acking, MessagingHandler, ScopedHandler, IncomingMessageHandler
+from proton.reactors import Container
+from proton.handlers import MessagingHandler, IncomingMessageHandler
 
 def utf8(s):
     if isinstance(s, unicode):
@@ -190,6 +190,7 @@ class BlockingConnection(Handler):
     def __init__(self, url, timeout=None, container=None, ssl_domain=None):
         self.timeout = timeout
         self.container = container or Container()
+        self.container.start()
         self.url = Url(utf8(url)).defaults()
         self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain)
         self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
@@ -216,7 +217,7 @@ class BlockingConnection(Handler):
 
     def run(self):
         """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """
-        self.container.run()
+        while self.container.process(): pass
 
     def wait(self, condition, timeout=False, msg=None):
         """Call do_work until condition() is true"""
@@ -224,11 +225,12 @@ class BlockingConnection(Handler):
             timeout = self.timeout
         if timeout is None:
             while not condition():
-                self.container.do_work()
+                self.container.process()
         else:
             deadline = time.time() + timeout
             while not condition():
-                if not self.container.do_work(deadline - time.time()):
+                self.container.process()
+                if deadline < time.time():
                     txt = "Connection %s timed out" % self.url
                     if msg: txt += ": " + msg
                     raise Timeout(txt)
-- 
1.7.11.7

Reply via email to