At Thu, 11 Feb 2016 16:49:46 +0000,
Victor Orlikowski wrote:
> 
> Address PEP8 violations, reported here:
> https://travis-ci.org/fujita/ryu/jobs/108267416
> 
> Signed-off-by: Victor J. Orlikowski <[email protected]>
> 
> diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py
> index 3d5d895..b08c6c5 100644
> --- a/ryu/base/app_manager.py
> +++ b/ryu/base/app_manager.py
> @@ -37,6 +37,7 @@ from ryu.controller.handler import register_instance, 
> get_dependent_services
>  from ryu.controller.controller import Datapath
>  from ryu.controller import event
>  from ryu.controller.event import EventRequestBase, EventReplyBase
> +from ryu.controller.ofp_event import EventOFPStateChange, EventOFPPacketIn
>  from ryu.lib import hub
>  from ryu.ofproto import ofproto_protocol
>  
> @@ -157,12 +158,25 @@ class RyuApp(object):
>          self.observers = {}     # ev_cls -> observer-name -> states:set
>          self.threads = []
>          self.main_thread = None
> -        self.events = hub.Queue(128)
> +        self.events = hub.Queue(32)
> +        self.sc_events = hub.Queue(32)
> +        self.pi_events = hub.Queue(32)

I think you can use eventlet.queue.PriorityQueue instead of these 3
queues.  (see below)

Let us have:

       self.events = hub.PriorityQueue(128)

> +        self._events_sem = hub.BoundedSemaphore(self.events.maxsize)
> +        self._sc_events_sem = hub.BoundedSemaphore(self.sc_events.maxsize)
> +        self._pi_events_sem = hub.BoundedSemaphore(self.pi_events.maxsize)
> +        self._event_get_timeout = 5
> +

And a single semaphore would do, if needed at all.

