Hopefully, this is the final revision (until we have additional data from 
long-term stability testing).

I have tweaked the queue sizes, and timeouts for put() and get().

Based on some of the failure modes we have been seeing with 10G switches, it 
appears that there might be a bug in eventlet’s queue.get().
It looks to be a missed wakeup on queue.put(), when things get busy enough; 
it’s either that, or the wakeup never gets serviced if there’s enough put() 
traffic.

I am hoping that adding timeouts to the get() will work around the issue, by 
forcing a wakeup on a timer.

On top of that, blocking put() to the event queue can completely stop the 
Datapath greenlets - so we now have a timeout on that.

Blocking an event queue put() in the Datapath appears to be worse than blocking 
get() in the event queue, so I’ve done 2 things to address that:
1) I made the put() timeout the shorter of the two.
2) I allowed the event queue to grow, if put() would block.

In this way, the Datapath greenlets can always make progress.
If the switch corresponding to a greenlet disconnects while the put() is 
blocked, the put will either succeed or timeout, allowing the loop to progress 
to discovering the dead socket and cleaning it up.

Increasing the size of event queue is a tradeoff.
On one hand, the event queue can potentially increase without bound, given 
enough event traffic (and thereby consume too much memory/become difficult to 
service).
On the other hand, peak put() rates can outstrip the ability of the single 
greenlet performing get() to service them - which can further lead to a cascade 
of blocked put() operations flooding the queue as the get() works its way 
through.
That doesn’t even begin to take into account the event disaster that occurs 
when the switch disconnects due to echo request events not being serviced in a 
timely manner, and then re-connects.

Since number of event queues is limited by the number of applications loaded, 
and modern systems *should* have enough memory to deal with a fair amount of 
queue growth, I decided that allowing the queue to grow dynamically was the 
better solution in high-event rate environments.

Shrinking the queue seemed somewhat pointless - once a high water mark is 
achieved in a given environment, it seems more than possible to me that the 
queue might reach that level again.

It *may* make sense, in the future, to implement a configuration option to 
limit the maximum size to which the queue can dynamically expand - but given 
the difficulty of tracing the controller’s failure in the event of an event 
queue fill, I’d rather just have the controller logging the constant event 
queue increases (so that the administrator can see that the network environment 
is generating so many events that the controller cannot cope).

Also of note: the initial size of the event queue is now a multiple of the 
number of iterations through the event loop.
This is intended to give some head room for the event queue, before it begins 
increasing on the number of received events.

I’ve also added a timeout to the send_q get(), in the event that the seeming 
missed wakeup issue ends up affecting it as well.

We’re currently running this and evaluating the stability under load.
I will follow up with future tweaks, if we find additional issues with event 
queue processing that result in hangs.

Please find the revised patch below.

Signed-off-by: Victor J. Orlikowski <[email protected]>

diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py
index 3d5d895..14c89fc 100644
--- a/ryu/base/app_manager.py
+++ b/ryu/base/app_manager.py
@@ -157,7 +157,7 @@ class RyuApp(object):
         self.observers = {}     # ev_cls -> observer-name -> states:set
         self.threads = []
         self.main_thread = None
-        self.events = hub.Queue(128)
+        self.events = hub.Queue(4 * Datapath.RECV_LOOP_FENCE)
         if hasattr(self.__class__, 'LOGGER_NAME'):
             self.logger = logging.getLogger(self.__class__.LOGGER_NAME)
         else:
@@ -168,6 +168,8 @@ class RyuApp(object):
         class _EventThreadStop(event.EventBase):
             pass
         self._event_stop = _EventThreadStop()
+        self._event_put_timeout = 2
+        self._event_get_timeout = (4 * self._event_put_timeout)
         self.is_active = True
 
     def start(self):
@@ -279,15 +281,33 @@ class RyuApp(object):
 
     def _event_loop(self):
         while self.is_active or not self.events.empty():
-            ev, state = self.events.get()
-            if ev == self._event_stop:
+            ev = state = None
+            try:
+                ev, state = self.events.get(timeout=self._event_get_timeout)
+            except hub.QueueEmpty:
+                continue
+            if (ev is None) or (ev == self._event_stop):
                 continue
             handlers = self.get_handlers(ev, state)
             for handler in handlers:
                 handler(ev)
 
     def _send_event(self, ev, state):
-        self.events.put((ev, state))
+        if self.events.full():
+            old_queue_size = self.events.qsize()
+            new_queue_size = (2 * old_queue_size)
+            self.events.resize(new_queue_size)
+            LOG.debug("Size of event queue for %s increased from %d to %d",
+                      self.name, old_queue_size, new_queue_size)
+        try:
+            self.events.put((ev, state), timeout=self._event_put_timeout)
+        except hub.QueueFull:
+            # It should be impossible for us to reach this, but...
+            #
+            # If the queue is being filled so fast that we get a QueueFull
+            # despite checking and resizing, log and drop the event.
+            LOG.debug("EVENT LOST FOR %s %s",
+                      self.name, ev.__class__.__name__)
 
     def send_event(self, name, ev, state=None):
         """
@@ -520,7 +540,7 @@ class AppManager(object):
         self._close(app)
         events = app.events
         if not events.empty():
-            app.logger.debug('%s events remians %d', app.name, events.qsize())
+            app.logger.debug('%s events remains %d', app.name, events.qsize())
 
     def close(self):
         def close_all(close_dict):
diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py
index 25b8776..37b257b 100644
--- a/ryu/controller/controller.py
+++ b/ryu/controller/controller.py
@@ -109,6 +109,8 @@ def _deactivate(method):
 
 
 class Datapath(ofproto_protocol.ProtocolDesc):
+    RECV_LOOP_FENCE = 32
+
     def __init__(self, socket, address):
         super(Datapath, self).__init__()
 
@@ -123,6 +125,7 @@ class Datapath(ofproto_protocol.ProtocolDesc):
         # The limit is arbitrary. We need to limit queue size to
         # prevent it from eating memory up
         self.send_q = hub.Queue(16)
+        self._send_q_timeout = 5
 
         self.xid = random.randint(0, self.ofproto.MAX_XID)
         self.id = None  # datapath_id is unknown yet
@@ -211,7 +214,7 @@ class Datapath(ofproto_protocol.ProtocolDesc):
                 # switches. The limit is arbitrary. We need the better
                 # approach in the future.
                 count += 1
-                if count > 2048:
+                if count > Datapath.RECV_LOOP_FENCE:
                     count = 0
                     hub.sleep(0)
 
@@ -219,7 +222,10 @@ class Datapath(ofproto_protocol.ProtocolDesc):
     def _send_loop(self):
         try:
             while self.send_active:
-                buf = self.send_q.get()
+                try:
+                    buf = self.send_q.get(timeout=self._send_q_timeout)
+                except hub.QueueEmpty:
+                    continue
                 self.socket.sendall(buf)
         except IOError as ioe:
             LOG.debug("Socket error while sending data to switch at address 
%s: [%d] %s",
diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py
index 5621147..954bfc8 100644
--- a/ryu/lib/hub.py
+++ b/ryu/lib/hub.py
@@ -91,6 +91,7 @@ if HUB_TYPE == 'eventlet':
                 pass
 
     Queue = eventlet.queue.Queue
+    QueueFull = eventlet.queue.Full
     QueueEmpty = eventlet.queue.Empty
     Semaphore = eventlet.semaphore.Semaphore
     BoundedSemaphore = eventlet.semaphore.BoundedSemaphore


Best,
Victor
--
Victor J. Orlikowski <> vjo@[cs.]duke.edu

------------------------------------------------------------------------------
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to