diff --git a/ryu/app/cbench.py b/ryu/app/cbench.py
index aed83ab..c8056b1 100644
--- a/ryu/app/cbench.py
+++ b/ryu/app/cbench.py
@@ -31,6 +31,7 @@ class Cbench(app_manager.RyuApp):
 
     def __init__(self, *args, **kwargs):
         super(Cbench, self).__init__(*args, **kwargs)
+        self.dpid_to_dp = {}
 
     @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
     def packet_in_handler(self, ev):
@@ -38,6 +39,14 @@ class Cbench(app_manager.RyuApp):
         datapath = msg.datapath
         ofproto = datapath.ofproto
 
+        dpid = datapath.id
+        id2dp_entry = self.dpid_to_dp.get(dpid)
+        if ((id2dp_entry is not None) and
+            (id2dp_entry is not datapath)):
+            id2dp_datapath.close()
+        dpid_to_dp[dpid] = datapath
+        datapath.assured = True
+
         match = datapath.ofproto_parser.OFPMatch(
             ofproto_v1_0.OFPFW_ALL, 0, 0, 0,
             0, 0, 0, 0, 0, 0, 0, 0, 0)
diff --git a/ryu/app/ofctl/service.py b/ryu/app/ofctl/service.py
index 0ca00c7..edde2b1 100644
--- a/ryu/app/ofctl/service.py
+++ b/ryu/app/ofctl/service.py
@@ -79,6 +79,9 @@ class OfctlService(app_manager.RyuApp):
         self.logger.debug('add dpid %s datapath %s new_info %s old_info %s',
                           id, datapath, new_info, old_info)
         self._switches[id] = new_info
+        datapath.assured = True
+        if old_info:
+            old_info.datapath.close()
 
     @set_ev_cls(ofp_event.EventOFPStateChange, DEAD_DISPATCHER)
     def _handle_dead(self, ev):
diff --git a/ryu/app/simple_switch.py b/ryu/app/simple_switch.py
index 862b830..1c40e6b 100644
--- a/ryu/app/simple_switch.py
+++ b/ryu/app/simple_switch.py
@@ -65,7 +65,13 @@ class SimpleSwitch(app_manager.RyuApp):
         src = eth.src
 
         dpid = datapath.id
-        self.mac_to_port.setdefault(dpid, {})
+        m2p_entry = self.mac_to_port.setdefault(dpid, {})
+        m2p_datapath = m2p_entry.get('datapath')
+        if ((m2p_datapath is not None) and
+            (m2p_datapath is not datapath)):
+            m2p_datapath.close()
+        m2p_entry['datapath'] = datapath
+        datapath.assured = True
 
         self.logger.info("packet in %s %s %s %s", dpid, src, dst, msg.in_port)
 
diff --git a/ryu/app/simple_switch_12.py b/ryu/app/simple_switch_12.py
index 6895b07..e434b2c 100644
--- a/ryu/app/simple_switch_12.py
+++ b/ryu/app/simple_switch_12.py
@@ -64,7 +64,13 @@ class SimpleSwitch12(app_manager.RyuApp):
         src = eth.src
 
         dpid = datapath.id
-        self.mac_to_port.setdefault(dpid, {})
+        m2p_entry = self.mac_to_port.setdefault(dpid, {})
+        m2p_datapath = m2p_entry.get('datapath')
+        if ((m2p_datapath is not None) and
+            (m2p_datapath is not datapath)):
+            m2p_datapath.close()
+        m2p_entry['datapath'] = datapath
+        datapath.assured = True
 
         self.logger.info("packet in %s %s %s %s", dpid, src, dst, in_port)
 
diff --git a/ryu/app/simple_switch_13.py b/ryu/app/simple_switch_13.py
index 3e7c598..ffc8ac8 100644
--- a/ryu/app/simple_switch_13.py
+++ b/ryu/app/simple_switch_13.py
@@ -86,7 +86,13 @@ class SimpleSwitch13(app_manager.RyuApp):
         src = eth.src
 
         dpid = datapath.id
-        self.mac_to_port.setdefault(dpid, {})
+        m2p_entry = self.mac_to_port.setdefault(dpid, {})
+        m2p_datapath = m2p_entry.get('datapath')
+        if ((m2p_datapath is not None) and
+            (m2p_datapath is not datapath)):
+            m2p_datapath.close()
+        m2p_entry['datapath'] = datapath
+        datapath.assured = True
 
         self.logger.info("packet in %s %s %s %s", dpid, src, dst, in_port)
 
