Per Fujita-San's request, the echo request loop is off by default. It is easily enabled by setting the "maximum-unreplied-echo-requests" configuration option to a non-zero value.
Signed-off-by: Victor J. Orlikowski <[email protected]> diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py index 3d5d895..eac2c42 100644 --- a/ryu/base/app_manager.py +++ b/ryu/base/app_manager.py @@ -37,6 +37,7 @@ from ryu.controller.handler import register_instance, get_dependent_services from ryu.controller.controller import Datapath from ryu.controller import event from ryu.controller.event import EventRequestBase, EventReplyBase +from ryu.controller.ofp_event import EventOFPStateChange, EventOFPPacketIn from ryu.lib import hub from ryu.ofproto import ofproto_protocol @@ -157,12 +158,25 @@ class RyuApp(object): self.observers = {} # ev_cls -> observer-name -> states:set self.threads = [] self.main_thread = None - self.events = hub.Queue(128) + self.events = hub.Queue(32) + self.sc_events = hub.Queue(32) + self.pi_events = hub.Queue(32) + self._events_sem = hub.BoundedSemaphore(self.events.maxsize) + self._sc_events_sem = hub.BoundedSemaphore(self.sc_events.maxsize) + self._pi_events_sem = hub.BoundedSemaphore(self.pi_events.maxsize) + self._event_get_timeout = 5 + if hasattr(self.__class__, 'LOGGER_NAME'): self.logger = logging.getLogger(self.__class__.LOGGER_NAME) else: self.logger = logging.getLogger(self.name) self.CONF = cfg.CONF + self.CONF.register_opts([ + cfg.FloatOpt('handler-execution-timeout', + default=10.0, + help='Maximum time, in seconds, to permit handlers to run.') + ]) + self.handler_execution_timeout = self.CONF.handler_execution_timeout # prevent accidental creation of instances of this class outside RyuApp class _EventThreadStop(event.EventBase): @@ -279,15 +293,65 @@ class RyuApp(object): def _event_loop(self): while self.is_active or not self.events.empty(): - ev, state = self.events.get() - if ev == self._event_stop: + # Process events according to priority. + # StatusChange highest, PacketIn lowest, all others in between. + ev = state = None + if self.sc_events.qsize(): + try: + ev, state = self.sc_events.get(timeout=self._event_get_timeout) + except hub.QueueEmpty: + pass + else: + self._sc_events_sem.release() + + if (ev is None) and (self.events.qsize()): + try: + ev, state = self.events.get(timeout=self._event_get_timeout) + except hub.QueueEmpty: + pass + else: + self._events_sem.release() + + if ev is None: + try: + ev, state = self.pi_events.get(timeout=self._event_get_timeout) + except hub.QueueEmpty: + pass + else: + self._pi_events_sem.release() + + if (ev is None) or (ev == self._event_stop): continue handlers = self.get_handlers(ev, state) for handler in handlers: - handler(ev) + handler_execution_timeout = hub.Timeout(self.handler_execution_timeout) + try: + handler(ev) + except hub.TaskExit: + # Normal exit. + # Propagate upwards, so we leave the event loop. + raise + except Exception as e: + if ((isinstance(e, hub.Timeout)) and (e is handler_execution_timeout)): + LOG.error('%s: Handler exceeded maximum execution time; terminated.', self.name) + else: + LOG.error('%s: Exception occurred during handler processing.', self.name) + LOG.exception('%s: Backtrace from offending handler ' + '[%s] servicing event [%s] follows.', + self.name, handler.__name__, ev.__class__.__name__) + finally: + handler_execution_timeout.cancel() def _send_event(self, ev, state): - self.events.put((ev, state)) + if isinstance(ev, EventOFPStateChange): + self._sc_events_sem.acquire() + self.sc_events.put((ev, state), block=False) + elif isinstance(ev, EventOFPPacketIn): + self._pi_events_sem.acquire() + self.pi_events.put((ev, state), block=False) + else: + self._events_sem.acquire() + self.events.put((ev, state), block=False) def send_event(self, name, ev, state=None): """ @@ -336,7 +400,7 @@ class RyuApp(object): class AppManager(object): - # singletone + # singleton _instance = None @staticmethod @@ -519,8 +583,14 @@ class AppManager(object): app.stop() self._close(app) events = app.events + sc_events = app.sc_events + pi_events = app.pi_events + if not sc_events.empty(): + app.logger.debug('%s state changes remains %d', app.name, sc_events.qsize()) + if not pi_events.empty(): + app.logger.debug('%s PacketIn events remains %d', app.name, pi_events.qsize()) if not events.empty(): - app.logger.debug('%s events remians %d', app.name, events.qsize()) + app.logger.debug('%s events remains %d', app.name, events.qsize()) def close(self): def close_all(close_dict): diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py index 25b8776..2ba399b 100644 --- a/ryu/controller/controller.py +++ b/ryu/controller/controller.py @@ -30,7 +30,7 @@ from ryu.lib.hub import StreamServer import traceback import random import ssl -from socket import IPPROTO_TCP, TCP_NODELAY, timeout as SocketTimeout, error as SocketError +from socket import IPPROTO_TCP, TCP_NODELAY, SHUT_RDWR, timeout as SocketTimeout import warnings import ryu.base.app_manager @@ -41,8 +41,10 @@ from ryu.ofproto import ofproto_protocol from ryu.ofproto import ofproto_v1_0 from ryu.ofproto import nx_match -from ryu.controller import handler from ryu.controller import ofp_event +from ryu.controller.handler import HANDSHAKE_DISPATCHER, MAIN_DISPATCHER, DEAD_DISPATCHER + +from ryu.exception import RyuException from ryu.lib.dpid import dpid_to_str @@ -57,11 +59,26 @@ CONF.register_cli_opts([ help='openflow ssl listen port'), cfg.StrOpt('ctl-privkey', default=None, help='controller private key'), cfg.StrOpt('ctl-cert', default=None, help='controller certificate'), - cfg.StrOpt('ca-certs', default=None, help='CA certificates'), - cfg.FloatOpt('socket-timeout', default=5.0, help='Time, in seconds, to await completion of socket operations.') + cfg.StrOpt('ca-certs', default=None, help='CA certificates') +]) +CONF.register_opts([ + cfg.FloatOpt('socket-timeout', + default=5.0, + help='Time, in seconds, to await completion of socket operations.'), + cfg.FloatOpt('echo-request-interval', + default=15.0, + help='Time, in seconds, between sending echo requests to a datapath.'), + cfg.IntOpt('maximum-unreplied-echo-requests', + default=0, + min=0, + help='Maximum number of unreplied echo requests before datapath is disconnected.') ]) +class DatapathShutdown(RyuException): + message = 'Datapath shutdown was requested' + + class OpenFlowController(object): def __init__(self): super(OpenFlowController, self).__init__() @@ -102,9 +119,22 @@ def _deactivate(method): def deactivate(self): try: method(self) + except DatapathShutdown: + if self.socket: + self.socket.close() + self.socket = None + if self.recv_thread: + self.recv_thread.throw(DatapathShutdown) + if self.send_thread: + self.send_thread.throw(DatapathShutdown) finally: - self.send_active = False - self.set_state(handler.DEAD_DISPATCHER) + if self.socket: + try: + self.socket.shutdown(SHUT_RDWR) + except (EOFError, IOError): + pass + if self.state is not DEAD_DISPATCHER: + self.set_state(DEAD_DISPATCHER) return deactivate @@ -117,28 +147,37 @@ class Datapath(ofproto_protocol.ProtocolDesc): self.socket.settimeout(CONF.socket_timeout) self.address = address - self.send_active = True + self.send_thread = None + self.recv_thread = None + self.close_requested = False # The limit is arbitrary. We need to limit queue size to - # prevent it from eating memory up + # prevent it from eating memory up. + # Similarly, the timeout is also arbitrary. self.send_q = hub.Queue(16) + self._send_q_sem = hub.BoundedSemaphore(self.send_q.maxsize) + self._send_q_timeout = 5 + + self.echo_request_interval = CONF.echo_request_interval + self.max_unreplied_echo_requests = CONF.maximum_unreplied_echo_requests + self.unreplied_echo_requests = [] self.xid = random.randint(0, self.ofproto.MAX_XID) self.id = None # datapath_id is unknown yet self._ports = None self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10 self.ofp_brick = ryu.base.app_manager.lookup_service_brick('ofp_event') - self.set_state(handler.HANDSHAKE_DISPATCHER) + self.set_state(HANDSHAKE_DISPATCHER) def _get_ports(self): - if (self.ofproto_parser is not None and - self.ofproto_parser.ofproto.OFP_VERSION >= 0x04): + if (self.ofproto_parser and + (self.ofproto_parser.ofproto.OFP_VERSION >= 0x04)): message = ( 'Datapath#ports is kept for compatibility with the previous ' 'openflow versions (< 1.3). ' - 'This not be updated by EventOFPPortStatus message. ' - 'If you want to be updated, you can use ' + 'This is not updated by the EventOFPPortStatus message. ' + 'If you want to be updated, you should use ' '\'ryu.controller.dpset\' or \'ryu.topology.switches\'.' ) warnings.warn(message, stacklevel=2) @@ -163,64 +202,101 @@ class Datapath(ofproto_protocol.ProtocolDesc): # Low level socket handling layer @_deactivate def _recv_loop(self): - buf = bytearray() - required_len = ofproto_common.OFP_HEADER_SIZE - - count = 0 - while True: - ret = "" - - try: - ret = self.socket.recv(required_len) - except SocketTimeout: - if not self.close_requested: - continue - except SocketError: - self.close_requested = True - - if (len(ret) == 0) or (self.close_requested): - self.socket.close() - break - - buf += ret - while len(buf) >= required_len: - (version, msg_type, msg_len, xid) = ofproto_parser.header(buf) - required_len = msg_len - if len(buf) < required_len: - break - - msg = ofproto_parser.msg( - self, version, msg_type, msg_len, xid, buf[:msg_len]) - # LOG.debug('queue msg %s cls %s', msg, msg.__class__) - if msg: - ev = ofp_event.ofp_msg_to_ev(msg) - self.ofp_brick.send_event_to_observers(ev, self.state) - - dispatchers = lambda x: x.callers[ev.__class__].dispatchers - handlers = [handler for handler in - self.ofp_brick.get_handlers(ev) if - self.state in dispatchers(handler)] - for handler in handlers: - handler(ev) - - buf = buf[required_len:] - required_len = ofproto_common.OFP_HEADER_SIZE - - # We need to schedule other greenlets. Otherwise, ryu - # can't accept new switches or handle the existing - # switches. The limit is arbitrary. We need the better - # approach in the future. - count += 1 - if count > 2048: - count = 0 - hub.sleep(0) + try: + buf = bytearray() + required_len = ofproto_common.OFP_HEADER_SIZE + + count = 0 + while True: + ret = "" + + try: + ret = self.socket.recv(required_len) + except SocketTimeout: + if not self.close_requested: + continue + except (AttributeError, EOFError, IOError): + self.close_requested = True + + if (len(ret) == 0) or (self.close_requested): + raise DatapathShutdown + + buf += ret + while len(buf) >= required_len: + (version, msg_type, msg_len, xid) = ofproto_parser.header(buf) + required_len = msg_len + if len(buf) < required_len: + break + + msg = ofproto_parser.msg( + self, version, msg_type, msg_len, xid, buf[:msg_len]) + # LOG.debug('queue msg %s cls %s', msg, msg.__class__) + if msg: + ev = ofp_event.ofp_msg_to_ev(msg) + exec_timeout = self.ofp_brick.handler_execution_timeout + self.ofp_brick.send_event_to_observers(ev, self.state) + + dispatchers = lambda x: x.callers[ev.__class__].dispatchers + handlers = [handler for handler in + self.ofp_brick.get_handlers(ev) if + self.state in dispatchers(handler)] + for handler in handlers: + # Defensively set a timeout for the handler. + # "This should never happen" are famous last words. + handler_timeout = hub.Timeout(exec_timeout) + try: + handler(ev) + finally: + handler_timeout.cancel() + + buf = buf[required_len:] + required_len = ofproto_common.OFP_HEADER_SIZE + + # We need to schedule other greenlets. Otherwise, ryu + # can't accept new switches or handle the existing + # switches. The limit is arbitrary. We need the better + # approach in the future. + count += 1 + if count > 2048: + count = 0 + hub.sleep(0) + except Exception as e: + if isinstance(e, DatapathShutdown): + # Normal termination sequence. + pass + else: + dpid_str = "UNKNOWN" + if self.id: + dpid_str = dpid_to_str(self.id) + LOG.exception('Exception occurred in recv_loop() ' + 'for datapath [%s] connecting from [%s]. ' + 'Shutting down datapath.', + dpid_str, self.address) + finally: + # Shutting down; get rid of the recv_thread reference. + self.recv_thread = None + # Although it may have been caught above, raise DatapathShutdown. + # This guarantees that close() will be called on the socket. + raise DatapathShutdown @_deactivate def _send_loop(self): try: - while self.send_active: - buf = self.send_q.get() - self.socket.sendall(buf) + while self.socket: + buf = None + try: + buf = self.send_q.get(timeout=self._send_q_timeout) + except hub.QueueEmpty: + pass + else: + self._send_q_sem.release() + + if buf is None: + continue + if self.socket: + self.socket.sendall(buf) + else: + raise DatapathShutdown except IOError as ioe: LOG.debug("Socket error while sending data to switch at address %s: [%d] %s", self.address, ioe.errno, ioe.strerror) @@ -228,17 +304,32 @@ class Datapath(ofproto_protocol.ProtocolDesc): q = self.send_q # first, clear self.send_q to prevent new references. self.send_q = None - # there might be threads currently blocking in send_q.put(). - # unblock them by draining the queue. + # Next, get rid of the send_thread reference. + self.send_thread = None + # Drain the send_q, releasing the associated semaphore for each entry. + # This should release all threads waiting to acquire the semaphore. try: while q.get(block=False): - pass + self._send_q_sem.release() except hub.QueueEmpty: pass def send(self, buf): + acquired = False + dp_closed = True + if self.send_q: - self.send_q.put(buf) + acquired = self._send_q_sem.acquire() + + if self.send_q and acquired: + dp_closed = False + self.send_q.put(buf, block=False) + elif acquired: + self._send_q_sem.release() + + if dp_closed: + LOG.debug('Datapath in process of terminating; send() to %s discarded.', + self.address) def set_xid(self, msg): self.xid += 1 @@ -254,18 +345,41 @@ class Datapath(ofproto_protocol.ProtocolDesc): # LOG.debug('send_msg %s', msg) self.send(msg.buf) + def _echo_request_loop(self): + if not self.max_unreplied_echo_requests: + return + + while (self.send_q and + (len(self.unreplied_echo_requests) <= self.max_unreplied_echo_requests)): + echo_req = self.ofproto_parser.OFPEchoRequest(self) + self.unreplied_echo_requests.append(self.set_xid(echo_req)) + self.send_msg(echo_req) + hub.sleep(self.echo_request_interval) + + if self.state is not DEAD_DISPATCHER: + self.close() + + def acknowledge_echo_reply(self, xid): + try: + self.unreplied_echo_requests.remove(xid) + except: + pass + def serve(self): - send_thr = hub.spawn(self._send_loop) + # self.recv_thread *MUST* be set before we spawn the send loop! + self.recv_thread = hub.getcurrent() + self.send_thread = hub.spawn(self._send_loop) # send hello message immediately hello = self.ofproto_parser.OFPHello(self) self.send_msg(hello) - try: - self._recv_loop() - finally: - hub.kill(send_thr) - hub.joinall([send_thr]) + # Keeping track of the echo request loop is not really required. + # It will exit when the send loop exits, or when the number of + # unreplied echo requests exceeds the threshold. + echo_thread = hub.spawn(self._echo_request_loop) + + self._recv_loop() # # Utility methods for convenience diff --git a/ryu/controller/ofp_handler.py b/ryu/controller/ofp_handler.py index b3c63df..3bf19d2 100644 --- a/ryu/controller/ofp_handler.py +++ b/ryu/controller/ofp_handler.py @@ -238,6 +238,13 @@ class OFPHandler(ryu.base.app_manager.RyuApp): echo_reply.data = msg.data datapath.send_msg(echo_reply) + @set_ev_handler(ofp_event.EventOFPEchoReply, + [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, MAIN_DISPATCHER]) + def echo_reply_handler(self, ev): + msg = ev.msg + datapath = msg.datapath + datapath.acknowledge_echo_reply(msg.xid) + @set_ev_handler(ofp_event.EventOFPErrorMsg, [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, MAIN_DISPATCHER]) def error_msg_handler(self, ev): diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py index 5621147..4de0b4a 100644 --- a/ryu/lib/hub.py +++ b/ryu/lib/hub.py @@ -44,13 +44,21 @@ if HUB_TYPE == 'eventlet': listen = eventlet.listen connect = eventlet.connect + + Queue = eventlet.queue.LightQueue + QueueEmpty = eventlet.queue.Empty + Semaphore = eventlet.semaphore.Semaphore + BoundedSemaphore = eventlet.semaphore.BoundedSemaphore + TaskExit = greenlet.GreenletExit + + def spawn(*args, **kwargs): def _launch(func, *args, **kwargs): # mimic gevent's default raise_error=False behaviour # by not propergating an exception to the joiner. try: func(*args, **kwargs) - except greenlet.GreenletExit: + except TaskExit: pass except: # log uncaught exception. @@ -67,7 +75,7 @@ if HUB_TYPE == 'eventlet': # by not propergating an exception to the joiner. try: func(*args, **kwargs) - except greenlet.GreenletExit: + except TaskExit: pass except: # log uncaught exception. @@ -87,13 +95,9 @@ if HUB_TYPE == 'eventlet': # greenthread try: t.wait() - except greenlet.GreenletExit: + except TaskExit: pass - Queue = eventlet.queue.Queue - QueueEmpty = eventlet.queue.Empty - Semaphore = eventlet.semaphore.Semaphore - BoundedSemaphore = eventlet.semaphore.BoundedSemaphore class StreamServer(object): def __init__(self, listen_info, handle=None, backlog=None, @@ -120,19 +124,24 @@ if HUB_TYPE == 'eventlet': sock, addr = self.server.accept() spawn(self.handle, sock, addr) + class LoggingWrapper(object): def write(self, message): LOG.info(message.rstrip('\n')) + class WSGIServer(StreamServer): def serve_forever(self): self.logger = LoggingWrapper() eventlet.wsgi.server(self.server, self.handle, self.logger) + WebSocketWSGI = websocket.WebSocketWSGI + Timeout = eventlet.timeout.Timeout + class Event(object): def __init__(self): self._ev = eventlet.event.Event() @@ -144,7 +153,7 @@ if HUB_TYPE == 'eventlet': def _broadcast(self): self._ev.send() - # because eventlet Event doesn't allow mutiple send() on an event, + # because eventlet Event doesn't allow multiple send() on an event, # re-create the underlying event. # note: _ev.reset() is obsolete. self._ev = eventlet.event.Event() Best, Victor -- Victor J. Orlikowski <> vjo@[cs.]duke.edu
stability.patch
Description: stability.patch
------------------------------------------------------------------------------ Site24x7 APM Insight: Get Deep Visibility into Application Performance APM + Mobile APM + RUM: Monitor 3 App instances at just $35/Month Monitor end-to-end web transactions and take corrective actions now Troubleshoot faster and improve end-user experience. Signup Now! http://pubads.g.doubleclick.net/gampad/clk?id=267308311&iu=/4140
_______________________________________________ Ryu-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/ryu-devel
