Signed-off-by: ISHIDA Wataru <[email protected]>
---
 ryu/lib/hub.py                             |   22 +++++++++++
 ryu/services/protocols/bgp/api/core.py     |    6 +--
 ryu/services/protocols/bgp/base.py         |   20 +++++-----
 ryu/services/protocols/bgp/utils/evtlet.py |   59 ++++------------------------
 4 files changed, 42 insertions(+), 65 deletions(-)

diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py
index e9726c0..95199e7 100644
--- a/ryu/lib/hub.py
+++ b/ryu/lib/hub.py
@@ -39,6 +39,8 @@ if HUB_TYPE == 'eventlet':
     getcurrent = eventlet.getcurrent
     patch = eventlet.monkey_patch
     sleep = eventlet.sleep
+    listen = eventlet.listen
+    connect = eventlet.connect
 
     def spawn(*args, **kwargs):
         def _launch(func, *args, **kwargs):
@@ -57,6 +59,23 @@ if HUB_TYPE == 'eventlet':
 
         return eventlet.spawn(_launch, *args, **kwargs)
 
+    def spawn_after(seconds, *args, **kwargs):
+        def _launch(func, *args, **kwargs):
+            # mimic gevent's default raise_error=False behaviour
+            # by not propergating an exception to the joiner.
+            try:
+                func(*args, **kwargs)
+            except greenlet.GreenletExit:
+                pass
+            except:
+                # log uncaught exception.
+                # note: this is an intentional divergence from gevent
+                # behaviour.  gevent silently ignores such exceptions.
+                LOG.error('hub: uncaught exception: %s',
+                          traceback.format_exc())
+
+        return eventlet.spawn_after(seconds, _launch, *args, **kwargs)
+
     def kill(thread):
         thread.kill()
 
@@ -119,6 +138,9 @@ if HUB_TYPE == 'eventlet':
             # note: _ev.reset() is obsolete.
             self._ev = eventlet.event.Event()
 
+        def is_set(self):
+            return self._cond
+
         def set(self):
             self._cond = True
             self._broadcast()
diff --git a/ryu/services/protocols/bgp/api/core.py 
b/ryu/services/protocols/bgp/api/core.py
index d8ee144..e272eef 100644
--- a/ryu/services/protocols/bgp/api/core.py
+++ b/ryu/services/protocols/bgp/api/core.py
@@ -16,7 +16,7 @@
 """
  Defines APIs related to Core/CoreManager.
 """
-import eventlet
+from ryu.lib import hub
 
 from ryu.services.protocols.bgp.api.base import register
 from ryu.services.protocols.bgp.core_manager import CORE_MANAGER
@@ -39,7 +39,7 @@ def start(**kwargs):
 
     waiter = kwargs.pop('waiter')
     common_config = CommonConf(**kwargs)