diff --git a/ryu/app/simple_switch_14.py b/ryu/app/simple_switch_14.py
index d3151bc..be9beef 100644
--- a/ryu/app/simple_switch_14.py
+++ b/ryu/app/simple_switch_14.py
@@ -77,7 +77,13 @@ class SimpleSwitch14(app_manager.RyuApp):
         src = eth.src
 
         dpid = datapath.id
-        self.mac_to_port.setdefault(dpid, {})
+        m2p_entry = self.mac_to_port.setdefault(dpid, {})
+        m2p_datapath = m2p_entry.get('datapath')
+        if ((m2p_datapath is not None) and
+            (m2p_datapath is not datapath)):
+            m2p_datapath.close()
+        m2p_entry['datapath'] = datapath
+        datapath.assured = True
 
         self.logger.info("packet in %s %s %s %s", dpid, src, dst, in_port)
 
diff --git a/ryu/app/simple_switch_igmp.py b/ryu/app/simple_switch_igmp.py
index b1b014f..e5656c0 100644
--- a/ryu/app/simple_switch_igmp.py
+++ b/ryu/app/simple_switch_igmp.py
@@ -67,7 +67,13 @@ class SimpleSwitchIgmp(app_manager.RyuApp):
         dst = addrconv.mac.bin_to_text(dst_)
 
         dpid = datapath.id
-        self.mac_to_port.setdefault(dpid, {})
+        m2p_entry = self.mac_to_port.setdefault(dpid, {})
+        m2p_datapath = m2p_entry.get('datapath')
+        if ((m2p_datapath is not None) and
+            (m2p_datapath is not datapath)):
+            m2p_datapath.close()
+        m2p_entry['datapath'] = datapath
+        datapath.assured = True
 
         self.logger.info("packet in %s %s %s %s",
                          dpid, src, dst, msg.in_port)
diff --git a/ryu/app/simple_switch_lacp.py b/ryu/app/simple_switch_lacp.py
index 3774163..5d27cf6 100644
--- a/ryu/app/simple_switch_lacp.py
+++ b/ryu/app/simple_switch_lacp.py
@@ -77,7 +77,13 @@ class SimpleSwitchLacp(app_manager.RyuApp):
         dst = addrconv.mac.bin_to_text(dst_)
 
         dpid = datapath.id
-        self.mac_to_port.setdefault(dpid, {})
+        m2p_entry = self.mac_to_port.setdefault(dpid, {})
+        m2p_datapath = m2p_entry.get('datapath')
+        if ((m2p_datapath is not None) and
+            (m2p_datapath is not datapath)):
+            m2p_datapath.close()
+        m2p_entry['datapath'] = datapath
+        datapath.assured = True
 
         self.logger.info("packet in %s %s %s %s",
                          dpid, src, dst, msg.in_port)
diff --git a/ryu/app/simple_switch_snort.py b/ryu/app/simple_switch_snort.py
index 553a8bb..56a2ea6 100644
--- a/ryu/app/simple_switch_snort.py
+++ b/ryu/app/simple_switch_snort.py
@@ -117,7 +117,13 @@ class SimpleSwitchSnort(app_manager.RyuApp):
         src = eth.src
 
         dpid = datapath.id
-        self.mac_to_port.setdefault(dpid, {})
+        m2p_entry = self.mac_to_port.setdefault(dpid, {})
+        m2p_datapath = m2p_entry.get('datapath')
+        if ((m2p_datapath is not None) and
+            (m2p_datapath is not datapath)):
+            m2p_datapath.close()
+        m2p_entry['datapath'] = datapath
+        datapath.assured = True
 
         # self.logger.info("packet in %s %s %s %s", dpid, src, dst, in_port)
 
diff --git a/ryu/app/simple_switch_stp.py b/ryu/app/simple_switch_stp.py
index a86c8a5..001bd44 100644
--- a/ryu/app/simple_switch_stp.py
+++ b/ryu/app/simple_switch_stp.py
@@ -85,7 +85,13 @@ class SimpleSwitchStp(app_manager.RyuApp):
         dst, src, _eth_type = struct.unpack_from('!6s6sH', buffer(msg.data), 0)
 
         dpid = datapath.id
