At Wed, 24 Feb 2016 20:28:05 +0000, Victor Orlikowski wrote: > > A few more minor changes: > 1) Place an assertion on one of the semaphore acquire() calls.
>From the eventlet code, acquire() has no chance to return other than True. I think such an assert statement with a side-effect should be avoided in general. Things will break when someone decides to turn on optimization and assert()s are disabled. (With python, it's "-O" option.) > 2) Clean up some naming and structure for greater clarity; one of the name > changes is a reversion of an earlier name change in an accepted patch. > 3) I thought of another couple of corner cases. This patch does * place semaphore calls around the event queue * add an echo request keep-alive timer * other unrelated cleanups I think it's optional but separating these things into different patches is good. Also please write commit messages that concisely describe the changes and your problem, not changelog from older patchsets. > Signed-off-by: Victor J. Orlikowski <[email protected]> > > diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py > index 3d5d895..6481a68 100644 > --- a/ryu/base/app_manager.py > +++ b/ryu/base/app_manager.py > @@ -158,6 +158,7 @@ class RyuApp(object): > self.threads = [] > self.main_thread = None > self.events = hub.Queue(128) > + self._events_sem = hub.BoundedSemaphore(self.events.maxsize) > if hasattr(self.__class__, 'LOGGER_NAME'): > self.logger = logging.getLogger(self.__class__.LOGGER_NAME) > else: > @@ -280,13 +281,25 @@ class RyuApp(object): > def _event_loop(self): > while self.is_active or not self.events.empty(): > ev, state = self.events.get() > + self._events_sem.release() > if ev == self._event_stop: > continue > handlers = self.get_handlers(ev, state) > for handler in handlers: > - handler(ev) > + try: > + handler(ev) > + except hub.TaskExit: > + # Normal exit. > + # Propagate upwards, so we leave the event loop. > + raise > + except: > + LOG.exception('%s: Exception occurred during handler > processing. ' > + 'Backtrace from offending handler ' > + '[%s] servicing event [%s] follows.', > + self.name, handler.__name__, > ev.__class__.__name__) > > def _send_event(self, ev, state): > + assert self._events_sem.acquire() > self.events.put((ev, state)) > > def send_event(self, name, ev, state=None): > diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py > index 25b8776..05f24e8 100644 > --- a/ryu/controller/controller.py > +++ b/ryu/controller/controller.py > @@ -30,7 +30,7 @@ from ryu.lib.hub import StreamServer > import traceback > import random > import ssl > -from socket import IPPROTO_TCP, TCP_NODELAY, timeout as SocketTimeout, error > as SocketError > +from socket import IPPROTO_TCP, TCP_NODELAY, SHUT_RDWR, timeout as > SocketTimeout > import warnings > > import ryu.base.app_manager > @@ -41,8 +41,8 @@ from ryu.ofproto import ofproto_protocol > from ryu.ofproto import ofproto_v1_0 > from ryu.ofproto import nx_match > > -from ryu.controller import handler > from ryu.controller import ofp_event > +from ryu.controller.handler import HANDSHAKE_DISPATCHER, MAIN_DISPATCHER, > DEAD_DISPATCHER > > from ryu.lib.dpid import dpid_to_str > > @@ -57,8 +57,19 @@ CONF.register_cli_opts([ > help='openflow ssl listen port'), > cfg.StrOpt('ctl-privkey', default=None, help='controller private key'), > cfg.StrOpt('ctl-cert', default=None, help='controller certificate'), > - cfg.StrOpt('ca-certs', default=None, help='CA certificates'), > - cfg.FloatOpt('socket-timeout', default=5.0, help='Time, in seconds, to > await completion of socket operations.') > + cfg.StrOpt('ca-certs', default=None, help='CA certificates') > +]) > +CONF.register_opts([ > + cfg.FloatOpt('socket-timeout', > + default=5.0, > + help='Time, in seconds, to await completion of socket > operations.'), > + cfg.FloatOpt('echo-request-interval', > + default=15.0, > + help='Time, in seconds, between sending echo requests to a > datapath.'), > + cfg.IntOpt('maximum-unreplied-echo-requests', > + default=0, > + min=0, > + help='Maximum number of unreplied echo requests before > datapath is disconnected.') > ]) > > > @@ -103,8 +114,15 @@ def _deactivate(method): > try: > method(self) > finally: > - self.send_active = False > - self.set_state(handler.DEAD_DISPATCHER) > + try: > + self.socket.shutdown(SHUT_RDWR) > + except (EOFError, IOError): > + pass > + > + if not self.is_active: > + self.socket.close() > + if self.state is not DEAD_DISPATCHER: > + self.set_state(DEAD_DISPATCHER) > return deactivate > > > @@ -117,19 +135,24 @@ class Datapath(ofproto_protocol.ProtocolDesc): > self.socket.settimeout(CONF.socket_timeout) > self.address = address > > - self.send_active = True > + self.is_active = True > self.close_requested = False > > # The limit is arbitrary. We need to limit queue size to > # prevent it from eating memory up > self.send_q = hub.Queue(16) > + self._send_q_sem = hub.BoundedSemaphore(self.send_q.maxsize) > + > + self.echo_request_interval = CONF.echo_request_interval > + self.max_unreplied_echo_requests = > CONF.maximum_unreplied_echo_requests > + self.unreplied_echo_requests = [] > > self.xid = random.randint(0, self.ofproto.MAX_XID) > self.id = None # datapath_id is unknown yet > self._ports = None > self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10 > self.ofp_brick = > ryu.base.app_manager.lookup_service_brick('ofp_event') > - self.set_state(handler.HANDSHAKE_DISPATCHER) > + self.set_state(HANDSHAKE_DISPATCHER) > > def _get_ports(self): > if (self.ofproto_parser is not None and > @@ -167,19 +190,17 @@ class Datapath(ofproto_protocol.ProtocolDesc): > required_len = ofproto_common.OFP_HEADER_SIZE > > count = 0 > - while True: > + while not self.close_requested: > ret = "" > > try: > ret = self.socket.recv(required_len) > except SocketTimeout: > - if not self.close_requested: > - continue > - except SocketError: > - self.close_requested = True > + continue > + except (EOFError, IOError): > + break > > - if (len(ret) == 0) or (self.close_requested): > - self.socket.close() > + if len(ret) == 0: > break > > buf += ret > @@ -215,30 +236,45 @@ class Datapath(ofproto_protocol.ProtocolDesc): > count = 0 > hub.sleep(0) > > - @_deactivate > def _send_loop(self): > try: > - while self.send_active: > + while True: > buf = self.send_q.get() > + self._send_q_sem.release() > self.socket.sendall(buf) > + except SocketTimeout: > + LOG.debug("Socket timed out while sending data to switch at > address %s", > + self.address) > except IOError as ioe: > - LOG.debug("Socket error while sending data to switch at address > %s: [%d] %s", > - self.address, ioe.errno, ioe.strerror) > + # Convert ioe.errno to a string, just in case it was somehow set > to None. > + errno = "%s" % ioe.errno > + LOG.debug("Socket error while sending data to switch at address > %s: [%s] %s", > + self.address, errno, ioe.strerror) > finally: > q = self.send_q > # first, clear self.send_q to prevent new references. > self.send_q = None > - # there might be threads currently blocking in send_q.put(). > - # unblock them by draining the queue. > + # Now, drain the send_q, releasing the associated semaphore for > each entry. > + # This should release all threads waiting to acquire the > semaphore. > try: > while q.get(block=False): > - pass > + self._send_q_sem.release() > except hub.QueueEmpty: > pass > + # Finally, ensure the _recv_loop terminates. > + self.close() > > def send(self, buf): > - if self.send_q: > - self.send_q.put(buf) > + msg_enqueued = False > + if self._send_q_sem.acquire(): > + if self.send_q: > + self.send_q.put(buf) > + msg_enqueued = True > + else: > + self._send_q_sem.release() > + if not msg_enqueued: > + LOG.debug('Datapath in process of terminating; send() to %s > discarded.', > + self.address) > > def set_xid(self, msg): > self.xid += 1 > @@ -254,6 +290,23 @@ class Datapath(ofproto_protocol.ProtocolDesc): > # LOG.debug('send_msg %s', msg) > self.send(msg.buf) > > + def _echo_request_loop(self): > + if not self.max_unreplied_echo_requests: > + return > + while (self.send_q and > + (len(self.unreplied_echo_requests) <= > self.max_unreplied_echo_requests)): > + echo_req = self.ofproto_parser.OFPEchoRequest(self) > + self.unreplied_echo_requests.append(self.set_xid(echo_req)) > + self.send_msg(echo_req) > + hub.sleep(self.echo_request_interval) > + self.close() > + > + def acknowledge_echo_reply(self, xid): > + try: > + self.unreplied_echo_requests.remove(xid) > + except: > + pass > + > def serve(self): > send_thr = hub.spawn(self._send_loop) > > @@ -261,11 +314,15 @@ class Datapath(ofproto_protocol.ProtocolDesc): > hello = self.ofproto_parser.OFPHello(self) > self.send_msg(hello) > > + echo_thr = hub.spawn(self._echo_request_loop) > + > try: > self._recv_loop() > finally: > hub.kill(send_thr) > - hub.joinall([send_thr]) > + hub.kill(echo_thr) > + hub.joinall([send_thr, echo_thr]) > + self.is_active = False > > # > # Utility methods for convenience > diff --git a/ryu/controller/ofp_handler.py b/ryu/controller/ofp_handler.py > index b3c63df..3bf19d2 100644 > --- a/ryu/controller/ofp_handler.py > +++ b/ryu/controller/ofp_handler.py > @@ -238,6 +238,13 @@ class OFPHandler(ryu.base.app_manager.RyuApp): > echo_reply.data = msg.data > datapath.send_msg(echo_reply) > > + @set_ev_handler(ofp_event.EventOFPEchoReply, > + [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, > MAIN_DISPATCHER]) > + def echo_reply_handler(self, ev): > + msg = ev.msg > + datapath = msg.datapath > + datapath.acknowledge_echo_reply(msg.xid) > + > @set_ev_handler(ofp_event.EventOFPErrorMsg, > [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, > MAIN_DISPATCHER]) > def error_msg_handler(self, ev): > diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py > index 5621147..b77465b 100644 > --- a/ryu/lib/hub.py > +++ b/ryu/lib/hub.py > @@ -50,7 +50,7 @@ if HUB_TYPE == 'eventlet': > # by not propergating an exception to the joiner. > try: > func(*args, **kwargs) > - except greenlet.GreenletExit: > + except TaskExit: > pass > except: > # log uncaught exception. > @@ -67,7 +67,7 @@ if HUB_TYPE == 'eventlet': > # by not propergating an exception to the joiner. > try: > func(*args, **kwargs) > - except greenlet.GreenletExit: > + except TaskExit: > pass > except: > # log uncaught exception. > @@ -87,13 +87,14 @@ if HUB_TYPE == 'eventlet': > # greenthread > try: > t.wait() > - except greenlet.GreenletExit: > + except TaskExit: > pass > > - Queue = eventlet.queue.Queue > + Queue = eventlet.queue.LightQueue > QueueEmpty = eventlet.queue.Empty > Semaphore = eventlet.semaphore.Semaphore > BoundedSemaphore = eventlet.semaphore.BoundedSemaphore > + TaskExit = greenlet.GreenletExit > > class StreamServer(object): > def __init__(self, listen_info, handle=None, backlog=None, > > > Best, > Victor > -- > Victor J. Orlikowski <> vjo@[cs.]duke.edu > ------------------------------------------------------------------------------ Site24x7 APM Insight: Get Deep Visibility into Application Performance APM + Mobile APM + RUM: Monitor 3 App instances at just $35/Month Monitor end-to-end web transactions and take corrective actions now Troubleshoot faster and improve end-user experience. Signup Now! http://pubads.g.doubleclick.net/gampad/clk?id=272487151&iu=/4140 _______________________________________________ Ryu-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/ryu-devel
