This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dc995f2  [SPARK-38683][SHUFFLE] It is unnecessary to release the 
ShuffleManagedBufferIterator or ShuffleChunkManagedBufferIterator or 
ManagedBufferIterator buffers when the client channel's connection is terminated
dc995f2 is described below

commit dc995f29592c720b959c877e31133bcd706d9b4e
Author: weixiuli <weixi...@jd.com>
AuthorDate: Fri Apr 1 10:06:19 2022 +0800

    [SPARK-38683][SHUFFLE] It is unnecessary to release the 
ShuffleManagedBufferIterator or ShuffleChunkManagedBufferIterator or 
ManagedBufferIterator buffers when the client channel's connection is terminated
    
    ### What changes were proposed in this pull request?
    
    It is unnecessary to release the ShuffleManagedBufferIterator or 
ShuffleChunkManagedBufferIterator or ManagedBufferIterator buffers when the 
client channel's connection is terminated.
    
    If a  client connection is closed before the iterator is fully drained, 
then the remaining materialized buffers should all be released, but some 
buffers like `ShuffleManagedBufferIterator`, 
`ShuffleChunkManagedBufferIterator`, `ManagedBufferIterator` are not 
materialized until the iterator is traversed by calling next(),  so we should 
not traverse and release them in order to avoid unnecessary buffer 
materialization,  which could be I/O based.
    ### Why are the changes needed?
    
    To reduce I/O operations  for the External Shuffle Service.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing unittests.
    
    Closes #36000 from weixiuli/SPARK-38683-unnecessary-release.
    
    Authored-by: weixiuli <weixi...@jd.com>
    Signed-off-by: yi.wu <yi...@databricks.com>
---
 .../network/server/OneForOneStreamManager.java     | 34 ++++++++++++++++++----
 .../server/OneForOneStreamManagerSuite.java        | 33 +++++++++++++++++++++
 .../network/shuffle/ExternalBlockHandler.java      |  4 +--
 .../network/shuffle/ExternalBlockHandlerSuite.java |  5 ++--
 4 files changed, 66 insertions(+), 10 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