-        self.mac_to_port.setdefault(dpid, {})
+        m2p_entry = self.mac_to_port.setdefault(dpid, {})
+        m2p_datapath = m2p_entry.get('datapath')
+        if ((m2p_datapath is not None) and
+            (m2p_datapath is not datapath)):
+            m2p_datapath.close()
+        m2p_entry['datapath'] = datapath
+        datapath.assured = True
 
         self.logger.debug("packet in %s %s %s %s",
                           dpid, haddr_to_str(src), haddr_to_str(dst),
diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py
index 3d5d895..60cf88d 100644
--- a/ryu/base/app_manager.py
+++ b/ryu/base/app_manager.py
@@ -29,6 +29,7 @@ import logging
 import sys
 import os
 import gc
+import traceback
 
 from ryu import cfg
 from ryu import utils
@@ -37,6 +38,7 @@ from ryu.controller.handler import register_instance, get_dependent_services
 from ryu.controller.controller import Datapath
 from ryu.controller import event
 from ryu.controller.event import EventRequestBase, EventReplyBase
+from ryu.controller.ofp_event import EventOFPStateChange
 from ryu.lib import hub
 from ryu.ofproto import ofproto_protocol
 
@@ -157,12 +159,23 @@ class RyuApp(object):
         self.observers = {}     # ev_cls -> observer-name -> states:set
         self.threads = []
         self.main_thread = None
-        self.events = hub.Queue(128)
+        self.events = hub.Queue(64)
+        self.sc_events = hub.Queue(32)
+        self._events_sem = hub.BoundedSemaphore(self.events.maxsize)
+        self._sc_events_sem = hub.BoundedSemaphore(self.sc_events.maxsize)
+        self._event_get_timeout = 5
+
         if hasattr(self.__class__, 'LOGGER_NAME'):
             self.logger = logging.getLogger(self.__class__.LOGGER_NAME)
         else:
             self.logger = logging.getLogger(self.name)
         self.CONF = cfg.CONF
+        self.CONF.register_opts([
+            cfg.FloatOpt('handler-execution-timeout',
+                         default=10.0,
+                         help='Maximum time, in seconds, to permit handlers to run.')
+        ])
+        self._handler_execution_timeout = self.CONF.handler_execution_timeout
 
         # prevent accidental creation of instances of this class outside RyuApp
         class _EventThreadStop(event.EventBase):
@@ -279,15 +292,45 @@ class RyuApp(object):
 
     def _event_loop(self):
         while self.is_active or not self.events.empty():
-            ev, state = self.events.get()
-            if ev == self._event_stop:
+            ev = state = None
+            if self.sc_events.qsize():
+                try:
+                    ev, state = self.sc_events.get(timeout=self._event_get_timeout)
+                except hub.QueueEmpty:
+                    pass
+                else:
+                    self._sc_events_sem.release()
+
+            if ev is None:
+                try:
+                    ev, state = self.events.get(timeout=self._event_get_timeout)
+                except hub.QueueEmpty:
+                    pass
+                else:
+                    self._events_sem.release()
+
+            if (ev is None) or (ev == self._event_stop):
                 continue
             handlers = self.get_handlers(ev, state)
             for handler in handlers:
-                handler(ev)
+                handler_execution_timeout = hub.Timeout(self._handler_execution_timeout)
+                try:
+                    handler(ev)
+                except hub.Timeout:
+                    LOG.error('%s: Handler exceeded maximum execution time; terminated.', self.name)
+                    LOG.error('%s: Backtrace from offending handler [%s] servicing event [%s] follows.',
+                              self.name, handler.__name__, ev.__class__.__name__)
+                    LOG.error('%s', traceback.format_exc())
+                finally:
+                    handler_execution_timeout.cancel()
 
     def _send_event(self, ev, state):
-        self.events.put((ev, state))
+        if isinstance(ev, EventOFPStateChange):
+            self._sc_events_sem.acquire()
+            self.sc_events.put((ev, state), block=False)
+        else:
+            self._events_sem.acquire()
+            self.events.put((ev, state), block=False)
 
     def send_event(self, name, ev, state=None):
         """
@@ -336,7 +379,7 @@ class RyuApp(object):
 
 
 class AppManager(object):
-    # singletone
+    # singleton
     _instance = None
 
     @staticmethod
@@ -519,8 +562,11 @@ class AppManager(object):
         app.stop()
         self._close(app)
         events = app.events
+        sc_events = app.sc_events
         if not events.empty():
-            app.logger.debug('%s events remians %d', app.name, events.qsize())
+            app.logger.debug('%s events remains %d', app.name, events.qsize())
+        if not sc_events.empty():
+            app.logger.debug('%s state changes remains %d', app.name, sc_events.qsize())
 
     def close(self):
         def close_all(close_dict):
diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py
index 25b8776..5468871 100644
--- a/ryu/controller/controller.py
+++ b/ryu/controller/controller.py
@@ -30,7 +30,7 @@ from ryu.lib.hub import StreamServer
 import traceback
 import random
 import ssl
-from socket import IPPROTO_TCP, TCP_NODELAY, timeout as SocketTimeout, error as SocketError
+from socket import IPPROTO_TCP, TCP_NODELAY, SHUT_RDWR, timeout as SocketTimeout
 import warnings
 
 import ryu.base.app_manager
@@ -41,8 +41,10 @@ from ryu.ofproto import ofproto_protocol
 from ryu.ofproto import ofproto_v1_0
 from ryu.ofproto import nx_match
 
-from ryu.controller import handler
 from ryu.controller import ofp_event
+from ryu.controller.handler import HANDSHAKE_DISPATCHER, MAIN_DISPATCHER, DEAD_DISPATCHER
+
+from ryu.exception import RyuException
 
 from ryu.lib.dpid import dpid_to_str
 
@@ -58,8 +60,16 @@ CONF.register_cli_opts([
     cfg.StrOpt('ctl-privkey', default=None, help='controller private key'),
     cfg.StrOpt('ctl-cert', default=None, help='controller certificate'),
     cfg.StrOpt('ca-certs', default=None, help='CA certificates'),
-    cfg.FloatOpt('socket-timeout', default=5.0, help='Time, in seconds, to await completion of socket operations.')
 ])
+CONF.register_opts([
+    cfg.FloatOpt('socket-timeout',
+                 default=5.0,
+                 help='Time, in seconds, to await completion of socket operations.')
+])
+
+
+class DatapathShutdown(RyuException):
+    message = 'Datapath shutdown was requested'
 
 
 class OpenFlowController(object):
@@ -102,9 +112,22 @@ def _deactivate(method):
     def deactivate(self):
         try:
             method(self)
+        except DatapathShutdown:
+            if self.socket is not None:
+                self.socket.close()
+                self.socket = None
+            if self.recv_thread:
+                self.recv_thread.throw(DatapathShutdown)
+            if self.send_thread:
+                self.send_thread.throw(DatapathShutdown)
         finally:
-            self.send_active = False
-            self.set_state(handler.DEAD_DISPATCHER)
+            if self.socket is not None:
+                try:
+                    self.socket.shutdown(SHUT_RDWR)
+                except (EOFError, IOError):
+                    pass
+            if self.state is not DEAD_DISPATCHER:
+                self.set_state(DEAD_DISPATCHER)
     return deactivate
 
 
@@ -117,19 +140,26 @@ class Datapath(ofproto_protocol.ProtocolDesc):
         self.socket.settimeout(CONF.socket_timeout)
         self.address = address
 
-        self.send_active = True
+        self.send_thread = None
+        self.recv_thread = None
+
         self.close_requested = False
 
         # The limit is arbitrary. We need to limit queue size to
-        # prevent it from eating memory up
+        # prevent it from eating memory up.
+        # Similarly, the timeouts are also arbitrary.
         self.send_q = hub.Queue(16)
+        self._send_q_sem = hub.BoundedSemaphore(self.send_q.maxsize)
+        self._send_q_timeout = 5
+        self._unknown_datapath_timeout = 20
 
         self.xid = random.randint(0, self.ofproto.MAX_XID)
         self.id = None  # datapath_id is unknown yet
         self._ports = None
         self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10
         self.ofp_brick = ryu.base.app_manager.lookup_service_brick('ofp_event')
-        self.set_state(handler.HANDSHAKE_DISPATCHER)
+        self.assured = False
+        self.set_state(HANDSHAKE_DISPATCHER)
 
     def _get_ports(self):
         if (self.ofproto_parser is not None and
@@ -137,8 +167,8 @@ class Datapath(ofproto_protocol.ProtocolDesc):
             message = (
                 'Datapath#ports is kept for compatibility with the previous '
                 'openflow versions (< 1.3). '
-                'This not be updated by EventOFPPortStatus message. '
-                'If you want to be updated, you can use '
+                'This is not updated by the EventOFPPortStatus message. '
+                'If you want to be updated, you should use '
                 '\'ryu.controller.dpset\' or \'ryu.topology.switches\'.'
             )
             warnings.warn(message, stacklevel=2)
@@ -163,64 +193,85 @@ class Datapath(ofproto_protocol.ProtocolDesc):
     # Low level socket handling layer
     @_deactivate
     def _recv_loop(self):
-        buf = bytearray()
-        required_len = ofproto_common.OFP_HEADER_SIZE
-
-        count = 0
-        while True:
-            ret = ""
-
-            try:
-                ret = self.socket.recv(required_len)
-            except SocketTimeout:
-                if not self.close_requested:
-                    continue
-            except SocketError:
-                self.close_requested = True
-
-            if (len(ret) == 0) or (self.close_requested):
-                self.socket.close()
-                break
-
-            buf += ret
-            while len(buf) >= required_len:
-                (version, msg_type, msg_len, xid) = ofproto_parser.header(buf)
-                required_len = msg_len
-                if len(buf) < required_len:
-                    break
-
-                msg = ofproto_parser.msg(
-                    self, version, msg_type, msg_len, xid, buf[:msg_len])
-                # LOG.debug('queue msg %s cls %s', msg, msg.__class__)
-                if msg:
-                    ev = ofp_event.ofp_msg_to_ev(msg)
-                    self.ofp_brick.send_event_to_observers(ev, self.state)
-
-                    dispatchers = lambda x: x.callers[ev.__class__].dispatchers
-                    handlers = [handler for handler in
-                                self.ofp_brick.get_handlers(ev) if
-                                self.state in dispatchers(handler)]
-                    for handler in handlers:
-                        handler(ev)
-
-                buf = buf[required_len:]
-                required_len = ofproto_common.OFP_HEADER_SIZE
-
-                # We need to schedule other greenlets. Otherwise, ryu
-                # can't accept new switches or handle the existing
-                # switches. The limit is arbitrary. We need the better
-                # approach in the future.
-                count += 1
-                if count > 2048:
-                    count = 0
-                    hub.sleep(0)
+        unknown_datapath_timeout = hub.Timeout(self._unknown_datapath_timeout, DatapathShutdown)
+        try:
+            buf = bytearray()
+            required_len = ofproto_common.OFP_HEADER_SIZE
+
+            count = 0
+            while True:
+                ret = ""
+
+                try:
+                    ret = self.socket.recv(required_len)
+                except SocketTimeout:
+                    if not self.close_requested:
+                        continue
+                except (AttributeError, EOFError, IOError):
+                    self.close_requested = True
+
+                if (len(ret) == 0) or (self.close_requested):
+                    raise DatapathShutdown
+
+                if self.assured:
+                    unknown_datapath_timeout.cancel()
+
+                buf += ret
+                while len(buf) >= required_len:
+                    (version, msg_type, msg_len, xid) = ofproto_parser.header(buf)
+                    required_len = msg_len
+                    if len(buf) < required_len:
+                        break
+
+                    msg = ofproto_parser.msg(
+                        self, version, msg_type, msg_len, xid, buf[:msg_len])
+                    # LOG.debug('queue msg %s cls %s', msg, msg.__class__)
+                    if msg:
+                        ev = ofp_event.ofp_msg_to_ev(msg)
+                        self.ofp_brick.send_event_to_observers(ev, self.state)
+
+                        dispatchers = lambda x: x.callers[ev.__class__].dispatchers
+                        handlers = [handler for handler in
+                                    self.ofp_brick.get_handlers(ev) if
+                                    self.state in dispatchers(handler)]
+                        for handler in handlers:
+                            handler(ev)
+
+                    buf = buf[required_len:]
+                    required_len = ofproto_common.OFP_HEADER_SIZE
+
+                    # We need to schedule other greenlets. Otherwise, ryu
+                    # can't accept new switches or handle the existing
+                    # switches. The limit is arbitrary. We need the better
+                    # approach in the future.
+                    count += 1
+                    if count > 2048:
+                        count = 0
+                        hub.sleep(0)
+        finally:
+            # Get rid of the recv_thread reference.
+            self.recv_thread = None
+            # Cancel timeout, if we haven't already.
+            unknown_datapath_timeout.cancel()
 
     @_deactivate
     def _send_loop(self):
         try:
-            while self.send_active:
-                buf = self.send_q.get()
-                self.socket.sendall(buf)
+            while self.socket is not None:
+                buf = None
+                try:
+                    buf = self.send_q.get(timeout=self._send_q_timeout)
+                except hub.QueueEmpty:
+                    pass
+                else:
+                    self._send_q_sem.release()
+
+                if buf is None:
+                    continue
+                if self.socket:
+                    self.socket.sendall(buf)
+                else:
+                    raise DatapathShutdown
         except IOError as ioe:
             LOG.debug("Socket error while sending data to switch at address %s: [%d] %s",
                       self.address, ioe.errno, ioe.strerror)
@@ -228,17 +279,32 @@ class Datapath(ofproto_protocol.ProtocolDesc):
             q = self.send_q
             # first, clear self.send_q to prevent new references.
             self.send_q = None
-            # there might be threads currently blocking in send_q.put().
-            # unblock them by draining the queue.
+            # Next, get rid of the send_thread reference.
+            self.send_thread = None
+            # Drain the send_q, releasing the associated semaphore for each entry.
+            # This should release all threads waiting to acquire the semaphore.
             try:
                 while q.get(block=False):
-                    pass
+                    self._send_q_sem.release()
             except hub.QueueEmpty:
                 pass
 
     def send(self, buf):
+        acquired = False
+        dp_closed = True
+
         if self.send_q:
-            self.send_q.put(buf)
+            acquired = self._send_q_sem.acquire()
+
+        if self.send_q and acquired:
+            dp_closed = False
+            self.send_q.put(buf, block=False)
+        elif acquired:
+            self._send_q_sem.release()
+
+        if dp_closed:
+            LOG.debug('Datapath in process of terminating; send() to %s discarded.',
+                      self.address)
 
     def set_xid(self, msg):
         self.xid += 1
@@ -255,17 +321,15 @@ class Datapath(ofproto_protocol.ProtocolDesc):
         self.send(msg.buf)
 
     def serve(self):
-        send_thr = hub.spawn(self._send_loop)
+        # self.recv_thread *MUST* be set before we spawn the send loop!
+        self.recv_thread = hub.getcurrent()
+        self.send_thread = hub.spawn(self._send_loop)
 
         # send hello message immediately
         hello = self.ofproto_parser.OFPHello(self)
         self.send_msg(hello)
 
-        try:
-            self._recv_loop()
-        finally:
-            hub.kill(send_thr)
-            hub.joinall([send_thr])
+        self._recv_loop()
 
     #
     # Utility methods for convenience
diff --git a/ryu/controller/dpset.py b/ryu/controller/dpset.py
index 2682777..974fe5b 100644
--- a/ryu/controller/dpset.py
+++ b/ryu/controller/dpset.py
@@ -127,6 +127,7 @@ class DPSet(app_manager.RyuApp):
             self.logger.debug('DPSET: New datapath %s', dp)
             send_dp_reconnected = True
         self.dps[dp.id] = dp
+        dp.assured = True
         if dp.id not in self.port_state:
             self.port_state[dp.id] = PortState()
             ev = EventDP(dp, True)
@@ -163,7 +164,6 @@ class DPSet(app_manager.RyuApp):
         """
         This method returns the ryu.controller.controller.Datapath
         instance for the given Datapath ID.
-        Raises KeyError if no such a datapath connected to this controller.
         """
         return self.dps.get(dp_id)
 
diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py
index 5621147..e982e9f 100644
--- a/ryu/lib/hub.py
+++ b/ryu/lib/hub.py
@@ -90,11 +90,13 @@ if HUB_TYPE == 'eventlet':
             except greenlet.GreenletExit:
                 pass
 
-    Queue = eventlet.queue.Queue
+
+    Queue = eventlet.queue.LightQueue
     QueueEmpty = eventlet.queue.Empty
     Semaphore = eventlet.semaphore.Semaphore
     BoundedSemaphore = eventlet.semaphore.BoundedSemaphore
 
+
     class StreamServer(object):
         def __init__(self, listen_info, handle=None, backlog=None,
                      spawn='default', **ssl_args):
@@ -120,19 +122,24 @@ if HUB_TYPE == 'eventlet':
                 sock, addr = self.server.accept()
                 spawn(self.handle, sock, addr)
 
+
     class LoggingWrapper(object):
         def write(self, message):
             LOG.info(message.rstrip('\n'))
 
+
     class WSGIServer(StreamServer):
         def serve_forever(self):
             self.logger = LoggingWrapper()
             eventlet.wsgi.server(self.server, self.handle, self.logger)
 
+
     WebSocketWSGI = websocket.WebSocketWSGI
 
+
     Timeout = eventlet.timeout.Timeout
 
+
     class Event(object):
         def __init__(self):
             self._ev = eventlet.event.Event()
@@ -144,7 +151,7 @@ if HUB_TYPE == 'eventlet':
 
         def _broadcast(self):
             self._ev.send()
-            # because eventlet Event doesn't allow mutiple send() on an event,
+            # because eventlet Event doesn't allow multiple send() on an event,
             # re-create the underlying event.
             # note: _ev.reset() is obsolete.
             self._ev = eventlet.event.Event()
diff --git a/ryu/services/protocols/vrrp/monitor_openflow.py b/ryu/services/protocols/vrrp/monitor_openflow.py
index 32e691e..e1e4c8d 100644
--- a/ryu/services/protocols/vrrp/monitor_openflow.py
+++ b/ryu/services/protocols/vrrp/monitor_openflow.py
@@ -59,6 +59,7 @@ class VRRPInterfaceMonitorOpenFlow(monitor.VRRPInterfaceMonitor):
                               dpid_lib.dpid_to_str(dpid),
                               dpid_lib.dpid_to_str(self.interface.dpid))
             return
+        datapath.assured = True
 
         in_port = None
         for field in msg.match.fields:
diff --git a/ryu/services/protocols/vrrp/sample_router.py b/ryu/services/protocols/vrrp/sample_router.py
index 8a057c9..198cab4 100644
--- a/ryu/services/protocols/vrrp/sample_router.py
+++ b/ryu/services/protocols/vrrp/sample_router.py
@@ -449,6 +449,7 @@ class RouterIPV4OpenFlow(RouterIPV4):
         dpid = datapath.dpid
         if dpid != self.interface.dpid:
             return
+        datapath.assured = True
 
         for field in msg.match.fields:
             header = field.header
diff --git a/ryu/services/protocols/vrrp/utils.py b/ryu/services/protocols/vrrp/utils.py
index 8cb8fa1..6322404 100644
--- a/ryu/services/protocols/vrrp/utils.py
+++ b/ryu/services/protocols/vrrp/utils.py
@@ -41,7 +41,7 @@ def get_dp(app, dpid):
     """
     :type dpid: datapath id
     :param dpid:
-    :rtype: ryu.controller.controller.Datapatyh
+    :rtype: ryu.controller.controller.Datapath
     :returns: datapath corresponding to dpid
     """
     switches = topo_api.get_switch(app, dpid)
diff --git a/ryu/tests/switch/tester.py b/ryu/tests/switch/tester.py
index f678663..ccd4f27 100644
--- a/ryu/tests/switch/tester.py
+++ b/ryu/tests/switch/tester.py
@@ -355,7 +355,7 @@ class OfTester(app_manager.RyuApp):
 
     @set_ev_cls(ofp_event.EventOFPStateChange,
                 [handler.MAIN_DISPATCHER, handler.DEAD_DISPATCHER])
-    def dispacher_change(self, ev):
+    def dispatcher_change(self, ev):
         assert ev.datapath is not None
         if ev.state == handler.MAIN_DISPATCHER:
             self._register_sw(ev.datapath)
@@ -375,6 +375,7 @@ class OfTester(app_manager.RyuApp):
                     vers[OfTester.target_ver]
             else:
                 self.target_sw.dp = dp
+                dp.assured = True
                 msg = 'Join target SW.'
         elif dp.id == self.tester_dpid:
             if dp.ofproto.OFP_VERSION != OfTester.tester_ver:
@@ -382,6 +383,7 @@ class OfTester(app_manager.RyuApp):
                     vers[OfTester.tester_ver]
             else:
                 self.tester_sw.dp = dp
+                dp.assured = True
                 msg = 'Join tester SW.'
         else:
             msg = 'Connect unknown SW.'
diff --git a/ryu/topology/event.py b/ryu/topology/event.py
index c54152b..e7b682c 100644
--- a/ryu/topology/event.py
+++ b/ryu/topology/event.py
@@ -41,6 +41,11 @@ class EventSwitchLeave(EventSwitchBase):
         super(EventSwitchLeave, self).__init__(switch)
 
 
+class EventSwitchReconnected(EventSwitchBase):
+    def __init__(self, switch):
+        super(EventSwitchReconnected, self).__init__(switch)
+
+
 class EventPortBase(event.EventBase):
     def __init__(self, port):
         super(EventPortBase, self).__init__()
diff --git a/ryu/topology/switches.py b/ryu/topology/switches.py
index 5fe5d26..bc507a6 100644
--- a/ryu/topology/switches.py
+++ b/ryu/topology/switches.py
@@ -495,6 +495,7 @@ class Switches(app_manager.RyuApp):
     OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION, ofproto_v1_2.OFP_VERSION,
                     ofproto_v1_3.OFP_VERSION, ofproto_v1_4.OFP_VERSION]
     _EVENTS = [event.EventSwitchEnter, event.EventSwitchLeave,
+               event.EventSwitchReconnected,
                event.EventPortAdd, event.EventPortDelete,
                event.EventPortModify,
                event.EventLinkAdd, event.EventLinkDelete,
@@ -540,6 +541,7 @@ class Switches(app_manager.RyuApp):
         assert dp.id is not None
 
         self.dps[dp.id] = dp
+        dp.assured = True
         if dp.id not in self.port_state:
             self.port_state[dp.id] = PortState()
             for port in dp.ports.values():
@@ -547,8 +549,9 @@ class Switches(app_manager.RyuApp):
 
     def _unregister(self, dp):
         if dp.id in self.dps:
-            del self.dps[dp.id]
-            del self.port_state[dp.id]
+            if (self.dps[dp.id] == dp):
+                del self.dps[dp.id]
+                del self.port_state[dp.id]
 
     def _get_switch(self, dpid):
         if dpid in self.dps:
@@ -602,16 +605,18 @@ class Switches(app_manager.RyuApp):
         if ev.state == MAIN_DISPATCHER:
             dp_multiple_conns = False
             if dp.id in self.dps:
-                LOG.warning('multiple connections from %s', dpid_to_str(dp.id))
+                LOG.warning('Multiple connections from %s', dpid_to_str(dp.id))
                 dp_multiple_conns = True
+                (self.dps[dp.id]).close()
 
             self._register(dp)
             switch = self._get_switch(dp.id)
             LOG.debug('register %s', switch)
 
-            # Do not send event while dp has multiple connections.
             if not dp_multiple_conns:
                 self.send_event_to_observers(event.EventSwitchEnter(switch))
+            else:
+                self.send_event_to_observers(event.EventSwitchReconnected(switch))
 
             if not self.link_discovery:
                 return
@@ -665,19 +670,23 @@ class Switches(app_manager.RyuApp):
             # dp.id is None when datapath dies before handshake
             if dp.id is None:
                 return
+
             switch = self._get_switch(dp.id)
-            self._unregister(dp)
-            LOG.debug('unregister %s', switch)
-            self.send_event_to_observers(event.EventSwitchLeave(switch))
+            if switch:
+                if switch.dp is dp:
+                    self._unregister(dp)
+                    LOG.debug('unregister %s', switch)
 
-            if not self.link_discovery:
-                return
+                    self.send_event_to_observers(event.EventSwitchLeave(switch))
 
-            for port in switch.ports:
-                if not port.is_reserved():
-                    self.ports.del_port(port)
-                    self._link_down(port)
-            self.lldp_event.set()
+                    if not self.link_discovery:
+                        return
+
+                    for port in switch.ports:
+                        if not port.is_reserved():
+                            self.ports.del_port(port)
+                            self._link_down(port)
+                    self.lldp_event.set()
 
     @set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER)
     def port_status_handler(self, ev):
@@ -762,7 +771,7 @@ class Switches(app_manager.RyuApp):
         try:
             src_dpid, src_port_no = LLDPPacket.lldp_parse(msg.data)
         except LLDPPacket.LLDPUnknownFormat as e:
-            # This handler can receive all the packtes which can be
+            # This handler can receive all the packets which can be
             # not-LLDP packet. Ignore it silently
             return
 
