hensg commented on a change in pull request #27064: 
[SPARK-30246]OneForOneStreamManager might leak memory in connectionTerminated
URL: https://github.com/apache/spark/pull/27064#discussion_r362458448
 
 

 ##########
 File path: 
common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
 ##########
 @@ -117,18 +118,21 @@ public static String genStreamChunkId(long streamId, int 
chunkId) {
 
   @Override
   public void connectionTerminated(Channel channel) {
+    LinkedList<StreamState> removedStates = new LinkedList<>();
     // Close all streams which have been associated with the channel.
     for (Map.Entry<Long, StreamState> entry: streams.entrySet()) {
       StreamState state = entry.getValue();
       if (state.associatedChannel == channel) {
-        streams.remove(entry.getKey());
-
-        // Release all remaining buffers.
-        while (state.buffers.hasNext()) {
-          ManagedBuffer buffer = state.buffers.next();
-          if (buffer != null) {
-            buffer.release();
-          }
+        removedStates.add(streams.remove(entry.getKey()));
+      }
+    }
+
+    for (StreamState state: removedStates) {
+      // Release all remaining buffers.
 
 Review comment:
   Actually, If iterator `.next()` throws an exception and we `catch` it, the 
iterator cursor/index won't be increased and we'll be stuck at that iterator 
position, and the next time that `.next()` get called it will throw the same 
exception.
   
   Furthermore, since buffers iterator is a lazy implementation in cases 1 and 
2 `ExternalBlockHandler`, I don't see a reason for that:
   1. 
https://github.com/apache/spark/blob/ee050ddbc6eb6bc08c7751a0eb00e7a05b011b52/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java#L112
 
https://github.com/apache/spark/blob/ee050ddbc6eb6bc08c7751a0eb00e7a05b011b52/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java#L318
   2. 
https://github.com/apache/spark/blob/ee050ddbc6eb6bc08c7751a0eb00e7a05b011b52/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java#L119
 
https://github.com/apache/spark/blob/ee050ddbc6eb6bc08c7751a0eb00e7a05b011b52/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java#L357
   
   And in case 3 `NettyBlockRpcServer` the iterator is just a java iterator.
   
https://github.com/apache/spark/blob/22def779b12eb34f8da4c1c40ab41d32bca29292/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala#L65
 
https://github.com/apache/spark/blob/22def779b12eb34f8da4c1c40ab41d32bca29292/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala#L95
   
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to