-    eventlet.spawn(CORE_MANAGER.start, *[], **{'common_conf': common_config,
+    hub.spawn(CORE_MANAGER.start, *[], **{'common_conf': common_config,
                                                'waiter': waiter})
     return True
 
@@ -70,7 +70,7 @@ def reset_neighor(ip_address):
         # Disable neighbor to close existing session.
         neigh_conf.enabled = False
         # Yield here so that we give chance for neighbor to be disabled.
-        eventlet.sleep(NEIGHBOR_RESET_WAIT_TIME)
+        hub.sleep(NEIGHBOR_RESET_WAIT_TIME)
         # Enable neighbor, so that we have a new session with it.
         neigh_conf.enabled = True
     else:
diff --git a/ryu/services/protocols/bgp/base.py 
b/ryu/services/protocols/bgp/base.py
index 85795c5..24bb289 100644
--- a/ryu/services/protocols/bgp/base.py
+++ b/ryu/services/protocols/bgp/base.py
@@ -16,13 +16,13 @@
   Defines some base class related to managing green threads.
 """
 import abc
-import eventlet
 import logging
 import time
 import traceback
 import weakref
 
-from eventlet.timeout import Timeout
+from ryu.lib import hub
+from ryu.lib.hub import Timeout
 from ryu.lib.packet.bgp import RF_IPv4_UC
 from ryu.lib.packet.bgp import RF_IPv6_UC
 from ryu.lib.packet.bgp import RF_IPv4_VPN
@@ -171,7 +171,7 @@ class Activity(object):
         self._validate_activity(activity)
 
         # Spawn a new greenthread for given activity
-        greenthread = eventlet.spawn(activity.start, *args, **kwargs)
+        greenthread = hub.spawn(activity.start, *args, **kwargs)
         self._child_thread_map[activity.name] = greenthread
         self._child_activity_map[activity.name] = activity
         return greenthread
@@ -180,7 +180,7 @@ class Activity(object):
         self._validate_activity(activity)
 
         # Schedule to spawn a new greenthread after requested delay
-        greenthread = eventlet.spawn_after(seconds, activity.start, *args,
+        greenthread = hub.spawn_after(seconds, activity.start, *args,
                                            **kwargs)
         self._child_thread_map[activity.name] = greenthread
         self._child_activity_map[activity.name] = activity
@@ -200,13 +200,13 @@ class Activity(object):
 
     def _spawn(self, name, callable_, *args, **kwargs):
         self._validate_callable(callable_)
-        greenthread = eventlet.spawn(callable_, *args, **kwargs)
+        greenthread = hub.spawn(callable_, *args, **kwargs)
         self._child_thread_map[name] = greenthread
         return greenthread
 
     def _spawn_after(self, name, seconds, callable_, *args, **kwargs):
         self._validate_callable(callable_)
-        greenthread = eventlet.spawn_after(seconds, callable_, *args, **kwargs)
+        greenthread = hub.spawn_after(seconds, callable_, *args, **kwargs)
         self._child_thread_map[name] = greenthread
         return greenthread
 
@@ -244,12 +244,12 @@ class Activity(object):
                 self.stop()
 
     def pause(self, seconds=0):
-        """Relinquishes eventlet hub for given number of seconds.
+        """Relinquishes hub for given number of seconds.
 
         In other words is puts to sleep to give other greeenthread a chance to
         run.
         """
-        eventlet.sleep(seconds)
+        hub.sleep(seconds)
 
     def _stop_child_activities(self):
         """Stop all child activities spawn by this activity.
@@ -317,7 +317,7 @@ class Activity(object):
 
         For each connection `server_factory` starts a new protocol.
         """
-        server = eventlet.listen(loc_addr)
+        server = hub.listen(loc_addr)
         server_name = self.name + '_server@' + str(loc_addr)
         self._asso_socket_map[server_name] = server
 
@@ -341,7 +341,7 @@ class Activity(object):
         LOG.debug('Connect TCP called for %s:%s' % (peer_addr[0],
                                                     peer_addr[1]))
         with Timeout(time_out, False):
-            sock = eventlet.connect(peer_addr, bind=bind_address)
+            sock = hub.connect(peer_addr, bind=bind_address)
             if sock:
                 # Connection name for pro-active connection is made up
                 # of local end address + remote end address
diff --git a/ryu/services/protocols/bgp/utils/evtlet.py 
b/ryu/services/protocols/bgp/utils/evtlet.py
index 4dc8a94..4c2aa44 100644
--- a/ryu/services/protocols/bgp/utils/evtlet.py
+++ b/ryu/services/protocols/bgp/utils/evtlet.py
@@ -16,8 +16,7 @@
 """
  Concurrent networking library - Eventlet, based utilities classes.
 """
-import eventlet
-from eventlet import event
+from ryu.lib import hub
 import logging
 
 LOG = logging.getLogger('utils.evtlet')
@@ -28,58 +27,14 @@ class EventletIOFactory(object):
     @staticmethod
     def create_custom_event():
         LOG.debug('Create CustomEvent called')
-        return CustomEvent()
+        return hub.Event()
 
     @staticmethod
     def create_looping_call(funct, *args, **kwargs):
         LOG.debug('create_looping_call called')
         return LoopingCall(funct, *args, **kwargs)
 
-
-class CustomEvent(object):
-    """Encapsulates eventlet event to provide a event which can recur.
-
-    It has the same interface as threading.Event but works for eventlet.
-    """
-    def __init__(self,):
-        self._event = event.Event()
-        self._is_set = False
-
-    def is_set(self):
-        """Return true if and only if the internal flag is true."""
-        return self._is_set
-
-    def set(self):
-        """Set the internal flag to true.
-
-         All threads waiting for it to become true are awakened.
-         Threads that call wait() once the flag is true will not block at all.
-        """
-        if self._event and not self._event.ready():
-            self._event.send()
-        self._is_set = True
-
-    def clear(self):
-        """Reset the internal flag to false.
-
-        Subsequently, threads calling wait() will block until set() is called
-        to set the internal flag to true again.
-        """
-        if self._is_set:
-            self._is_set = False
-            self._event = event.Event()
-
-    def wait(self):
-        """Block until the internal flag is true.
-
-        If the internal flag is true on entry, return immediately. Otherwise,
-        block until another thread calls set() to set the flag to true, or
-        until the optional timeout occurs.
-        """
-        if not self._is_set:
-            self._event.wait()
-
-
+# TODO: improve Timer service and move it into framework
 class LoopingCall(object):
     """Call a function repeatedly.
     """
@@ -102,7 +57,7 @@ class LoopingCall(object):
     def __call__(self):
         if self._running:
             # Schedule next iteration of the call.
-            self._self_thread = eventlet.spawn_after(self._interval, self)
+            self._self_thread = hub.spawn_after(self._interval, self)
         self._funct(*self._args, **self._kwargs)
 
     def start(self, interval, now=True):
@@ -117,9 +72,9 @@ class LoopingCall(object):
         self._running = True
         self._interval = interval
         if now:
-            self._self_thread = eventlet.spawn_after(0, self)
+            self._self_thread = hub.spawn_after(0, self)
         else:
-            self._self_thread = eventlet.spawn_after(self._interval, self)
+            self._self_thread = hub.spawn_after(self._interval, self)
 
     def stop(self):
         """Stop running scheduled function.
@@ -137,4 +92,4 @@ class LoopingCall(object):
             self._self_thread.cancel()
             self._self_thread = None
         # Schedule a new call
-        self._self_thread = eventlet.spawn_after(self._interval, self)
+        self._self_thread = hub.spawn_after(self._interval, self)
-- 
1.7.9.5


------------------------------------------------------------------------------
Start Your Social Network Today - Download eXo Platform
Build your Enterprise Intranet with eXo Platform Software
Java Based Open Source Intranet - Social, Extensible, Cloud Ready
Get Started Now And Turn Your Intranet Into A Collaboration Platform
http://p.sf.net/sfu/ExoPlatform
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to