Folks, Sigh. As soon as I submit it, I think of a one-liner defensive addition to my patch.
Literally, the change is: < + if isinstance(e, hub.Timeout): --- > + if ((isinstance(e, hub.Timeout)) and (e is > handler_execution_timeout)): Since I should be verifying that the Timeout object I’m handling is the *same* as the one I set, per: http://eventlet.net/doc/modules/timeout.html My apologies for the brown paper bag redo. Signed-off-by: Victor J. Orlikowski <[email protected]> diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py index 3d5d895..eac2c42 100644 --- a/ryu/base/app_manager.py +++ b/ryu/base/app_manager.py @@ -37,6 +37,7 @@ from ryu.controller.handler import register_instance, get_dependent_services from ryu.controller.controller import Datapath from ryu.controller import event from ryu.controller.event import EventRequestBase, EventReplyBase +from ryu.controller.ofp_event import EventOFPStateChange, EventOFPPacketIn from ryu.lib import hub from ryu.ofproto import ofproto_protocol @@ -157,12 +158,25 @@ class RyuApp(object): self.observers = {} # ev_cls -> observer-name -> states:set self.threads = [] self.main_thread = None - self.events = hub.Queue(128) + self.events = hub.Queue(32) + self.sc_events = hub.Queue(32) + self.pi_events = hub.Queue(32) + self._events_sem = hub.BoundedSemaphore(self.events.maxsize) + self._sc_events_sem = hub.BoundedSemaphore(self.sc_events.maxsize) + self._pi_events_sem = hub.BoundedSemaphore(self.pi_events.maxsize) + self._event_get_timeout = 5 + if hasattr(self.__class__, 'LOGGER_NAME'): self.logger = logging.getLogger(self.__class__.LOGGER_NAME) else: self.logger = logging.getLogger(self.name) self.CONF = cfg.CONF + self.CONF.register_opts([ + cfg.FloatOpt('handler-execution-timeout', + default=10.0, + help='Maximum time, in seconds, to permit handlers to run.') + ]) + self.handler_execution_timeout = self.CONF.handler_execution_timeout # prevent accidental creation of instances of this class outside RyuApp class _EventThreadStop(event.EventBase): @@ -279,15 +293,65 @@ class RyuApp(object): def _event_loop(self): while self.is_active or not self.events.empty(): - ev, state = self.events.get() - if ev == self._event_stop: + # Process events according to priority. + # StatusChange highest, PacketIn lowest, all others in between. + ev = state = None + if self.sc_events.qsize(): + try: + ev, state = self.sc_events.get(timeout=self._event_get_timeout) + except hub.QueueEmpty: + pass + else: + self._sc_events_sem.release() + + if (ev is None) and (self.events.qsize()): + try: + ev, state = self.events.get(timeout=self._event_get_timeout) + except hub.QueueEmpty: + pass + else: + self._events_sem.release() + + if ev is None: + try: + ev, state = self.pi_events.get(timeout=self._event_get_timeout) + except hub.QueueEmpty: + pass + else: + self._pi_events_sem.release() + + if (ev is None) or (ev == self._event_stop): continue handlers = self.get_handlers(ev, state) for handler in handlers: - handler(ev) + handler_execution_timeout = hub.Timeout(self.handler_execution_timeout) + try: + handler(ev) + except hub.TaskExit: + # Normal exit. + # Propagate upwards, so we leave the event loop. + raise + except Exception as e: + if ((isinstance(e, hub.Timeout)) and (e is handler_execution_timeout)): + LOG.error('%s: Handler exceeded maximum execution time; terminated.', self.name) + else: + LOG.error('%s: Exception occurred during handler processing.', self.name) + LOG.exception('%s: Backtrace from offending handler ' + '[%s] servicing event [%s] follows.', + self.name, handler.__name__, ev.__class__.__name__) + finally: + handler_execution_timeout.cancel() def _send_event(self, ev, state): - self.events.put((ev, state)) + if isinstance(ev, EventOFPStateChange): + self._sc_events_sem.acquire() + self.sc_events.put((ev, state), block=False) + elif isinstance(ev, EventOFPPacketIn): + self._pi_events_sem.acquire() + self.pi_events.put((ev, state), block=False) + else: + self._events_sem.acquire() + self.events.put((ev, state), block=False) def send_event(self, name, ev, state=None): """ @@ -336,7 +400,7 @@ class RyuApp(object): class AppManager(object): - # singletone + # singleton _instance = None @staticmethod @@ -519,8 +583,14 @@ class AppManager(object): app.stop() self._close(app) events = app.events + sc_events = app.sc_events + pi_events = app.pi_events + if not sc_events.empty(): + app.logger.debug('%s state changes remains %d', app.name, sc_events.qsize()) + if not pi_events.empty(): + app.logger.debug('%s PacketIn events remains %d', app.name, pi_events.qsize()) if not events.empty(): - app.logger.debug('%s events remians %d', app.name, events.qsize()) + app.logger.debug('%s events remains %d', app.name, events.qsize()) def close(self): def close_all(close_dict): diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py index 25b8776..0fd19f6 100644 --- a/ryu/controller/controller.py +++ b/ryu/controller/controller.py @@ -30,7 +30,7 @@ from ryu.lib.hub import StreamServer import traceback import random import ssl -from socket import IPPROTO_TCP, TCP_NODELAY, timeout as SocketTimeout, error as SocketError +from socket import IPPROTO_TCP, TCP_NODELAY, SHUT_RDWR, timeout as SocketTimeout import warnings import ryu.base.app_manager @@ -41,8 +41,10 @@ from ryu.ofproto import ofproto_protocol from ryu.ofproto import ofproto_v1_0 from ryu.ofproto import nx_match -from ryu.controller import handler from ryu.controller import ofp_event +from ryu.controller.handler import HANDSHAKE_DISPATCHER, MAIN_DISPATCHER, DEAD_DISPATCHER + +from ryu.exception import RyuException from ryu.lib.dpid import dpid_to_str @@ -58,8 +60,16 @@ CONF.register_cli_opts([ cfg.StrOpt('ctl-privkey', default=None, help='controller private key'), cfg.StrOpt('ctl-cert', default=None, help='controller certificate'), cfg.StrOpt('ca-certs', default=None, help='CA certificates'), - cfg.FloatOpt('socket-timeout', default=5.0, help='Time, in seconds, to await completion of socket operations.') ]) +CONF.register_opts([ + cfg.FloatOpt('socket-timeout', + default=5.0, + help='Time, in seconds, to await completion of socket operations.') +]) + + +class DatapathShutdown(RyuException): + message = 'Datapath shutdown was requested' class OpenFlowController(object): @@ -102,9 +112,22 @@ def _deactivate(method): def deactivate(self): try: method(self) + except DatapathShutdown: + if self.socket is not None: + self.socket.close() + self.socket = None + if self.recv_thread: + self.recv_thread.throw(DatapathShutdown) + if self.send_thread: + self.send_thread.throw(DatapathShutdown) finally: - self.send_active = False - self.set_state(handler.DEAD_DISPATCHER) + if self.socket is not None: + try: + self.socket.shutdown(SHUT_RDWR) + except (EOFError, IOError): + pass + if self.state is not DEAD_DISPATCHER: + self.set_state(DEAD_DISPATCHER) return deactivate @@ -117,19 +140,26 @@ class Datapath(ofproto_protocol.ProtocolDesc): self.socket.settimeout(CONF.socket_timeout) self.address = address - self.send_active = True + self.send_thread = None + self.recv_thread = None + self.close_requested = False # The limit is arbitrary. We need to limit queue size to - # prevent it from eating memory up + # prevent it from eating memory up. + # Similarly, the timeouts are also arbitrary. self.send_q = hub.Queue(16) + self._send_q_sem = hub.BoundedSemaphore(self.send_q.maxsize) + self._send_q_timeout = 5 + self._unknown_datapath_timeout = 20 self.xid = random.randint(0, self.ofproto.MAX_XID) self.id = None # datapath_id is unknown yet self._ports = None self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10 self.ofp_brick = ryu.base.app_manager.lookup_service_brick('ofp_event') - self.set_state(handler.HANDSHAKE_DISPATCHER) + self.assured = False + self.set_state(HANDSHAKE_DISPATCHER) def _get_ports(self): if (self.ofproto_parser is not None and @@ -137,8 +167,8 @@ class Datapath(ofproto_protocol.ProtocolDesc): message = ( 'Datapath#ports is kept for compatibility with the previous ' 'openflow versions (< 1.3). ' - 'This not be updated by EventOFPPortStatus message. ' - 'If you want to be updated, you can use ' + 'This is not updated by the EventOFPPortStatus message. ' + 'If you want to be updated, you should use ' '\'ryu.controller.dpset\' or \'ryu.topology.switches\'.' ) warnings.warn(message, stacklevel=2) @@ -163,64 +193,107 @@ class Datapath(ofproto_protocol.ProtocolDesc): # Low level socket handling layer @_deactivate def _recv_loop(self): - buf = bytearray() - required_len = ofproto_common.OFP_HEADER_SIZE - - count = 0 - while True: - ret = "" - - try: - ret = self.socket.recv(required_len) - except SocketTimeout: - if not self.close_requested: - continue - except SocketError: - self.close_requested = True - - if (len(ret) == 0) or (self.close_requested): - self.socket.close() - break - - buf += ret - while len(buf) >= required_len: - (version, msg_type, msg_len, xid) = ofproto_parser.header(buf) - required_len = msg_len - if len(buf) < required_len: - break - - msg = ofproto_parser.msg( - self, version, msg_type, msg_len, xid, buf[:msg_len]) - # LOG.debug('queue msg %s cls %s', msg, msg.__class__) - if msg: - ev = ofp_event.ofp_msg_to_ev(msg) - self.ofp_brick.send_event_to_observers(ev, self.state) - - dispatchers = lambda x: x.callers[ev.__class__].dispatchers - handlers = [handler for handler in - self.ofp_brick.get_handlers(ev) if - self.state in dispatchers(handler)] - for handler in handlers: - handler(ev) - - buf = buf[required_len:] - required_len = ofproto_common.OFP_HEADER_SIZE - - # We need to schedule other greenlets. Otherwise, ryu - # can't accept new switches or handle the existing - # switches. The limit is arbitrary. We need the better - # approach in the future. - count += 1 - if count > 2048: - count = 0 - hub.sleep(0) + unknown_datapath_timeout = hub.Timeout(self._unknown_datapath_timeout, DatapathShutdown) + try: + buf = bytearray() + required_len = ofproto_common.OFP_HEADER_SIZE + + count = 0 + while True: + ret = "" + + try: + ret = self.socket.recv(required_len) + except SocketTimeout: + if not self.close_requested: + continue + except (AttributeError, EOFError, IOError): + self.close_requested = True + + if (len(ret) == 0) or (self.close_requested): + raise DatapathShutdown + + if self.assured: + unknown_datapath_timeout.cancel() + + buf += ret + while len(buf) >= required_len: + (version, msg_type, msg_len, xid) = ofproto_parser.header(buf) + required_len = msg_len + if len(buf) < required_len: + break + + msg = ofproto_parser.msg( + self, version, msg_type, msg_len, xid, buf[:msg_len]) + # LOG.debug('queue msg %s cls %s', msg, msg.__class__) + if msg: + ev = ofp_event.ofp_msg_to_ev(msg) + exec_timeout = self.ofp_brick.handler_execution_timeout + self.ofp_brick.send_event_to_observers(ev, self.state) + + dispatchers = lambda x: x.callers[ev.__class__].dispatchers + handlers = [handler for handler in + self.ofp_brick.get_handlers(ev) if + self.state in dispatchers(handler)] + for handler in handlers: + # Defensively set a timeout for the handler. + # "This should never happen" are famous last words. + handler_timeout = hub.Timeout(exec_timeout) + try: + handler(ev) + finally: + handler_timeout.cancel() + + buf = buf[required_len:] + required_len = ofproto_common.OFP_HEADER_SIZE + + # We need to schedule other greenlets. Otherwise, ryu + # can't accept new switches or handle the existing + # switches. The limit is arbitrary. We need the better + # approach in the future. + count += 1 + if count > 2048: + count = 0 + hub.sleep(0) + except Exception as e: + if isinstance(e, DatapathShutdown): + # Normal termination sequence. + pass + else: + dpid_str = "UNKNOWN" + if self.id is not None: + dpid_str = dpid_to_str(self.id) + LOG.exception('Exception occurred in recv_loop() ' + 'for datapath [%s] connecting from [%s]. ' + 'Shutting down datapath.', + dpid_str, self.address) + finally: + # Shutting down; get rid of the recv_thread reference. + self.recv_thread = None + # Cancel timeout, if we haven't already. + unknown_datapath_timeout.cancel() + # Although it may have been caught above, raise DatapathShutdown. + # This guarantees that close() will be called on the socket. + raise DatapathShutdown @_deactivate def _send_loop(self): try: - while self.send_active: - buf = self.send_q.get() - self.socket.sendall(buf) + while self.socket is not None: + buf = None + try: + buf = self.send_q.get(timeout=self._send_q_timeout) + except hub.QueueEmpty: + pass + else: + self._send_q_sem.release() + + if buf is None: + continue + if self.socket: + self.socket.sendall(buf) + else: + raise DatapathShutdown except IOError as ioe: LOG.debug("Socket error while sending data to switch at address %s: [%d] %s", self.address, ioe.errno, ioe.strerror) @@ -228,17 +301,32 @@ class Datapath(ofproto_protocol.ProtocolDesc): q = self.send_q # first, clear self.send_q to prevent new references. self.send_q = None - # there might be threads currently blocking in send_q.put(). - # unblock them by draining the queue. + # Next, get rid of the send_thread reference. + self.send_thread = None + # Drain the send_q, releasing the associated semaphore for each entry. + # This should release all threads waiting to acquire the semaphore. try: while q.get(block=False): - pass + self._send_q_sem.release() except hub.QueueEmpty: pass def send(self, buf): + acquired = False + dp_closed = True + if self.send_q: - self.send_q.put(buf) + acquired = self._send_q_sem.acquire() + + if self.send_q and acquired: + dp_closed = False + self.send_q.put(buf, block=False) + elif acquired: + self._send_q_sem.release() + + if dp_closed: + LOG.debug('Datapath in process of terminating; send() to %s discarded.', + self.address) def set_xid(self, msg): self.xid += 1 @@ -255,17 +343,15 @@ class Datapath(ofproto_protocol.ProtocolDesc): self.send(msg.buf) def serve(self): - send_thr = hub.spawn(self._send_loop) + # self.recv_thread *MUST* be set before we spawn the send loop! + self.recv_thread = hub.getcurrent() + self.send_thread = hub.spawn(self._send_loop) # send hello message immediately hello = self.ofproto_parser.OFPHello(self) self.send_msg(hello) - try: - self._recv_loop() - finally: - hub.kill(send_thr) - hub.joinall([send_thr]) + self._recv_loop() # # Utility methods for convenience diff --git a/ryu/controller/dpset.py b/ryu/controller/dpset.py index 2682777..a65d05b 100644 --- a/ryu/controller/dpset.py +++ b/ryu/controller/dpset.py @@ -56,6 +56,8 @@ class EventDP(EventDPBase): class EventDPReconnected(EventDPBase): def __init__(self, dp): super(EventDPReconnected, self).__init__(dp) + # port list, which should not change across reconnects + self.ports = [] class EventPortBase(EventDPBase): @@ -127,6 +129,7 @@ class DPSet(app_manager.RyuApp): self.logger.debug('DPSET: New datapath %s', dp) send_dp_reconnected = True self.dps[dp.id] = dp + dp.assured = True if dp.id not in self.port_state: self.port_state[dp.id] = PortState() ev = EventDP(dp, True) @@ -138,6 +141,7 @@ class DPSet(app_manager.RyuApp): self.send_event_to_observers(ev) if send_dp_reconnected: ev = EventDPReconnected(dp) + ev.ports = self.port_state.get(dp.id, {}).values() self.send_event_to_observers(ev) def _unregister(self, dp): @@ -163,7 +167,6 @@ class DPSet(app_manager.RyuApp): """ This method returns the ryu.controller.controller.Datapath instance for the given Datapath ID. - Raises KeyError if no such a datapath connected to this controller. """ return self.dps.get(dp_id) diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py index 5621147..4de0b4a 100644 --- a/ryu/lib/hub.py +++ b/ryu/lib/hub.py @@ -44,13 +44,21 @@ if HUB_TYPE == 'eventlet': listen = eventlet.listen connect = eventlet.connect + + Queue = eventlet.queue.LightQueue + QueueEmpty = eventlet.queue.Empty + Semaphore = eventlet.semaphore.Semaphore + BoundedSemaphore = eventlet.semaphore.BoundedSemaphore + TaskExit = greenlet.GreenletExit + + def spawn(*args, **kwargs): def _launch(func, *args, **kwargs): # mimic gevent's default raise_error=False behaviour # by not propergating an exception to the joiner. try: func(*args, **kwargs) - except greenlet.GreenletExit: + except TaskExit: pass except: # log uncaught exception. @@ -67,7 +75,7 @@ if HUB_TYPE == 'eventlet': # by not propergating an exception to the joiner. try: func(*args, **kwargs) - except greenlet.GreenletExit: + except TaskExit: pass except: # log uncaught exception. @@ -87,13 +95,9 @@ if HUB_TYPE == 'eventlet': # greenthread try: t.wait() - except greenlet.GreenletExit: + except TaskExit: pass - Queue = eventlet.queue.Queue - QueueEmpty = eventlet.queue.Empty - Semaphore = eventlet.semaphore.Semaphore - BoundedSemaphore = eventlet.semaphore.BoundedSemaphore class StreamServer(object): def __init__(self, listen_info, handle=None, backlog=None, @@ -120,19 +124,24 @@ if HUB_TYPE == 'eventlet': sock, addr = self.server.accept() spawn(self.handle, sock, addr) + class LoggingWrapper(object): def write(self, message): LOG.info(message.rstrip('\n')) + class WSGIServer(StreamServer): def serve_forever(self): self.logger = LoggingWrapper() eventlet.wsgi.server(self.server, self.handle, self.logger) + WebSocketWSGI = websocket.WebSocketWSGI + Timeout = eventlet.timeout.Timeout + class Event(object): def __init__(self): self._ev = eventlet.event.Event() @@ -144,7 +153,7 @@ if HUB_TYPE == 'eventlet': def _broadcast(self): self._ev.send() - # because eventlet Event doesn't allow mutiple send() on an event, + # because eventlet Event doesn't allow multiple send() on an event, # re-create the underlying event. # note: _ev.reset() is obsolete. self._ev = eventlet.event.Event() diff --git a/ryu/topology/event.py b/ryu/topology/event.py index c54152b..e7b682c 100644 --- a/ryu/topology/event.py +++ b/ryu/topology/event.py @@ -41,6 +41,11 @@ class EventSwitchLeave(EventSwitchBase): super(EventSwitchLeave, self).__init__(switch) +class EventSwitchReconnected(EventSwitchBase): + def __init__(self, switch): + super(EventSwitchReconnected, self).__init__(switch) + + class EventPortBase(event.EventBase): def __init__(self, port): super(EventPortBase, self).__init__() diff --git a/ryu/topology/switches.py b/ryu/topology/switches.py index 9622a90..a06f83b 100644 --- a/ryu/topology/switches.py +++ b/ryu/topology/switches.py @@ -495,6 +495,7 @@ class Switches(app_manager.RyuApp): OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION, ofproto_v1_2.OFP_VERSION, ofproto_v1_3.OFP_VERSION, ofproto_v1_4.OFP_VERSION] _EVENTS = [event.EventSwitchEnter, event.EventSwitchLeave, + event.EventSwitchReconnected, event.EventPortAdd, event.EventPortDelete, event.EventPortModify, event.EventLinkAdd, event.EventLinkDelete, @@ -540,6 +541,7 @@ class Switches(app_manager.RyuApp): assert dp.id is not None self.dps[dp.id] = dp + dp.assured = True if dp.id not in self.port_state: self.port_state[dp.id] = PortState() for port in dp.ports.values(): @@ -547,8 +549,9 @@ class Switches(app_manager.RyuApp): def _unregister(self, dp): if dp.id in self.dps: - del self.dps[dp.id] - del self.port_state[dp.id] + if (self.dps[dp.id] == dp): + del self.dps[dp.id] + del self.port_state[dp.id] def _get_switch(self, dpid): if dpid in self.dps: @@ -602,16 +605,18 @@ class Switches(app_manager.RyuApp): if ev.state == MAIN_DISPATCHER: dp_multiple_conns = False if dp.id in self.dps: - LOG.warning('multiple connections from %s', dpid_to_str(dp.id)) + LOG.warning('Multiple connections from %s', dpid_to_str(dp.id)) dp_multiple_conns = True + (self.dps[dp.id]).close() self._register(dp) switch = self._get_switch(dp.id) LOG.debug('register %s', switch) - # Do not send event while dp has multiple connections. if not dp_multiple_conns: self.send_event_to_observers(event.EventSwitchEnter(switch)) + else: + self.send_event_to_observers(event.EventSwitchReconnected(switch)) if not self.link_discovery: return @@ -665,19 +670,23 @@ class Switches(app_manager.RyuApp): # dp.id is None when datapath dies before handshake if dp.id is None: return + switch = self._get_switch(dp.id) - self._unregister(dp) - LOG.debug('unregister %s', switch) - self.send_event_to_observers(event.EventSwitchLeave(switch)) + if switch: + if switch.dp is dp: + self._unregister(dp) + LOG.debug('unregister %s', switch) - if not self.link_discovery: - return + self.send_event_to_observers(event.EventSwitchLeave(switch)) - for port in switch.ports: - if not port.is_reserved(): - self.ports.del_port(port) - self._link_down(port) - self.lldp_event.set() + if not self.link_discovery: + return + + for port in switch.ports: + if not port.is_reserved(): + self.ports.del_port(port) + self._link_down(port) + self.lldp_event.set() @set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER) def port_status_handler(self, ev): @@ -762,7 +771,7 @@ class Switches(app_manager.RyuApp): try: src_dpid, src_port_no = LLDPPacket.lldp_parse(msg.data) except LLDPPacket.LLDPUnknownFormat as e: - # This handler can receive all the packtes which can be + # This handler can receive all the packets which can be # not-LLDP packet. Ignore it silently return Best, Victor -- Victor J. Orlikowski <> vjo@[cs.]duke.edu ------------------------------------------------------------------------------ Site24x7 APM Insight: Get Deep Visibility into Application Performance APM + Mobile APM + RUM: Monitor 3 App instances at just $35/Month Monitor end-to-end web transactions and take corrective actions now Troubleshoot faster and improve end-user experience. Signup Now! http://pubads.g.doubleclick.net/gampad/clk?id=267308311&iu=/4140 _______________________________________________ Ryu-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/ryu-devel
