Folks,

After an extensive testing interval, I am re-submitting my patches to improve 
the stability of Ryu in the face of buggy switches and applications.
These are extensive changes, so please let me explain them.

1) The event queue has been split into 3 queues.
EventOFPStateChange events could get starved by a large number of Packet-In 
events.
So could other event types.
So - to implement a priority queuing mechanism that did not suffer from 
priority inversion, I created three queues out of the original one.
The first queue is dedicated to EventOFPStateChange events.
The second queue is dedicated to all events that are *not* EventOFPPacketIn.
The third queue is dedicated to all EventOFPPacketIn events.

The three queues are serviced in the above-stated order.
EventOFPStateChange is generated internally, and must be serviced quickly.
Responses to echo requests, feature replies, port status changes, etc. must be 
serviced with next highest priority, since they may alter the nature of a 
response to a Packet-In.
Packet-Ins are serviced with lowest priority; they are (often) the most 
numerous events, and can crowd out servicing of other events.

Putting the events in a single queue could result in priority inversions due to 
queues blocking on being full.

2) Each queue in the event handler and the Datapath send/recv mechanism has an 
associated semaphore.
Eventlet synchronized queues have broken synchronization, and miss wakeups on 
put() and get() operations.
Eventlet semaphores do not suffer from this problem - so I have ensured that 
queue operations do not block for any significant amount of time (get() 
operations are permitted to sleep for a user-configurable duration, if the 
queue is empty) and that full queues are protected using their associated 
semaphore.

3) Events must now be serviced within a limited time frame, that is end-user 
configurable.
From: http://ryu-zhdoc.readthedocs.org/en/latest/ryu_app_api.html

"
Because the event handler is called in the context of the event processing 
thread, it should be careful for blocking. I.e. while an event handler is 
blocked, no further events for the Ryu application will be processed.
"

The assumption that Ryu application handlers cannot block, and must complete 
relatively quickly, has been an implicit part of the design for some time.
I have now made this explicit in the event loop; handlers must complete 
execution within a (user-configurable) time bound - or they will be terminated 
(usually - it is possible for a handler to put itself into a tight loop, and 
thereby cause the execution timeout handler to not be able to fire).

Presuming that the application has not been written with a tight loop that 
prevents termination of a handler, handlers must always make progress, or they 
will be terminated - and a backtrace will be printed for debugging. Thus, the 
event loop for an application will always be able to make forward progress.

4) Applications must now inform the core that they have "taken ownership" of a 
Datapath object.
Previously, Datapaths could “leak” if they did not make it through the OpenFlow 
handshake. I have addressed this by adding a timer that disconnects any 
Datapath that is not claimed by an application after a certain interval 
(currently statically defined at 20 seconds - but it could be made 
user-configurable).
An application that wishes to ensure that Datapaths are properly cleaned up 
(and do not disappear out from under them) should:
i) Have a structure keeping track of what Datapaths it “owns."
ii) When handling the an EventOFPSwitchFeatures in the CONFIG_DISPATCHER, or 
any event in the MAIN_DISPATCHER, the application should ensure that the 
datapath has been tracked in its internal record-keeping structure, and set the 
“assured” field of the Datapath object to “True”.
iii) When the application discovers duplicate records for a given Datapath 
(during handling of a new Datapath having the same DPID) in its internal 
structure, it should call the close() method of the old Datapath, after first 
recording the new Datapath in its internal record-keeping structure.

Applications that do not set the “assured” field of a Datapath will find that 
the Datapath will be closed underneath them by the Ryu core.
Applications using ryu.controller.dpset or ryu.topology.switches will find that 
this has been automatically handled for them.
Furthermore, another of the patches in this series ensures that all sample 
applications have been patched.


I have also included several minor typo fixes and cleanups, throughout each of 
these files.
Much of the convoluted logic in ryu.controller.controller exists to ensure that 
sockets are closed correctly; in particular, shutdown() must be called on the 
Datapath socket as a result of the Datapath close() operation, in order to 
ensure that recv() does not end up trying to read from a new socket with the 
same file descriptor number.

Signed-off-by: Victor J. Orlikowski <[email protected]>

diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py
index 3d5d895..3ae6657 100644
--- a/ryu/base/app_manager.py
+++ b/ryu/base/app_manager.py
@@ -37,6 +37,7 @@ from ryu.controller.handler import register_instance, 
get_dependent_services
 from ryu.controller.controller import Datapath
 from ryu.controller import event
 from ryu.controller.event import EventRequestBase, EventReplyBase
