Hopefully, this is the final revision (until we have additional data from long-term stability testing).
I have tweaked the queue sizes, and timeouts for put() and get(). Based on some of the failure modes we have been seeing with 10G switches, it appears that there might be a bug in eventlet’s queue.get(). It looks to be a missed wakeup on queue.put(), when things get busy enough; it’s either that, or the wakeup never gets serviced if there’s enough put() traffic. I am hoping that adding timeouts to the get() will work around the issue, by forcing a wakeup on a timer. On top of that, blocking put() to the event queue can completely stop the Datapath greenlets - so we now have a timeout on that. Blocking an event queue put() in the Datapath appears to be worse than blocking get() in the event queue, so I’ve done 2 things to address that: 1) I made the put() timeout the shorter of the two. 2) I allowed the event queue to grow, if put() would block. In this way, the Datapath greenlets can always make progress. If the switch corresponding to a greenlet disconnects while the put() is blocked, the put will either succeed or timeout, allowing the loop to progress to discovering the dead socket and cleaning it up. Increasing the size of event queue is a tradeoff. On one hand, the event queue can potentially increase without bound, given enough event traffic (and thereby consume too much memory/become difficult to service). On the other hand, peak put() rates can outstrip the ability of the single greenlet performing get() to service them - which can further lead to a cascade of blocked put() operations flooding the queue as the get() works its way through. That doesn’t even begin to take into account the event disaster that occurs when the switch disconnects due to echo request events not being serviced in a timely manner, and then re-connects. Since number of event queues is limited by the number of applications loaded, and modern systems *should* have enough memory to deal with a fair amount of queue growth, I decided that allowing the queue to grow dynamically was the better solution in high-event rate environments. Shrinking the queue seemed somewhat pointless - once a high water mark is achieved in a given environment, it seems more than possible to me that the queue might reach that level again. It *may* make sense, in the future, to implement a configuration option to limit the maximum size to which the queue can dynamically expand - but given the difficulty of tracing the controller’s failure in the event of an event queue fill, I’d rather just have the controller logging the constant event queue increases (so that the administrator can see that the network environment is generating so many events that the controller cannot cope). Also of note: the initial size of the event queue is now a multiple of the number of iterations through the event loop. This is intended to give some head room for the event queue, before it begins increasing on the number of received events. I’ve also added a timeout to the send_q get(), in the event that the seeming missed wakeup issue ends up affecting it as well. We’re currently running this and evaluating the stability under load. I will follow up with future tweaks, if we find additional issues with event queue processing that result in hangs. Please find the revised patch below. Signed-off-by: Victor J. Orlikowski <[email protected]> diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py index 3d5d895..14c89fc 100644 --- a/ryu/base/app_manager.py +++ b/ryu/base/app_manager.py @@ -157,7 +157,7 @@ class RyuApp(object): self.observers = {} # ev_cls -> observer-name -> states:set self.threads = [] self.main_thread = None - self.events = hub.Queue(128) + self.events = hub.Queue(4 * Datapath.RECV_LOOP_FENCE) if hasattr(self.__class__, 'LOGGER_NAME'): self.logger = logging.getLogger(self.__class__.LOGGER_NAME) else: @@ -168,6 +168,8 @@ class RyuApp(object): class _EventThreadStop(event.EventBase): pass self._event_stop = _EventThreadStop() + self._event_put_timeout = 2 + self._event_get_timeout = (4 * self._event_put_timeout) self.is_active = True def start(self): @@ -279,15 +281,33 @@ class RyuApp(object): def _event_loop(self): while self.is_active or not self.events.empty(): - ev, state = self.events.get() - if ev == self._event_stop: + ev = state = None + try: + ev, state = self.events.get(timeout=self._event_get_timeout) + except hub.QueueEmpty: + continue + if (ev is None) or (ev == self._event_stop): continue handlers = self.get_handlers(ev, state) for handler in handlers: handler(ev) def _send_event(self, ev, state): - self.events.put((ev, state)) + if self.events.full(): + old_queue_size = self.events.qsize() + new_queue_size = (2 * old_queue_size) + self.events.resize(new_queue_size) + LOG.debug("Size of event queue for %s increased from %d to %d", + self.name, old_queue_size, new_queue_size) + try: + self.events.put((ev, state), timeout=self._event_put_timeout) + except hub.QueueFull: + # It should be impossible for us to reach this, but... + # + # If the queue is being filled so fast that we get a QueueFull + # despite checking and resizing, log and drop the event. + LOG.debug("EVENT LOST FOR %s %s", + self.name, ev.__class__.__name__) def send_event(self, name, ev, state=None): """ @@ -520,7 +540,7 @@ class AppManager(object): self._close(app) events = app.events if not events.empty(): - app.logger.debug('%s events remians %d', app.name, events.qsize()) + app.logger.debug('%s events remains %d', app.name, events.qsize()) def close(self): def close_all(close_dict): diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py index 25b8776..37b257b 100644 --- a/ryu/controller/controller.py +++ b/ryu/controller/controller.py @@ -109,6 +109,8 @@ def _deactivate(method): class Datapath(ofproto_protocol.ProtocolDesc): + RECV_LOOP_FENCE = 32 + def __init__(self, socket, address): super(Datapath, self).__init__() @@ -123,6 +125,7 @@ class Datapath(ofproto_protocol.ProtocolDesc): # The limit is arbitrary. We need to limit queue size to # prevent it from eating memory up self.send_q = hub.Queue(16) + self._send_q_timeout = 5 self.xid = random.randint(0, self.ofproto.MAX_XID) self.id = None # datapath_id is unknown yet @@ -211,7 +214,7 @@ class Datapath(ofproto_protocol.ProtocolDesc): # switches. The limit is arbitrary. We need the better # approach in the future. count += 1 - if count > 2048: + if count > Datapath.RECV_LOOP_FENCE: count = 0 hub.sleep(0) @@ -219,7 +222,10 @@ class Datapath(ofproto_protocol.ProtocolDesc): def _send_loop(self): try: while self.send_active: - buf = self.send_q.get() + try: + buf = self.send_q.get(timeout=self._send_q_timeout) + except hub.QueueEmpty: + continue self.socket.sendall(buf) except IOError as ioe: LOG.debug("Socket error while sending data to switch at address %s: [%d] %s", diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py index 5621147..954bfc8 100644 --- a/ryu/lib/hub.py +++ b/ryu/lib/hub.py @@ -91,6 +91,7 @@ if HUB_TYPE == 'eventlet': pass Queue = eventlet.queue.Queue + QueueFull = eventlet.queue.Full QueueEmpty = eventlet.queue.Empty Semaphore = eventlet.semaphore.Semaphore BoundedSemaphore = eventlet.semaphore.BoundedSemaphore Best, Victor -- Victor J. Orlikowski <> vjo@[cs.]duke.edu ------------------------------------------------------------------------------ _______________________________________________ Ryu-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/ryu-devel