>          if hasattr(self.__class__, 'LOGGER_NAME'):
>              self.logger = logging.getLogger(self.__class__.LOGGER_NAME)
>          else:
>              self.logger = logging.getLogger(self.name)
>          self.CONF = cfg.CONF
> +        self.CONF.register_opts([
> +            cfg.FloatOpt('handler-execution-timeout',
> +                         default=10.0,
> +                         help='Maximum time, in seconds, to permit handlers 
> to run.')
> +        ])
> +        self.handler_execution_timeout = self.CONF.handler_execution_timeout
>  
>          # prevent accidental creation of instances of this class outside 
> RyuApp
>          class _EventThreadStop(event.EventBase):
> @@ -278,16 +292,68 @@ class RyuApp(object):
>          return req.reply_q.get()
>  
>      def _event_loop(self):
> -        while self.is_active or not self.events.empty():
> -            ev, state = self.events.get()

This would be:

           priority, (ev, state) = self.events.get()

> -            if ev == self._event_stop:
> +        while (self.is_active or (not (self.events.empty() and
> +                                       self.sc_events.empty() and
> +                                       self.pi_events.empty()))):
> +            # Process events according to priority.
> +            # StatusChange highest, PacketIn lowest, all others in between.
> +            ev = state = None
> +            if not self.sc_events.empty():
> +                try:
> +                    ev, state = 
> self.sc_events.get(timeout=self._event_get_timeout)
> +                except hub.QueueEmpty:
> +                    pass
> +                else:
> +                    self._sc_events_sem.release()
> +
> +            if (ev is None) and (not self.events.empty()):
> +                try:
> +                    ev, state = 
> self.events.get(timeout=self._event_get_timeout)
> +                except hub.QueueEmpty:
> +                    pass
> +                else:
> +                    self._events_sem.release()
> +
> +            if ev is None:
> +                try:
> +                    ev, state = 
> self.pi_events.get(timeout=self._event_get_timeout)


This will harm responsiveness.

> +                except hub.QueueEmpty:
> +                    pass
> +                else:
> +                    self._pi_events_sem.release()
> +
> +            if (ev is None) or (ev == self._event_stop):
>                  continue
>              handlers = self.get_handlers(ev, state)
>              for handler in handlers:
> -                handler(ev)
> +                handler_execution_timeout = 
> hub.Timeout(self.handler_execution_timeout)
> +                try:
> +                    handler(ev)
> +                except hub.TaskExit:
> +                    # Normal exit.
> +                    # Propagate upwards, so we leave the event loop.
> +                    raise
> +                except Exception as e:
> +                    if ((isinstance(e, hub.Timeout)) and (e is 
> handler_execution_timeout)):
> +                        LOG.error('%s: Handler exceeded maximum execution 
> time; terminated.', self.name)
> +                    else:
> +                        LOG.error('%s: Exception occurred during handler 
> processing.', self.name)
> +                    LOG.exception('%s: Backtrace from offending handler '
> +                                  '[%s] servicing event [%s] follows.',
> +                                  self.name, handler.__name__, 
> ev.__class__.__name__)
> +                finally:
> +                    handler_execution_timeout.cancel()
>  
>      def _send_event(self, ev, state):
> -        self.events.put((ev, state))
> +        if isinstance(ev, EventOFPStateChange):
> +            self._sc_events_sem.acquire()
> +            self.sc_events.put((ev, state), block=False)
> +        elif isinstance(ev, EventOFPPacketIn):
> +            self._pi_events_sem.acquire()
> +            self.pi_events.put((ev, state), block=False)
> +        else:
> +            self._events_sem.acquire()
> +            self.events.put((ev, state), block=False)


And replace _send_event with something like the following.

PRIORITY_SC = 0
PRIORITY_OTHER = 1
PRIORITY_PI = 2

    def _send_event(self, ev, state):
        if isinstance(ev, EventOFPStateChange):
            self.events.put((PRIORITY_SC, (ev, state)))
        elif isinstance(ev, EventOFPPacketIn):
            self.pi_events.put((PRIORITY_PI, (ev, state))
        else:
            self.events.put((PRIORITY_OTHER, (ev, state))


>      def send_event(self, name, ev, state=None):
>          """
> @@ -336,7 +402,7 @@ class RyuApp(object):
>  
>  
>  class AppManager(object):
> -    # singletone
> +    # singleton
>      _instance = None
>  
>      @staticmethod
> @@ -519,8 +585,14 @@ class AppManager(object):
>          app.stop()
>          self._close(app)
>          events = app.events
> +        sc_events = app.sc_events
> +        pi_events = app.pi_events
> +        if not sc_events.empty():
> +            app.logger.debug('%s state changes remains %d', app.name, 
> sc_events.qsize())
> +        if not pi_events.empty():
> +            app.logger.debug('%s PacketIn events remains %d', app.name, 
> pi_events.qsize())
>          if not events.empty():
> -            app.logger.debug('%s events remians %d', app.name, 
> events.qsize())
> +            app.logger.debug('%s events remains %d', app.name, 
> events.qsize())
>  
>      def close(self):
>          def close_all(close_dict):
> diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py
> index 25b8776..566f22b 100644
> --- a/ryu/controller/controller.py
> +++ b/ryu/controller/controller.py
> @@ -30,7 +30,7 @@ from ryu.lib.hub import StreamServer
>  import traceback
>  import random
>  import ssl
> -from socket import IPPROTO_TCP, TCP_NODELAY, timeout as SocketTimeout, error 
> as SocketError
> +from socket import IPPROTO_TCP, TCP_NODELAY, SHUT_RDWR, timeout as 
> SocketTimeout
>  import warnings
>  
>  import ryu.base.app_manager
> @@ -41,8 +41,10 @@ from ryu.ofproto import ofproto_protocol
>  from ryu.ofproto import ofproto_v1_0
>  from ryu.ofproto import nx_match
>  
> -from ryu.controller import handler
>  from ryu.controller import ofp_event
> +from ryu.controller.handler import HANDSHAKE_DISPATCHER, MAIN_DISPATCHER, 
> DEAD_DISPATCHER
> +
> +from ryu.exception import RyuException
>  
>  from ryu.lib.dpid import dpid_to_str
>  
> @@ -57,11 +59,26 @@ CONF.register_cli_opts([
>                 help='openflow ssl listen port'),
>      cfg.StrOpt('ctl-privkey', default=None, help='controller private key'),
>      cfg.StrOpt('ctl-cert', default=None, help='controller certificate'),
> -    cfg.StrOpt('ca-certs', default=None, help='CA certificates'),
> -    cfg.FloatOpt('socket-timeout', default=5.0, help='Time, in seconds, to 
> await completion of socket operations.')
> +    cfg.StrOpt('ca-certs', default=None, help='CA certificates')
> +])
> +CONF.register_opts([
> +    cfg.FloatOpt('socket-timeout',
> +                 default=5.0,
> +                 help='Time, in seconds, to await completion of socket 
> operations.'),
> +    cfg.FloatOpt('echo-request-interval',
> +                 default=15.0,
> +                 help='Time, in seconds, between sending echo requests to a 
> datapath.'),
> +    cfg.IntOpt('maximum-unreplied-echo-requests',
> +               default=0,
> +               min=0,
> +               help='Maximum number of unreplied echo requests before 
> datapath is disconnected.')
>  ])
>  
>  
> +class DatapathShutdown(RyuException):
> +    message = 'Datapath shutdown was requested'
> +
> +
>  class OpenFlowController(object):
>      def __init__(self):
>          super(OpenFlowController, self).__init__()
> @@ -102,9 +119,22 @@ def _deactivate(method):
>      def deactivate(self):
>          try:
>              method(self)
> +        except DatapathShutdown:
> +            if self.socket:
> +                self.socket.close()
> +                self.socket = None
> +            if self.recv_thread:
> +                self.recv_thread.throw(DatapathShutdown)
> +            if self.send_thread:
> +                self.send_thread.throw(DatapathShutdown)
>          finally:
> -            self.send_active = False
> -            self.set_state(handler.DEAD_DISPATCHER)
> +            if self.socket:
> +                try:
> +                    self.socket.shutdown(SHUT_RDWR)
> +                except (EOFError, IOError):
> +                    pass
> +            if self.state is not DEAD_DISPATCHER:
> +                self.set_state(DEAD_DISPATCHER)
>      return deactivate
>  
>  
> @@ -117,28 +147,37 @@ class Datapath(ofproto_protocol.ProtocolDesc):
>          self.socket.settimeout(CONF.socket_timeout)
>          self.address = address
>  
> -        self.send_active = True
> +        self.send_thread = None
> +        self.recv_thread = None
> +
>          self.close_requested = False
>  
>          # The limit is arbitrary. We need to limit queue size to
> -        # prevent it from eating memory up
> +        # prevent it from eating memory up.
> +        # Similarly, the timeout is also arbitrary.
>          self.send_q = hub.Queue(16)
> +        self._send_q_sem = hub.BoundedSemaphore(self.send_q.maxsize)
> +        self._send_q_timeout = 5
> +
> +        self.echo_request_interval = CONF.echo_request_interval
> +        self.max_unreplied_echo_requests = 
> CONF.maximum_unreplied_echo_requests
> +        self.unreplied_echo_requests = []
>  
>          self.xid = random.randint(0, self.ofproto.MAX_XID)
>          self.id = None  # datapath_id is unknown yet
>          self._ports = None
>          self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10
>          self.ofp_brick = 
> ryu.base.app_manager.lookup_service_brick('ofp_event')
> -        self.set_state(handler.HANDSHAKE_DISPATCHER)
> +        self.set_state(HANDSHAKE_DISPATCHER)
>  
>      def _get_ports(self):
> -        if (self.ofproto_parser is not None and
> -                self.ofproto_parser.ofproto.OFP_VERSION >= 0x04):
> +        if (self.ofproto_parser and
> +           (self.ofproto_parser.ofproto.OFP_VERSION >= 0x04)):
>              message = (
>                  'Datapath#ports is kept for compatibility with the previous '
>                  'openflow versions (< 1.3). '
> -                'This not be updated by EventOFPPortStatus message. '
> -                'If you want to be updated, you can use '
> +                'This is not updated by the EventOFPPortStatus message. '
> +                'If you want to be updated, you should use '
>                  '\'ryu.controller.dpset\' or \'ryu.topology.switches\'.'
>              )
>              warnings.warn(message, stacklevel=2)
> @@ -163,64 +202,101 @@ class Datapath(ofproto_protocol.ProtocolDesc):
>      # Low level socket handling layer
>      @_deactivate
>      def _recv_loop(self):
> -        buf = bytearray()
> -        required_len = ofproto_common.OFP_HEADER_SIZE
> -
> -        count = 0
> -        while True:
> -            ret = ""
> -
> -            try:
> -                ret = self.socket.recv(required_len)
> -            except SocketTimeout:
> -                if not self.close_requested:
> -                    continue
> -            except SocketError:
> -                self.close_requested = True
> -
> -            if (len(ret) == 0) or (self.close_requested):
> -                self.socket.close()
> -                break
> -
> -            buf += ret
> -            while len(buf) >= required_len:
> -                (version, msg_type, msg_len, xid) = 
> ofproto_parser.header(buf)
> -                required_len = msg_len
> -                if len(buf) < required_len:
> -                    break
> -
> -                msg = ofproto_parser.msg(
> -                    self, version, msg_type, msg_len, xid, buf[:msg_len])
> -                # LOG.debug('queue msg %s cls %s', msg, msg.__class__)
> -                if msg:
> -                    ev = ofp_event.ofp_msg_to_ev(msg)
> -                    self.ofp_brick.send_event_to_observers(ev, self.state)
> -
> -                    dispatchers = lambda x: 
> x.callers[ev.__class__].dispatchers
> -                    handlers = [handler for handler in
> -                                self.ofp_brick.get_handlers(ev) if
> -                                self.state in dispatchers(handler)]
> -                    for handler in handlers:
> -                        handler(ev)
> -
> -                buf = buf[required_len:]
> -                required_len = ofproto_common.OFP_HEADER_SIZE
> -
> -                # We need to schedule other greenlets. Otherwise, ryu
> -                # can't accept new switches or handle the existing
> -                # switches. The limit is arbitrary. We need the better
> -                # approach in the future.
> -                count += 1
> -                if count > 2048:
> -                    count = 0
> -                    hub.sleep(0)
> +        try:
> +            buf = bytearray()
> +            required_len = ofproto_common.OFP_HEADER_SIZE
> +
> +            count = 0
> +            while True:
> +                ret = ""
> +
> +                try:
> +                    ret = self.socket.recv(required_len)
> +                except SocketTimeout:
> +                    if not self.close_requested:
> +                        continue
> +                except (AttributeError, EOFError, IOError):
> +                    self.close_requested = True
> +
> +                if (len(ret) == 0) or (self.close_requested):
> +                    raise DatapathShutdown
> +
> +                buf += ret
> +                while len(buf) >= required_len:
> +                    (version, msg_type, msg_len, xid) = 
> ofproto_parser.header(buf)
> +                    required_len = msg_len
> +                    if len(buf) < required_len:
> +                        break
> +
> +                    msg = ofproto_parser.msg(
> +                        self, version, msg_type, msg_len, xid, buf[:msg_len])
> +                    # LOG.debug('queue msg %s cls %s', msg, msg.__class__)
> +                    if msg:
> +                        ev = ofp_event.ofp_msg_to_ev(msg)
> +                        exec_timeout = CONF.handler_execution_timeout
> +                        self.ofp_brick.send_event_to_observers(ev, 
> self.state)
> +
> +                        dispatchers = lambda x: 
> x.callers[ev.__class__].dispatchers
> +                        handlers = [handler for handler in
> +                                    self.ofp_brick.get_handlers(ev) if
> +                                    self.state in dispatchers(handler)]
> +                        for handler in handlers:
> +                            # Defensively set a timeout for the handler.
> +                            # "This should never happen" are famous last 
> words.
> +                            handler_timeout = hub.Timeout(exec_timeout)
> +                            try:
> +                                handler(ev)
> +                            finally:
> +                                handler_timeout.cancel()
> +
> +                    buf = buf[required_len:]
> +                    required_len = ofproto_common.OFP_HEADER_SIZE
> +
> +                    # We need to schedule other greenlets. Otherwise, ryu
> +                    # can't accept new switches or handle the existing
> +                    # switches. The limit is arbitrary. We need the better
> +                    # approach in the future.
> +                    count += 1
> +                    if count > 2048:
> +                        count = 0
> +                        hub.sleep(0)
> +        except Exception as e:
> +            if isinstance(e, DatapathShutdown):
> +                # Normal termination sequence.
> +                pass
> +            else:
> +                dpid_str = "UNKNOWN"
> +                if self.id:
> +                    dpid_str = dpid_to_str(self.id)
> +                LOG.exception('Exception occurred in recv_loop() '
> +                              'for datapath [%s] connecting from [%s]. '
> +                              'Shutting down datapath.',
> +                              dpid_str, self.address)
> +        finally:
> +            # Shutting down; get rid of the recv_thread reference.
> +            self.recv_thread = None
> +            # Although it may have been caught above, raise DatapathShutdown.
> +            # This guarantees that close() will be called on the socket.
> +            raise DatapathShutdown
>  
>      @_deactivate
>      def _send_loop(self):
>          try:
> -            while self.send_active:
> -                buf = self.send_q.get()
> -                self.socket.sendall(buf)
> +            while self.socket:
> +                buf = None
> +                try:
> +                    buf = self.send_q.get(timeout=self._send_q_timeout)
> +                except hub.QueueEmpty:
> +                    pass
> +                else:
> +                    self._send_q_sem.release()
> +
> +                if buf is None:
> +                    continue
> +                if self.socket:
> +                    self.socket.sendall(buf)
> +                else:
> +                    raise DatapathShutdown
>          except IOError as ioe:
>              LOG.debug("Socket error while sending data to switch at address 
> %s: [%d] %s",
>                        self.address, ioe.errno, ioe.strerror)
> @@ -228,17 +304,32 @@ class Datapath(ofproto_protocol.ProtocolDesc):
>              q = self.send_q
>              # first, clear self.send_q to prevent new references.
>              self.send_q = None
> -            # there might be threads currently blocking in send_q.put().
> -            # unblock them by draining the queue.
> +            # Next, get rid of the send_thread reference.
> +            self.send_thread = None
> +            # Drain the send_q, releasing the associated semaphore for each 
> entry.
> +            # This should release all threads waiting to acquire the 
> semaphore.
>              try:
>                  while q.get(block=False):
> -                    pass
> +                    self._send_q_sem.release()
>              except hub.QueueEmpty:
>                  pass
>  
>      def send(self, buf):
> +        acquired = False
> +        dp_closed = True
> +
>          if self.send_q:
> -            self.send_q.put(buf)
> +            acquired = self._send_q_sem.acquire()
> +
> +        if self.send_q and acquired:
> +            dp_closed = False
> +            self.send_q.put(buf, block=False)
> +        elif acquired:
> +            self._send_q_sem.release()
> +
> +        if dp_closed:
> +            LOG.debug('Datapath in process of terminating; send() to %s 
> discarded.',
> +                      self.address)
>  
>      def set_xid(self, msg):
>          self.xid += 1
> @@ -254,18 +345,41 @@ class Datapath(ofproto_protocol.ProtocolDesc):
>          # LOG.debug('send_msg %s', msg)
>          self.send(msg.buf)
>  
> +    def _echo_request_loop(self):
> +        if not self.max_unreplied_echo_requests:
> +            return
> +
> +        while (self.send_q and
> +               (len(self.unreplied_echo_requests) <= 
> self.max_unreplied_echo_requests)):
> +            echo_req = self.ofproto_parser.OFPEchoRequest(self)
> +            self.unreplied_echo_requests.append(self.set_xid(echo_req))
> +            self.send_msg(echo_req)
> +            hub.sleep(self.echo_request_interval)
> +
> +        if self.state is not DEAD_DISPATCHER:
> +            self.close()
> +
> +    def acknowledge_echo_reply(self, xid):
> +        try:
> +            self.unreplied_echo_requests.remove(xid)
> +        except:
> +            pass
> +
>      def serve(self):
> -        send_thr = hub.spawn(self._send_loop)
> +        # self.recv_thread *MUST* be set before we spawn the send loop!
> +        self.recv_thread = hub.getcurrent()
> +        self.send_thread = hub.spawn(self._send_loop)
>  
>          # send hello message immediately
>          hello = self.ofproto_parser.OFPHello(self)
>          self.send_msg(hello)
>  
> -        try:
> -            self._recv_loop()
> -        finally:
> -            hub.kill(send_thr)
> -            hub.joinall([send_thr])
> +        # Keeping track of the echo request loop is not really required.
> +        # It will exit when the send loop exits, or when the number of
> +        # unreplied echo requests exceeds the threshold.
> +        echo_thread = hub.spawn(self._echo_request_loop)
> +
> +        self._recv_loop()
>  
>      #
>      # Utility methods for convenience
> diff --git a/ryu/controller/ofp_handler.py b/ryu/controller/ofp_handler.py
> index b3c63df..3bf19d2 100644
> --- a/ryu/controller/ofp_handler.py
> +++ b/ryu/controller/ofp_handler.py
> @@ -238,6 +238,13 @@ class OFPHandler(ryu.base.app_manager.RyuApp):
>          echo_reply.data = msg.data
>          datapath.send_msg(echo_reply)
>  
> +    @set_ev_handler(ofp_event.EventOFPEchoReply,
> +                    [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, 
> MAIN_DISPATCHER])
> +    def echo_reply_handler(self, ev):
> +        msg = ev.msg
> +        datapath = msg.datapath
> +        datapath.acknowledge_echo_reply(msg.xid)
> +
>      @set_ev_handler(ofp_event.EventOFPErrorMsg,
>                      [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, 
> MAIN_DISPATCHER])
>      def error_msg_handler(self, ev):
> diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py
> index 5621147..d214705 100644
> --- a/ryu/lib/hub.py
> +++ b/ryu/lib/hub.py
> @@ -44,13 +44,19 @@ if HUB_TYPE == 'eventlet':
>      listen = eventlet.listen
>      connect = eventlet.connect
>  
> +    Queue = eventlet.queue.LightQueue
> +    QueueEmpty = eventlet.queue.Empty
> +    Semaphore = eventlet.semaphore.Semaphore
> +    BoundedSemaphore = eventlet.semaphore.BoundedSemaphore
> +    TaskExit = greenlet.GreenletExit
> +
>      def spawn(*args, **kwargs):
>          def _launch(func, *args, **kwargs):
>              # mimic gevent's default raise_error=False behaviour
>              # by not propergating an exception to the joiner.
>              try:
>                  func(*args, **kwargs)
> -            except greenlet.GreenletExit:
> +            except TaskExit:
>                  pass
>              except:
>                  # log uncaught exception.
> @@ -67,7 +73,7 @@ if HUB_TYPE == 'eventlet':
>              # by not propergating an exception to the joiner.
>              try:
>                  func(*args, **kwargs)
> -            except greenlet.GreenletExit:
> +            except TaskExit:
>                  pass
>              except:
>                  # log uncaught exception.
> @@ -87,14 +93,9 @@ if HUB_TYPE == 'eventlet':
>              # greenthread
>              try:
>                  t.wait()
> -            except greenlet.GreenletExit:
> +            except TaskExit:
>                  pass
>  
> -    Queue = eventlet.queue.Queue
> -    QueueEmpty = eventlet.queue.Empty
> -    Semaphore = eventlet.semaphore.Semaphore
> -    BoundedSemaphore = eventlet.semaphore.BoundedSemaphore
> -
>      class StreamServer(object):
>          def __init__(self, listen_info, handle=None, backlog=None,
>                       spawn='default', **ssl_args):
> @@ -144,7 +145,7 @@ if HUB_TYPE == 'eventlet':
>  
>          def _broadcast(self):
>              self._ev.send()
> -            # because eventlet Event doesn't allow mutiple send() on an 
> event,
> +            # because eventlet Event doesn't allow multiple send() on an 
> event,
>              # re-create the underlying event.
>              # note: _ev.reset() is obsolete.
>              self._ev = eventlet.event.Event()
> 
> 
> Best,
> Victor
> --
> Victor J. Orlikowski <> vjo@[cs.]duke.edu
> 

------------------------------------------------------------------------------
Site24x7 APM Insight: Get Deep Visibility into Application Performance
APM + Mobile APM + RUM: Monitor 3 App instances at just $35/Month
Monitor end-to-end web transactions and take corrective actions now
Troubleshoot faster and improve end-user experience. Signup Now!
http://pubads.g.doubleclick.net/gampad/clk?id=272487151&iu=/4140
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to