And one more.
As I thought about the drain rate from event queue, it became obvious to me the 
we need to give get() operations preferential treatment.

Please find the revised patch below.

Signed-off-by: Victor J. Orlikowski <[email protected]>

diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py
index 3d5d895..490c85f 100644
--- a/ryu/base/app_manager.py
+++ b/ryu/base/app_manager.py
@@ -157,7 +157,7 @@ class RyuApp(object):
         self.observers = {}     # ev_cls -> observer-name -> states:set
         self.threads = []
         self.main_thread = None
-        self.events = hub.Queue(128)
+        self.events = hub.Queue(2 * Datapath.RECV_LOOP_FENCE)
         if hasattr(self.__class__, 'LOGGER_NAME'):
             self.logger = logging.getLogger(self.__class__.LOGGER_NAME)
         else:
@@ -168,6 +168,8 @@ class RyuApp(object):
         class _EventThreadStop(event.EventBase):
             pass
         self._event_stop = _EventThreadStop()
+        self._event_get_timeout = 5
+        self._event_put_timeout = (2 * self._event_get_timeout)
         self.is_active = True
 
     def start(self):
@@ -279,15 +281,23 @@ class RyuApp(object):
 
     def _event_loop(self):
         while self.is_active or not self.events.empty():
-            ev, state = self.events.get()
-            if ev == self._event_stop:
+            ev = state = None
+            try:
+                ev, state = self.events.get(timeout=self._event_get_timeout)
+            except hub.QueueEmpty:
+                continue
+            if (ev is None) or (ev == self._event_stop):
                 continue
             handlers = self.get_handlers(ev, state)
             for handler in handlers:
                 handler(ev)
 
     def _send_event(self, ev, state):
-        self.events.put((ev, state))
+        try:
+            self.events.put((ev, state), timeout=self._event_put_timeout)
+        except hub.QueueFull:
+            LOG.debug("EVENT LOST FOR %s %s",
+                      self.name, ev.__class__.__name__)
 
     def send_event(self, name, ev, state=None):
         """
@@ -520,7 +530,7 @@ class AppManager(object):
         self._close(app)
         events = app.events
         if not events.empty():
-            app.logger.debug('%s events remians %d', app.name, events.qsize())
+            app.logger.debug('%s events remains %d', app.name, events.qsize())
 
     def close(self):
         def close_all(close_dict):
diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py
index 25b8776..06b30f6 100644
--- a/ryu/controller/controller.py
+++ b/ryu/controller/controller.py
@@ -109,6 +109,8 @@ def _deactivate(method):
 
 
 class Datapath(ofproto_protocol.ProtocolDesc):
+    RECV_LOOP_FENCE = 128
+
     def __init__(self, socket, address):
         super(Datapath, self).__init__()

@@ -211,7 +213,7 @@ class Datapath(ofproto_protocol.ProtocolDesc):
                 # switches. The limit is arbitrary. We need the better
                 # approach in the future.
                 count += 1
-                if count > 2048:
+                if count > Datapath.RECV_LOOP_FENCE:
                     count = 0
                     hub.sleep(0)
diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py
index 5621147..954bfc8 100644
--- a/ryu/lib/hub.py
+++ b/ryu/lib/hub.py
@@ -91,6 +91,7 @@ if HUB_TYPE == 'eventlet':
                 pass
 
     Queue = eventlet.queue.Queue
+    QueueFull = eventlet.queue.Full
     QueueEmpty = eventlet.queue.Empty
     Semaphore = eventlet.semaphore.Semaphore
     BoundedSemaphore = eventlet.semaphore.BoundedSemaphore


Best,
Victor
--
Victor J. Orlikowski <> vjo@[cs.]duke.edu


------------------------------------------------------------------------------
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to