This is an automated email from the ASF dual-hosted git repository. wuyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new dc995f2 [SPARK-38683][SHUFFLE] It is unnecessary to release the ShuffleManagedBufferIterator or ShuffleChunkManagedBufferIterator or ManagedBufferIterator buffers when the client channel's connection is terminated dc995f2 is described below commit dc995f29592c720b959c877e31133bcd706d9b4e Author: weixiuli <weixi...@jd.com> AuthorDate: Fri Apr 1 10:06:19 2022 +0800 [SPARK-38683][SHUFFLE] It is unnecessary to release the ShuffleManagedBufferIterator or ShuffleChunkManagedBufferIterator or ManagedBufferIterator buffers when the client channel's connection is terminated ### What changes were proposed in this pull request? It is unnecessary to release the ShuffleManagedBufferIterator or ShuffleChunkManagedBufferIterator or ManagedBufferIterator buffers when the client channel's connection is terminated. If a client connection is closed before the iterator is fully drained, then the remaining materialized buffers should all be released, but some buffers like `ShuffleManagedBufferIterator`, `ShuffleChunkManagedBufferIterator`, `ManagedBufferIterator` are not materialized until the iterator is traversed by calling next(), so we should not traverse and release them in order to avoid unnecessary buffer materialization, which could be I/O based. ### Why are the changes needed? To reduce I/O operations for the External Shuffle Service. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unittests. Closes #36000 from weixiuli/SPARK-38683-unnecessary-release. Authored-by: weixiuli <weixi...@jd.com> Signed-off-by: yi.wu <yi...@databricks.com> --- .../network/server/OneForOneStreamManager.java | 34 ++++++++++++++++++---- .../server/OneForOneStreamManagerSuite.java | 33 +++++++++++++++++++++ .../network/shuffle/ExternalBlockHandler.java | 4 +-- .../network/shuffle/ExternalBlockHandlerSuite.java | 5 ++-- 4 files changed, 66 insertions(+), 10 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index dfa31c0..ace409e 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -51,6 +51,12 @@ public class OneForOneStreamManager extends StreamManager { // The channel associated to the stream final Channel associatedChannel; + // Indicates whether the buffers is only materialized when next() is called. Some buffers like + // ShuffleManagedBufferIterator, ShuffleChunkManagedBufferIterator, ManagedBufferIterator are + // not materialized until the iterator is traversed by calling next(). We use it to decide + // whether buffers should be released at connectionTerminated() in order to avoid unnecessary + // buffer materialization, which could be I/O based. + final boolean isBufferMaterializedOnNext; // Used to keep track of the index of the buffer that the user has retrieved, just to ensure // that the caller only requests each chunk one at a time, in order. @@ -59,10 +65,15 @@ public class OneForOneStreamManager extends StreamManager { // Used to keep track of the number of chunks being transferred and not finished yet. final AtomicLong chunksBeingTransferred = new AtomicLong(0L); - StreamState(String appId, Iterator<ManagedBuffer> buffers, Channel channel) { + StreamState( + String appId, + Iterator<ManagedBuffer> buffers, + Channel channel, + boolean isBufferMaterializedOnNext) { this.appId = appId; this.buffers = Preconditions.checkNotNull(buffers); this.associatedChannel = channel; + this.isBufferMaterializedOnNext = isBufferMaterializedOnNext; } } @@ -130,7 +141,7 @@ public class OneForOneStreamManager extends StreamManager { try { // Release all remaining buffers. - while (state.buffers.hasNext()) { + while (!state.isBufferMaterializedOnNext && state.buffers.hasNext()) { ManagedBuffer buffer = state.buffers.next(); if (buffer != null) { buffer.release(); @@ -205,8 +216,11 @@ public class OneForOneStreamManager extends StreamManager { /** * Registers a stream of ManagedBuffers which are served as individual chunks one at a time to * callers. Each ManagedBuffer will be release()'d after it is transferred on the wire. If a - * client connection is closed before the iterator is fully drained, then the remaining buffers - * will all be release()'d. + * client connection is closed before the iterator is fully drained, then the remaining + * materialized buffers will all be release()'d, but some buffers like + * ShuffleManagedBufferIterator, ShuffleChunkManagedBufferIterator, ManagedBufferIterator should + * not release, because they have not been materialized before requesting the iterator by + * the next method. * * If an app ID is provided, only callers who've authenticated with the given app ID will be * allowed to fetch from this stream. @@ -215,12 +229,20 @@ public class OneForOneStreamManager extends StreamManager { * to be the only reader of the stream. Once the connection is closed, the stream will never * be used again, enabling cleanup by `connectionTerminated`. */ - public long registerStream(String appId, Iterator<ManagedBuffer> buffers, Channel channel) { + public long registerStream( + String appId, + Iterator<ManagedBuffer> buffers, + Channel channel, + boolean isBufferMaterializedOnNext) { long myStreamId = nextStreamId.getAndIncrement(); - streams.put(myStreamId, new StreamState(appId, buffers, channel)); + streams.put(myStreamId, new StreamState(appId, buffers, channel, isBufferMaterializedOnNext)); return myStreamId; } + public long registerStream(String appId, Iterator<ManagedBuffer> buffers, Channel channel) { + return registerStream(appId, buffers, channel, false); + } + @VisibleForTesting public int numStreamStates() { return streams.size(); diff --git a/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java index b65daaf..54f2617 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java @@ -126,4 +126,37 @@ public class OneForOneStreamManagerSuite { Mockito.verify(mockManagedBuffer, Mockito.times(1)).release(); Assert.assertEquals(0, manager.numStreamStates()); } + + @Test + public void streamStatesAreFreeOrNotWhenConnectionIsClosed() { + OneForOneStreamManager manager = new OneForOneStreamManager(); + ManagedBuffer mockManagedBuffer = Mockito.mock(ManagedBuffer.class); + + Iterator<ManagedBuffer> buffers1 = Mockito.mock(Iterator.class); + Mockito.when(buffers1.hasNext()).thenReturn(true).thenReturn(false); + Mockito.when(buffers1.next()).thenReturn(mockManagedBuffer); + + Iterator<ManagedBuffer> buffers2 = Mockito.mock(Iterator.class); + Mockito.when(buffers2.hasNext()).thenReturn(true); + Mockito.when(buffers2.next()).thenReturn(mockManagedBuffer); + + Channel dummyChannel = Mockito.mock(Channel.class, Mockito.RETURNS_SMART_NULLS); + // should Release, + manager.registerStream("appId", buffers1, dummyChannel, false); + // should NOT Release + manager.registerStream("appId", buffers2, dummyChannel, true); + Assert.assertEquals(2, manager.numStreamStates()); + + // connectionTerminated + manager.connectionTerminated(dummyChannel); + + Mockito.verify(buffers1, Mockito.times(2)).hasNext(); + Mockito.verify(buffers1, Mockito.times(1)).next(); + + Mockito.verify(buffers2, Mockito.times(0)).hasNext(); + Mockito.verify(buffers2, Mockito.times(0)).next(); + // only buffers1 has been released + Mockito.verify(mockManagedBuffer, Mockito.times(1)).release(); + Assert.assertEquals(0, manager.numStreamStates()); + } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java index 52bc0f9c..9b2e7a4 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java @@ -157,14 +157,14 @@ public class ExternalBlockHandler extends RpcHandler iterator = new ShuffleChunkManagedBufferIterator((FetchShuffleBlockChunks) msgObj); } streamId = streamManager.registerStream(client.getClientId(), iterator, - client.getChannel()); + client.getChannel(), true); } else { // For the compatibility with the old version, still keep the support for OpenBlocks. OpenBlocks msg = (OpenBlocks) msgObj; numBlockIds = msg.blockIds.length; checkAuth(client, msg.appId); streamId = streamManager.registerStream(client.getClientId(), - new ManagedBufferIterator(msg), client.getChannel()); + new ManagedBufferIterator(msg), client.getChannel(), true); } if (logger.isTraceEnabled()) { logger.trace( diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java index 14896c8..18efc10 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java @@ -302,7 +302,7 @@ public class ExternalBlockHandlerSuite { ArgumentCaptor<Iterator<ManagedBuffer>> stream = (ArgumentCaptor<Iterator<ManagedBuffer>>) (ArgumentCaptor<?>) ArgumentCaptor.forClass(Iterator.class); verify(streamManager, times(1)).registerStream(anyString(), stream.capture(), - any()); + any(), anyBoolean()); Iterator<ManagedBuffer> buffers = stream.getValue(); for (ManagedBuffer blockMarker : blockMarkers) { assertEquals(blockMarker, buffers.next()); @@ -451,7 +451,8 @@ public class ExternalBlockHandlerSuite { @SuppressWarnings("unchecked") ArgumentCaptor<Iterator<ManagedBuffer>> stream = (ArgumentCaptor<Iterator<ManagedBuffer>>) (ArgumentCaptor<?>) ArgumentCaptor.forClass(Iterator.class); - verify(streamManager, times(1)).registerStream(any(), stream.capture(), any()); + verify(streamManager, times(1)).registerStream(any(), stream.capture(), + any(), anyBoolean()); Iterator<ManagedBuffer> bufferIter = stream.getValue(); for (int reduceId = 0; reduceId < 2; reduceId++) { for (int chunkId = 0; chunkId < 2; chunkId++) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org