+from ryu.controller.ofp_event import EventOFPStateChange, EventOFPPacketIn
 from ryu.lib import hub
 from ryu.ofproto import ofproto_protocol
 
@@ -157,12 +158,25 @@ class RyuApp(object):
         self.observers = {}     # ev_cls -> observer-name -> states:set
         self.threads = []
         self.main_thread = None
-        self.events = hub.Queue(128)
+        self.events = hub.Queue(32)
+        self.sc_events = hub.Queue(32)
+        self.pi_events = hub.Queue(32)
+        self._events_sem = hub.BoundedSemaphore(self.events.maxsize)
+        self._sc_events_sem = hub.BoundedSemaphore(self.sc_events.maxsize)
+        self._pi_events_sem = hub.BoundedSemaphore(self.pi_events.maxsize)
+        self._event_get_timeout = 5
+
         if hasattr(self.__class__, 'LOGGER_NAME'):
             self.logger = logging.getLogger(self.__class__.LOGGER_NAME)
         else:
             self.logger = logging.getLogger(self.name)
         self.CONF = cfg.CONF
+        self.CONF.register_opts([
+            cfg.FloatOpt('handler-execution-timeout',
+                         default=10.0,
+                         help='Maximum time, in seconds, to permit handlers to 
run.')
+        ])
+        self.handler_execution_timeout = self.CONF.handler_execution_timeout
 
         # prevent accidental creation of instances of this class outside RyuApp
         class _EventThreadStop(event.EventBase):
@@ -279,15 +293,65 @@ class RyuApp(object):
 
     def _event_loop(self):
         while self.is_active or not self.events.empty():
-            ev, state = self.events.get()
-            if ev == self._event_stop:
+            # Process events according to priority.
+            # StatusChange highest, PacketIn lowest, all others in between.
+            ev = state = None
+            if self.sc_events.qsize():
+                try:
+                    ev, state = 
self.sc_events.get(timeout=self._event_get_timeout)
+                except hub.QueueEmpty:
+                    pass
+                else:
+                    self._sc_events_sem.release()
+
+            if (ev is None) and (self.events.qsize()):
+                try:
+                    ev, state = 
self.events.get(timeout=self._event_get_timeout)
+                except hub.QueueEmpty:
+                    pass
+                else:
+                    self._events_sem.release()
+
+            if ev is None:
+                try:
+                    ev, state = 
self.pi_events.get(timeout=self._event_get_timeout)
+                except hub.QueueEmpty:
+                    pass
+                else:
+                    self._pi_events_sem.release()
+
+            if (ev is None) or (ev == self._event_stop):
                 continue
             handlers = self.get_handlers(ev, state)
             for handler in handlers:
-                handler(ev)
+                handler_execution_timeout = 
hub.Timeout(self.handler_execution_timeout)
+                try:
+                    handler(ev)
+                except hub.TaskExit:
+                    # Normal exit.
+                    # Propagate upwards, so we leave the event loop.
+                    raise
+                except Exception as e:
+                    if isinstance(e, hub.Timeout):
+                        LOG.error('%s: Handler exceeded maximum execution 
time; terminated.', self.name)
+                    else:
+                        LOG.error('%s: Exception occurred during handler 
processing.', self.name)
+                    LOG.exception('%s: Backtrace from offending handler '
+                                  '[%s] servicing event [%s] follows.',
+                                  self.name, handler.__name__, 
ev.__class__.__name__)
+                finally:
+                    handler_execution_timeout.cancel()
 
     def _send_event(self, ev, state):
-        self.events.put((ev, state))
+        if isinstance(ev, EventOFPStateChange):
+            self._sc_events_sem.acquire()
+            self.sc_events.put((ev, state), block=False)
+        elif isinstance(ev, EventOFPPacketIn):
+            self._pi_events_sem.acquire()
+            self.pi_events.put((ev, state), block=False)
+        else:
+            self._events_sem.acquire()
+            self.events.put((ev, state), block=False)
 
     def send_event(self, name, ev, state=None):
         """
@@ -336,7 +400,7 @@ class RyuApp(object):
 
 
 class AppManager(object):
-    # singletone
+    # singleton
     _instance = None
 
     @staticmethod
@@ -519,8 +583,14 @@ class AppManager(object):
         app.stop()
         self._close(app)
         events = app.events
+        sc_events = app.sc_events
+        pi_events = app.pi_events
+        if not sc_events.empty():
+            app.logger.debug('%s state changes remains %d', app.name, 
sc_events.qsize())
+        if not pi_events.empty():
+            app.logger.debug('%s PacketIn events remains %d', app.name, 
pi_events.qsize())
         if not events.empty():
-            app.logger.debug('%s events remians %d', app.name, events.qsize())
+            app.logger.debug('%s events remains %d', app.name, events.qsize())
 
     def close(self):
         def close_all(close_dict):
diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py
index 25b8776..0fd19f6 100644
--- a/ryu/controller/controller.py
+++ b/ryu/controller/controller.py
@@ -30,7 +30,7 @@ from ryu.lib.hub import StreamServer
 import traceback
 import random
 import ssl
-from socket import IPPROTO_TCP, TCP_NODELAY, timeout as SocketTimeout, error 
as SocketError
+from socket import IPPROTO_TCP, TCP_NODELAY, SHUT_RDWR, timeout as 
SocketTimeout
 import warnings
 
 import ryu.base.app_manager
@@ -41,8 +41,10 @@ from ryu.ofproto import ofproto_protocol
 from ryu.ofproto import ofproto_v1_0
 from ryu.ofproto import nx_match
 
-from ryu.controller import handler
 from ryu.controller import ofp_event
+from ryu.controller.handler import HANDSHAKE_DISPATCHER, MAIN_DISPATCHER, 
DEAD_DISPATCHER
+
+from ryu.exception import RyuException
 
 from ryu.lib.dpid import dpid_to_str
 
@@ -58,8 +60,16 @@ CONF.register_cli_opts([
     cfg.StrOpt('ctl-privkey', default=None, help='controller private key'),
     cfg.StrOpt('ctl-cert', default=None, help='controller certificate'),
     cfg.StrOpt('ca-certs', default=None, help='CA certificates'),
-    cfg.FloatOpt('socket-timeout', default=5.0, help='Time, in seconds, to 
await completion of socket operations.')
 ])
+CONF.register_opts([
+    cfg.FloatOpt('socket-timeout',
+                 default=5.0,
+                 help='Time, in seconds, to await completion of socket 
operations.')
+])
+
+
+class DatapathShutdown(RyuException):
+    message = 'Datapath shutdown was requested'
 
 
 class OpenFlowController(object):
@@ -102,9 +112,22 @@ def _deactivate(method):
     def deactivate(self):
         try:
             method(self)
+        except DatapathShutdown:
+            if self.socket is not None:
+                self.socket.close()
+                self.socket = None
+            if self.recv_thread:
+                self.recv_thread.throw(DatapathShutdown)
+            if self.send_thread:
+                self.send_thread.throw(DatapathShutdown)
         finally:
-            self.send_active = False
-            self.set_state(handler.DEAD_DISPATCHER)
+            if self.socket is not None:
+                try:
+                    self.socket.shutdown(SHUT_RDWR)
+                except (EOFError, IOError):
+                    pass
+            if self.state is not DEAD_DISPATCHER:
+                self.set_state(DEAD_DISPATCHER)
     return deactivate
 
 
@@ -117,19 +140,26 @@ class Datapath(ofproto_protocol.ProtocolDesc):
         self.socket.settimeout(CONF.socket_timeout)
         self.address = address
 
-        self.send_active = True
+        self.send_thread = None
+        self.recv_thread = None
+
         self.close_requested = False
 
         # The limit is arbitrary. We need to limit queue size to
-        # prevent it from eating memory up
+        # prevent it from eating memory up.
+        # Similarly, the timeouts are also arbitrary.
         self.send_q = hub.Queue(16)
+        self._send_q_sem = hub.BoundedSemaphore(self.send_q.maxsize)
+        self._send_q_timeout = 5
+        self._unknown_datapath_timeout = 20
 
         self.xid = random.randint(0, self.ofproto.MAX_XID)
         self.id = None  # datapath_id is unknown yet
         self._ports = None
         self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10
         self.ofp_brick = ryu.base.app_manager.lookup_service_brick('ofp_event')
-        self.set_state(handler.HANDSHAKE_DISPATCHER)
+        self.assured = False
+        self.set_state(HANDSHAKE_DISPATCHER)
 
     def _get_ports(self):
         if (self.ofproto_parser is not None and
@@ -137,8 +167,8 @@ class Datapath(ofproto_protocol.ProtocolDesc):
             message = (
                 'Datapath#ports is kept for compatibility with the previous '
                 'openflow versions (< 1.3). '
-                'This not be updated by EventOFPPortStatus message. '
-                'If you want to be updated, you can use '
+                'This is not updated by the EventOFPPortStatus message. '
+                'If you want to be updated, you should use '
                 '\'ryu.controller.dpset\' or \'ryu.topology.switches\'.'
             )
             warnings.warn(message, stacklevel=2)
@@ -163,64 +193,107 @@ class Datapath(ofproto_protocol.ProtocolDesc):
     # Low level socket handling layer
     @_deactivate
     def _recv_loop(self):
-        buf = bytearray()
-        required_len = ofproto_common.OFP_HEADER_SIZE
-
-        count = 0
-        while True:
-            ret = ""
-
-            try:
-                ret = self.socket.recv(required_len)
-            except SocketTimeout:
-                if not self.close_requested:
-                    continue
-            except SocketError:
-                self.close_requested = True
-
-            if (len(ret) == 0) or (self.close_requested):
-                self.socket.close()
-                break
-
-            buf += ret
-            while len(buf) >= required_len:
-                (version, msg_type, msg_len, xid) = ofproto_parser.header(buf)
-                required_len = msg_len
-                if len(buf) < required_len:
-                    break
-
-                msg = ofproto_parser.msg(
-                    self, version, msg_type, msg_len, xid, buf[:msg_len])
-                # LOG.debug('queue msg %s cls %s', msg, msg.__class__)
-                if msg:
-                    ev = ofp_event.ofp_msg_to_ev(msg)
-                    self.ofp_brick.send_event_to_observers(ev, self.state)
-
-                    dispatchers = lambda x: x.callers[ev.__class__].dispatchers
-                    handlers = [handler for handler in
-                                self.ofp_brick.get_handlers(ev) if
-                                self.state in dispatchers(handler)]
-                    for handler in handlers:
-                        handler(ev)
-
-                buf = buf[required_len:]
-                required_len = ofproto_common.OFP_HEADER_SIZE
-
-                # We need to schedule other greenlets. Otherwise, ryu
-                # can't accept new switches or handle the existing
-                # switches. The limit is arbitrary. We need the better
-                # approach in the future.
-                count += 1
-                if count > 2048:
-                    count = 0
-                    hub.sleep(0)
+        unknown_datapath_timeout = hub.Timeout(self._unknown_datapath_timeout, 
DatapathShutdown)
+        try:
+            buf = bytearray()
+            required_len = ofproto_common.OFP_HEADER_SIZE
+
+            count = 0
+            while True:
+                ret = ""
+
+                try:
+                    ret = self.socket.recv(required_len)
+                except SocketTimeout:
+                    if not self.close_requested:
+                        continue
+                except (AttributeError, EOFError, IOError):
+                    self.close_requested = True
+
+                if (len(ret) == 0) or (self.close_requested):
+                    raise DatapathShutdown
+
+                if self.assured:
+                    unknown_datapath_timeout.cancel()
+
+                buf += ret
+                while len(buf) >= required_len:
+                    (version, msg_type, msg_len, xid) = 
ofproto_parser.header(buf)
+                    required_len = msg_len
+                    if len(buf) < required_len:
+                        break
+
+                    msg = ofproto_parser.msg(
+                        self, version, msg_type, msg_len, xid, buf[:msg_len])
+                    # LOG.debug('queue msg %s cls %s', msg, msg.__class__)
+                    if msg:
+                        ev = ofp_event.ofp_msg_to_ev(msg)
+                        exec_timeout = self.ofp_brick.handler_execution_timeout
+                        self.ofp_brick.send_event_to_observers(ev, self.state)
+
+                        dispatchers = lambda x: 
x.callers[ev.__class__].dispatchers
+                        handlers = [handler for handler in
+                                    self.ofp_brick.get_handlers(ev) if
+                                    self.state in dispatchers(handler)]
+                        for handler in handlers:
+                            # Defensively set a timeout for the handler.
+                            # "This should never happen" are famous last words.
+                            handler_timeout = hub.Timeout(exec_timeout)
+                            try:
+                                handler(ev)
+                            finally:
+                                handler_timeout.cancel()
+
+                    buf = buf[required_len:]
+                    required_len = ofproto_common.OFP_HEADER_SIZE
+
+                    # We need to schedule other greenlets. Otherwise, ryu
+                    # can't accept new switches or handle the existing
+                    # switches. The limit is arbitrary. We need the better
+                    # approach in the future.
+                    count += 1
+                    if count > 2048:
+                        count = 0
+                        hub.sleep(0)
+        except Exception as e:
+            if isinstance(e, DatapathShutdown):
+                # Normal termination sequence.
+                pass
+            else:
+                dpid_str = "UNKNOWN"
+                if self.id is not None:
+                    dpid_str = dpid_to_str(self.id)
+                LOG.exception('Exception occurred in recv_loop() '
+                              'for datapath [%s] connecting from [%s]. '
+                              'Shutting down datapath.',
+                              dpid_str, self.address)
+        finally:
+            # Shutting down; get rid of the recv_thread reference.
+            self.recv_thread = None
+            # Cancel timeout, if we haven't already.
+            unknown_datapath_timeout.cancel()
+            # Although it may have been caught above, raise DatapathShutdown.
+            # This guarantees that close() will be called on the socket.
+            raise DatapathShutdown
 
     @_deactivate
     def _send_loop(self):
         try:
-            while self.send_active:
-                buf = self.send_q.get()
-                self.socket.sendall(buf)
+            while self.socket is not None:
+                buf = None
+                try:
+                    buf = self.send_q.get(timeout=self._send_q_timeout)
+                except hub.QueueEmpty:
+                    pass
+                else:
+                    self._send_q_sem.release()
+
+                if buf is None:
+                    continue
+                if self.socket:
+                    self.socket.sendall(buf)
+                else:
+                    raise DatapathShutdown
         except IOError as ioe:
             LOG.debug("Socket error while sending data to switch at address 
%s: [%d] %s",
                       self.address, ioe.errno, ioe.strerror)
@@ -228,17 +301,32 @@ class Datapath(ofproto_protocol.ProtocolDesc):
             q = self.send_q
             # first, clear self.send_q to prevent new references.
             self.send_q = None
-            # there might be threads currently blocking in send_q.put().
-            # unblock them by draining the queue.
+            # Next, get rid of the send_thread reference.
+            self.send_thread = None
+            # Drain the send_q, releasing the associated semaphore for each 
entry.
+            # This should release all threads waiting to acquire the semaphore.
             try:
                 while q.get(block=False):
-                    pass
+                    self._send_q_sem.release()
             except hub.QueueEmpty:
                 pass
 
     def send(self, buf):
+        acquired = False
+        dp_closed = True
+
         if self.send_q:
-            self.send_q.put(buf)
+            acquired = self._send_q_sem.acquire()
+
+        if self.send_q and acquired:
+            dp_closed = False
+            self.send_q.put(buf, block=False)
+        elif acquired:
+            self._send_q_sem.release()
+
+        if dp_closed:
+            LOG.debug('Datapath in process of terminating; send() to %s 
discarded.',
+                      self.address)
 
     def set_xid(self, msg):
         self.xid += 1
@@ -255,17 +343,15 @@ class Datapath(ofproto_protocol.ProtocolDesc):
         self.send(msg.buf)
 
     def serve(self):
-        send_thr = hub.spawn(self._send_loop)
+        # self.recv_thread *MUST* be set before we spawn the send loop!
+        self.recv_thread = hub.getcurrent()
+        self.send_thread = hub.spawn(self._send_loop)
 
         # send hello message immediately
         hello = self.ofproto_parser.OFPHello(self)
         self.send_msg(hello)
 
-        try:
-            self._recv_loop()
-        finally:
-            hub.kill(send_thr)
-            hub.joinall([send_thr])
+        self._recv_loop()
 
     #
     # Utility methods for convenience
diff --git a/ryu/controller/dpset.py b/ryu/controller/dpset.py
index 2682777..a65d05b 100644
--- a/ryu/controller/dpset.py
+++ b/ryu/controller/dpset.py
@@ -56,6 +56,8 @@ class EventDP(EventDPBase):
 class EventDPReconnected(EventDPBase):
     def __init__(self, dp):
         super(EventDPReconnected, self).__init__(dp)
+        # port list, which should not change across reconnects
+        self.ports = []
 
 
 class EventPortBase(EventDPBase):
@@ -127,6 +129,7 @@ class DPSet(app_manager.RyuApp):
             self.logger.debug('DPSET: New datapath %s', dp)
             send_dp_reconnected = True
         self.dps[dp.id] = dp
+        dp.assured = True
         if dp.id not in self.port_state:
             self.port_state[dp.id] = PortState()
             ev = EventDP(dp, True)
@@ -138,6 +141,7 @@ class DPSet(app_manager.RyuApp):
             self.send_event_to_observers(ev)
         if send_dp_reconnected:
             ev = EventDPReconnected(dp)
+            ev.ports = self.port_state.get(dp.id, {}).values()
             self.send_event_to_observers(ev)
 
     def _unregister(self, dp):
@@ -163,7 +167,6 @@ class DPSet(app_manager.RyuApp):
         """
         This method returns the ryu.controller.controller.Datapath
         instance for the given Datapath ID.
-        Raises KeyError if no such a datapath connected to this controller.
         """
         return self.dps.get(dp_id)
 
diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py
index 5621147..4de0b4a 100644
--- a/ryu/lib/hub.py
+++ b/ryu/lib/hub.py
@@ -44,13 +44,21 @@ if HUB_TYPE == 'eventlet':
     listen = eventlet.listen
     connect = eventlet.connect
 
+
+    Queue = eventlet.queue.LightQueue
+    QueueEmpty = eventlet.queue.Empty
+    Semaphore = eventlet.semaphore.Semaphore
+    BoundedSemaphore = eventlet.semaphore.BoundedSemaphore
+    TaskExit = greenlet.GreenletExit
+
+
     def spawn(*args, **kwargs):
         def _launch(func, *args, **kwargs):
             # mimic gevent's default raise_error=False behaviour
             # by not propergating an exception to the joiner.
             try:
                 func(*args, **kwargs)
-            except greenlet.GreenletExit:
+            except TaskExit:
                 pass
             except:
                 # log uncaught exception.
@@ -67,7 +75,7 @@ if HUB_TYPE == 'eventlet':
             # by not propergating an exception to the joiner.
             try:
                 func(*args, **kwargs)
-            except greenlet.GreenletExit:
+            except TaskExit:
                 pass
             except:
                 # log uncaught exception.
@@ -87,13 +95,9 @@ if HUB_TYPE == 'eventlet':
             # greenthread
             try:
                 t.wait()
-            except greenlet.GreenletExit:
+            except TaskExit:
                 pass
 
-    Queue = eventlet.queue.Queue
-    QueueEmpty = eventlet.queue.Empty
-    Semaphore = eventlet.semaphore.Semaphore
-    BoundedSemaphore = eventlet.semaphore.BoundedSemaphore
 
     class StreamServer(object):
         def __init__(self, listen_info, handle=None, backlog=None,
@@ -120,19 +124,24 @@ if HUB_TYPE == 'eventlet':
                 sock, addr = self.server.accept()
                 spawn(self.handle, sock, addr)
 
+
     class LoggingWrapper(object):
         def write(self, message):
             LOG.info(message.rstrip('\n'))
 
+
     class WSGIServer(StreamServer):
         def serve_forever(self):
             self.logger = LoggingWrapper()
             eventlet.wsgi.server(self.server, self.handle, self.logger)
 
+
     WebSocketWSGI = websocket.WebSocketWSGI
 
+
     Timeout = eventlet.timeout.Timeout
 
+
     class Event(object):
         def __init__(self):
             self._ev = eventlet.event.Event()
@@ -144,7 +153,7 @@ if HUB_TYPE == 'eventlet':
 
         def _broadcast(self):
             self._ev.send()
-            # because eventlet Event doesn't allow mutiple send() on an event,
+            # because eventlet Event doesn't allow multiple send() on an event,
             # re-create the underlying event.
             # note: _ev.reset() is obsolete.
             self._ev = eventlet.event.Event()
diff --git a/ryu/topology/event.py b/ryu/topology/event.py
index c54152b..e7b682c 100644
--- a/ryu/topology/event.py
+++ b/ryu/topology/event.py
@@ -41,6 +41,11 @@ class EventSwitchLeave(EventSwitchBase):
         super(EventSwitchLeave, self).__init__(switch)
 
 
+class EventSwitchReconnected(EventSwitchBase):
+    def __init__(self, switch):
+        super(EventSwitchReconnected, self).__init__(switch)
+
+
 class EventPortBase(event.EventBase):
     def __init__(self, port):
         super(EventPortBase, self).__init__()
diff --git a/ryu/topology/switches.py b/ryu/topology/switches.py
index 9622a90..a06f83b 100644
--- a/ryu/topology/switches.py
+++ b/ryu/topology/switches.py
@@ -495,6 +495,7 @@ class Switches(app_manager.RyuApp):
     OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION, ofproto_v1_2.OFP_VERSION,
                     ofproto_v1_3.OFP_VERSION, ofproto_v1_4.OFP_VERSION]
     _EVENTS = [event.EventSwitchEnter, event.EventSwitchLeave,
+               event.EventSwitchReconnected,
                event.EventPortAdd, event.EventPortDelete,
                event.EventPortModify,
                event.EventLinkAdd, event.EventLinkDelete,
@@ -540,6 +541,7 @@ class Switches(app_manager.RyuApp):
         assert dp.id is not None
 
         self.dps[dp.id] = dp
+        dp.assured = True
         if dp.id not in self.port_state:
             self.port_state[dp.id] = PortState()
             for port in dp.ports.values():
@@ -547,8 +549,9 @@ class Switches(app_manager.RyuApp):
 
     def _unregister(self, dp):
         if dp.id in self.dps:
-            del self.dps[dp.id]
-            del self.port_state[dp.id]
+            if (self.dps[dp.id] == dp):
+                del self.dps[dp.id]
+                del self.port_state[dp.id]
 
     def _get_switch(self, dpid):
         if dpid in self.dps:
@@ -602,16 +605,18 @@ class Switches(app_manager.RyuApp):
         if ev.state == MAIN_DISPATCHER:
             dp_multiple_conns = False
             if dp.id in self.dps:
-                LOG.warning('multiple connections from %s', dpid_to_str(dp.id))
+                LOG.warning('Multiple connections from %s', dpid_to_str(dp.id))
                 dp_multiple_conns = True
+                (self.dps[dp.id]).close()
 
             self._register(dp)
             switch = self._get_switch(dp.id)
             LOG.debug('register %s', switch)
 
-            # Do not send event while dp has multiple connections.
             if not dp_multiple_conns:
                 self.send_event_to_observers(event.EventSwitchEnter(switch))
+            else:
+                
self.send_event_to_observers(event.EventSwitchReconnected(switch))
 
             if not self.link_discovery:
                 return
@@ -665,19 +670,23 @@ class Switches(app_manager.RyuApp):
             # dp.id is None when datapath dies before handshake
             if dp.id is None:
                 return
+
             switch = self._get_switch(dp.id)
-            self._unregister(dp)
-            LOG.debug('unregister %s', switch)
-            self.send_event_to_observers(event.EventSwitchLeave(switch))
+            if switch:
+                if switch.dp is dp:
+                    self._unregister(dp)
+                    LOG.debug('unregister %s', switch)
 
-            if not self.link_discovery:
-                return
+                    
self.send_event_to_observers(event.EventSwitchLeave(switch))
 
-            for port in switch.ports:
-                if not port.is_reserved():
-                    self.ports.del_port(port)
-                    self._link_down(port)
-            self.lldp_event.set()
+                    if not self.link_discovery:
+                        return
+
+                    for port in switch.ports:
+                        if not port.is_reserved():
+                            self.ports.del_port(port)
+                            self._link_down(port)
+                    self.lldp_event.set()
 
     @set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER)
     def port_status_handler(self, ev):
@@ -762,7 +771,7 @@ class Switches(app_manager.RyuApp):
         try:
             src_dpid, src_port_no = LLDPPacket.lldp_parse(msg.data)
         except LLDPPacket.LLDPUnknownFormat as e:
-            # This handler can receive all the packtes which can be
+            # This handler can receive all the packets which can be
             # not-LLDP packet. Ignore it silently
             return
 


Best,
Victor
--
Victor J. Orlikowski <> vjo@[cs.]duke.edu

------------------------------------------------------------------------------
Site24x7 APM Insight: Get Deep Visibility into Application Performance
APM + Mobile APM + RUM: Monitor 3 App instances at just $35/Month
Monitor end-to-end web transactions and take corrective actions now
Troubleshoot faster and improve end-user experience. Signup Now!
http://pubads.g.doubleclick.net/gampad/clk?id=267308311&iu=/4140
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to