At Wed, 24 Feb 2016 20:28:05 +0000,
Victor Orlikowski wrote:
> 
> A few more minor changes:
> 1) Place an assertion on one of the semaphore acquire() calls.

>From the eventlet code, acquire() has no chance to return other than
True.  I think such an assert statement with a side-effect should be
avoided in general.  Things will break when someone decides to turn on
optimization and assert()s are disabled.  (With python, it's "-O"
option.)


> 2) Clean up some naming and structure for greater clarity; one of the name 
> changes is a reversion of an earlier name change in an accepted patch.
> 3) I thought of another couple of corner cases.

This patch does
* place semaphore calls around the event queue
* add an echo request keep-alive timer
* other unrelated cleanups

I think it's optional but separating these things into different
patches is good.  Also please write commit messages that concisely
describe the changes and your problem, not changelog from older
patchsets.


> Signed-off-by: Victor J. Orlikowski <[email protected]>
> 
> diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py
> index 3d5d895..6481a68 100644
> --- a/ryu/base/app_manager.py
> +++ b/ryu/base/app_manager.py
> @@ -158,6 +158,7 @@ class RyuApp(object):
>          self.threads = []
>          self.main_thread = None
>          self.events = hub.Queue(128)
> +        self._events_sem = hub.BoundedSemaphore(self.events.maxsize)
>          if hasattr(self.__class__, 'LOGGER_NAME'):
>              self.logger = logging.getLogger(self.__class__.LOGGER_NAME)
>          else:
> @@ -280,13 +281,25 @@ class RyuApp(object):
>      def _event_loop(self):
>          while self.is_active or not self.events.empty():
>              ev, state = self.events.get()
> +            self._events_sem.release()
>              if ev == self._event_stop:
>                  continue
>              handlers = self.get_handlers(ev, state)
>              for handler in handlers:
> -                handler(ev)
> +                try:
> +                    handler(ev)
> +                except hub.TaskExit:
> +                    # Normal exit.
> +                    # Propagate upwards, so we leave the event loop.
> +                    raise
> +                except:
> +                    LOG.exception('%s: Exception occurred during handler 
> processing. '
> +                                  'Backtrace from offending handler '
> +                                  '[%s] servicing event [%s] follows.',
> +                                  self.name, handler.__name__, 
> ev.__class__.__name__)
>  
>      def _send_event(self, ev, state):
> +        assert self._events_sem.acquire()
>          self.events.put((ev, state))
>  
>      def send_event(self, name, ev, state=None):
> diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py
> index 25b8776..05f24e8 100644
> --- a/ryu/controller/controller.py
> +++ b/ryu/controller/controller.py
> @@ -30,7 +30,7 @@ from ryu.lib.hub import StreamServer
>  import traceback
>  import random
>  import ssl
> -from socket import IPPROTO_TCP, TCP_NODELAY, timeout as SocketTimeout, error 
> as SocketError
> +from socket import IPPROTO_TCP, TCP_NODELAY, SHUT_RDWR, timeout as 
> SocketTimeout
>  import warnings
>  
>  import ryu.base.app_manager
> @@ -41,8 +41,8 @@ from ryu.ofproto import ofproto_protocol
>  from ryu.ofproto import ofproto_v1_0
>  from ryu.ofproto import nx_match
>  
> -from ryu.controller import handler
>  from ryu.controller import ofp_event
> +from ryu.controller.handler import HANDSHAKE_DISPATCHER, MAIN_DISPATCHER, 
> DEAD_DISPATCHER
>  
>  from ryu.lib.dpid import dpid_to_str
>  
> @@ -57,8 +57,19 @@ CONF.register_cli_opts([
>                 help='openflow ssl listen port'),
>      cfg.StrOpt('ctl-privkey', default=None, help='controller private key'),
>      cfg.StrOpt('ctl-cert', default=None, help='controller certificate'),
> -    cfg.StrOpt('ca-certs', default=None, help='CA certificates'),
> -    cfg.FloatOpt('socket-timeout', default=5.0, help='Time, in seconds, to 
> await completion of socket operations.')
> +    cfg.StrOpt('ca-certs', default=None, help='CA certificates')
> +])
> +CONF.register_opts([
> +    cfg.FloatOpt('socket-timeout',
> +                 default=5.0,
> +                 help='Time, in seconds, to await completion of socket 
> operations.'),
> +    cfg.FloatOpt('echo-request-interval',
> +                 default=15.0,
> +                 help='Time, in seconds, between sending echo requests to a 
> datapath.'),
> +    cfg.IntOpt('maximum-unreplied-echo-requests',
> +               default=0,
> +               min=0,
> +               help='Maximum number of unreplied echo requests before 
> datapath is disconnected.')
>  ])
>  
>  
> @@ -103,8 +114,15 @@ def _deactivate(method):
>          try:
>              method(self)
>          finally:
> -            self.send_active = False
> -            self.set_state(handler.DEAD_DISPATCHER)
> +            try:
> +                self.socket.shutdown(SHUT_RDWR)
> +            except (EOFError, IOError):
> +                pass
> +
> +            if not self.is_active:
> +                self.socket.close()
> +            if self.state is not DEAD_DISPATCHER:
> +                self.set_state(DEAD_DISPATCHER)
>      return deactivate
>  
>  
> @@ -117,19 +135,24 @@ class Datapath(ofproto_protocol.ProtocolDesc):
>          self.socket.settimeout(CONF.socket_timeout)
>          self.address = address
>  
> -        self.send_active = True
> +        self.is_active = True
>          self.close_requested = False
>  
>          # The limit is arbitrary. We need to limit queue size to
>          # prevent it from eating memory up
>          self.send_q = hub.Queue(16)
> +        self._send_q_sem = hub.BoundedSemaphore(self.send_q.maxsize)
> +
> +        self.echo_request_interval = CONF.echo_request_interval
> +        self.max_unreplied_echo_requests = 
> CONF.maximum_unreplied_echo_requests
> +        self.unreplied_echo_requests = []
>  
>          self.xid = random.randint(0, self.ofproto.MAX_XID)
>          self.id = None  # datapath_id is unknown yet
>          self._ports = None
>          self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10
>          self.ofp_brick = 
> ryu.base.app_manager.lookup_service_brick('ofp_event')
> -        self.set_state(handler.HANDSHAKE_DISPATCHER)
> +        self.set_state(HANDSHAKE_DISPATCHER)
>  
>      def _get_ports(self):
>          if (self.ofproto_parser is not None and
> @@ -167,19 +190,17 @@ class Datapath(ofproto_protocol.ProtocolDesc):
>          required_len = ofproto_common.OFP_HEADER_SIZE
>  
>          count = 0
> -        while True:
> +        while not self.close_requested:
>              ret = ""
>  
>              try:
>                  ret = self.socket.recv(required_len)
>              except SocketTimeout:
> -                if not self.close_requested:
> -                    continue
> -            except SocketError:
> -                self.close_requested = True
> +                continue
> +            except (EOFError, IOError):
> +                break
>  
> -            if (len(ret) == 0) or (self.close_requested):
> -                self.socket.close()
> +            if len(ret) == 0:
>                  break
>  
>              buf += ret
> @@ -215,30 +236,45 @@ class Datapath(ofproto_protocol.ProtocolDesc):
>                      count = 0
>                      hub.sleep(0)
>  
> -    @_deactivate
>      def _send_loop(self):
>          try:
> -            while self.send_active:
> +            while True:
>                  buf = self.send_q.get()
> +                self._send_q_sem.release()
>                  self.socket.sendall(buf)
> +        except SocketTimeout:
> +            LOG.debug("Socket timed out while sending data to switch at 
> address %s",
> +                      self.address)
>          except IOError as ioe:
> -            LOG.debug("Socket error while sending data to switch at address 
> %s: [%d] %s",
> -                      self.address, ioe.errno, ioe.strerror)
> +            # Convert ioe.errno to a string, just in case it was somehow set 
> to None.
> +            errno = "%s" % ioe.errno
> +            LOG.debug("Socket error while sending data to switch at address 
> %s: [%s] %s",
> +                      self.address, errno, ioe.strerror)
>          finally:
>              q = self.send_q
>              # first, clear self.send_q to prevent new references.
>              self.send_q = None
> -            # there might be threads currently blocking in send_q.put().
> -            # unblock them by draining the queue.
> +            # Now, drain the send_q, releasing the associated semaphore for 
> each entry.
> +            # This should release all threads waiting to acquire the 
> semaphore.
>              try:
>                  while q.get(block=False):
> -                    pass
> +                    self._send_q_sem.release()
>              except hub.QueueEmpty:
>                  pass
> +            # Finally, ensure the _recv_loop terminates.
> +            self.close()
>  
>      def send(self, buf):
> -        if self.send_q:
> -            self.send_q.put(buf)
> +        msg_enqueued = False
> +        if self._send_q_sem.acquire():
> +            if self.send_q:
> +                self.send_q.put(buf)
> +                msg_enqueued = True
> +            else:
> +                self._send_q_sem.release()
> +        if not msg_enqueued:
> +            LOG.debug('Datapath in process of terminating; send() to %s 
> discarded.',
> +                      self.address)
>  
>      def set_xid(self, msg):
>          self.xid += 1
> @@ -254,6 +290,23 @@ class Datapath(ofproto_protocol.ProtocolDesc):
>          # LOG.debug('send_msg %s', msg)
>          self.send(msg.buf)
>  
> +    def _echo_request_loop(self):
> +        if not self.max_unreplied_echo_requests:
> +            return
> +        while (self.send_q and
> +               (len(self.unreplied_echo_requests) <= 
> self.max_unreplied_echo_requests)):
> +            echo_req = self.ofproto_parser.OFPEchoRequest(self)
> +            self.unreplied_echo_requests.append(self.set_xid(echo_req))
> +            self.send_msg(echo_req)
> +            hub.sleep(self.echo_request_interval)
> +        self.close()
> +
> +    def acknowledge_echo_reply(self, xid):
> +        try:
> +            self.unreplied_echo_requests.remove(xid)
> +        except:
> +            pass
> +
>      def serve(self):
>          send_thr = hub.spawn(self._send_loop)
>  
> @@ -261,11 +314,15 @@ class Datapath(ofproto_protocol.ProtocolDesc):
>          hello = self.ofproto_parser.OFPHello(self)
>          self.send_msg(hello)
>  
> +        echo_thr = hub.spawn(self._echo_request_loop)
> +
>          try:
>              self._recv_loop()
>          finally:
>              hub.kill(send_thr)
> -            hub.joinall([send_thr])
> +            hub.kill(echo_thr)
> +            hub.joinall([send_thr, echo_thr])
> +            self.is_active = False
>  
>      #
>      # Utility methods for convenience
> diff --git a/ryu/controller/ofp_handler.py b/ryu/controller/ofp_handler.py
> index b3c63df..3bf19d2 100644
> --- a/ryu/controller/ofp_handler.py
> +++ b/ryu/controller/ofp_handler.py
> @@ -238,6 +238,13 @@ class OFPHandler(ryu.base.app_manager.RyuApp):
>          echo_reply.data = msg.data
>          datapath.send_msg(echo_reply)
>  
> +    @set_ev_handler(ofp_event.EventOFPEchoReply,
> +                    [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, 
> MAIN_DISPATCHER])
> +    def echo_reply_handler(self, ev):
> +        msg = ev.msg
> +        datapath = msg.datapath
> +        datapath.acknowledge_echo_reply(msg.xid)
> +
>      @set_ev_handler(ofp_event.EventOFPErrorMsg,
>                      [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, 
> MAIN_DISPATCHER])
>      def error_msg_handler(self, ev):
> diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py
> index 5621147..b77465b 100644
> --- a/ryu/lib/hub.py
> +++ b/ryu/lib/hub.py
> @@ -50,7 +50,7 @@ if HUB_TYPE == 'eventlet':
>              # by not propergating an exception to the joiner.
>              try:
>                  func(*args, **kwargs)
> -            except greenlet.GreenletExit:
> +            except TaskExit:
>                  pass
>              except:
>                  # log uncaught exception.
> @@ -67,7 +67,7 @@ if HUB_TYPE == 'eventlet':
>              # by not propergating an exception to the joiner.
>              try:
>                  func(*args, **kwargs)
> -            except greenlet.GreenletExit:
> +            except TaskExit:
>                  pass
>              except:
>                  # log uncaught exception.
> @@ -87,13 +87,14 @@ if HUB_TYPE == 'eventlet':
>              # greenthread
>              try:
>                  t.wait()
> -            except greenlet.GreenletExit:
> +            except TaskExit:
>                  pass
>  
> -    Queue = eventlet.queue.Queue
> +    Queue = eventlet.queue.LightQueue
>      QueueEmpty = eventlet.queue.Empty
>      Semaphore = eventlet.semaphore.Semaphore
>      BoundedSemaphore = eventlet.semaphore.BoundedSemaphore
> +    TaskExit = greenlet.GreenletExit
>  
>      class StreamServer(object):
>          def __init__(self, listen_info, handle=None, backlog=None,
> 
> 
> Best,
> Victor
> --
> Victor J. Orlikowski <> vjo@[cs.]duke.edu
> 

------------------------------------------------------------------------------
Site24x7 APM Insight: Get Deep Visibility into Application Performance
APM + Mobile APM + RUM: Monitor 3 App instances at just $35/Month
Monitor end-to-end web transactions and take corrective actions now
Troubleshoot faster and improve end-user experience. Signup Now!
http://pubads.g.doubleclick.net/gampad/clk?id=272487151&iu=/4140
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to