this provides gevent-like api using eventlet.

Signed-off-by: YAMAMOTO Takashi <yamam...@valinux.co.jp>
---
 ryu/lib/hub.py                 | 118 ++++++++++++++++++++++++
 ryu/tests/unit/lib/test_hub.py | 201 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 319 insertions(+)
 create mode 100644 ryu/lib/hub.py
 create mode 100644 ryu/tests/unit/lib/test_hub.py

diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py
new file mode 100644
index 0000000..02dae4d
--- /dev/null
+++ b/ryu/lib/hub.py
@@ -0,0 +1,118 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+# Copyright (C) 2013 YAMAMOTO Takashi <yamamoto at valinux co jp>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+
+
+# we don't bother to use cfg.py because monkey patch needs to be
+# called very early.  instead, we use an environment variable to
+# select the type of hub.
+HUB_TYPE = os.getenv('RYU_HUB_TYPE', 'eventlet')
+
+LOG = logging.getLogger('ryu.lib.hub')
+
+if HUB_TYPE == 'eventlet':
+    import eventlet
+    import eventlet.event
+    import eventlet.queue
+    import eventlet.timeout
+    import eventlet.wsgi
+    import ssl
+    import traceback
+
+    getcurrent = eventlet.getcurrent
+    patch = eventlet.monkey_patch
+    sleep = eventlet.sleep
+
+    def spawn(*args, **kwargs):
+        def _launch(func, *args, **kwargs):
+            # mimic gevent's default raise_error=False behaviour
+            # by not propergating an exception to the joiner.
+            try:
+                func(*args, **kwargs)
+            except:
+                LOG.error('hub: uncaught exception: %s',
+                          traceback.format_exc())
+
+        return eventlet.spawn(_launch, *args, **kwargs)
+
+    def kill(thread):
+        thread.kill()
+
+    def joinall(threads):
+        for t in threads:
+            t.wait()
+
+    Queue = eventlet.queue.Queue
+    QueueEmpty = eventlet.queue.Empty
+
+    class StreamServer(object):
+        def __init__(self, listen_info, handle=None, backlog=None,
+                     spawn='default', **ssl_args):
+            assert backlog is None
+            assert spawn == 'default'
+            self.server = eventlet.listen(listen_info)
+            if ssl_args:
+                def wrap_and_handle(sock, addr):
+                    ssl_args.setdefault('server_side', True)
+                    handle(ssl.wrap_socket(sock, **ssl_args), addr)
+
+                self.handle = wrap_and_handle
+            else:
+                self.handle = handle
+
+        def serve_forever(self):
+            while True:
+                sock, addr = self.server.accept()
+                spawn(self.handle, sock, addr)
+
+    class WSGIServer(StreamServer):
+        def serve_forever(self):
+            eventlet.wsgi.server(self.server, self.handle)
+
+    Timeout = eventlet.timeout.Timeout
+
+    class Event(object):
+        def __init__(self):
+            self._ev = eventlet.event.Event()
+            self._cond = False
+
+        def _wait(self, timeout=None):
+            while not self._cond:
+                self._ev.wait()
+
+        def _broadcast(self):
+            self._ev.send()
+            # because eventlet Event doesn't allow mutiple send() on an event,
+            # re-create the underlying event.
+            # note: _ev.reset() is obsolete.
+            self._ev = eventlet.event.Event()
+
+        def set(self):
+            self._cond = True
+            self._broadcast()
+
+        def clear(self):
+            self._cond = False
+
+        def wait(self, timeout=None):
+            if timeout is None:
+                self._wait()
+            with Timeout(timeout):
+                self._wait()
diff --git a/ryu/tests/unit/lib/test_hub.py b/ryu/tests/unit/lib/test_hub.py
new file mode 100644
index 0000000..1b8166c
--- /dev/null
+++ b/ryu/tests/unit/lib/test_hub.py
@@ -0,0 +1,201 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+# Copyright (C) 2013 YAMAMOTO Takashi <yamamoto at valinux co jp>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+import unittest
+from nose.tools import raises
+
+from ryu.lib import hub
+hub.patch()
+
+
+class MyException(BaseException):
+    pass
+
+
+class Test_hub(unittest.TestCase):
+    """ Test case for ryu.lib.hub
+    """
+
+    def setUp(self):
+        pass
+
+    def tearDown(self):
+        pass
+
+    # we want to test timeout first because the rest of tests rely on it.
+    # thus test_0_ prefix.
+
+    @raises(hub.Timeout)
+    def test_0_timeout1(self):
+        with hub.Timeout(0.1):
+            hub.sleep(1)
+
+    @raises(MyException)
+    def test_0_timeout2(self):
+        with hub.Timeout(0.1, MyException):
+            hub.sleep(1)
+
+    def test_0_timeout3(self):
+        with hub.Timeout(1):
+            hub.sleep(0.1)
+        # sleep some more to ensure timer cancelation
+        hub.sleep(2)
+
+    def test_spawn_event1(self):
+        def _child(ev, result):
+            hub.sleep(1)
+            result.append(1)
+            ev.set()
+
+        ev = hub.Event()
+        result = []
+        with hub.Timeout(2):
+            hub.spawn(_child, ev, result)
+            ev.wait()
+        assert len(result) == 1
+
+    def test_spawn_event2(self):
+        def _child(ev, result):
+            hub.sleep(1)
+            result.append(1)
+            ev.set()
+
+        ev = hub.Event()
+        result = []
+        with hub.Timeout(2):
+            hub.spawn(_child, ev, result)
+            try:
+                ev.wait(timeout=0.5)
+                raise BaseException("should timed out")
+            except hub.Timeout:
+                pass
+        assert len(result) == 0
+
+    def test_spawn_event3(self):
+        def _child(ev, ev2, result):
+            ev2.wait()
+            hub.sleep(0.5)
+            result.append(1)
+            ev.set()
+
+        ev = hub.Event()
+        ev2 = hub.Event()
+        result = []
+        with hub.Timeout(2):
+            hub.spawn(_child, ev, ev2, result)
+            hub.spawn(_child, ev, ev2, result)
+            hub.sleep(0.5)
+            ev2.set()  # this should wake up the above created two threads
+            ev.wait(timeout=1)
+        assert len(result) == 2
+
+    def test_spawn_select1(self):
+        import select
+        import socket
+
+        def _child(s1):
+            hub.sleep(0.5)
+            s1.send("hoge")
+
+        s1, s2 = socket.socketpair()
+        with hub.Timeout(1):
+            hub.spawn(_child, s1)
+            select.select([s2.fileno()], [], [])
+            select.select([s2.fileno()], [], [])  # return immediately
+
+    @raises(MyException)
+    def test_select1(self):
+        import select
+        import socket
+
+        s1, s2 = socket.socketpair()
+        with hub.Timeout(1, MyException):
+            select.select([s2.fileno()], [], [])
+
+    def test_select2(self):
+        import select
+
+        with hub.Timeout(1, MyException):
+            select.select([], [], [], 0)  # timeout immediately
+
+    def test_select3(self):
+        import select
+        import socket
+
+        s1, s2 = socket.socketpair()
+        with hub.Timeout(1, MyException):
+            list = [s1.fileno(), s2.fileno()]
+            rlist, wlist, xlist = select.select(list, list, list)
+            assert not s1.fileno() in rlist
+            assert not s2.fileno() in rlist
+            # the following two assertions are commented out because one of
+            # them fails with eventlet-patched select.
+            #       assert s1.fileno() in wlist
+            #       assert s2.fileno() in wlist
+            # note: eventlet-patched select returns at most one file.
+            assert (s1.fileno() in wlist) or (s2.fileno() in wlist)
+            assert not s1.fileno() in xlist
+            assert not s2.fileno() in xlist
+
+    def test_spawn_joinall(self):
+        def _child(ev2, result):
+            ev2.wait()
+            hub.sleep(0.5)
+            result.append(1)
+            raise BaseException("this exception should not be propagated")
+
+        ev2 = hub.Event()
+        threads = []
+        result = []
+        with hub.Timeout(2):
+            threads.append(hub.spawn(_child, ev2, result))
+            threads.append(hub.spawn(_child, ev2, result))
+            hub.sleep(0.5)
+            ev2.set()  # this should wake up the above created two threads
+            hub.joinall(threads)
+        assert len(result) == 2
+
+    def test_spawn_kill_joinall(self):
+        def _child(ev2, result):
+            ev2.wait()
+            result.append(1)
+
+        ev2 = hub.Event()
+        threads = []
+        result = []
+        with hub.Timeout(2):
+            threads.append(hub.spawn(_child, ev2, result))
+            threads.append(hub.spawn(_child, ev2, result))
+            hub.sleep(0.5)
+            for t in threads:
+                hub.kill(t)
+            hub.joinall(threads)
+        assert len(result) == 0
+
+    def test_event1(self):
+        ev = hub.Event()
+        ev.set()
+        with hub.Timeout(1):
+            ev.wait()  # should return immediately
+
+    def test_event2(self):
+        ev = hub.Event()
+        # allow multiple sets unlike eventlet Event
+        ev.set()
+        ev.set()
-- 
1.8.0.1


------------------------------------------------------------------------------
Precog is a next-generation analytics platform capable of advanced
analytics on semi-structured data. The platform includes APIs for building
apps and a phenomenal toolset for data science. Developers can use
our toolset for easy data analysis & visualization. Get a free account!
http://www2.precog.com/precogplatform/slashdotnewsletter
_______________________________________________
Ryu-devel mailing list
Ryu-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to