If event is dispatched directly, event handler may block.
Handlers from multiple source may have race condition.
In order to maximize parallelism and minimize blocking and avoid race
condition, it is useful to create another event queue and create dedicated
thread to dispatch those events.
Events from multiple sources will be serialized and blocking is mitigated.

Signed-off-by: Isaku Yamahata <[email protected]>
---
 ryu/controller/dispatcher.py |   86 +++++++++++++++++++++++++++++++++++++++---
 1 file changed, 80 insertions(+), 6 deletions(-)

diff --git a/ryu/controller/dispatcher.py b/ryu/controller/dispatcher.py
index 01cdcf4..4fbd1fc 100644
--- a/ryu/controller/dispatcher.py
+++ b/ryu/controller/dispatcher.py
@@ -14,18 +14,26 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from abc import ABCMeta, abstractmethod
 import copy
+import gevent
+import gevent.coros
 import logging
+import traceback
 
 from gevent.queue import Queue
 
+from ryu.controller import event
+from ryu.lib import synchronized
 from ryu.lib.track_instances import TrackInstances
 from . import event
 
+
 LOG = logging.getLogger('ryu.controller.dispatcher')
 
 
-class EventQueue(TrackInstances):
+class EventQueueCommon(TrackInstances):
+    __metaclass__ = ABCMeta
     _ev_q = None
 
     def set_ev_q(self):
@@ -43,13 +51,13 @@ class EventQueue(TrackInstances):
             ev_q.queue(ev)
 
     def __init__(self, name, dispatcher, aux=None):
-        super(EventQueue, self).__init__()
+        super(EventQueueCommon, self).__init__()
         self.name = name
         self._dispatcher = dispatcher.clone()
-        self.is_dispatching = False
         self.ev_q = Queue()
         self.aux = aux  # for EventQueueCreate event
 
+    def _init_done(self):
         self._queue_q_ev(EventQueueCreate(self, True))
 
     def __del__(self):
@@ -87,6 +95,17 @@ class EventQueue(TrackInstances):
     def queue_raw(self, ev):
         self.ev_q.put(ev)
 
+    @abstractmethod
+    def queue(self, ev):
+        pass
+
+
+class EventQueueDirect(EventQueueCommon):
+    def __init__(self, name, dispatcher, aux=None):
+        super(EventQueueDirect, self).__init__(name, dispatcher, aux)
+        self.is_dispatching = False
+        self._init_done()
+
     class _EventQueueGuard(object):
         def __init__(self, ev_q):
             self.ev_q = ev_q
@@ -94,7 +113,7 @@ class EventQueue(TrackInstances):
         def __enter__(self):
             self.ev_q.is_dispatching = True
 
-        def __exit__(self, type_, value, traceback):
+        def __exit__(self, type_, value, traceback_):
             self.ev_q.is_dispatching = False
             return False
 
@@ -106,10 +125,65 @@ class EventQueue(TrackInstances):
         with self._EventQueueGuard(self):
             assert self.ev_q.empty()
 
+            try:
+                self._dispatcher(ev)
+                while not self.ev_q.empty():
+                    ev = self.ev_q.get()
+                    self._dispatcher(ev)
+            except:
+                traceback.print_exc()
+                raise
+
+
+EventQueue = EventQueueDirect
+
+
+class EventQueueThread(EventQueueCommon):
+    _DISPATCHER_LOCK = '_dispatcher_lock'
+
+    class _EventRequestClose(object):
+        pass
+
+    def __init__(self, name, dispatcher, aux=None):
+        super(EventQueueThread, self).__init__(name, dispatcher, aux)
+        self.is_active = True
+        self._dispatcher_lock = gevent.coros.Semaphore()
+        self.serve_thread = gevent.spawn(self._serve)
+        self._init_done()
+
+    def close(self):
+        self.is_active = False
+        self.queue_raw(self._EventRequestClose)
+        self.serve_thread.join()
+        super(EventQueueThread, self).close()
+
+    def _get_and_dispatch(self):
+        ev = self.ev_q.get()
+        if ev == self._EventRequestClose:
+            assert not self.is_active
+            return
+        with self._dispatcher_lock:
             self._dispatcher(ev)
+
+    def _serve(self):
+        try:
+            while self.is_active:
+                self._get_and_dispatch()
+        except:
+            traceback.print_exc()  # for debug
+            raise
+        finally:
+            self.is_active = False
+            # drain remianing events
             while not self.ev_q.empty():
-                ev = self.ev_q.get()
-                self._dispatcher(ev)
+                self._get_and_dispatch()
+
+    @synchronized.synchronized(_DISPATCHER_LOCK)
+    def set_dispatcher(self, dispatcher):
+        super(EventQueueThread, self).set_dipatcher(dispatcher)
+
+    def queue(self, ev):
+        self.queue_raw(ev)
 
 
 class EventDispatcher(TrackInstances):
-- 
1.7.10.4


------------------------------------------------------------------------------
Keep yourself connected to Go Parallel: 
TUNE You got it built. Now make it sing. Tune shows you how.
http://goparallel.sourceforge.net
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to