Currently, a thread for processing outgoing message will be remained in
"RpcSession" activity even if RPC session is closed by the remote peer,
and garbages on memory will grow.

This patch fixes to close "RpcSession" activity when RPC session
closing.

Signed-off-by: IWASE Yusuke <iwase.yusu...@gmail.com>
---
 ryu/services/protocols/bgp/net_ctrl.py | 24 +++++++++++++++++-------
 1 file changed, 17 insertions(+), 7 deletions(-)

diff --git a/ryu/services/protocols/bgp/net_ctrl.py 
b/ryu/services/protocols/bgp/net_ctrl.py
index 7944ac2..92a8e71 100644
--- a/ryu/services/protocols/bgp/net_ctrl.py
+++ b/ryu/services/protocols/bgp/net_ctrl.py
@@ -107,6 +107,8 @@ class RpcSession(Activity):
         self._socket = sock
         self._outgoing_msg_sink_iter = outgoing_msg_sink_iter
         self.is_connected = True
+        self.green_in = None
+        self.green_out = None
 
     def stop(self):
         super(RpcSession, self).stop()
@@ -115,15 +117,15 @@ class RpcSession(Activity):
 
     def _run(self):
         # Process outgoing messages in new thread.
-        green_out = self._spawn('net_ctrl._process_outgoing',
-                                self._process_outgoing_msg,
-                                self._outgoing_msg_sink_iter)
+        self.green_out = self._spawn('net_ctrl._process_outgoing',
+                                     self._process_outgoing_msg,
+                                     self._outgoing_msg_sink_iter)
         # Process incoming messages in new thread.
-        green_in = self._spawn('net_ctrl._process_incoming',
-                               self._process_incoming_msgs)
+        self.green_in = self._spawn('net_ctrl._process_incoming',
+                                    self._process_incoming_msgs)
         LOG.info('RPC Session to %s started', self.peer_name)
-        green_in.wait()
-        green_out.wait()
+        self.green_in.wait()
+        self.green_out.wait()
 
     def _next_msg_id(self):
         this_id = self._next_msgid
@@ -202,6 +204,10 @@ class RpcSession(Activity):
                     LOG.error('Invalid message type: %r', msg)
                 self.pause(0)
 
+        # Stop outgoing connection.
+        if self.green_out:
+            self.green_out.kill()
+
     def _process_outgoing_msg(self, sink_iter):
         """For every message we construct a corresponding RPC message to be
         sent over the given socket inside given RPC session.
@@ -231,6 +237,10 @@ class RpcSession(Activity):
                     self._sendall(rpc_msg)
             self.pause(0)
 
+        # Stop incoming connection.
+        if self.green_in:
+            self.green_in.kill()
+
     def _recv(self):
         return self._sock_wrap(self._socket.recv)(RPC_SOCK_BUFF_SIZE)
 
-- 
2.7.4


------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, Slashdot.org! http://sdm.link/slashdot
_______________________________________________
Ryu-devel mailing list
Ryu-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to