After extensive testing in the lab, I have established that the timeouts used for queue get() operations are not strictly necessary. Therefore, I am removing them, per Iwamoto-san's request.
The semaphore protecting put(), however, *is* required. I have also reverted back to a simple queue, from a PriorityQueue. The PriorityQueue was intended as a protection mechanism against a single switch abusing the controller with a PacketIn flood. That having been said - a PacketIn flood is (usually) indicative of either a buggy switch or a buggy controller application; either should be fixed. Compensating for either may be counterproductive. I have also split out all typo corrections. These will follow in a subsequent patch. Any remaining corrections to comments are a result of changes in the logic that needed to be documented correctly. I have left the change of the "socket-timeout" CLI option to a config-file option in place, awaiting a final judgment call on the matter. Signed-off-by: Victor J. Orlikowski <[email protected]> diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py index 3d5d895..ed60857 100644 --- a/ryu/base/app_manager.py +++ b/ryu/base/app_manager.py @@ -158,6 +158,8 @@ class RyuApp(object): self.threads = [] self.main_thread = None self.events = hub.Queue(128) + self._events_sem = hub.BoundedSemaphore(self.events.maxsize) + if hasattr(self.__class__, 'LOGGER_NAME'): self.logger = logging.getLogger(self.__class__.LOGGER_NAME) else: @@ -280,13 +282,25 @@ class RyuApp(object): def _event_loop(self): while self.is_active or not self.events.empty(): ev, state = self.events.get() + self._events_sem.release() if ev == self._event_stop: continue handlers = self.get_handlers(ev, state) for handler in handlers: - handler(ev) + try: + handler(ev) + except hub.TaskExit: + # Normal exit. + # Propagate upwards, so we leave the event loop. + raise + except Exception as e: + LOG.exception('%s: Exception occurred during handler processing. ' + 'Backtrace from offending handler ' + '[%s] servicing event [%s] follows.', + self.name, handler.__name__, ev.__class__.__name__) def _send_event(self, ev, state): + self._events_sem.acquire() self.events.put((ev, state)) def send_event(self, name, ev, state=None): diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py index 25b8776..0a9019f 100644 --- a/ryu/controller/controller.py +++ b/ryu/controller/controller.py @@ -30,7 +30,7 @@ from ryu.lib.hub import StreamServer import traceback import random import ssl -from socket import IPPROTO_TCP, TCP_NODELAY, timeout as SocketTimeout, error as SocketError +from socket import IPPROTO_TCP, TCP_NODELAY, SHUT_RDWR, timeout as SocketTimeout import warnings import ryu.base.app_manager @@ -41,8 +41,8 @@ from ryu.ofproto import ofproto_protocol from ryu.ofproto import ofproto_v1_0 from ryu.ofproto import nx_match -from ryu.controller import handler from ryu.controller import ofp_event +from ryu.controller.handler import HANDSHAKE_DISPATCHER, MAIN_DISPATCHER, DEAD_DISPATCHER from ryu.lib.dpid import dpid_to_str @@ -57,8 +57,19 @@ CONF.register_cli_opts([ help='openflow ssl listen port'), cfg.StrOpt('ctl-privkey', default=None, help='controller private key'), cfg.StrOpt('ctl-cert', default=None, help='controller certificate'), - cfg.StrOpt('ca-certs', default=None, help='CA certificates'), - cfg.FloatOpt('socket-timeout', default=5.0, help='Time, in seconds, to await completion of socket operations.') + cfg.StrOpt('ca-certs', default=None, help='CA certificates') +]) +CONF.register_opts([ + cfg.FloatOpt('socket-timeout', + default=5.0, + help='Time, in seconds, to await completion of socket operations.'), + cfg.FloatOpt('echo-request-interval', + default=15.0, + help='Time, in seconds, between sending echo requests to a datapath.'), + cfg.IntOpt('maximum-unreplied-echo-requests', + default=0, + min=0, + help='Maximum number of unreplied echo requests before datapath is disconnected.') ]) @@ -103,8 +114,16 @@ def _deactivate(method): try: method(self) finally: - self.send_active = False - self.set_state(handler.DEAD_DISPATCHER) + try: + self.socket.shutdown(SHUT_RDWR) + except (EOFError, IOError): + pass + + if self.recv_thread is None: + self.socket.close() + + if self.state is not DEAD_DISPATCHER: + self.set_state(DEAD_DISPATCHER) return deactivate @@ -117,19 +136,25 @@ class Datapath(ofproto_protocol.ProtocolDesc): self.socket.settimeout(CONF.socket_timeout) self.address = address - self.send_active = True + self.recv_thread = None + self.close_requested = False # The limit is arbitrary. We need to limit queue size to # prevent it from eating memory up self.send_q = hub.Queue(16) + self._send_q_sem = hub.BoundedSemaphore(self.send_q.maxsize) + + self.echo_request_interval = CONF.echo_request_interval + self.max_unreplied_echo_requests = CONF.maximum_unreplied_echo_requests + self.unreplied_echo_requests = [] self.xid = random.randint(0, self.ofproto.MAX_XID) self.id = None # datapath_id is unknown yet self._ports = None self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10 self.ofp_brick = ryu.base.app_manager.lookup_service_brick('ofp_event') - self.set_state(handler.HANDSHAKE_DISPATCHER) + self.set_state(HANDSHAKE_DISPATCHER) def _get_ports(self): if (self.ofproto_parser is not None and @@ -167,19 +192,17 @@ class Datapath(ofproto_protocol.ProtocolDesc): required_len = ofproto_common.OFP_HEADER_SIZE count = 0 - while True: + while not self.close_requested: ret = "" try: ret = self.socket.recv(required_len) except SocketTimeout: - if not self.close_requested: - continue - except SocketError: - self.close_requested = True + continue + except (EOFError, IOError): + break - if (len(ret) == 0) or (self.close_requested): - self.socket.close() + if len(ret) == 0: break buf += ret @@ -218,27 +241,46 @@ class Datapath(ofproto_protocol.ProtocolDesc): @_deactivate def _send_loop(self): try: - while self.send_active: + while True: buf = self.send_q.get() + self._send_q_sem.release() self.socket.sendall(buf) + except SocketTimeout: + LOG.debug("Socket timed out while sending data to switch at address %s", + self.address) except IOError as ioe: - LOG.debug("Socket error while sending data to switch at address %s: [%d] %s", - self.address, ioe.errno, ioe.strerror) + # Convert ioe.errno to a string, just in case it was somehow set to None. + errno = "%s" % ioe.errno + LOG.debug("Socket error while sending data to switch at address %s: [%s] %s", + self.address, errno, ioe.strerror) finally: q = self.send_q # first, clear self.send_q to prevent new references. self.send_q = None - # there might be threads currently blocking in send_q.put(). - # unblock them by draining the queue. + # Now, drain the send_q, releasing the associated semaphore for each entry. + # This should release all threads waiting to acquire the semaphore. try: while q.get(block=False): - pass + self._send_q_sem.release() except hub.QueueEmpty: pass def send(self, buf): + acquired = False + dp_closed = True + if self.send_q: + acquired = self._send_q_sem.acquire() + + if self.send_q and acquired: + dp_closed = False self.send_q.put(buf) + elif acquired: + self._send_q_sem.release() + + if dp_closed: + LOG.debug('Datapath in process of terminating; send() to %s discarded.', + self.address) def set_xid(self, msg): self.xid += 1 @@ -254,18 +296,43 @@ class Datapath(ofproto_protocol.ProtocolDesc): # LOG.debug('send_msg %s', msg) self.send(msg.buf) + def _echo_request_loop(self): + if not self.max_unreplied_echo_requests: + return + + while (self.send_q and + (len(self.unreplied_echo_requests) <= self.max_unreplied_echo_requests)): + echo_req = self.ofproto_parser.OFPEchoRequest(self) + self.unreplied_echo_requests.append(self.set_xid(echo_req)) + self.send_msg(echo_req) + hub.sleep(self.echo_request_interval) + + if self.state is not DEAD_DISPATCHER: + self.close() + + def acknowledge_echo_reply(self, xid): + try: + self.unreplied_echo_requests.remove(xid) + except: + pass + def serve(self): + self.recv_thread = hub.getcurrent() send_thr = hub.spawn(self._send_loop) # send hello message immediately hello = self.ofproto_parser.OFPHello(self) self.send_msg(hello) + echo_thr = hub.spawn(self._echo_request_loop) + try: self._recv_loop() finally: hub.kill(send_thr) - hub.joinall([send_thr]) + hub.kill(echo_thr) + hub.joinall([send_thr, echo_thr]) + self.recv_thread = None # # Utility methods for convenience diff --git a/ryu/controller/ofp_handler.py b/ryu/controller/ofp_handler.py index b3c63df..3bf19d2 100644 --- a/ryu/controller/ofp_handler.py +++ b/ryu/controller/ofp_handler.py @@ -238,6 +238,13 @@ class OFPHandler(ryu.base.app_manager.RyuApp): echo_reply.data = msg.data datapath.send_msg(echo_reply) + @set_ev_handler(ofp_event.EventOFPEchoReply, + [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, MAIN_DISPATCHER]) + def echo_reply_handler(self, ev): + msg = ev.msg + datapath = msg.datapath + datapath.acknowledge_echo_reply(msg.xid) + @set_ev_handler(ofp_event.EventOFPErrorMsg, [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, MAIN_DISPATCHER]) def error_msg_handler(self, ev): diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py index 5621147..16904b1 100644 --- a/ryu/lib/hub.py +++ b/ryu/lib/hub.py @@ -50,7 +50,7 @@ if HUB_TYPE == 'eventlet': # by not propergating an exception to the joiner. try: func(*args, **kwargs) - except greenlet.GreenletExit: + except TaskExit: pass except: # log uncaught exception. @@ -67,7 +67,7 @@ if HUB_TYPE == 'eventlet': # by not propergating an exception to the joiner. try: func(*args, **kwargs) - except greenlet.GreenletExit: + except TaskExit: pass except: # log uncaught exception. @@ -87,13 +87,15 @@ if HUB_TYPE == 'eventlet': # greenthread try: t.wait() - except greenlet.GreenletExit: + except TaskExit: pass - Queue = eventlet.queue.Queue + Queue = eventlet.queue.LightQueue + PriorityQueue = eventlet.queue.PriorityQueue QueueEmpty = eventlet.queue.Empty Semaphore = eventlet.semaphore.Semaphore BoundedSemaphore = eventlet.semaphore.BoundedSemaphore + TaskExit = greenlet.GreenletExit class StreamServer(object): def __init__(self, listen_info, handle=None, backlog=None, Best, Victor -- Victor J. Orlikowski <> vjo@[cs.]duke.edu
stability.patch
Description: stability.patch
------------------------------------------------------------------------------ Site24x7 APM Insight: Get Deep Visibility into Application Performance APM + Mobile APM + RUM: Monitor 3 App instances at just $35/Month Monitor end-to-end web transactions and take corrective actions now Troubleshoot faster and improve end-user experience. Signup Now! http://pubads.g.doubleclick.net/gampad/clk?id=272487151&iu=/4140
_______________________________________________ Ryu-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/ryu-devel
