Minor change: since I am not using PriorityQueue anymore, it does not need to 
be present in hub.py.

Signed-off-by: Victor J. Orlikowski <[email protected]>

diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py
index 3d5d895..ed60857 100644
--- a/ryu/base/app_manager.py
+++ b/ryu/base/app_manager.py
@@ -158,6 +158,8 @@ class RyuApp(object):
         self.threads = []
         self.main_thread = None
         self.events = hub.Queue(128)
+        self._events_sem = hub.BoundedSemaphore(self.events.maxsize)
+
         if hasattr(self.__class__, 'LOGGER_NAME'):
             self.logger = logging.getLogger(self.__class__.LOGGER_NAME)
         else:
@@ -280,13 +282,25 @@ class RyuApp(object):
     def _event_loop(self):
         while self.is_active or not self.events.empty():
             ev, state = self.events.get()
+            self._events_sem.release()
             if ev == self._event_stop:
                 continue
             handlers = self.get_handlers(ev, state)
             for handler in handlers:
-                handler(ev)
+                try:
+                    handler(ev)
+                except hub.TaskExit:
+                    # Normal exit.
+                    # Propagate upwards, so we leave the event loop.
+                    raise
+                except Exception as e:
+                    LOG.exception('%s: Exception occurred during handler 
processing. '
+                                  'Backtrace from offending handler '
+                                  '[%s] servicing event [%s] follows.',
+                                  self.name, handler.__name__, 
ev.__class__.__name__)
 
     def _send_event(self, ev, state):
+        self._events_sem.acquire()
         self.events.put((ev, state))
 
     def send_event(self, name, ev, state=None):
diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py
index 25b8776..0a9019f 100644
--- a/ryu/controller/controller.py
+++ b/ryu/controller/controller.py
@@ -30,7 +30,7 @@ from ryu.lib.hub import StreamServer
 import traceback
 import random
 import ssl
-from socket import IPPROTO_TCP, TCP_NODELAY, timeout as SocketTimeout, error 
as SocketError
+from socket import IPPROTO_TCP, TCP_NODELAY, SHUT_RDWR, timeout as 
SocketTimeout
 import warnings
 
 import ryu.base.app_manager
@@ -41,8 +41,8 @@ from ryu.ofproto import ofproto_protocol
 from ryu.ofproto import ofproto_v1_0
 from ryu.ofproto import nx_match
 
-from ryu.controller import handler
 from ryu.controller import ofp_event
+from ryu.controller.handler import HANDSHAKE_DISPATCHER, MAIN_DISPATCHER, 
DEAD_DISPATCHER
 
 from ryu.lib.dpid import dpid_to_str
 
