If event is dispatched directly, event handler may block. Handlers from multiple source may have race condition. In order to maximize parallelism and minimize blocking and avoid race condition, it is useful to create another event queue and create dedicated thread to dispatch those events. Events from multiple sources will be serialized and blocking is mitigated.
Signed-off-by: Isaku Yamahata <[email protected]> --- ryu/controller/dispatcher.py | 86 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 80 insertions(+), 6 deletions(-) diff --git a/ryu/controller/dispatcher.py b/ryu/controller/dispatcher.py index 01cdcf4..4fbd1fc 100644 --- a/ryu/controller/dispatcher.py +++ b/ryu/controller/dispatcher.py @@ -14,18 +14,26 @@ # See the License for the specific language governing permissions and # limitations under the License. +from abc import ABCMeta, abstractmethod import copy +import gevent +import gevent.coros import logging +import traceback from gevent.queue import Queue +from ryu.controller import event +from ryu.lib import synchronized from ryu.lib.track_instances import TrackInstances from . import event + LOG = logging.getLogger('ryu.controller.dispatcher') -class EventQueue(TrackInstances): +class EventQueueCommon(TrackInstances): + __metaclass__ = ABCMeta _ev_q = None def set_ev_q(self): @@ -43,13 +51,13 @@ class EventQueue(TrackInstances): ev_q.queue(ev) def __init__(self, name, dispatcher, aux=None): - super(EventQueue, self).__init__() + super(EventQueueCommon, self).__init__() self.name = name self._dispatcher = dispatcher.clone() - self.is_dispatching = False self.ev_q = Queue() self.aux = aux # for EventQueueCreate event + def _init_done(self): self._queue_q_ev(EventQueueCreate(self, True)) def __del__(self): @@ -87,6 +95,17 @@ class EventQueue(TrackInstances): def queue_raw(self, ev): self.ev_q.put(ev) + @abstractmethod + def queue(self, ev): + pass + + +class EventQueueDirect(EventQueueCommon): + def __init__(self, name, dispatcher, aux=None): + super(EventQueueDirect, self).__init__(name, dispatcher, aux) + self.is_dispatching = False + self._init_done() + class _EventQueueGuard(object): def __init__(self, ev_q): self.ev_q = ev_q @@ -94,7 +113,7 @@ class EventQueue(TrackInstances): def __enter__(self): self.ev_q.is_dispatching = True - def __exit__(self, type_, value, traceback): + def __exit__(self, type_, value, traceback_): self.ev_q.is_dispatching = False return False @@ -106,10 +125,65 @@ class EventQueue(TrackInstances): with self._EventQueueGuard(self): assert self.ev_q.empty() + try: + self._dispatcher(ev) + while not self.ev_q.empty(): + ev = self.ev_q.get() + self._dispatcher(ev) + except: + traceback.print_exc() + raise + + +EventQueue = EventQueueDirect + + +class EventQueueThread(EventQueueCommon): + _DISPATCHER_LOCK = '_dispatcher_lock' + + class _EventRequestClose(object): + pass + + def __init__(self, name, dispatcher, aux=None): + super(EventQueueThread, self).__init__(name, dispatcher, aux) + self.is_active = True + self._dispatcher_lock = gevent.coros.Semaphore() + self.serve_thread = gevent.spawn(self._serve) + self._init_done() + + def close(self): + self.is_active = False + self.queue_raw(self._EventRequestClose) + self.serve_thread.join() + super(EventQueueThread, self).close() + + def _get_and_dispatch(self): + ev = self.ev_q.get() + if ev == self._EventRequestClose: + assert not self.is_active + return + with self._dispatcher_lock: self._dispatcher(ev) + + def _serve(self): + try: + while self.is_active: + self._get_and_dispatch() + except: + traceback.print_exc() # for debug + raise + finally: + self.is_active = False + # drain remianing events while not self.ev_q.empty(): - ev = self.ev_q.get() - self._dispatcher(ev) + self._get_and_dispatch() + + @synchronized.synchronized(_DISPATCHER_LOCK) + def set_dispatcher(self, dispatcher): + super(EventQueueThread, self).set_dipatcher(dispatcher) + + def queue(self, ev): + self.queue_raw(ev) class EventDispatcher(TrackInstances): -- 1.7.10.4 ------------------------------------------------------------------------------ Keep yourself connected to Go Parallel: TUNE You got it built. Now make it sing. Tune shows you how. http://goparallel.sourceforge.net _______________________________________________ Ryu-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/ryu-devel
