Correcting logical and stylistic errors caught by Iwamoto-San. Signed-off-by: Victor J. Orlikowski <[email protected]>
diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py
index 3d5d895..b08c6c5 100644
--- a/ryu/base/app_manager.py
+++ b/ryu/base/app_manager.py
@@ -37,6 +37,7 @@ from ryu.controller.handler import register_instance,
get_dependent_services
from ryu.controller.controller import Datapath
from ryu.controller import event
from ryu.controller.event import EventRequestBase, EventReplyBase
+from ryu.controller.ofp_event import EventOFPStateChange, EventOFPPacketIn
from ryu.lib import hub
from ryu.ofproto import ofproto_protocol
@@ -157,12 +158,25 @@ class RyuApp(object):
self.observers = {} # ev_cls -> observer-name -> states:set
self.threads = []
self.main_thread = None
- self.events = hub.Queue(128)
+ self.events = hub.Queue(32)
+ self.sc_events = hub.Queue(32)
+ self.pi_events = hub.Queue(32)
+ self._events_sem = hub.BoundedSemaphore(self.events.maxsize)
+ self._sc_events_sem = hub.BoundedSemaphore(self.sc_events.maxsize)
+ self._pi_events_sem = hub.BoundedSemaphore(self.pi_events.maxsize)
+ self._event_get_timeout = 5
+
if hasattr(self.__class__, 'LOGGER_NAME'):
self.logger = logging.getLogger(self.__class__.LOGGER_NAME)
else:
self.logger = logging.getLogger(self.name)
self.CONF = cfg.CONF
+ self.CONF.register_opts([
+ cfg.FloatOpt('handler-execution-timeout',
+ default=10.0,
+ help='Maximum time, in seconds, to permit handlers to
run.')
+ ])
+ self.handler_execution_timeout = self.CONF.handler_execution_timeout
# prevent accidental creation of instances of this class outside RyuApp
class _EventThreadStop(event.EventBase):
@@ -278,16 +292,68 @@ class RyuApp(object):
return req.reply_q.get()
def _event_loop(self):
- while self.is_active or not self.events.empty():
- ev, state = self.events.get()
- if ev == self._event_stop:
+ while (self.is_active or (not (self.events.empty() and
+ self.sc_events.empty() and
+ self.pi_events.empty()))):
+ # Process events according to priority.
+ # StatusChange highest, PacketIn lowest, all others in between.
+ ev = state = None
+ if not self.sc_events.empty():
+ try:
+ ev, state =
self.sc_events.get(timeout=self._event_get_timeout)
+ except hub.QueueEmpty:
+ pass
+ else:
+ self._sc_events_sem.release()
+
+ if (ev is None) and (not self.events.empty()):
+ try:
+ ev, state =
self.events.get(timeout=self._event_get_timeout)
+ except hub.QueueEmpty:
+ pass
+ else:
+ self._events_sem.release()
+
+ if ev is None:
+ try:
+ ev, state =
self.pi_events.get(timeout=self._event_get_timeout)
+ except hub.QueueEmpty:
+ pass
+ else:
+ self._pi_events_sem.release()
+
+ if (ev is None) or (ev == self._event_stop):
continue
handlers = self.get_handlers(ev, state)
for handler in handlers:
- handler(ev)
+ handler_execution_timeout =
hub.Timeout(self.handler_execution_timeout)
+ try:
+ handler(ev)
+ except hub.TaskExit:
+ # Normal exit.
+ # Propagate upwards, so we leave the event loop.
+ raise
+ except Exception as e:
+ if ((isinstance(e, hub.Timeout)) and (e is
handler_execution_timeout)):
+ LOG.error('%s: Handler exceeded maximum execution
time; terminated.', self.name)
+ else:
+ LOG.error('%s: Exception occurred during handler
processing.', self.name)
+ LOG.exception('%s: Backtrace from offending handler '
+ '[%s] servicing event [%s] follows.',
+ self.name, handler.__name__,
ev.__class__.__name__)
+ finally:
+ handler_execution_timeout.cancel()
def _send_event(self, ev, state):
- self.events.put((ev, state))
+ if isinstance(ev, EventOFPStateChange):
+ self._sc_events_sem.acquire()
+ self.sc_events.put((ev, state), block=False)
+ elif isinstance(ev, EventOFPPacketIn):
+ self._pi_events_sem.acquire()
+ self.pi_events.put((ev, state), block=False)
+ else:
+ self._events_sem.acquire()
+ self.events.put((ev, state), block=False)
def send_event(self, name, ev, state=None):
"""
@@ -336,7 +402,7 @@ class RyuApp(object):
class AppManager(object):
- # singletone
+ # singleton
_instance = None
@staticmethod
@@ -519,8 +585,14 @@ class AppManager(object):
app.stop()
self._close(app)
events = app.events
+ sc_events = app.sc_events
+ pi_events = app.pi_events
+ if not sc_events.empty():
+ app.logger.debug('%s state changes remains %d', app.name,
sc_events.qsize())
+ if not pi_events.empty():
+ app.logger.debug('%s PacketIn events remains %d', app.name,
pi_events.qsize())
if not events.empty():
- app.logger.debug('%s events remians %d', app.name, events.qsize())
+ app.logger.debug('%s events remains %d', app.name, events.qsize())
def close(self):
def close_all(close_dict):
diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py
index 25b8776..934bb2f 100644
--- a/ryu/controller/controller.py
+++ b/ryu/controller/controller.py
@@ -30,7 +30,7 @@ from ryu.lib.hub import StreamServer
import traceback
import random
import ssl
-from socket import IPPROTO_TCP, TCP_NODELAY, timeout as SocketTimeout, error
as SocketError
+from socket import IPPROTO_TCP, TCP_NODELAY, SHUT_RDWR, timeout as
SocketTimeout
import warnings
import ryu.base.app_manager
@@ -41,8 +41,10 @@ from ryu.ofproto import ofproto_protocol
from ryu.ofproto import ofproto_v1_0
from ryu.ofproto import nx_match
-from ryu.controller import handler
from ryu.controller import ofp_event
+from ryu.controller.handler import HANDSHAKE_DISPATCHER, MAIN_DISPATCHER,
DEAD_DISPATCHER
+
+from ryu.exception import RyuException
from ryu.lib.dpid import dpid_to_str
@@ -57,11 +59,26 @@ CONF.register_cli_opts([
help='openflow ssl listen port'),
cfg.StrOpt('ctl-privkey', default=None, help='controller private key'),
cfg.StrOpt('ctl-cert', default=None, help='controller certificate'),
- cfg.StrOpt('ca-certs', default=None, help='CA certificates'),
- cfg.FloatOpt('socket-timeout', default=5.0, help='Time, in seconds, to
await completion of socket operations.')
+ cfg.StrOpt('ca-certs', default=None, help='CA certificates')
+])
+CONF.register_opts([
+ cfg.FloatOpt('socket-timeout',
+ default=5.0,
+ help='Time, in seconds, to await completion of socket
operations.'),
+ cfg.FloatOpt('echo-request-interval',
+ default=15.0,
+ help='Time, in seconds, between sending echo requests to a
datapath.'),
+ cfg.IntOpt('maximum-unreplied-echo-requests',
+ default=0,
+ min=0,
+ help='Maximum number of unreplied echo requests before datapath
is disconnected.')
])
+class DatapathShutdown(RyuException):
+ message = 'Datapath shutdown was requested'
+
+
class OpenFlowController(object):
def __init__(self):
super(OpenFlowController, self).__init__()
@@ -102,9 +119,22 @@ def _deactivate(method):
def deactivate(self):
try:
method(self)
+ except DatapathShutdown:
+ if self.socket:
+ self.socket.close()
+ self.socket = None
+ if self.recv_thread:
+ self.recv_thread.throw(DatapathShutdown)
+ if self.send_thread:
+ self.send_thread.throw(DatapathShutdown)
finally:
- self.send_active = False
- self.set_state(handler.DEAD_DISPATCHER)
+ if self.socket:
+ try:
+ self.socket.shutdown(SHUT_RDWR)
+ except (EOFError, IOError):
+ pass
+ if self.state is not DEAD_DISPATCHER:
+ self.set_state(DEAD_DISPATCHER)
return deactivate
@@ -117,28 +147,37 @@ class Datapath(ofproto_protocol.ProtocolDesc):
self.socket.settimeout(CONF.socket_timeout)
self.address = address
- self.send_active = True
+ self.send_thread = None
+ self.recv_thread = None
+
self.close_requested = False
# The limit is arbitrary. We need to limit queue size to
- # prevent it from eating memory up
+ # prevent it from eating memory up.
+ # Similarly, the timeout is also arbitrary.
self.send_q = hub.Queue(16)
+ self._send_q_sem = hub.BoundedSemaphore(self.send_q.maxsize)
+ self._send_q_timeout = 5
+
+ self.echo_request_interval = CONF.echo_request_interval
+ self.max_unreplied_echo_requests = CONF.maximum_unreplied_echo_requests
+ self.unreplied_echo_requests = []
self.xid = random.randint(0, self.ofproto.MAX_XID)
self.id = None # datapath_id is unknown yet
self._ports = None
self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10
self.ofp_brick = ryu.base.app_manager.lookup_service_brick('ofp_event')
- self.set_state(handler.HANDSHAKE_DISPATCHER)
+ self.set_state(HANDSHAKE_DISPATCHER)
def _get_ports(self):
- if (self.ofproto_parser is not None and
- self.ofproto_parser.ofproto.OFP_VERSION >= 0x04):
+ if (self.ofproto_parser and
+ (self.ofproto_parser.ofproto.OFP_VERSION >= 0x04)):
message = (
'Datapath#ports is kept for compatibility with the previous '
'openflow versions (< 1.3). '
- 'This not be updated by EventOFPPortStatus message. '
- 'If you want to be updated, you can use '
+ 'This is not updated by the EventOFPPortStatus message. '
+ 'If you want to be updated, you should use '
'\'ryu.controller.dpset\' or \'ryu.topology.switches\'.'
)
warnings.warn(message, stacklevel=2)
@@ -163,64 +202,101 @@ class Datapath(ofproto_protocol.ProtocolDesc):
# Low level socket handling layer
@_deactivate
def _recv_loop(self):
- buf = bytearray()
- required_len = ofproto_common.OFP_HEADER_SIZE
-
- count = 0
- while True:
- ret = ""
-
- try:
- ret = self.socket.recv(required_len)
- except SocketTimeout:
- if not self.close_requested:
- continue
- except SocketError:
- self.close_requested = True
-
- if (len(ret) == 0) or (self.close_requested):
- self.socket.close()
- break
-
- buf += ret
- while len(buf) >= required_len:
- (version, msg_type, msg_len, xid) = ofproto_parser.header(buf)
- required_len = msg_len
- if len(buf) < required_len:
- break
-
- msg = ofproto_parser.msg(
- self, version, msg_type, msg_len, xid, buf[:msg_len])
- # LOG.debug('queue msg %s cls %s', msg, msg.__class__)
- if msg:
- ev = ofp_event.ofp_msg_to_ev(msg)
- self.ofp_brick.send_event_to_observers(ev, self.state)
-
- dispatchers = lambda x: x.callers[ev.__class__].dispatchers
- handlers = [handler for handler in
- self.ofp_brick.get_handlers(ev) if
- self.state in dispatchers(handler)]
- for handler in handlers:
- handler(ev)
-
- buf = buf[required_len:]
- required_len = ofproto_common.OFP_HEADER_SIZE
-
- # We need to schedule other greenlets. Otherwise, ryu
- # can't accept new switches or handle the existing
- # switches. The limit is arbitrary. We need the better
- # approach in the future.
- count += 1
- if count > 2048:
- count = 0
- hub.sleep(0)
+ try:
+ buf = bytearray()
+ required_len = ofproto_common.OFP_HEADER_SIZE
+
+ count = 0
+ while True:
+ ret = ""
+
+ try:
+ ret = self.socket.recv(required_len)
+ except SocketTimeout:
+ if not self.close_requested:
+ continue
+ except (AttributeError, EOFError, IOError):
+ self.close_requested = True
+
+ if (len(ret) == 0) or (self.close_requested):
+ raise DatapathShutdown
+
+ buf += ret
+ while len(buf) >= required_len:
+ (version, msg_type, msg_len, xid) =
ofproto_parser.header(buf)
+ required_len = msg_len
+ if len(buf) < required_len:
+ break
+
+ msg = ofproto_parser.msg(
+ self, version, msg_type, msg_len, xid, buf[:msg_len])
+ # LOG.debug('queue msg %s cls %s', msg, msg.__class__)
+ if msg:
+ ev = ofp_event.ofp_msg_to_ev(msg)
+ exec_timeout = CONF.handler_execution_timeout
+ self.ofp_brick.send_event_to_observers(ev, self.state)
+
+ dispatchers = lambda x:
x.callers[ev.__class__].dispatchers
+ handlers = [handler for handler in
+ self.ofp_brick.get_handlers(ev) if
+ self.state in dispatchers(handler)]
+ for handler in handlers:
+ # Defensively set a timeout for the handler.
+ # "This should never happen" are famous last words.
+ handler_timeout = hub.Timeout(exec_timeout)
+ try:
+ handler(ev)
+ finally:
+ handler_timeout.cancel()
+
+ buf = buf[required_len:]
+ required_len = ofproto_common.OFP_HEADER_SIZE
+
+ # We need to schedule other greenlets. Otherwise, ryu
+ # can't accept new switches or handle the existing
+ # switches. The limit is arbitrary. We need the better
+ # approach in the future.
+ count += 1
+ if count > 2048:
+ count = 0
+ hub.sleep(0)
+ except Exception as e:
+ if isinstance(e, DatapathShutdown):
+ # Normal termination sequence.
+ pass
+ else:
+ dpid_str = "UNKNOWN"
+ if self.id:
+ dpid_str = dpid_to_str(self.id)
+ LOG.exception('Exception occurred in recv_loop() '
+ 'for datapath [%s] connecting from [%s]. '
+ 'Shutting down datapath.',
+ dpid_str, self.address)
+ finally:
+ # Shutting down; get rid of the recv_thread reference.
+ self.recv_thread = None
+ # Although it may have been caught above, raise DatapathShutdown.
+ # This guarantees that close() will be called on the socket.
+ raise DatapathShutdown
@_deactivate
def _send_loop(self):
try:
- while self.send_active:
- buf = self.send_q.get()
- self.socket.sendall(buf)
+ while self.socket:
+ buf = None
+ try:
+ buf = self.send_q.get(timeout=self._send_q_timeout)
+ except hub.QueueEmpty:
+ pass
+ else:
+ self._send_q_sem.release()
+
+ if buf is None:
+ continue
+ if self.socket:
+ self.socket.sendall(buf)
+ else:
+ raise DatapathShutdown
except IOError as ioe:
LOG.debug("Socket error while sending data to switch at address
%s: [%d] %s",
self.address, ioe.errno, ioe.strerror)
@@ -228,17 +304,32 @@ class Datapath(ofproto_protocol.ProtocolDesc):
q = self.send_q
# first, clear self.send_q to prevent new references.
self.send_q = None
- # there might be threads currently blocking in send_q.put().
- # unblock them by draining the queue.
+ # Next, get rid of the send_thread reference.
+ self.send_thread = None
+ # Drain the send_q, releasing the associated semaphore for each
entry.
+ # This should release all threads waiting to acquire the semaphore.
try:
while q.get(block=False):
- pass
+ self._send_q_sem.release()
except hub.QueueEmpty:
pass
def send(self, buf):
+ acquired = False
+ dp_closed = True
+
if self.send_q:
- self.send_q.put(buf)
+ acquired = self._send_q_sem.acquire()
+
+ if self.send_q and acquired:
+ dp_closed = False
+ self.send_q.put(buf, block=False)
+ elif acquired:
+ self._send_q_sem.release()
+
+ if dp_closed:
+ LOG.debug('Datapath in process of terminating; send() to %s
discarded.',
+ self.address)
def set_xid(self, msg):
self.xid += 1
@@ -254,18 +345,41 @@ class Datapath(ofproto_protocol.ProtocolDesc):
# LOG.debug('send_msg %s', msg)
self.send(msg.buf)
+ def _echo_request_loop(self):
+ if not self.max_unreplied_echo_requests:
+ return
+
+ while (self.send_q and
+ (len(self.unreplied_echo_requests) <=
self.max_unreplied_echo_requests)):
+ echo_req = self.ofproto_parser.OFPEchoRequest(self)
+ self.unreplied_echo_requests.append(self.set_xid(echo_req))
+ self.send_msg(echo_req)
+ hub.sleep(self.echo_request_interval)
+
+ if self.state is not DEAD_DISPATCHER:
+ self.close()
+
+ def acknowledge_echo_reply(self, xid):
+ try:
+ self.unreplied_echo_requests.remove(xid)
+ except:
+ pass
+
def serve(self):
- send_thr = hub.spawn(self._send_loop)
+ # self.recv_thread *MUST* be set before we spawn the send loop!
+ self.recv_thread = hub.getcurrent()
+ self.send_thread = hub.spawn(self._send_loop)
# send hello message immediately
hello = self.ofproto_parser.OFPHello(self)
self.send_msg(hello)
- try:
- self._recv_loop()
- finally:
- hub.kill(send_thr)
- hub.joinall([send_thr])
+ # Keeping track of the echo request loop is not really required.
+ # It will exit when the send loop exits, or when the number of
+ # unreplied echo requests exceeds the threshold.
+ echo_thread = hub.spawn(self._echo_request_loop)
+
+ self._recv_loop()
#
# Utility methods for convenience
diff --git a/ryu/controller/ofp_handler.py b/ryu/controller/ofp_handler.py
index b3c63df..3bf19d2 100644
--- a/ryu/controller/ofp_handler.py
+++ b/ryu/controller/ofp_handler.py
@@ -238,6 +238,13 @@ class OFPHandler(ryu.base.app_manager.RyuApp):
echo_reply.data = msg.data
datapath.send_msg(echo_reply)
+ @set_ev_handler(ofp_event.EventOFPEchoReply,
+ [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, MAIN_DISPATCHER])
+ def echo_reply_handler(self, ev):
+ msg = ev.msg
+ datapath = msg.datapath
+ datapath.acknowledge_echo_reply(msg.xid)
+
@set_ev_handler(ofp_event.EventOFPErrorMsg,
[HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, MAIN_DISPATCHER])
def error_msg_handler(self, ev):
diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py
index 5621147..4de0b4a 100644
--- a/ryu/lib/hub.py
+++ b/ryu/lib/hub.py
@@ -44,13 +44,21 @@ if HUB_TYPE == 'eventlet':
listen = eventlet.listen
connect = eventlet.connect
+
+ Queue = eventlet.queue.LightQueue
+ QueueEmpty = eventlet.queue.Empty
+ Semaphore = eventlet.semaphore.Semaphore
+ BoundedSemaphore = eventlet.semaphore.BoundedSemaphore
+ TaskExit = greenlet.GreenletExit
+
+
def spawn(*args, **kwargs):
def _launch(func, *args, **kwargs):
# mimic gevent's default raise_error=False behaviour
# by not propergating an exception to the joiner.
try:
func(*args, **kwargs)
- except greenlet.GreenletExit:
+ except TaskExit:
pass
except:
# log uncaught exception.
@@ -67,7 +75,7 @@ if HUB_TYPE == 'eventlet':
# by not propergating an exception to the joiner.
try:
func(*args, **kwargs)
- except greenlet.GreenletExit:
+ except TaskExit:
pass
except:
# log uncaught exception.
@@ -87,13 +95,9 @@ if HUB_TYPE == 'eventlet':
# greenthread
try:
t.wait()
- except greenlet.GreenletExit:
+ except TaskExit:
pass
- Queue = eventlet.queue.Queue
- QueueEmpty = eventlet.queue.Empty
- Semaphore = eventlet.semaphore.Semaphore
- BoundedSemaphore = eventlet.semaphore.BoundedSemaphore
class StreamServer(object):
def __init__(self, listen_info, handle=None, backlog=None,
@@ -120,19 +124,24 @@ if HUB_TYPE == 'eventlet':
sock, addr = self.server.accept()
spawn(self.handle, sock, addr)
+
class LoggingWrapper(object):
def write(self, message):
LOG.info(message.rstrip('\n'))
+
class WSGIServer(StreamServer):
def serve_forever(self):
self.logger = LoggingWrapper()
eventlet.wsgi.server(self.server, self.handle, self.logger)
+
WebSocketWSGI = websocket.WebSocketWSGI
+
Timeout = eventlet.timeout.Timeout
+
class Event(object):
def __init__(self):
self._ev = eventlet.event.Event()
@@ -144,7 +153,7 @@ if HUB_TYPE == 'eventlet':
def _broadcast(self):
self._ev.send()
- # because eventlet Event doesn't allow mutiple send() on an event,
+ # because eventlet Event doesn't allow multiple send() on an event,
# re-create the underlying event.
# note: _ev.reset() is obsolete.
self._ev = eventlet.event.Event()
Best,
Victor
--
Victor J. Orlikowski <> vjo@[cs.]duke.edu
stability.patch
Description: stability.patch
------------------------------------------------------------------------------ Site24x7 APM Insight: Get Deep Visibility into Application Performance APM + Mobile APM + RUM: Monitor 3 App instances at just $35/Month Monitor end-to-end web transactions and take corrective actions now Troubleshoot faster and improve end-user experience. Signup Now! http://pubads.g.doubleclick.net/gampad/clk?id=272487151&iu=/4140
_______________________________________________ Ryu-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/ryu-devel
