And one more. As I thought about the drain rate from event queue, it became obvious to me the we need to give get() operations preferential treatment.
Please find the revised patch below. Signed-off-by: Victor J. Orlikowski <[email protected]> diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py index 3d5d895..490c85f 100644 --- a/ryu/base/app_manager.py +++ b/ryu/base/app_manager.py @@ -157,7 +157,7 @@ class RyuApp(object): self.observers = {} # ev_cls -> observer-name -> states:set self.threads = [] self.main_thread = None - self.events = hub.Queue(128) + self.events = hub.Queue(2 * Datapath.RECV_LOOP_FENCE) if hasattr(self.__class__, 'LOGGER_NAME'): self.logger = logging.getLogger(self.__class__.LOGGER_NAME) else: @@ -168,6 +168,8 @@ class RyuApp(object): class _EventThreadStop(event.EventBase): pass self._event_stop = _EventThreadStop() + self._event_get_timeout = 5 + self._event_put_timeout = (2 * self._event_get_timeout) self.is_active = True def start(self): @@ -279,15 +281,23 @@ class RyuApp(object): def _event_loop(self): while self.is_active or not self.events.empty(): - ev, state = self.events.get() - if ev == self._event_stop: + ev = state = None + try: + ev, state = self.events.get(timeout=self._event_get_timeout) + except hub.QueueEmpty: + continue + if (ev is None) or (ev == self._event_stop): continue handlers = self.get_handlers(ev, state) for handler in handlers: handler(ev) def _send_event(self, ev, state): - self.events.put((ev, state)) + try: + self.events.put((ev, state), timeout=self._event_put_timeout) + except hub.QueueFull: + LOG.debug("EVENT LOST FOR %s %s", + self.name, ev.__class__.__name__) def send_event(self, name, ev, state=None): """ @@ -520,7 +530,7 @@ class AppManager(object): self._close(app) events = app.events if not events.empty(): - app.logger.debug('%s events remians %d', app.name, events.qsize()) + app.logger.debug('%s events remains %d', app.name, events.qsize()) def close(self): def close_all(close_dict): diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py index 25b8776..06b30f6 100644 --- a/ryu/controller/controller.py +++ b/ryu/controller/controller.py @@ -109,6 +109,8 @@ def _deactivate(method): class Datapath(ofproto_protocol.ProtocolDesc): + RECV_LOOP_FENCE = 128 + def __init__(self, socket, address): super(Datapath, self).__init__() @@ -211,7 +213,7 @@ class Datapath(ofproto_protocol.ProtocolDesc): # switches. The limit is arbitrary. We need the better # approach in the future. count += 1 - if count > 2048: + if count > Datapath.RECV_LOOP_FENCE: count = 0 hub.sleep(0) diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py index 5621147..954bfc8 100644 --- a/ryu/lib/hub.py +++ b/ryu/lib/hub.py @@ -91,6 +91,7 @@ if HUB_TYPE == 'eventlet': pass Queue = eventlet.queue.Queue + QueueFull = eventlet.queue.Full QueueEmpty = eventlet.queue.Empty Semaphore = eventlet.semaphore.Semaphore BoundedSemaphore = eventlet.semaphore.BoundedSemaphore Best, Victor -- Victor J. Orlikowski <> vjo@[cs.]duke.edu ------------------------------------------------------------------------------ _______________________________________________ Ryu-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/ryu-devel
