At Wed, 10 Feb 2016 06:02:05 +0000,
Victor Orlikowski wrote:
>
> Re-sending...
>
> Per Fujita-San's request, the echo request loop is off by default.
> It is easily enabled by setting the "maximum-unreplied-echo-requests"
> configuration option to a non-zero value.
>
> Signed-off-by: Victor J. Orlikowski <[email protected]>
>
> diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py
> index 3d5d895..eac2c42 100644
> --- a/ryu/base/app_manager.py
> +++ b/ryu/base/app_manager.py
> @@ -37,6 +37,7 @@ from ryu.controller.handler import register_instance,
> get_dependent_services
> from ryu.controller.controller import Datapath
> from ryu.controller import event
> from ryu.controller.event import EventRequestBase, EventReplyBase
> +from ryu.controller.ofp_event import EventOFPStateChange, EventOFPPacketIn
> from ryu.lib import hub
> from ryu.ofproto import ofproto_protocol
>
> @@ -157,12 +158,25 @@ class RyuApp(object):
> self.observers = {} # ev_cls -> observer-name -> states:set
> self.threads = []
> self.main_thread = None
> - self.events = hub.Queue(128)
> + self.events = hub.Queue(32)
> + self.sc_events = hub.Queue(32)
> + self.pi_events = hub.Queue(32)
> + self._events_sem = hub.BoundedSemaphore(self.events.maxsize)
> + self._sc_events_sem = hub.BoundedSemaphore(self.sc_events.maxsize)
> + self._pi_events_sem = hub.BoundedSemaphore(self.pi_events.maxsize)
Do we really need these semaphores?
eventlet.queue.Queue.put should block if a queue is full.
> + self._event_get_timeout = 5
> +
> if hasattr(self.__class__, 'LOGGER_NAME'):
> self.logger = logging.getLogger(self.__class__.LOGGER_NAME)
> else:
> self.logger = logging.getLogger(self.name)
> self.CONF = cfg.CONF
> + self.CONF.register_opts([
> + cfg.FloatOpt('handler-execution-timeout',
> + default=10.0,
> + help='Maximum time, in seconds, to permit handlers
> to run.')
> + ])
> + self.handler_execution_timeout = self.CONF.handler_execution_timeout
>
> # prevent accidental creation of instances of this class outside
> RyuApp
> class _EventThreadStop(event.EventBase):
> @@ -279,15 +293,65 @@ class RyuApp(object):
>
> def _event_loop(self):
> while self.is_active or not self.events.empty():
If pi_events (or sc_events) isn't empty but events is empty,
pi_events doesn't seem to be processed. Correct?
> - ev, state = self.events.get()
> - if ev == self._event_stop:
> + # Process events according to priority.
> + # StatusChange highest, PacketIn lowest, all others in between.
> + ev = state = None
> + if self.sc_events.qsize():
Just a nit, but it seems to be better to use empty() instead of
qsize() for consistency.
> + try:
> + ev, state =
> self.sc_events.get(timeout=self._event_get_timeout)
> + except hub.QueueEmpty:
> + pass
> + else:
> + self._sc_events_sem.release()
> +
> + if (ev is None) and (self.events.qsize()):
> + try:
> + ev, state =
> self.events.get(timeout=self._event_get_timeout)
> + except hub.QueueEmpty:
> + pass
> + else:
> + self._events_sem.release()
> +
> + if ev is None:
> + try:
> + ev, state =
> self.pi_events.get(timeout=self._event_get_timeout)
> + except hub.QueueEmpty:
> + pass
> + else:
> + self._pi_events_sem.release()
> +
> + if (ev is None) or (ev == self._event_stop):
> continue
> handlers = self.get_handlers(ev, state)
> for handler in handlers:
> - handler(ev)
> + handler_execution_timeout =
> hub.Timeout(self.handler_execution_timeout)
> + try:
> + handler(ev)
> + except hub.TaskExit:
> + # Normal exit.
> + # Propagate upwards, so we leave the event loop.
> + raise
> + except Exception as e:
> + if ((isinstance(e, hub.Timeout)) and (e is
> handler_execution_timeout)):
> + LOG.error('%s: Handler exceeded maximum execution
> time; terminated.', self.name)
> + else:
> + LOG.error('%s: Exception occurred during handler
> processing.', self.name)
> + LOG.exception('%s: Backtrace from offending handler '
> + '[%s] servicing event [%s] follows.',
> + self.name, handler.__name__,
> ev.__class__.__name__)
> + finally:
> + handler_execution_timeout.cancel()
>
> def _send_event(self, ev, state):
> - self.events.put((ev, state))
> + if isinstance(ev, EventOFPStateChange):
> + self._sc_events_sem.acquire()
> + self.sc_events.put((ev, state), block=False)
> + elif isinstance(ev, EventOFPPacketIn):
> + self._pi_events_sem.acquire()
> + self.pi_events.put((ev, state), block=False)
> + else:
> + self._events_sem.acquire()
> + self.events.put((ev, state), block=False)
>
> def send_event(self, name, ev, state=None):
> """
> @@ -336,7 +400,7 @@ class RyuApp(object):
>
>
> class AppManager(object):
> - # singletone
> + # singleton
> _instance = None
>
> @staticmethod
> @@ -519,8 +583,14 @@ class AppManager(object):
> app.stop()
> self._close(app)
> events = app.events
> + sc_events = app.sc_events
> + pi_events = app.pi_events
> + if not sc_events.empty():
> + app.logger.debug('%s state changes remains %d', app.name,
> sc_events.qsize())
> + if not pi_events.empty():
> + app.logger.debug('%s PacketIn events remains %d', app.name,
> pi_events.qsize())
> if not events.empty():
> - app.logger.debug('%s events remians %d', app.name,
> events.qsize())
> + app.logger.debug('%s events remains %d', app.name,
> events.qsize())
>
> def close(self):
> def close_all(close_dict):
> diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py
> index 25b8776..2ba399b 100644
> --- a/ryu/controller/controller.py
> +++ b/ryu/controller/controller.py
> @@ -30,7 +30,7 @@ from ryu.lib.hub import StreamServer
> import traceback
> import random
> import ssl
> -from socket import IPPROTO_TCP, TCP_NODELAY, timeout as SocketTimeout, error
> as SocketError
> +from socket import IPPROTO_TCP, TCP_NODELAY, SHUT_RDWR, timeout as
> SocketTimeout
> import warnings
>
> import ryu.base.app_manager
> @@ -41,8 +41,10 @@ from ryu.ofproto import ofproto_protocol
> from ryu.ofproto import ofproto_v1_0
> from ryu.ofproto import nx_match
>
> -from ryu.controller import handler
> from ryu.controller import ofp_event
> +from ryu.controller.handler import HANDSHAKE_DISPATCHER, MAIN_DISPATCHER,
> DEAD_DISPATCHER
> +
> +from ryu.exception import RyuException
>
> from ryu.lib.dpid import dpid_to_str
>
> @@ -57,11 +59,26 @@ CONF.register_cli_opts([
> help='openflow ssl listen port'),
> cfg.StrOpt('ctl-privkey', default=None, help='controller private key'),
> cfg.StrOpt('ctl-cert', default=None, help='controller certificate'),
> - cfg.StrOpt('ca-certs', default=None, help='CA certificates'),
> - cfg.FloatOpt('socket-timeout', default=5.0, help='Time, in seconds, to
> await completion of socket operations.')
> + cfg.StrOpt('ca-certs', default=None, help='CA certificates')
> +])
> +CONF.register_opts([
> + cfg.FloatOpt('socket-timeout',
> + default=5.0,
> + help='Time, in seconds, to await completion of socket
> operations.'),
> + cfg.FloatOpt('echo-request-interval',
> + default=15.0,
> + help='Time, in seconds, between sending echo requests to a
> datapath.'),
> + cfg.IntOpt('maximum-unreplied-echo-requests',
> + default=0,
> + min=0,
> + help='Maximum number of unreplied echo requests before
> datapath is disconnected.')
> ])
>
>
> +class DatapathShutdown(RyuException):
> + message = 'Datapath shutdown was requested'
> +
> +
> class OpenFlowController(object):
> def __init__(self):
> super(OpenFlowController, self).__init__()
> @@ -102,9 +119,22 @@ def _deactivate(method):
> def deactivate(self):
> try:
> method(self)
> + except DatapathShutdown:
> + if self.socket:
> + self.socket.close()
> + self.socket = None
> + if self.recv_thread:
> + self.recv_thread.throw(DatapathShutdown)
> + if self.send_thread:
> + self.send_thread.throw(DatapathShutdown)
> finally:
> - self.send_active = False
> - self.set_state(handler.DEAD_DISPATCHER)
> + if self.socket:
> + try:
> + self.socket.shutdown(SHUT_RDWR)
> + except (EOFError, IOError):
> + pass
> + if self.state is not DEAD_DISPATCHER:
> + self.set_state(DEAD_DISPATCHER)
> return deactivate
>
>
> @@ -117,28 +147,37 @@ class Datapath(ofproto_protocol.ProtocolDesc):
> self.socket.settimeout(CONF.socket_timeout)
> self.address = address
>
> - self.send_active = True
> + self.send_thread = None
> + self.recv_thread = None
> +
> self.close_requested = False
>
> # The limit is arbitrary. We need to limit queue size to
> - # prevent it from eating memory up
> + # prevent it from eating memory up.
> + # Similarly, the timeout is also arbitrary.
> self.send_q = hub.Queue(16)
> + self._send_q_sem = hub.BoundedSemaphore(self.send_q.maxsize)
> + self._send_q_timeout = 5
> +
> + self.echo_request_interval = CONF.echo_request_interval
> + self.max_unreplied_echo_requests =
> CONF.maximum_unreplied_echo_requests
> + self.unreplied_echo_requests = []
>
> self.xid = random.randint(0, self.ofproto.MAX_XID)
> self.id = None # datapath_id is unknown yet
> self._ports = None
> self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10
> self.ofp_brick =
> ryu.base.app_manager.lookup_service_brick('ofp_event')
> - self.set_state(handler.HANDSHAKE_DISPATCHER)
> + self.set_state(HANDSHAKE_DISPATCHER)
>
> def _get_ports(self):
> - if (self.ofproto_parser is not None and
> - self.ofproto_parser.ofproto.OFP_VERSION >= 0x04):
> + if (self.ofproto_parser and
> + (self.ofproto_parser.ofproto.OFP_VERSION >= 0x04)):
> message = (
> 'Datapath#ports is kept for compatibility with the previous '
> 'openflow versions (< 1.3). '
> - 'This not be updated by EventOFPPortStatus message. '
> - 'If you want to be updated, you can use '
> + 'This is not updated by the EventOFPPortStatus message. '
> + 'If you want to be updated, you should use '
> '\'ryu.controller.dpset\' or \'ryu.topology.switches\'.'
> )
> warnings.warn(message, stacklevel=2)
> @@ -163,64 +202,101 @@ class Datapath(ofproto_protocol.ProtocolDesc):
> # Low level socket handling layer
> @_deactivate
> def _recv_loop(self):
> - buf = bytearray()
> - required_len = ofproto_common.OFP_HEADER_SIZE
> -
> - count = 0
> - while True:
> - ret = ""
> -
> - try:
> - ret = self.socket.recv(required_len)
> - except SocketTimeout:
> - if not self.close_requested:
> - continue
> - except SocketError:
> - self.close_requested = True
> -
> - if (len(ret) == 0) or (self.close_requested):
> - self.socket.close()
> - break
> -
> - buf += ret
> - while len(buf) >= required_len:
> - (version, msg_type, msg_len, xid) =
> ofproto_parser.header(buf)
> - required_len = msg_len
> - if len(buf) < required_len:
> - break
> -
> - msg = ofproto_parser.msg(
> - self, version, msg_type, msg_len, xid, buf[:msg_len])
> - # LOG.debug('queue msg %s cls %s', msg, msg.__class__)
> - if msg:
> - ev = ofp_event.ofp_msg_to_ev(msg)
> - self.ofp_brick.send_event_to_observers(ev, self.state)
> -
> - dispatchers = lambda x:
> x.callers[ev.__class__].dispatchers
> - handlers = [handler for handler in
> - self.ofp_brick.get_handlers(ev) if
> - self.state in dispatchers(handler)]
> - for handler in handlers:
> - handler(ev)
> -
> - buf = buf[required_len:]
> - required_len = ofproto_common.OFP_HEADER_SIZE
> -
> - # We need to schedule other greenlets. Otherwise, ryu
> - # can't accept new switches or handle the existing
> - # switches. The limit is arbitrary. We need the better
> - # approach in the future.
> - count += 1
> - if count > 2048:
> - count = 0
> - hub.sleep(0)
> + try:
> + buf = bytearray()
> + required_len = ofproto_common.OFP_HEADER_SIZE
> +
> + count = 0
> + while True:
> + ret = ""
> +
> + try:
> + ret = self.socket.recv(required_len)
> + except SocketTimeout:
> + if not self.close_requested:
> + continue
> + except (AttributeError, EOFError, IOError):
> + self.close_requested = True
> +
> + if (len(ret) == 0) or (self.close_requested):
> + raise DatapathShutdown
> +
> + buf += ret
> + while len(buf) >= required_len:
> + (version, msg_type, msg_len, xid) =
> ofproto_parser.header(buf)
> + required_len = msg_len
> + if len(buf) < required_len:
> + break
> +
> + msg = ofproto_parser.msg(
> + self, version, msg_type, msg_len, xid, buf[:msg_len])
> + # LOG.debug('queue msg %s cls %s', msg, msg.__class__)
> + if msg:
> + ev = ofp_event.ofp_msg_to_ev(msg)
> + exec_timeout =
> self.ofp_brick.handler_execution_timeout
> + self.ofp_brick.send_event_to_observers(ev,
> self.state)
> +
> + dispatchers = lambda x:
> x.callers[ev.__class__].dispatchers
> + handlers = [handler for handler in
> + self.ofp_brick.get_handlers(ev) if
> + self.state in dispatchers(handler)]
> + for handler in handlers:
> + # Defensively set a timeout for the handler.
> + # "This should never happen" are famous last
> words.
> + handler_timeout = hub.Timeout(exec_timeout)
> + try:
> + handler(ev)
> + finally:
> + handler_timeout.cancel()
> +
> + buf = buf[required_len:]
> + required_len = ofproto_common.OFP_HEADER_SIZE
> +
> + # We need to schedule other greenlets. Otherwise, ryu
> + # can't accept new switches or handle the existing
> + # switches. The limit is arbitrary. We need the better
> + # approach in the future.
> + count += 1
> + if count > 2048:
> + count = 0
> + hub.sleep(0)
> + except Exception as e:
> + if isinstance(e, DatapathShutdown):
> + # Normal termination sequence.
> + pass
> + else:
> + dpid_str = "UNKNOWN"
> + if self.id:
> + dpid_str = dpid_to_str(self.id)
> + LOG.exception('Exception occurred in recv_loop() '
> + 'for datapath [%s] connecting from [%s]. '
> + 'Shutting down datapath.',
> + dpid_str, self.address)
> + finally:
> + # Shutting down; get rid of the recv_thread reference.
> + self.recv_thread = None
> + # Although it may have been caught above, raise DatapathShutdown.
> + # This guarantees that close() will be called on the socket.
> + raise DatapathShutdown
>
> @_deactivate
> def _send_loop(self):
> try:
> - while self.send_active:
> - buf = self.send_q.get()
> - self.socket.sendall(buf)
> + while self.socket:
> + buf = None
> + try:
> + buf = self.send_q.get(timeout=self._send_q_timeout)
> + except hub.QueueEmpty:
> + pass
> + else:
> + self._send_q_sem.release()
> +
> + if buf is None:
> + continue
> + if self.socket:
> + self.socket.sendall(buf)
> + else:
> + raise DatapathShutdown
> except IOError as ioe:
> LOG.debug("Socket error while sending data to switch at address
> %s: [%d] %s",
> self.address, ioe.errno, ioe.strerror)
> @@ -228,17 +304,32 @@ class Datapath(ofproto_protocol.ProtocolDesc):
> q = self.send_q
> # first, clear self.send_q to prevent new references.
> self.send_q = None
> - # there might be threads currently blocking in send_q.put().
> - # unblock them by draining the queue.
> + # Next, get rid of the send_thread reference.
> + self.send_thread = None
> + # Drain the send_q, releasing the associated semaphore for each
> entry.
> + # This should release all threads waiting to acquire the
> semaphore.
> try:
> while q.get(block=False):
> - pass
> + self._send_q_sem.release()
> except hub.QueueEmpty:
> pass
>
> def send(self, buf):
> + acquired = False
> + dp_closed = True
> +
> if self.send_q:
> - self.send_q.put(buf)
> + acquired = self._send_q_sem.acquire()
> +
> + if self.send_q and acquired:
> + dp_closed = False
> + self.send_q.put(buf, block=False)
> + elif acquired:
> + self._send_q_sem.release()
> +
> + if dp_closed:
> + LOG.debug('Datapath in process of terminating; send() to %s
> discarded.',
> + self.address)
>
> def set_xid(self, msg):
> self.xid += 1
> @@ -254,18 +345,41 @@ class Datapath(ofproto_protocol.ProtocolDesc):
> # LOG.debug('send_msg %s', msg)
> self.send(msg.buf)
>
> + def _echo_request_loop(self):
> + if not self.max_unreplied_echo_requests:
> + return
> +
> + while (self.send_q and
> + (len(self.unreplied_echo_requests) <=
> self.max_unreplied_echo_requests)):
> + echo_req = self.ofproto_parser.OFPEchoRequest(self)
> + self.unreplied_echo_requests.append(self.set_xid(echo_req))
> + self.send_msg(echo_req)
> + hub.sleep(self.echo_request_interval)
> +
> + if self.state is not DEAD_DISPATCHER:
> + self.close()
> +
> + def acknowledge_echo_reply(self, xid):
> + try:
> + self.unreplied_echo_requests.remove(xid)
> + except:
> + pass
> +
> def serve(self):
> - send_thr = hub.spawn(self._send_loop)
> + # self.recv_thread *MUST* be set before we spawn the send loop!
> + self.recv_thread = hub.getcurrent()
> + self.send_thread = hub.spawn(self._send_loop)
>
> # send hello message immediately
> hello = self.ofproto_parser.OFPHello(self)
> self.send_msg(hello)
>
> - try:
> - self._recv_loop()
> - finally:
> - hub.kill(send_thr)
> - hub.joinall([send_thr])
> + # Keeping track of the echo request loop is not really required.
> + # It will exit when the send loop exits, or when the number of
> + # unreplied echo requests exceeds the threshold.
> + echo_thread = hub.spawn(self._echo_request_loop)
> +
> + self._recv_loop()
>
> #
> # Utility methods for convenience
> diff --git a/ryu/controller/ofp_handler.py b/ryu/controller/ofp_handler.py
> index b3c63df..3bf19d2 100644
> --- a/ryu/controller/ofp_handler.py
> +++ b/ryu/controller/ofp_handler.py
> @@ -238,6 +238,13 @@ class OFPHandler(ryu.base.app_manager.RyuApp):
> echo_reply.data = msg.data
> datapath.send_msg(echo_reply)
>
> + @set_ev_handler(ofp_event.EventOFPEchoReply,
> + [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER,
> MAIN_DISPATCHER])
> + def echo_reply_handler(self, ev):
> + msg = ev.msg
> + datapath = msg.datapath
> + datapath.acknowledge_echo_reply(msg.xid)
> +
> @set_ev_handler(ofp_event.EventOFPErrorMsg,
> [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER,
> MAIN_DISPATCHER])
> def error_msg_handler(self, ev):
> diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py
> index 5621147..4de0b4a 100644
> --- a/ryu/lib/hub.py
> +++ b/ryu/lib/hub.py
> @@ -44,13 +44,21 @@ if HUB_TYPE == 'eventlet':
> listen = eventlet.listen
> connect = eventlet.connect
>
> +
> + Queue = eventlet.queue.LightQueue
> + QueueEmpty = eventlet.queue.Empty
> + Semaphore = eventlet.semaphore.Semaphore
> + BoundedSemaphore = eventlet.semaphore.BoundedSemaphore
> + TaskExit = greenlet.GreenletExit
> +
> +
> def spawn(*args, **kwargs):
> def _launch(func, *args, **kwargs):
> # mimic gevent's default raise_error=False behaviour
> # by not propergating an exception to the joiner.
> try:
> func(*args, **kwargs)
> - except greenlet.GreenletExit:
> + except TaskExit:
> pass
> except:
> # log uncaught exception.
> @@ -67,7 +75,7 @@ if HUB_TYPE == 'eventlet':
> # by not propergating an exception to the joiner.
> try:
> func(*args, **kwargs)
> - except greenlet.GreenletExit:
> + except TaskExit:
> pass
> except:
> # log uncaught exception.
> @@ -87,13 +95,9 @@ if HUB_TYPE == 'eventlet':
> # greenthread
> try:
> t.wait()
> - except greenlet.GreenletExit:
> + except TaskExit:
> pass
>
> - Queue = eventlet.queue.Queue
> - QueueEmpty = eventlet.queue.Empty
> - Semaphore = eventlet.semaphore.Semaphore
> - BoundedSemaphore = eventlet.semaphore.BoundedSemaphore
>
> class StreamServer(object):
> def __init__(self, listen_info, handle=None, backlog=None,
> @@ -120,19 +124,24 @@ if HUB_TYPE == 'eventlet':
> sock, addr = self.server.accept()
> spawn(self.handle, sock, addr)
>
> +
> class LoggingWrapper(object):
> def write(self, message):
> LOG.info(message.rstrip('\n'))
>
> +
> class WSGIServer(StreamServer):
> def serve_forever(self):
> self.logger = LoggingWrapper()
> eventlet.wsgi.server(self.server, self.handle, self.logger)
>
> +
> WebSocketWSGI = websocket.WebSocketWSGI
>
> +
> Timeout = eventlet.timeout.Timeout
>
> +
> class Event(object):
> def __init__(self):
> self._ev = eventlet.event.Event()
> @@ -144,7 +153,7 @@ if HUB_TYPE == 'eventlet':
>
> def _broadcast(self):
> self._ev.send()
> - # because eventlet Event doesn't allow mutiple send() on an
> event,
> + # because eventlet Event doesn't allow multiple send() on an
> event,
> # re-create the underlying event.
> # note: _ev.reset() is obsolete.
> self._ev = eventlet.event.Event()
>
>
> Best,
> Victor
> --
> Victor J. Orlikowski <> vjo@[cs.]duke.edu
>
------------------------------------------------------------------------------
Site24x7 APM Insight: Get Deep Visibility into Application Performance
APM + Mobile APM + RUM: Monitor 3 App instances at just $35/Month
Monitor end-to-end web transactions and take corrective actions now
Troubleshoot faster and improve end-user experience. Signup Now!
http://pubads.g.doubleclick.net/gampad/clk?id=272487151&iu=/4140
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel