Currently, Ryu does not provide the way to close a socket connecting to a switch after sending all enqueued messages, but provides only the way to close the socket immediately regardless of enqueued messages.
This patch adds a new option "close_socket" into "Datapath.send_msg()" method and this option enables to close the socket after sending the given message. This patch is convenient to close the socket after sending OFPT_ERROR message to the switch. Signed-off-by: IWASE Yusuke <iwase.yusu...@gmail.com> --- ryu/controller/controller.py | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py index 8484891..03bd381 100644 --- a/ryu/controller/controller.py +++ b/ryu/controller/controller.py @@ -23,15 +23,19 @@ The main component of OpenFlow controller. """ import contextlib -from ryu import cfg import logging -from ryu.lib import hub -from ryu.lib.hub import StreamServer import random +from socket import ( + IPPROTO_TCP, + TCP_NODELAY, + SHUT_RDWR, + timeout as SocketTimeout, +) import ssl -from socket import IPPROTO_TCP, TCP_NODELAY, SHUT_RDWR, timeout as SocketTimeout -import netaddr +from ryu import cfg +from ryu.lib import hub +from ryu.lib.hub import StreamServer import ryu.base.app_manager @@ -200,6 +204,7 @@ def _deactivate(method): if not self.is_active: self.socket.close() + return deactivate @@ -335,7 +340,9 @@ class Datapath(ofproto_protocol.ProtocolDesc): ev = ofp_event.ofp_msg_to_ev(msg) self.ofp_brick.send_event_to_observers(ev, self.state) - dispatchers = lambda x: x.callers[ev.__class__].dispatchers + def dispatchers(x): + return x.callers[ev.__class__].dispatchers + handlers = [handler for handler in self.ofp_brick.get_handlers(ev) if self.state in dispatchers(handler)] @@ -358,9 +365,11 @@ class Datapath(ofproto_protocol.ProtocolDesc): def _send_loop(self): try: while self.state != DEAD_DISPATCHER: - buf = self.send_q.get() + buf, close_socket = self.send_q.get() self._send_q_sem.release() self.socket.sendall(buf) + if close_socket: + break except SocketTimeout: LOG.debug("Socket timed out while sending data to switch at address %s", self.address) @@ -383,11 +392,11 @@ class Datapath(ofproto_protocol.ProtocolDesc): # Finally, ensure the _recv_loop terminates. self.close() - def send(self, buf): + def send(self, buf, close_socket=False): msg_enqueued = False self._send_q_sem.acquire() if self.send_q: - self.send_q.put(buf) + self.send_q.put((buf, close_socket)) msg_enqueued = True else: self._send_q_sem.release() @@ -402,13 +411,13 @@ class Datapath(ofproto_protocol.ProtocolDesc): msg.set_xid(self.xid) return self.xid - def send_msg(self, msg): + def send_msg(self, msg, close_socket=False): assert isinstance(msg, self.ofproto_parser.MsgBase) if msg.xid is None: self.set_xid(msg) msg.serialize() # LOG.debug('send_msg %s', msg) - return self.send(msg.buf) + return self.send(msg.buf, close_socket=close_socket) def _echo_request_loop(self): if not self.max_unreplied_echo_requests: -- 2.7.4 ------------------------------------------------------------------------------ Check out the vibrant tech community on one of the world's most engaging tech sites, Slashdot.org! http://sdm.link/slashdot _______________________________________________ Ryu-devel mailing list Ryu-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/ryu-devel