@@ -57,8 +57,19 @@ CONF.register_cli_opts([
                help='openflow ssl listen port'),
     cfg.StrOpt('ctl-privkey', default=None, help='controller private key'),
     cfg.StrOpt('ctl-cert', default=None, help='controller certificate'),
-    cfg.StrOpt('ca-certs', default=None, help='CA certificates'),
-    cfg.FloatOpt('socket-timeout', default=5.0, help='Time, in seconds, to 
await completion of socket operations.')
+    cfg.StrOpt('ca-certs', default=None, help='CA certificates')
+])
+CONF.register_opts([
+    cfg.FloatOpt('socket-timeout',
+                 default=5.0,
+                 help='Time, in seconds, to await completion of socket 
operations.'),
+    cfg.FloatOpt('echo-request-interval',
+                 default=15.0,
+                 help='Time, in seconds, between sending echo requests to a 
datapath.'),
+    cfg.IntOpt('maximum-unreplied-echo-requests',
+               default=0,
+               min=0,
+               help='Maximum number of unreplied echo requests before datapath 
is disconnected.')
 ])
 
 
@@ -103,8 +114,16 @@ def _deactivate(method):
         try:
             method(self)
         finally:
-            self.send_active = False
-            self.set_state(handler.DEAD_DISPATCHER)
+            try:
+                self.socket.shutdown(SHUT_RDWR)
+            except (EOFError, IOError):
+                pass
+
+            if self.recv_thread is None:
+                self.socket.close()
+
+            if self.state is not DEAD_DISPATCHER:
+                self.set_state(DEAD_DISPATCHER)
     return deactivate
 
 
@@ -117,19 +136,25 @@ class Datapath(ofproto_protocol.ProtocolDesc):
         self.socket.settimeout(CONF.socket_timeout)
         self.address = address
 
-        self.send_active = True
+        self.recv_thread = None
+
         self.close_requested = False
 
         # The limit is arbitrary. We need to limit queue size to
         # prevent it from eating memory up
         self.send_q = hub.Queue(16)
+        self._send_q_sem = hub.BoundedSemaphore(self.send_q.maxsize)
+
+        self.echo_request_interval = CONF.echo_request_interval
+        self.max_unreplied_echo_requests = CONF.maximum_unreplied_echo_requests
+        self.unreplied_echo_requests = []
 
         self.xid = random.randint(0, self.ofproto.MAX_XID)
         self.id = None  # datapath_id is unknown yet
         self._ports = None
         self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10
         self.ofp_brick = ryu.base.app_manager.lookup_service_brick('ofp_event')
-        self.set_state(handler.HANDSHAKE_DISPATCHER)
+        self.set_state(HANDSHAKE_DISPATCHER)
 
     def _get_ports(self):
         if (self.ofproto_parser is not None and
@@ -167,19 +192,17 @@ class Datapath(ofproto_protocol.ProtocolDesc):
         required_len = ofproto_common.OFP_HEADER_SIZE
 
         count = 0
-        while True:
+        while not self.close_requested:
             ret = ""
 
             try:
                 ret = self.socket.recv(required_len)
             except SocketTimeout:
-                if not self.close_requested:
-                    continue
-            except SocketError:
-                self.close_requested = True
+                continue
+            except (EOFError, IOError):
+                break
 
-            if (len(ret) == 0) or (self.close_requested):
-                self.socket.close()
+            if len(ret) == 0:
                 break
 
             buf += ret
@@ -218,27 +241,46 @@ class Datapath(ofproto_protocol.ProtocolDesc):
     @_deactivate
     def _send_loop(self):
         try:
-            while self.send_active:
+            while True:
                 buf = self.send_q.get()
+                self._send_q_sem.release()
                 self.socket.sendall(buf)
+        except SocketTimeout:
+            LOG.debug("Socket timed out while sending data to switch at 
address %s",
+                      self.address)
         except IOError as ioe:
-            LOG.debug("Socket error while sending data to switch at address 
%s: [%d] %s",
-                      self.address, ioe.errno, ioe.strerror)
+            # Convert ioe.errno to a string, just in case it was somehow set 
to None.
+            errno = "%s" % ioe.errno
+            LOG.debug("Socket error while sending data to switch at address 
%s: [%s] %s",
+                      self.address, errno, ioe.strerror)
         finally:
             q = self.send_q
             # first, clear self.send_q to prevent new references.
             self.send_q = None
-            # there might be threads currently blocking in send_q.put().
-            # unblock them by draining the queue.
+            # Now, drain the send_q, releasing the associated semaphore for 
each entry.
+            # This should release all threads waiting to acquire the semaphore.
             try:
                 while q.get(block=False):
-                    pass
+                    self._send_q_sem.release()
             except hub.QueueEmpty:
                 pass
 
     def send(self, buf):
+        acquired = False
+        dp_closed = True
+
         if self.send_q:
+            acquired = self._send_q_sem.acquire()
+
+        if self.send_q and acquired:
+            dp_closed = False
             self.send_q.put(buf)
+        elif acquired:
+            self._send_q_sem.release()
+
+        if dp_closed:
+            LOG.debug('Datapath in process of terminating; send() to %s 
discarded.',
+                      self.address)
 
     def set_xid(self, msg):
         self.xid += 1
@@ -254,18 +296,43 @@ class Datapath(ofproto_protocol.ProtocolDesc):
         # LOG.debug('send_msg %s', msg)
         self.send(msg.buf)
 
+    def _echo_request_loop(self):
+        if not self.max_unreplied_echo_requests:
+            return
+
+        while (self.send_q and
+               (len(self.unreplied_echo_requests) <= 
self.max_unreplied_echo_requests)):
+            echo_req = self.ofproto_parser.OFPEchoRequest(self)
+            self.unreplied_echo_requests.append(self.set_xid(echo_req))
+            self.send_msg(echo_req)
+            hub.sleep(self.echo_request_interval)
+
+        if self.state is not DEAD_DISPATCHER:
+            self.close()
+
+    def acknowledge_echo_reply(self, xid):
+        try:
+            self.unreplied_echo_requests.remove(xid)
+        except:
+            pass
+
     def serve(self):
+        self.recv_thread = hub.getcurrent()
         send_thr = hub.spawn(self._send_loop)
 
         # send hello message immediately
         hello = self.ofproto_parser.OFPHello(self)
         self.send_msg(hello)
 
+        echo_thr = hub.spawn(self._echo_request_loop)
+
         try:
             self._recv_loop()
         finally:
             hub.kill(send_thr)
-            hub.joinall([send_thr])
+            hub.kill(echo_thr)
+            hub.joinall([send_thr, echo_thr])
+            self.recv_thread = None
 
     #
     # Utility methods for convenience
diff --git a/ryu/controller/ofp_handler.py b/ryu/controller/ofp_handler.py
index b3c63df..3bf19d2 100644
--- a/ryu/controller/ofp_handler.py
+++ b/ryu/controller/ofp_handler.py
@@ -238,6 +238,13 @@ class OFPHandler(ryu.base.app_manager.RyuApp):
         echo_reply.data = msg.data
         datapath.send_msg(echo_reply)
 
+    @set_ev_handler(ofp_event.EventOFPEchoReply,
+                    [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, MAIN_DISPATCHER])
+    def echo_reply_handler(self, ev):
+        msg = ev.msg
+        datapath = msg.datapath
+        datapath.acknowledge_echo_reply(msg.xid)
+
     @set_ev_handler(ofp_event.EventOFPErrorMsg,
                     [HANDSHAKE_DISPATCHER, CONFIG_DISPATCHER, MAIN_DISPATCHER])
     def error_msg_handler(self, ev):
diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py
index 5621147..b77465b 100644
--- a/ryu/lib/hub.py
+++ b/ryu/lib/hub.py
@@ -50,7 +50,7 @@ if HUB_TYPE == 'eventlet':
             # by not propergating an exception to the joiner.
             try:
                 func(*args, **kwargs)
-            except greenlet.GreenletExit:
+            except TaskExit:
                 pass
             except:
                 # log uncaught exception.
@@ -67,7 +67,7 @@ if HUB_TYPE == 'eventlet':
             # by not propergating an exception to the joiner.
             try:
                 func(*args, **kwargs)
-            except greenlet.GreenletExit:
+            except TaskExit:
                 pass
             except:
                 # log uncaught exception.
@@ -87,13 +87,14 @@ if HUB_TYPE == 'eventlet':
             # greenthread
             try:
                 t.wait()
-            except greenlet.GreenletExit:
+            except TaskExit:
                 pass
 
-    Queue = eventlet.queue.Queue
+    Queue = eventlet.queue.LightQueue
     QueueEmpty = eventlet.queue.Empty
     Semaphore = eventlet.semaphore.Semaphore
     BoundedSemaphore = eventlet.semaphore.BoundedSemaphore
+    TaskExit = greenlet.GreenletExit
 
     class StreamServer(object):
         def __init__(self, listen_info, handle=None, backlog=None,


Best,
Victor
--
Victor J. Orlikowski <> vjo@[cs.]duke.edu

Attachment: stability.patch
Description: stability.patch





------------------------------------------------------------------------------
Site24x7 APM Insight: Get Deep Visibility into Application Performance
APM + Mobile APM + RUM: Monitor 3 App instances at just $35/Month
Monitor end-to-end web transactions and take corrective actions now
Troubleshoot faster and improve end-user experience. Signup Now!
http://pubads.g.doubleclick.net/gampad/clk?id=272487151&iu=/4140
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to