Folks, After an extensive testing interval, I am re-submitting my patches to improve the stability of Ryu in the face of buggy switches and applications. These are extensive changes, so please let me explain them.
1) The event queue has been split into 3 queues. EventOFPStateChange events could get starved by a large number of Packet-In events. So could other event types. So - to implement a priority queuing mechanism that did not suffer from priority inversion, I created three queues out of the original one. The first queue is dedicated to EventOFPStateChange events. The second queue is dedicated to all events that are *not* EventOFPPacketIn. The third queue is dedicated to all EventOFPPacketIn events. The three queues are serviced in the above-stated order. EventOFPStateChange is generated internally, and must be serviced quickly. Responses to echo requests, feature replies, port status changes, etc. must be serviced with next highest priority, since they may alter the nature of a response to a Packet-In. Packet-Ins are serviced with lowest priority; they are (often) the most numerous events, and can crowd out servicing of other events. Putting the events in a single queue could result in priority inversions due to queues blocking on being full. 2) Each queue in the event handler and the Datapath send/recv mechanism has an associated semaphore. Eventlet synchronized queues have broken synchronization, and miss wakeups on put() and get() operations. Eventlet semaphores do not suffer from this problem - so I have ensured that queue operations do not block for any significant amount of time (get() operations are permitted to sleep for a user-configurable duration, if the queue is empty) and that full queues are protected using their associated semaphore. 3) Events must now be serviced within a limited time frame, that is end-user configurable. From: http://ryu-zhdoc.readthedocs.org/en/latest/ryu_app_api.html " Because the event handler is called in the context of the event processing thread, it should be careful for blocking. I.e. while an event handler is blocked, no further events for the Ryu application will be processed. " The assumption that Ryu application handlers cannot block, and must complete relatively quickly, has been an implicit part of the design for some time. I have now made this explicit in the event loop; handlers must complete execution within a (user-configurable) time bound - or they will be terminated (usually - it is possible for a handler to put itself into a tight loop, and thereby cause the execution timeout handler to not be able to fire). Presuming that the application has not been written with a tight loop that prevents termination of a handler, handlers must always make progress, or they will be terminated - and a backtrace will be printed for debugging. Thus, the event loop for an application will always be able to make forward progress. 4) Applications must now inform the core that they have "taken ownership" of a Datapath object. Previously, Datapaths could “leak” if they did not make it through the OpenFlow handshake. I have addressed this by adding a timer that disconnects any Datapath that is not claimed by an application after a certain interval (currently statically defined at 20 seconds - but it could be made user-configurable). An application that wishes to ensure that Datapaths are properly cleaned up (and do not disappear out from under them) should: i) Have a structure keeping track of what Datapaths it “owns." ii) When handling the an EventOFPSwitchFeatures in the CONFIG_DISPATCHER, or any event in the MAIN_DISPATCHER, the application should ensure that the datapath has been tracked in its internal record-keeping structure, and set the “assured” field of the Datapath object to “True”. iii) When the application discovers duplicate records for a given Datapath (during handling of a new Datapath having the same DPID) in its internal structure, it should call the close() method of the old Datapath, after first recording the new Datapath in its internal record-keeping structure. Applications that do not set the “assured” field of a Datapath will find that the Datapath will be closed underneath them by the Ryu core. Applications using ryu.controller.dpset or ryu.topology.switches will find that this has been automatically handled for them. Furthermore, another of the patches in this series ensures that all sample applications have been patched. I have also included several minor typo fixes and cleanups, throughout each of these files. Much of the convoluted logic in ryu.controller.controller exists to ensure that sockets are closed correctly; in particular, shutdown() must be called on the Datapath socket as a result of the Datapath close() operation, in order to ensure that recv() does not end up trying to read from a new socket with the same file descriptor number. Signed-off-by: Victor J. Orlikowski <[email protected]> diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py index 3d5d895..3ae6657 100644 --- a/ryu/base/app_manager.py +++ b/ryu/base/app_manager.py @@ -37,6 +37,7 @@ from ryu.controller.handler import register_instance, get_dependent_services from ryu.controller.controller import Datapath from ryu.controller import event from ryu.controller.event import EventRequestBase, EventReplyBase +from ryu.controller.ofp_event import EventOFPStateChange, EventOFPPacketIn from ryu.lib import hub from ryu.ofproto import ofproto_protocol @@ -157,12 +158,25 @@ class RyuApp(object): self.observers = {} # ev_cls -> observer-name -> states:set self.threads = [] self.main_thread = None - self.events = hub.Queue(128) + self.events = hub.Queue(32) + self.sc_events = hub.Queue(32) + self.pi_events = hub.Queue(32) + self._events_sem = hub.BoundedSemaphore(self.events.maxsize) + self._sc_events_sem = hub.BoundedSemaphore(self.sc_events.maxsize) + self._pi_events_sem = hub.BoundedSemaphore(self.pi_events.maxsize) + self._event_get_timeout = 5 + if hasattr(self.__class__, 'LOGGER_NAME'): self.logger = logging.getLogger(self.__class__.LOGGER_NAME) else: self.logger = logging.getLogger(self.name) self.CONF = cfg.CONF + self.CONF.register_opts([ + cfg.FloatOpt('handler-execution-timeout', + default=10.0, + help='Maximum time, in seconds, to permit handlers to run.') + ]) + self.handler_execution_timeout = self.CONF.handler_execution_timeout # prevent accidental creation of instances of this class outside RyuApp class _EventThreadStop(event.EventBase): @@ -279,15 +293,65 @@ class RyuApp(object): def _event_loop(self): while self.is_active or not self.events.empty(): - ev, state = self.events.get() - if ev == self._event_stop: + # Process events according to priority. + # StatusChange highest, PacketIn lowest, all others in between. + ev = state = None + if self.sc_events.qsize(): + try: + ev, state = self.sc_events.get(timeout=self._event_get_timeout) + except hub.QueueEmpty: + pass + else: + self._sc_events_sem.release() + + if (ev is None) and (self.events.qsize()): + try: + ev, state = self.events.get(timeout=self._event_get_timeout) + except hub.QueueEmpty: + pass + else: + self._events_sem.release() + + if ev is None: + try: + ev, state = self.pi_events.get(timeout=self._event_get_timeout) + except hub.QueueEmpty: + pass + else: + self._pi_events_sem.release() + + if (ev is None) or (ev == self._event_stop): continue handlers = self.get_handlers(ev, state) for handler in handlers: - handler(ev) + handler_execution_timeout = hub.Timeout(self.handler_execution_timeout) + try: + handler(ev) + except hub.TaskExit: + # Normal exit. + # Propagate upwards, so we leave the event loop. + raise + except Exception as e: + if isinstance(e, hub.Timeout): + LOG.error('%s: Handler exceeded maximum execution time; terminated.', self.name) + else: + LOG.error('%s: Exception occurred during handler processing.', self.name) + LOG.exception('%s: Backtrace from offending handler ' + '[%s] servicing event [%s] follows.', + self.name, handler.__name__, ev.__class__.__name__) + finally: + handler_execution_timeout.cancel() def _send_event(self, ev, state): - self.events.put((ev, state)) + if isinstance(ev, EventOFPStateChange): + self._sc_events_sem.acquire() + self.sc_events.put((ev, state), block=False) + elif isinstance(ev, EventOFPPacketIn): + self._pi_events_sem.acquire() + self.pi_events.put((ev, state), block=False) + else: + self._events_sem.acquire() + self.events.put((ev, state), block=False) def send_event(self, name, ev, state=None): """ @@ -336,7 +400,7 @@ class RyuApp(object): class AppManager(object): - # singletone + # singleton _instance = None @staticmethod @@ -519,8 +583,14 @@ class AppManager(object): app.stop() self._close(app) events = app.events + sc_events = app.sc_events + pi_events = app.pi_events + if not sc_events.empty(): + app.logger.debug('%s state changes remains %d', app.name, sc_events.qsize()) + if not pi_events.empty(): + app.logger.debug('%s PacketIn events remains %d', app.name, pi_events.qsize()) if not events.empty(): - app.logger.debug('%s events remians %d', app.name, events.qsize()) + app.logger.debug('%s events remains %d', app.name, events.qsize()) def close(self): def close_all(close_dict): diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py index 25b8776..0fd19f6 100644 --- a/ryu/controller/controller.py +++ b/ryu/controller/controller.py @@ -30,7 +30,7 @@ from ryu.lib.hub import StreamServer import traceback import random import ssl -from socket import IPPROTO_TCP, TCP_NODELAY, timeout as SocketTimeout, error as SocketError +from socket import IPPROTO_TCP, TCP_NODELAY, SHUT_RDWR, timeout as SocketTimeout import warnings import ryu.base.app_manager @@ -41,8 +41,10 @@ from ryu.ofproto import ofproto_protocol from ryu.ofproto import ofproto_v1_0 from ryu.ofproto import nx_match -from ryu.controller import handler from ryu.controller import ofp_event +from ryu.controller.handler import HANDSHAKE_DISPATCHER, MAIN_DISPATCHER, DEAD_DISPATCHER + +from ryu.exception import RyuException from ryu.lib.dpid import dpid_to_str @@ -58,8 +60,16 @@ CONF.register_cli_opts([ cfg.StrOpt('ctl-privkey', default=None, help='controller private key'), cfg.StrOpt('ctl-cert', default=None, help='controller certificate'), cfg.StrOpt('ca-certs', default=None, help='CA certificates'), - cfg.FloatOpt('socket-timeout', default=5.0, help='Time, in seconds, to await completion of socket operations.') ]) +CONF.register_opts([ + cfg.FloatOpt('socket-timeout', + default=5.0, + help='Time, in seconds, to await completion of socket operations.') +]) + + +class DatapathShutdown(RyuException): + message = 'Datapath shutdown was requested' class OpenFlowController(object): @@ -102,9 +112,22 @@ def _deactivate(method): def deactivate(self): try: method(self) + except DatapathShutdown: + if self.socket is not None: + self.socket.close() + self.socket = None + if self.recv_thread: + self.recv_thread.throw(DatapathShutdown) + if self.send_thread: + self.send_thread.throw(DatapathShutdown) finally: - self.send_active = False - self.set_state(handler.DEAD_DISPATCHER) + if self.socket is not None: + try: + self.socket.shutdown(SHUT_RDWR) + except (EOFError, IOError): + pass + if self.state is not DEAD_DISPATCHER: + self.set_state(DEAD_DISPATCHER) return deactivate @@ -117,19 +140,26 @@ class Datapath(ofproto_protocol.ProtocolDesc): self.socket.settimeout(CONF.socket_timeout) self.address = address - self.send_active = True + self.send_thread = None + self.recv_thread = None + self.close_requested = False # The limit is arbitrary. We need to limit queue size to - # prevent it from eating memory up + # prevent it from eating memory up. + # Similarly, the timeouts are also arbitrary. self.send_q = hub.Queue(16) + self._send_q_sem = hub.BoundedSemaphore(self.send_q.maxsize) + self._send_q_timeout = 5 + self._unknown_datapath_timeout = 20 self.xid = random.randint(0, self.ofproto.MAX_XID) self.id = None # datapath_id is unknown yet self._ports = None self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10 self.ofp_brick = ryu.base.app_manager.lookup_service_brick('ofp_event') - self.set_state(handler.HANDSHAKE_DISPATCHER) + self.assured = False + self.set_state(HANDSHAKE_DISPATCHER) def _get_ports(self): if (self.ofproto_parser is not None and @@ -137,8 +167,8 @@ class Datapath(ofproto_protocol.ProtocolDesc): message = ( 'Datapath#ports is kept for compatibility with the previous ' 'openflow versions (< 1.3). ' - 'This not be updated by EventOFPPortStatus message. ' - 'If you want to be updated, you can use ' + 'This is not updated by the EventOFPPortStatus message. ' + 'If you want to be updated, you should use ' '\'ryu.controller.dpset\' or \'ryu.topology.switches\'.' ) warnings.warn(message, stacklevel=2) @@ -163,64 +193,107 @@ class Datapath(ofproto_protocol.ProtocolDesc): # Low level socket handling layer @_deactivate def _recv_loop(self): - buf = bytearray() - required_len = ofproto_common.OFP_HEADER_SIZE - - count = 0 - while True: - ret = "" - - try: - ret = self.socket.recv(required_len) - except SocketTimeout: - if not self.close_requested: - continue - except SocketError: - self.close_requested = True - - if (len(ret) == 0) or (self.close_requested): - self.socket.close() - break - - buf += ret - while len(buf) >= required_len: - (version, msg_type, msg_len, xid) = ofproto_parser.header(buf) - required_len = msg_len - if len(buf) < required_len: - break - - msg = ofproto_parser.msg( - self, version, msg_type, msg_len, xid, buf[:msg_len]) - # LOG.debug('queue msg %s cls %s', msg, msg.__class__) - if msg: - ev = ofp_event.ofp_msg_to_ev(msg) - self.ofp_brick.send_event_to_observers(ev, self.state) - - dispatchers = lambda x: x.callers[ev.__class__].dispatchers - handlers = [handler for handler in - self.ofp_brick.get_handlers(ev) if - self.state in dispatchers(handler)] - for handler in handlers: - handler(ev) - - buf = buf[required_len:] - required_len = ofproto_common.OFP_HEADER_SIZE - - # We need to schedule other greenlets. Otherwise, ryu - # can't accept new switches or handle the existing - # switches. The limit is arbitrary. We need the better - # approach in the future. - count += 1 - if count > 2048: - count = 0 - hub.sleep(0) + unknown_datapath_timeout = hub.Timeout(self._unknown_datapath_timeout, DatapathShutdown) + try: + buf = bytearray() + required_len = ofproto_common.OFP_HEADER_SIZE + + count = 0 + while True: + ret = "" + + try: + ret = self.socket.recv(required_len) + except SocketTimeout: + if not self.close_requested: + continue + except (AttributeError, EOFError, IOError): + self.close_requested = True + + if (len(ret) == 0) or (self.close_requested): + raise DatapathShutdown + + if self.assured: + unknown_datapath_timeout.cancel() + + buf += ret + while len(buf) >= required_len: + (version, msg_type, msg_len, xid) = ofproto_parser.header(buf) + required_len = msg_len + if len(buf) < required_len: + break + + msg = ofproto_parser.msg( + self, version, msg_type, msg_len, xid, buf[:msg_len]) + # LOG.debug('queue msg %s cls %s', msg, msg.__class__) + if msg: + ev = ofp_event.ofp_msg_to_ev(msg) + exec_timeout = self.ofp_brick.handler_execution_timeout + self.ofp_brick.send_event_to_observers(ev, self.state) + + dispatchers = lambda x: x.callers[ev.__class__].dispatchers + handlers = [handler for handler in + self.ofp_brick.get_handlers(ev) if + self.state in dispatchers(handler)] + for handler in handlers: + # Defensively set a timeout for the handler. + # "This should never happen" are famous last words. + handler_timeout = hub.Timeout(exec_timeout) + try: + handler(ev) + finally: + handler_timeout.cancel() + + buf = buf[required_len:] + required_len = ofproto_common.OFP_HEADER_SIZE + + # We need to schedule other greenlets. Otherwise, ryu + # can't accept new switches or handle the existing + # switches. The limit is arbitrary. We need the better + # approach in the future. + count += 1 + if count > 2048: + count = 0 + hub.sleep(0) + except Exception as e: + if isinstance(e, DatapathShutdown): + # Normal termination sequence. + pass + else: + dpid_str = "UNKNOWN" + if self.id is not None: + dpid_str = dpid_to_str(self.id) + LOG.exception('Exception occurred in recv_loop() ' + 'for datapath [%s] connecting from [%s]. ' + 'Shutting down datapath.', + dpid_str, self.address) + finally: + # Shutting down; get rid of the recv_thread reference. + self.recv_thread = None + # Cancel timeout, if we haven't already. + unknown_datapath_timeout.cancel() + # Although it may have been caught above, raise DatapathShutdown. + # This guarantees that close() will be called on the socket. + raise DatapathShutdown @_deactivate def _send_loop(self): try: - while self.send_active: - buf = self.send_q.get() - self.socket.sendall(buf) + while self.socket is not None: + buf = None + try: + buf = self.send_q.get(timeout=self._send_q_timeout) + except hub.QueueEmpty: + pass + else: + self._send_q_sem.release() + + if buf is None: + continue + if self.socket: + self.socket.sendall(buf) + else: + raise DatapathShutdown except IOError as ioe: LOG.debug("Socket error while sending data to switch at address %s: [%d] %s", self.address, ioe.errno, ioe.strerror) @@ -228,17 +301,32 @@ class Datapath(ofproto_protocol.ProtocolDesc): q = self.send_q # first, clear self.send_q to prevent new references. self.send_q = None - # there might be threads currently blocking in send_q.put(). - # unblock them by draining the queue. + # Next, get rid of the send_thread reference. + self.send_thread = None + # Drain the send_q, releasing the associated semaphore for each entry. + # This should release all threads waiting to acquire the semaphore. try: while q.get(block=False): - pass + self._send_q_sem.release() except hub.QueueEmpty: pass def send(self, buf): + acquired = False + dp_closed = True + if self.send_q: - self.send_q.put(buf) + acquired = self._send_q_sem.acquire() + + if self.send_q and acquired: + dp_closed = False + self.send_q.put(buf, block=False) + elif acquired: + self._send_q_sem.release() + + if dp_closed: + LOG.debug('Datapath in process of terminating; send() to %s discarded.', + self.address) def set_xid(self, msg): self.xid += 1 @@ -255,17 +343,15 @@ class Datapath(ofproto_protocol.ProtocolDesc): self.send(msg.buf) def serve(self): - send_thr = hub.spawn(self._send_loop) + # self.recv_thread *MUST* be set before we spawn the send loop! + self.recv_thread = hub.getcurrent() + self.send_thread = hub.spawn(self._send_loop) # send hello message immediately hello = self.ofproto_parser.OFPHello(self) self.send_msg(hello) - try: - self._recv_loop() - finally: - hub.kill(send_thr) - hub.joinall([send_thr]) + self._recv_loop() # # Utility methods for convenience diff --git a/ryu/controller/dpset.py b/ryu/controller/dpset.py index 2682777..a65d05b 100644 --- a/ryu/controller/dpset.py +++ b/ryu/controller/dpset.py @@ -56,6 +56,8 @@ class EventDP(EventDPBase): class EventDPReconnected(EventDPBase): def __init__(self, dp): super(EventDPReconnected, self).__init__(dp) + # port list, which should not change across reconnects + self.ports = [] class EventPortBase(EventDPBase): @@ -127,6 +129,7 @@ class DPSet(app_manager.RyuApp): self.logger.debug('DPSET: New datapath %s', dp) send_dp_reconnected = True self.dps[dp.id] = dp + dp.assured = True if dp.id not in self.port_state: self.port_state[dp.id] = PortState() ev = EventDP(dp, True) @@ -138,6 +141,7 @@ class DPSet(app_manager.RyuApp): self.send_event_to_observers(ev) if send_dp_reconnected: ev = EventDPReconnected(dp) + ev.ports = self.port_state.get(dp.id, {}).values() self.send_event_to_observers(ev) def _unregister(self, dp): @@ -163,7 +167,6 @@ class DPSet(app_manager.RyuApp): """ This method returns the ryu.controller.controller.Datapath instance for the given Datapath ID. - Raises KeyError if no such a datapath connected to this controller. """ return self.dps.get(dp_id) diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py index 5621147..4de0b4a 100644 --- a/ryu/lib/hub.py +++ b/ryu/lib/hub.py @@ -44,13 +44,21 @@ if HUB_TYPE == 'eventlet': listen = eventlet.listen connect = eventlet.connect + + Queue = eventlet.queue.LightQueue + QueueEmpty = eventlet.queue.Empty + Semaphore = eventlet.semaphore.Semaphore + BoundedSemaphore = eventlet.semaphore.BoundedSemaphore + TaskExit = greenlet.GreenletExit + + def spawn(*args, **kwargs): def _launch(func, *args, **kwargs): # mimic gevent's default raise_error=False behaviour # by not propergating an exception to the joiner. try: func(*args, **kwargs) - except greenlet.GreenletExit: + except TaskExit: pass except: # log uncaught exception. @@ -67,7 +75,7 @@ if HUB_TYPE == 'eventlet': # by not propergating an exception to the joiner. try: func(*args, **kwargs) - except greenlet.GreenletExit: + except TaskExit: pass except: # log uncaught exception. @@ -87,13 +95,9 @@ if HUB_TYPE == 'eventlet': # greenthread try: t.wait() - except greenlet.GreenletExit: + except TaskExit: pass - Queue = eventlet.queue.Queue - QueueEmpty = eventlet.queue.Empty - Semaphore = eventlet.semaphore.Semaphore - BoundedSemaphore = eventlet.semaphore.BoundedSemaphore class StreamServer(object): def __init__(self, listen_info, handle=None, backlog=None, @@ -120,19 +124,24 @@ if HUB_TYPE == 'eventlet': sock, addr = self.server.accept() spawn(self.handle, sock, addr) + class LoggingWrapper(object): def write(self, message): LOG.info(message.rstrip('\n')) + class WSGIServer(StreamServer): def serve_forever(self): self.logger = LoggingWrapper() eventlet.wsgi.server(self.server, self.handle, self.logger) + WebSocketWSGI = websocket.WebSocketWSGI + Timeout = eventlet.timeout.Timeout + class Event(object): def __init__(self): self._ev = eventlet.event.Event() @@ -144,7 +153,7 @@ if HUB_TYPE == 'eventlet': def _broadcast(self): self._ev.send() - # because eventlet Event doesn't allow mutiple send() on an event, + # because eventlet Event doesn't allow multiple send() on an event, # re-create the underlying event. # note: _ev.reset() is obsolete. self._ev = eventlet.event.Event() diff --git a/ryu/topology/event.py b/ryu/topology/event.py index c54152b..e7b682c 100644 --- a/ryu/topology/event.py +++ b/ryu/topology/event.py @@ -41,6 +41,11 @@ class EventSwitchLeave(EventSwitchBase): super(EventSwitchLeave, self).__init__(switch) +class EventSwitchReconnected(EventSwitchBase): + def __init__(self, switch): + super(EventSwitchReconnected, self).__init__(switch) + + class EventPortBase(event.EventBase): def __init__(self, port): super(EventPortBase, self).__init__() diff --git a/ryu/topology/switches.py b/ryu/topology/switches.py index 9622a90..a06f83b 100644 --- a/ryu/topology/switches.py +++ b/ryu/topology/switches.py @@ -495,6 +495,7 @@ class Switches(app_manager.RyuApp): OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION, ofproto_v1_2.OFP_VERSION, ofproto_v1_3.OFP_VERSION, ofproto_v1_4.OFP_VERSION] _EVENTS = [event.EventSwitchEnter, event.EventSwitchLeave, + event.EventSwitchReconnected, event.EventPortAdd, event.EventPortDelete, event.EventPortModify, event.EventLinkAdd, event.EventLinkDelete, @@ -540,6 +541,7 @@ class Switches(app_manager.RyuApp): assert dp.id is not None self.dps[dp.id] = dp + dp.assured = True if dp.id not in self.port_state: self.port_state[dp.id] = PortState() for port in dp.ports.values(): @@ -547,8 +549,9 @@ class Switches(app_manager.RyuApp): def _unregister(self, dp): if dp.id in self.dps: - del self.dps[dp.id] - del self.port_state[dp.id] + if (self.dps[dp.id] == dp): + del self.dps[dp.id] + del self.port_state[dp.id] def _get_switch(self, dpid): if dpid in self.dps: @@ -602,16 +605,18 @@ class Switches(app_manager.RyuApp): if ev.state == MAIN_DISPATCHER: dp_multiple_conns = False if dp.id in self.dps: - LOG.warning('multiple connections from %s', dpid_to_str(dp.id)) + LOG.warning('Multiple connections from %s', dpid_to_str(dp.id)) dp_multiple_conns = True + (self.dps[dp.id]).close() self._register(dp) switch = self._get_switch(dp.id) LOG.debug('register %s', switch) - # Do not send event while dp has multiple connections. if not dp_multiple_conns: self.send_event_to_observers(event.EventSwitchEnter(switch)) + else: + self.send_event_to_observers(event.EventSwitchReconnected(switch)) if not self.link_discovery: return @@ -665,19 +670,23 @@ class Switches(app_manager.RyuApp): # dp.id is None when datapath dies before handshake if dp.id is None: return + switch = self._get_switch(dp.id) - self._unregister(dp) - LOG.debug('unregister %s', switch) - self.send_event_to_observers(event.EventSwitchLeave(switch)) + if switch: + if switch.dp is dp: + self._unregister(dp) + LOG.debug('unregister %s', switch) - if not self.link_discovery: - return + self.send_event_to_observers(event.EventSwitchLeave(switch)) - for port in switch.ports: - if not port.is_reserved(): - self.ports.del_port(port) - self._link_down(port) - self.lldp_event.set() + if not self.link_discovery: + return + + for port in switch.ports: + if not port.is_reserved(): + self.ports.del_port(port) + self._link_down(port) + self.lldp_event.set() @set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER) def port_status_handler(self, ev): @@ -762,7 +771,7 @@ class Switches(app_manager.RyuApp): try: src_dpid, src_port_no = LLDPPacket.lldp_parse(msg.data) except LLDPPacket.LLDPUnknownFormat as e: - # This handler can receive all the packtes which can be + # This handler can receive all the packets which can be # not-LLDP packet. Ignore it silently return Best, Victor -- Victor J. Orlikowski <> vjo@[cs.]duke.edu ------------------------------------------------------------------------------ Site24x7 APM Insight: Get Deep Visibility into Application Performance APM + Mobile APM + RUM: Monitor 3 App instances at just $35/Month Monitor end-to-end web transactions and take corrective actions now Troubleshoot faster and improve end-user experience. Signup Now! http://pubads.g.doubleclick.net/gampad/clk?id=267308311&iu=/4140 _______________________________________________ Ryu-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/ryu-devel
