Hi, on an unrelated note, have you considered going for true multi-threading (Python's native threading for example) as eventlet cannot make use of multiple CPUs?
At Mon, 22 Feb 2016 21:44:26 +0000, Victor Orlikowski wrote: > > After re-examining the code again and re-testing, I have simplified the patch > still further. > > Signed-off-by: Victor J. Orlikowski <[email protected]> > > diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py > index 3d5d895..f162cdf 100644 > --- a/ryu/base/app_manager.py > +++ b/ryu/base/app_manager.py > @@ -37,6 +37,7 @@ from ryu.controller.handler import register_instance, > get_dependent_services > from ryu.controller.controller import Datapath > from ryu.controller import event > from ryu.controller.event import EventRequestBase, EventReplyBase > +from ryu.controller.ofp_event import EventOFPPacketIn > from ryu.lib import hub > from ryu.ofproto import ofproto_protocol > > @@ -157,7 +158,18 @@ class RyuApp(object): > self.observers = {} # ev_cls -> observer-name -> states:set > self.threads = [] > self.main_thread = None > - self.events = hub.Queue(128) > + > + self.events = hub.PriorityQueue(128) > + self.event_counter = itertools.count() > + self._event_get_timeout = 5 > + # The sum of the following semaphores must equal the number of > + # entries in the events queue. > + self._events_sem = hub.BoundedSemaphore(64) > + self._pi_events_sem = hub.BoundedSemaphore(64) > + # Internal priorities for events. > + self._PRIORITY_EV = 0 > + self._PRIORITY_PI = 1 > + > if hasattr(self.__class__, 'LOGGER_NAME'): > self.logger = logging.getLogger(self.__class__.LOGGER_NAME) > else: > @@ -279,15 +291,42 @@ class RyuApp(object): > > def _event_loop(self): > while self.is_active or not self.events.empty(): > - ev, state = self.events.get() > - if ev == self._event_stop: > + ev = state = count = priority = None > + > + try: > + priority, count, (ev, state) = > self.events.get(timeout=self._event_get_timeout) As this timeout does nothing useful in the code, it needs to be eliminated unless proved to have some positive effect. > + except hub.QueueEmpty: > + pass > + else: > + if priority == self._PRIORITY_PI: > + self._pi_events_sem.release() > + else: > + self._events_sem.release() > + > + if (ev is None) or (ev == self._event_stop): > continue > handlers = self.get_handlers(ev, state) > for handler in handlers: > - handler(ev) > + try: > + handler(ev) > + except hub.TaskExit: > + # Normal exit. > + # Propagate upwards, so we leave the event loop. > + raise > + except Exception as e: > + LOG.exception('%s: Exception occurred during handler > processing. ' > + 'Backtrace from offending handler ' > + '[%s] servicing event [%s] follows.', > + self.name, handler.__name__, > ev.__class__.__name__) > > def _send_event(self, ev, state): > - self.events.put((ev, state)) > + count = next(self.event_counter) > + if isinstance(ev, EventOFPPacketIn): > + self._pi_events_sem.acquire() > + self.events.put((self._PRIORITY_PI, count, (ev, state)), > block=False) > + else: > + self._events_sem.acquire() > + self.events.put((self._PRIORITY_EV, count, (ev, state)), > block=False) > > def send_event(self, name, ev, state=None): > """ > @@ -336,7 +375,7 @@ class RyuApp(object): > > > class AppManager(object): > - # singletone > + # singleton > _instance = None > > @staticmethod > @@ -520,7 +559,7 @@ class AppManager(object): > self._close(app) > events = app.events > if not events.empty(): > - app.logger.debug('%s events remians %d', app.name, > events.qsize()) > + app.logger.debug('%s events remains %d', app.name, > events.qsize()) Could you make a separate patch for typo fixes? > > def close(self): > def close_all(close_dict): > diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py > index 25b8776..78b8593 100644 > --- a/ryu/controller/controller.py > +++ b/ryu/controller/controller.py > @@ -30,7 +30,7 @@ from ryu.lib.hub import StreamServer > import traceback > import random > import ssl > -from socket import IPPROTO_TCP, TCP_NODELAY, timeout as SocketTimeout, error > as SocketError > +from socket import IPPROTO_TCP, TCP_NODELAY, SHUT_RDWR, timeout as > SocketTimeout > import warnings > > import ryu.base.app_manager > @@ -41,8 +41,8 @@ from ryu.ofproto import ofproto_protocol > from ryu.ofproto import ofproto_v1_0 > from ryu.ofproto import nx_match > > -from ryu.controller import handler > from ryu.controller import ofp_event > +from ryu.controller.handler import HANDSHAKE_DISPATCHER, MAIN_DISPATCHER, > DEAD_DISPATCHER > > from ryu.lib.dpid import dpid_to_str > > @@ -57,8 +57,19 @@ CONF.register_cli_opts([ > help='openflow ssl listen port'), > cfg.StrOpt('ctl-privkey', default=None, help='controller private key'), > cfg.StrOpt('ctl-cert', default=None, help='controller certificate'), > - cfg.StrOpt('ca-certs', default=None, help='CA certificates'), > - cfg.FloatOpt('socket-timeout', default=5.0, help='Time, in seconds, to > await completion of socket operations.') > + cfg.StrOpt('ca-certs', default=None, help='CA certificates') > +]) > +CONF.register_opts([ > + cfg.FloatOpt('socket-timeout', > + default=5.0, > + help='Time, in seconds, to await completion of socket > operations.'), Moving this config opt from register_cli_opts may break existing appliciations. > + cfg.FloatOpt('echo-request-interval', > + default=15.0, > + help='Time, in seconds, between sending echo requests to a > datapath.'), > + cfg.IntOpt('maximum-unreplied-echo-requests', > + default=0, > + min=0, > + help='Maximum number of unreplied echo requests before > datapath is disconnected.') > ]) > > > @@ -103,8 +114,17 @@ def _deactivate(method): > try: > method(self) > finally: > - self.send_active = False > - self.set_state(handler.DEAD_DISPATCHER) > + if self.socket: > + try: > + self.socket.shutdown(SHUT_RDWR) > + except (EOFError, IOError): > + pass > + > + if self.recv_thread is None: > + self.socket.close() > + self.socket = None > + if self.state is not DEAD_DISPATCHER: > + self.set_state(DEAD_DISPATCHER) > return deactivate > > > @@ -117,28 +137,36 @@ class Datapath(ofproto_protocol.ProtocolDesc): > self.socket.settimeout(CONF.socket_timeout) > self.address = address > > - self.send_active = True > + self.recv_thread = None > + > self.close_requested = False > > # The limit is arbitrary. We need to limit queue size to > - # prevent it from eating memory up > + # prevent it from eating memory up. > + # Similarly, the timeout is also arbitrary. > self.send_q = hub.Queue(16) > + self._send_q_sem = hub.BoundedSemaphore(self.send_q.maxsize) > + self._send_q_timeout = 5 > + > + self.echo_request_interval = CONF.echo_request_interval > + self.max_unreplied_echo_requests = > CONF.maximum_unreplied_echo_requests > + self.unreplied_echo_requests = [] > > self.xid = random.randint(0, self.ofproto.MAX_XID) > self.id = None # datapath_id is unknown yet > self._ports = None > self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10 > self.ofp_brick = > ryu.base.app_manager.lookup_service_brick('ofp_event') > - self.set_state(handler.HANDSHAKE_DISPATCHER) > + self.set_state(HANDSHAKE_DISPATCHER) > > def _get_ports(self): > - if (self.ofproto_parser is not None and > - self.ofproto_parser.ofproto.OFP_VERSION >= 0x04): > + if (self.ofproto_parser and > + (self.ofproto_parser.ofproto.OFP_VERSION >= 0x04)): Do you need this change? > message = ( > 'Datapath#ports is kept for compatibility with the previous ' > 'openflow versions (< 1.3). ' > - 'This not be updated by EventOFPPortStatus message. ' > - 'If you want to be updated, you can use ' > + 'This is not updated by the EventOFPPortStatus message. ' > + 'If you want to be updated, you should use ' > '\'ryu.controller.dpset\' or \'ryu.topology.switches\'.' > ) > warnings.warn(message, stacklevel=2) > @@ -167,19 +195,17 @@ class Datapath(ofproto_protocol.ProtocolDesc): > required_len = ofproto_common.OFP_HEADER_SIZE > > count = 0 > - while True: > + while not self.close_requested: > ret = "" > > try: > ret = self.socket.recv(required_len) > except SocketTimeout: > - if not self.close_requested: > - continue > - except SocketError: > - self.close_requested = True > + continue > + except (AttributeError, EOFError, IOError): > + break > > - if (len(ret) == 0) or (self.close_requested): > - self.socket.close() Why do you want to remove this close() call? > + if len(ret) == 0: > break > > buf += ret > @@ -218,27 +244,57 @@ class Datapath(ofproto_protocol.ProtocolDesc): > @_deactivate > def _send_loop(self): > try: > - while self.send_active: > - buf = self.send_q.get() > - self.socket.sendall(buf) > + while self.socket: > + buf = None > + try: > + buf = self.send_q.get(timeout=self._send_q_timeout) Just like above, this timeout doesn't seem to be necessary. > + except hub.QueueEmpty: > + pass > + else: > + self._send_q_sem.release() > + > + if buf is None: > + continue > + if self.socket: > + self.socket.sendall(buf) > + else: > + break > + except SocketTimeout: > + LOG.debug("Socket timed out while sending data to switch at > address %s", > + self.address) > except IOError as ioe: > - LOG.debug("Socket error while sending data to switch at address > %s: [%d] %s", > - self.address, ioe.errno, ioe.strerror) > + # Convert ioe.errno to a string, just in case it was somehow set > to None. > + errno = "%s" % ioe.errno > + LOG.debug("Socket error while sending data to switch at address > %s: [%s] %s", > + self.address, errno, ioe.strerror) > finally: > q = self.send_q > - # first, clear self.send_q to prevent new references. > + # First, clear self.send_q to prevent new references. > self.send_q = None > - # there might be threads currently blocking in send_q.put(). > - # unblock them by draining the queue. > + # Now, drain the send_q, releasing the associated semaphore for > each entry. > + # This should release all threads waiting to acquire the > semaphore. > try: > while q.get(block=False): > - pass > + self._send_q_sem.release() > except hub.QueueEmpty: > pass > > def send(self, buf): > + acquired = False > + dp_closed = True > + > if self.send_q: > - self.send_q.put(buf) > + acquired = self._send_q_sem.acquire() > + > + if self.send_q and acquired: > + dp_closed = False > + self.send_q.put(buf, block=False) > + elif acquired: > + self._send_q_sem.release() > + > + if dp_closed: > + LOG.debug('Datapath in process of terminating; send() to %s > discarded.', > + self.address) > > def set_xid(self, msg): > self.xid += 1 > @@ -254,18 +310,43 @@ class Datapath(ofproto_protocol.ProtocolDesc): > # LOG.debug('send_msg %s', msg) > self.send(msg.buf) > > + def _echo_request_loop(self): > + if not self.max_unreplied_echo_requests: > + return > + > + while (self.send_q and > + (len(self.unreplied_echo_requests) <= > self.max_unreplied_echo_requests)): > + echo_req = self.ofproto_parser.OFPEchoRequest(self) > + self.unreplied_echo_requests.append(self.set_xid(echo_req)) > + self.send_msg(echo_req) > + hub.sleep(self.echo_request_interval) > + > + if self.state is not DEAD_DISPATCHER: > + self.close() > + > + def acknowledge_echo_reply(self, xid): > + try: > + self.unreplied_echo_requests.remove(xid) > + except: > + pass > + > def serve(self): > + self.recv_thread = hub.getcurrent() > send_thr = hub.spawn(self._send_loop) > > # send hello message immediately > hello = self.ofproto_parser.OFPHello(self) > self.send_msg(hello) > > + echo_thr = hub.spawn(self._echo_request_loop) > + > try: > self._recv_loop() > finally: > hub.kill(send_thr) > - hub.joinall([send_thr]) > + hub.kill(echo_thr) > + hub.joinall([send_thr, echo_thr]) > + self.recv_thread = None > > # > # Utility methods for convenience > diff --git a/ryu/controller/ofp_handler.py b/ryu/controller/ofp_handler.py > index b3c63df..3bf19d2 100644 > --- a/ryu/controller/ofp_handler.py > +++ b/ryu/controller/ofp_handler.py > @@ -238,6 +238,13 @@ class OFPHandler(ryu.base.app_manager.RyuApp): > echo_reply.data = msg.data > datapath.send_msg(echo_reply) > > + @set_ev_handler(ofp_event.EventOFPEchoReply, > + [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, > MAIN_DISPATCHER]) > + def echo_reply_handler(self, ev): > + msg = ev.msg > + datapath = msg.datapath > + datapath.acknowledge_echo_reply(msg.xid) > + > @set_ev_handler(ofp_event.EventOFPErrorMsg, > [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, > MAIN_DISPATCHER]) > def error_msg_handler(self, ev): > diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py > index 5621147..f59abd9 100644 > --- a/ryu/lib/hub.py > +++ b/ryu/lib/hub.py > @@ -50,7 +50,7 @@ if HUB_TYPE == 'eventlet': > # by not propergating an exception to the joiner. > try: > func(*args, **kwargs) > - except greenlet.GreenletExit: > + except TaskExit: > pass > except: > # log uncaught exception. > @@ -67,7 +67,7 @@ if HUB_TYPE == 'eventlet': > # by not propergating an exception to the joiner. > try: > func(*args, **kwargs) > - except greenlet.GreenletExit: > + except TaskExit: > pass > except: > # log uncaught exception. > @@ -87,13 +87,15 @@ if HUB_TYPE == 'eventlet': > # greenthread > try: > t.wait() > - except greenlet.GreenletExit: > + except TaskExit: > pass > > - Queue = eventlet.queue.Queue > + Queue = eventlet.queue.LightQueue > + PriorityQueue = eventlet.queue.PriorityQueue > QueueEmpty = eventlet.queue.Empty > Semaphore = eventlet.semaphore.Semaphore > BoundedSemaphore = eventlet.semaphore.BoundedSemaphore > + TaskExit = greenlet.GreenletExit > > class StreamServer(object): > def __init__(self, listen_info, handle=None, backlog=None, > @@ -144,7 +146,7 @@ if HUB_TYPE == 'eventlet': > > def _broadcast(self): > self._ev.send() > - # because eventlet Event doesn't allow mutiple send() on an > event, > + # because eventlet Event doesn't allow multiple send() on an > event, > # re-create the underlying event. > # note: _ev.reset() is obsolete. > self._ev = eventlet.event.Event() > > > Best, > Victor > -- > Victor J. Orlikowski <> vjo@[cs.]duke.edu > ------------------------------------------------------------------------------ Site24x7 APM Insight: Get Deep Visibility into Application Performance APM + Mobile APM + RUM: Monitor 3 App instances at just $35/Month Monitor end-to-end web transactions and take corrective actions now Troubleshoot faster and improve end-user experience. Signup Now! http://pubads.g.doubleclick.net/gampad/clk?id=272487151&iu=/4140 _______________________________________________ Ryu-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/ryu-devel