index dfa31c0..ace409e 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
@@ -51,6 +51,12 @@ public class OneForOneStreamManager extends StreamManager {
 
     // The channel associated to the stream
     final Channel associatedChannel;
+    // Indicates whether the buffers is only materialized when next() is 
called. Some buffers like
+    // ShuffleManagedBufferIterator, ShuffleChunkManagedBufferIterator, 
ManagedBufferIterator are
+    // not materialized until the iterator is traversed by calling next(). We 
use it to decide
+    // whether buffers should be released at connectionTerminated() in order 
to avoid unnecessary
+    // buffer materialization, which could be I/O based.
+    final boolean isBufferMaterializedOnNext;
 
     // Used to keep track of the index of the buffer that the user has 
retrieved, just to ensure
     // that the caller only requests each chunk one at a time, in order.
@@ -59,10 +65,15 @@ public class OneForOneStreamManager extends StreamManager {
     // Used to keep track of the number of chunks being transferred and not 
finished yet.
     final AtomicLong chunksBeingTransferred = new AtomicLong(0L);
 
-    StreamState(String appId, Iterator<ManagedBuffer> buffers, Channel 
channel) {
+    StreamState(
+        String appId,
+        Iterator<ManagedBuffer> buffers,
+        Channel channel,
+        boolean isBufferMaterializedOnNext) {
       this.appId = appId;
       this.buffers = Preconditions.checkNotNull(buffers);
       this.associatedChannel = channel;
+      this.isBufferMaterializedOnNext = isBufferMaterializedOnNext;
     }
   }
 
@@ -130,7 +141,7 @@ public class OneForOneStreamManager extends StreamManager {
 
         try {
           // Release all remaining buffers.
-          while (state.buffers.hasNext()) {
+          while (!state.isBufferMaterializedOnNext && state.buffers.hasNext()) 
{
             ManagedBuffer buffer = state.buffers.next();
             if (buffer != null) {
               buffer.release();
@@ -205,8 +216,11 @@ public class OneForOneStreamManager extends StreamManager {
   /**
    * Registers a stream of ManagedBuffers which are served as individual 
chunks one at a time to
    * callers. Each ManagedBuffer will be release()'d after it is transferred 
on the wire. If a
-   * client connection is closed before the iterator is fully drained, then 
the remaining buffers
-   * will all be release()'d.
+   * client connection is closed before the iterator is fully drained, then 
the remaining
+   * materialized buffers will all be release()'d, but some buffers like
+   * ShuffleManagedBufferIterator, ShuffleChunkManagedBufferIterator, 
ManagedBufferIterator should
+   * not release, because they have not been materialized before requesting 
the iterator by
+   * the next method.
    *
    * If an app ID is provided, only callers who've authenticated with the 
given app ID will be
    * allowed to fetch from this stream.
@@ -215,12 +229,20 @@ public class OneForOneStreamManager extends StreamManager 
{
    * to be the only reader of the stream. Once the connection is closed, the 
stream will never
    * be used again, enabling cleanup by `connectionTerminated`.
    */
-  public long registerStream(String appId, Iterator<ManagedBuffer> buffers, 
Channel channel) {
+  public long registerStream(
+      String appId,
+      Iterator<ManagedBuffer> buffers,
+      Channel channel,
+      boolean isBufferMaterializedOnNext) {
     long myStreamId = nextStreamId.getAndIncrement();
-    streams.put(myStreamId, new StreamState(appId, buffers, channel));
+    streams.put(myStreamId, new StreamState(appId, buffers, channel, 
isBufferMaterializedOnNext));
     return myStreamId;
   }
 
+  public long registerStream(String appId, Iterator<ManagedBuffer> buffers, 
Channel channel) {
+    return registerStream(appId, buffers, channel, false);
+  }
+
   @VisibleForTesting
   public int numStreamStates() {
     return streams.size();
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java
index b65daaf..54f2617 100644
--- 
a/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java
@@ -126,4 +126,37 @@ public class OneForOneStreamManagerSuite {
     Mockito.verify(mockManagedBuffer, Mockito.times(1)).release();
     Assert.assertEquals(0, manager.numStreamStates());
   }
+
+  @Test
+  public void streamStatesAreFreeOrNotWhenConnectionIsClosed() {
+    OneForOneStreamManager manager = new OneForOneStreamManager();
+    ManagedBuffer mockManagedBuffer = Mockito.mock(ManagedBuffer.class);
+
+    Iterator<ManagedBuffer> buffers1 = Mockito.mock(Iterator.class);
+    Mockito.when(buffers1.hasNext()).thenReturn(true).thenReturn(false);
+    Mockito.when(buffers1.next()).thenReturn(mockManagedBuffer);
+
+    Iterator<ManagedBuffer> buffers2 = Mockito.mock(Iterator.class);
+    Mockito.when(buffers2.hasNext()).thenReturn(true);
+    Mockito.when(buffers2.next()).thenReturn(mockManagedBuffer);
+
+    Channel dummyChannel = Mockito.mock(Channel.class, 
Mockito.RETURNS_SMART_NULLS);
+    // should Release,
+    manager.registerStream("appId", buffers1, dummyChannel, false);
+    // should NOT Release
+    manager.registerStream("appId", buffers2, dummyChannel, true);
+    Assert.assertEquals(2, manager.numStreamStates());
+
+    // connectionTerminated
+    manager.connectionTerminated(dummyChannel);
+
+    Mockito.verify(buffers1, Mockito.times(2)).hasNext();
+    Mockito.verify(buffers1, Mockito.times(1)).next();
+
+    Mockito.verify(buffers2, Mockito.times(0)).hasNext();
+    Mockito.verify(buffers2, Mockito.times(0)).next();
+    // only buffers1 has been released
+    Mockito.verify(mockManagedBuffer, Mockito.times(1)).release();
+    Assert.assertEquals(0, manager.numStreamStates());
+  }
 }
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
index 52bc0f9c..9b2e7a4 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
@@ -157,14 +157,14 @@ public class ExternalBlockHandler extends RpcHandler
             iterator = new 
ShuffleChunkManagedBufferIterator((FetchShuffleBlockChunks) msgObj);
           }
           streamId = streamManager.registerStream(client.getClientId(), 
iterator,
-            client.getChannel());
+            client.getChannel(), true);
         } else {
           // For the compatibility with the old version, still keep the 
support for OpenBlocks.
           OpenBlocks msg = (OpenBlocks) msgObj;
           numBlockIds = msg.blockIds.length;
           checkAuth(client, msg.appId);
           streamId = streamManager.registerStream(client.getClientId(),
-            new ManagedBufferIterator(msg), client.getChannel());
+            new ManagedBufferIterator(msg), client.getChannel(), true);
         }
         if (logger.isTraceEnabled()) {
           logger.trace(
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
index 14896c8..18efc10 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
@@ -302,7 +302,7 @@ public class ExternalBlockHandlerSuite {
     ArgumentCaptor<Iterator<ManagedBuffer>> stream = 
(ArgumentCaptor<Iterator<ManagedBuffer>>)
         (ArgumentCaptor<?>) ArgumentCaptor.forClass(Iterator.class);
     verify(streamManager, times(1)).registerStream(anyString(), 
stream.capture(),
-      any());
+      any(), anyBoolean());
     Iterator<ManagedBuffer> buffers = stream.getValue();
     for (ManagedBuffer blockMarker : blockMarkers) {
       assertEquals(blockMarker, buffers.next());
@@ -451,7 +451,8 @@ public class ExternalBlockHandlerSuite {
     @SuppressWarnings("unchecked")
     ArgumentCaptor<Iterator<ManagedBuffer>> stream = 
(ArgumentCaptor<Iterator<ManagedBuffer>>)
       (ArgumentCaptor<?>) ArgumentCaptor.forClass(Iterator.class);
-    verify(streamManager, times(1)).registerStream(any(), stream.capture(), 
any());
+    verify(streamManager, times(1)).registerStream(any(), stream.capture(),
+        any(), anyBoolean());
     Iterator<ManagedBuffer> bufferIter = stream.getValue();
     for (int reduceId = 0; reduceId < 2; reduceId++) {
       for (int chunkId = 0; chunkId < 2; chunkId++) {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to