[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218942035 --- Diff: common/network-common/src/main/java/org/apache/spark/network/TransportContext.java --- @@ -77,17 +82,54 @@ private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE; private static final MessageDecoder DECODER = MessageDecoder.INSTANCE; + // Separate thread pool for handling ChunkFetchRequest. This helps to enable throttling + // max number of TransportServer worker threads that are blocked on writing response + // of ChunkFetchRequest message back to the client via the underlying channel. + private static EventLoopGroup chunkFetchWorkers; + public TransportContext(TransportConf conf, RpcHandler rpcHandler) { -this(conf, rpcHandler, false); +this(conf, rpcHandler, false, false); } public TransportContext( TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections) { +this(conf, rpcHandler, closeIdleConnections, false); + } + +/** + * + * @param conf TransportConf + * @param rpcHandler RpcHandler responsible for handling requests and responses. + * @param closeIdleConnections Close idle connections if it is set to true. + * @param isClientOnly This config is more important when external shuffle is enabled. + * It stops creating extra event loop and subsequent thread pool + * for shuffle clients to handle chunked fetch requests. + * In the case when external shuffle is disabled, the executors are both + * client and server so both share the same event loop which is trivial. --- End diff -- I hope we follow a similar indentation for all other comments --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218939333 --- Diff: common/network-common/src/main/java/org/apache/spark/network/TransportContext.java --- @@ -98,21 +98,32 @@ public TransportContext( this(conf, rpcHandler, closeIdleConnections, false); } +/** + * + * @param conf TransportConf + * @param rpcHandler RpcHandler responsible for handling requests and responses. + * @param closeIdleConnections Close idle connections if is set to true. + * @param isClientOnly This config is more important when external shuffle is enabled. --- End diff -- I think for comments we follow the same spacing convention as observed here so sticking with it... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218930135 --- Diff: common/network-common/src/main/java/org/apache/spark/network/TransportContext.java --- @@ -77,17 +82,43 @@ private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE; private static final MessageDecoder DECODER = MessageDecoder.INSTANCE; + // Separate thread pool for handling ChunkFetchRequest. This helps to enable throttling + // max number of TransportServer worker threads that are blocked on writing response + // of ChunkFetchRequest message back to the client via the underlying channel. + private static EventLoopGroup chunkFetchWorkers; + public TransportContext(TransportConf conf, RpcHandler rpcHandler) { -this(conf, rpcHandler, false); +this(conf, rpcHandler, false, false); } public TransportContext( TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections) { +this(conf, rpcHandler, closeIdleConnections, false); + } + + public TransportContext( + TransportConf conf, + RpcHandler rpcHandler, + boolean closeIdleConnections, + boolean isClient) { --- End diff -- sure... anything to make is more clear --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218926387 --- Diff: common/network-common/src/main/java/org/apache/spark/network/TransportContext.java --- @@ -77,17 +82,43 @@ private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE; private static final MessageDecoder DECODER = MessageDecoder.INSTANCE; + // Separate thread pool for handling ChunkFetchRequest. This helps to enable throttling + // max number of TransportServer worker threads that are blocked on writing response + // of ChunkFetchRequest message back to the client via the underlying channel. + private static EventLoopGroup chunkFetchWorkers; + public TransportContext(TransportConf conf, RpcHandler rpcHandler) { -this(conf, rpcHandler, false); +this(conf, rpcHandler, false, false); } public TransportContext( TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections) { +this(conf, rpcHandler, closeIdleConnections, false); + } + + public TransportContext( + TransportConf conf, + RpcHandler rpcHandler, + boolean closeIdleConnections, + boolean isClient) { --- End diff -- we should make this more clear. Because with external shuffle off, we want this on client mode as well since its really both a client and server. Perhaps we could change name to be isClientOnly, we should put some java docs on the function to describe the parameter as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218925291 --- Diff: common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network; + +import io.netty.channel.ChannelHandlerContext; +import java.util.ArrayList; +import java.util.List; + +import io.netty.channel.Channel; +import org.apache.spark.network.server.ChunkFetchRequestHandler; +import org.junit.Test; + +import static org.mockito.Mockito.*; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.protocol.*; +import org.apache.spark.network.server.NoOpRpcHandler; +import org.apache.spark.network.server.OneForOneStreamManager; +import org.apache.spark.network.server.RpcHandler; + +public class ChunkFetchRequestHandlerSuite { + + @Test + public void handleChunkFetchRequest() throws Exception { +RpcHandler rpcHandler = new NoOpRpcHandler(); +OneForOneStreamManager streamManager = (OneForOneStreamManager) (rpcHandler.getStreamManager()); +Channel channel = mock(Channel.class); +ChannelHandlerContext context = mock(ChannelHandlerContext.class); +when(context.channel()) +.thenAnswer(invocationOnMock0 -> { --- End diff -- change spacing to 2 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218924768 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.server; + +import java.net.SocketAddress; + +import com.google.common.base.Throwables; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.protocol.ChunkFetchFailure; +import org.apache.spark.network.protocol.ChunkFetchRequest; +import org.apache.spark.network.protocol.ChunkFetchSuccess; +import org.apache.spark.network.protocol.Encodable; + +import static org.apache.spark.network.util.NettyUtils.*; + + --- End diff -- remove extra empty line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218924588 --- Diff: common/network-common/src/main/java/org/apache/spark/network/TransportContext.java --- @@ -173,5 +213,14 @@ private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler conf.connectionTimeoutMs(), closeIdleConnections); } + /** + * Creates the dedicated ChannelHandler for ChunkFetchRequest messages. + */ + private ChunkFetchRequestHandler createChunkFetchHandler(TransportChannelHandler channelHandler, + RpcHandler rpcHandler) { +return new ChunkFetchRequestHandler(channelHandler.getClient(), +rpcHandler.getStreamManager(), conf.maxChunksBeingTransferred()); --- End diff -- identation 2 spaces inside return --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218874415 --- Diff: common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java --- @@ -281,4 +284,44 @@ public Properties cryptoConf() { public long maxChunksBeingTransferred() { return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE); } + + /** + * Check if it is a shuffle client + * and avoid creating unnecessary event loops + * in the TransportChannelHandler + */ + public boolean shuffleClient() { --- End diff -- I think since this is so specialized and really only used from one place (the external shuffle service client) perhaps putting it here isn't that useful. I think if we add a new constructor to the TransportContext with that optional parameter it might make more sense --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218812564 --- Diff: common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java --- @@ -281,4 +284,44 @@ public Properties cryptoConf() { public long maxChunksBeingTransferred() { return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE); } + + /** + * Check if it is a shuffle client + * and avoid creating unnecessary event loops + * in the TransportChannelHandler + */ + public boolean shuffleClient() { +return this.isShuffleClient; + } + + public void setShuffleClient(boolean isShuffleClient) { +this.isShuffleClient = isShuffleClient; + } + + /** + * Percentage of io.serverThreads used by netty to process ChunkFetchRequest. + * Shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages. + * Although when calling the async writeAndFlush on the underlying channel to send + * response back to client, the I/O on the channel is still being handled by + * {@link org.apache.spark.network.server.TransportServer}'s default EventLoopGroup + * that's registered with the Channel, by waiting inside the ChunkFetchRequest handler + * threads for the completion of sending back responses, we are able to put a limit on + * the max number of threads from TransportServer's default EventLoopGroup that are + * going to be consumed by writing response to ChunkFetchRequest, which are I/O intensive + * and could take long time to process due to disk contentions. By configuring a slightly + * higher number of shuffler server threads, we are able to reserve some threads for + * handling other RPC messages, thus making the Client less likely to experience timeout + * when sending RPC messages to the shuffle server. Default to 0, which is 2*#cores + * or io.serverThreads. 90 would mean 90% of 2*#cores or 90% of io.serverThreads + * which equals 0.9 * 2*#cores or 0.9 * io.serverThreads. + */ + public int chunkFetchHandlerThreads() { +if (!this.getModuleName().equalsIgnoreCase("shuffle")) { + return 0; +} +int chunkFetchHandlerThreadsPercent = + conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0); +return this.serverThreads() > 0 ? (this.serverThreads() * chunkFetchHandlerThreadsPercent)/100: +(2 * NettyRuntime.availableProcessors() * chunkFetchHandlerThreadsPercent)/100; --- End diff -- fix indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218812503 --- Diff: common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java --- @@ -281,4 +284,44 @@ public Properties cryptoConf() { public long maxChunksBeingTransferred() { return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE); } + + /** + * Check if it is a shuffle client + * and avoid creating unnecessary event loops + * in the TransportChannelHandler + */ + public boolean shuffleClient() { +return this.isShuffleClient; + } + + public void setShuffleClient(boolean isShuffleClient) { +this.isShuffleClient = isShuffleClient; + } + + /** + * Percentage of io.serverThreads used by netty to process ChunkFetchRequest. + * Shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages. + * Although when calling the async writeAndFlush on the underlying channel to send + * response back to client, the I/O on the channel is still being handled by + * {@link org.apache.spark.network.server.TransportServer}'s default EventLoopGroup + * that's registered with the Channel, by waiting inside the ChunkFetchRequest handler + * threads for the completion of sending back responses, we are able to put a limit on + * the max number of threads from TransportServer's default EventLoopGroup that are + * going to be consumed by writing response to ChunkFetchRequest, which are I/O intensive + * and could take long time to process due to disk contentions. By configuring a slightly + * higher number of shuffler server threads, we are able to reserve some threads for + * handling other RPC messages, thus making the Client less likely to experience timeout + * when sending RPC messages to the shuffle server. Default to 0, which is 2*#cores + * or io.serverThreads. 90 would mean 90% of 2*#cores or 90% of io.serverThreads + * which equals 0.9 * 2*#cores or 0.9 * io.serverThreads. + */ + public int chunkFetchHandlerThreads() { +if (!this.getModuleName().equalsIgnoreCase("shuffle")) { + return 0; +} +int chunkFetchHandlerThreadsPercent = + conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0); --- End diff -- fix indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218811827 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java --- @@ -32,7 +41,6 @@ import org.apache.spark.network.client.*; import org.apache.spark.network.protocol.*; import org.apache.spark.network.util.TransportFrameDecoder; - --- End diff -- put extra line back to keep separation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218811721 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java --- @@ -24,6 +24,15 @@ import com.google.common.base.Throwables; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; + +import org.apache.spark.network.protocol.OneWayMessage; +import org.apache.spark.network.protocol.RpcFailure; +import org.apache.spark.network.protocol.RpcRequest; --- End diff -- these don't seem like should be necessary? and in wrong location --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r21885 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.server; + +import java.net.SocketAddress; + +import com.google.common.base.Throwables; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.protocol.ChunkFetchFailure; +import org.apache.spark.network.protocol.ChunkFetchRequest; +import org.apache.spark.network.protocol.ChunkFetchSuccess; +import org.apache.spark.network.protocol.Encodable; + +import static org.apache.spark.network.util.NettyUtils.*; + + +/** + * A dedicated ChannelHandler for processing ChunkFetchRequest messages. When sending response + * of ChunkFetchRequest messages to the clients, the thread performing the I/O on the underlying + * channel could potentially be blocked due to disk contentions. If several hundreds of clients + * send ChunkFetchRequest to the server at the same time, it could potentially occupying all + * threads from TransportServer's default EventLoopGroup for waiting for disk reads before it + * can send the block data back to the client as part of the ChunkFetchSuccess messages. As a + * result, it would leave no threads left to process other RPC messages, which takes much less + * time to process, and could lead to client timing out on either performing SASL authentication, + * registering executors, or waiting for response for an OpenBlocks messages. + */ +public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class); + + private final TransportClient client; + private final StreamManager streamManager; + /** The max number of chunks being transferred and not finished yet. */ + private final long maxChunksBeingTransferred; + + public ChunkFetchRequestHandler( + TransportClient client, + StreamManager streamManager, + Long maxChunksBeingTransferred) { +this.client = client; +this.streamManager = streamManager; +this.maxChunksBeingTransferred = maxChunksBeingTransferred; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), cause); +ctx.close(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, final ChunkFetchRequest msg) +throws Exception { +Channel channel = ctx.channel(); +if (logger.isTraceEnabled()) { + logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), +msg.streamChunkId); +} +long chunksBeingTransferred = streamManager.chunksBeingTransferred(); +if (chunksBeingTransferred >= maxChunksBeingTransferred) { + logger.warn("The number of chunks being transferred {} is above {}, close the connection.", +chunksBeingTransferred, maxChunksBeingTransferred); + channel.close(); + return; +} +ManagedBuffer buf; +try { + streamManager.checkAuthorization(client, msg.streamChunkId.streamId); + streamManager.registerChannel(channel, msg.streamChunkId.streamId); + buf = streamManager.getChunk(msg.streamChunkId.streamId,
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218807868 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.server; + +import java.net.SocketAddress; + +import com.google.common.base.Throwables; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.protocol.ChunkFetchFailure; +import org.apache.spark.network.protocol.ChunkFetchRequest; +import org.apache.spark.network.protocol.ChunkFetchSuccess; +import org.apache.spark.network.protocol.Encodable; + +import static org.apache.spark.network.util.NettyUtils.*; + + +/** + * A dedicated ChannelHandler for processing ChunkFetchRequest messages. When sending response + * of ChunkFetchRequest messages to the clients, the thread performing the I/O on the underlying + * channel could potentially be blocked due to disk contentions. If several hundreds of clients + * send ChunkFetchRequest to the server at the same time, it could potentially occupying all + * threads from TransportServer's default EventLoopGroup for waiting for disk reads before it + * can send the block data back to the client as part of the ChunkFetchSuccess messages. As a + * result, it would leave no threads left to process other RPC messages, which takes much less + * time to process, and could lead to client timing out on either performing SASL authentication, + * registering executors, or waiting for response for an OpenBlocks messages. + */ +public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class); + + private final TransportClient client; + private final StreamManager streamManager; + /** The max number of chunks being transferred and not finished yet. */ + private final long maxChunksBeingTransferred; + + public ChunkFetchRequestHandler( + TransportClient client, + StreamManager streamManager, + Long maxChunksBeingTransferred) { +this.client = client; +this.streamManager = streamManager; +this.maxChunksBeingTransferred = maxChunksBeingTransferred; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), cause); +ctx.close(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, final ChunkFetchRequest msg) +throws Exception { --- End diff -- indent this 2 more spaces --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218621553 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.server; + +import java.net.SocketAddress; + +import com.google.common.base.Throwables; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.protocol.ChunkFetchFailure; +import org.apache.spark.network.protocol.ChunkFetchRequest; +import org.apache.spark.network.protocol.ChunkFetchSuccess; +import org.apache.spark.network.protocol.Encodable; + +import static org.apache.spark.network.util.NettyUtils.*; + + +/** + * A dedicated ChannelHandler for processing ChunkFetchRequest messages. When sending response + * of ChunkFetchRequest messages to the clients, the thread performing the I/O on the underlying + * channel could potentially be blocked due to disk contentions. If several hundreds of clients + * send ChunkFetchRequest to the server at the same time, it could potentially occupying all + * threads from TransportServer's default EventLoopGroup for waiting for disk reads before it + * can send the block data back to the client as part of the ChunkFetchSuccess messages. As a + * result, it would leave no threads left to process other RPC messages, which takes much less + * time to process, and could lead to client timing out on either performing SASL authentication, + * registering executors, or waiting for response for an OpenBlocks messages. + */ +public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class); + + private final TransportClient client; + private final StreamManager streamManager; + /** The max number of chunks being transferred and not finished yet. */ + private final long maxChunksBeingTransferred; + + public ChunkFetchRequestHandler( + TransportClient client, + StreamManager streamManager, + Long maxChunksBeingTransferred) { +this.client = client; +this.streamManager = streamManager; +this.maxChunksBeingTransferred = maxChunksBeingTransferred; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), +cause); +ctx.close(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, + final ChunkFetchRequest msg) throws Exception { +Channel channel = ctx.channel(); +if (logger.isTraceEnabled()) { + logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), + msg.streamChunkId); +} +long chunksBeingTransferred = streamManager.chunksBeingTransferred(); +if (chunksBeingTransferred >= maxChunksBeingTransferred) { + logger.warn("The number of chunks being transferred {} is above {}, close the connection.", + chunksBeingTransferred, maxChunksBeingTransferred); + channel.close(); + return; +} +ManagedBuffer buf; +try { + streamManager.checkAuthorization(client, msg.streamChunkId.streamId); + streamManager.registerChannel(channel, msg.streamChunkId.streamId); + buf =
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218590278 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.server; + +import java.net.SocketAddress; + +import com.google.common.base.Throwables; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.protocol.ChunkFetchFailure; +import org.apache.spark.network.protocol.ChunkFetchRequest; +import org.apache.spark.network.protocol.ChunkFetchSuccess; +import org.apache.spark.network.protocol.Encodable; + +import static org.apache.spark.network.util.NettyUtils.*; + + +/** + * A dedicated ChannelHandler for processing ChunkFetchRequest messages. When sending response + * of ChunkFetchRequest messages to the clients, the thread performing the I/O on the underlying + * channel could potentially be blocked due to disk contentions. If several hundreds of clients + * send ChunkFetchRequest to the server at the same time, it could potentially occupying all + * threads from TransportServer's default EventLoopGroup for waiting for disk reads before it + * can send the block data back to the client as part of the ChunkFetchSuccess messages. As a + * result, it would leave no threads left to process other RPC messages, which takes much less + * time to process, and could lead to client timing out on either performing SASL authentication, + * registering executors, or waiting for response for an OpenBlocks messages. + */ +public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class); + + private final TransportClient client; + private final StreamManager streamManager; + /** The max number of chunks being transferred and not finished yet. */ + private final long maxChunksBeingTransferred; + + public ChunkFetchRequestHandler( + TransportClient client, + StreamManager streamManager, + Long maxChunksBeingTransferred) { +this.client = client; +this.streamManager = streamManager; +this.maxChunksBeingTransferred = maxChunksBeingTransferred; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), +cause); +ctx.close(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, + final ChunkFetchRequest msg) throws Exception { +Channel channel = ctx.channel(); +if (logger.isTraceEnabled()) { + logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), + msg.streamChunkId); +} +long chunksBeingTransferred = streamManager.chunksBeingTransferred(); +if (chunksBeingTransferred >= maxChunksBeingTransferred) { + logger.warn("The number of chunks being transferred {} is above {}, close the connection.", + chunksBeingTransferred, maxChunksBeingTransferred); + channel.close(); + return; +} +ManagedBuffer buf; +try { + streamManager.checkAuthorization(client, msg.streamChunkId.streamId); + streamManager.registerChannel(channel, msg.streamChunkId.streamId); + buf =
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218562181 --- Diff: common/network-common/src/main/java/org/apache/spark/network/TransportContext.java --- @@ -88,6 +97,16 @@ public TransportContext( this.conf = conf; this.rpcHandler = rpcHandler; this.closeIdleConnections = closeIdleConnections; + +synchronized(this.getClass()) { + if (chunkFetchWorkers == null && conf.getModuleName() != null && + conf.getModuleName().equalsIgnoreCase("shuffle")) { +chunkFetchWorkers = NettyUtils.createEventLoop( +IOMode.valueOf(conf.ioMode()), +conf.chunkFetchHandlerThreads(), +"chunk-fetch-handler"); --- End diff -- like you mention if we can not create the event loop when on the client side that would be best --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218559203 --- Diff: common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java --- @@ -281,4 +282,31 @@ public Properties cryptoConf() { public long maxChunksBeingTransferred() { return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE); } + + /** + * Percentage of io.serverThreads used by netty to process ChunkFetchRequest. + * Shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages. + * Although when calling the async writeAndFlush on the underlying channel to send + * response back to client, the I/O on the channel is still being handled by + * {@link org.apache.spark.network.server.TransportServer}'s default EventLoopGroup + * that's registered with the Channel, by waiting inside the ChunkFetchRequest handler + * threads for the completion of sending back responses, we are able to put a limit on + * the max number of threads from TransportServer's default EventLoopGroup that are + * going to be consumed by writing response to ChunkFetchRequest, which are I/O intensive + * and could take long time to process due to disk contentions. By configuring a slightly + * higher number of shuffler server threads, we are able to reserve some threads for + * handling other RPC messages, thus making the Client less likely to experience timeout + * when sending RPC messages to the shuffle server. Default to 0, which is 2*#cores + * or io.serverThreads. 90 would mean 90% of 2*#cores or 90% of io.serverThreads + * which equals 0.9 * 2*#cores or 0.9 * io.serverThreads. + */ + public int chunkFetchHandlerThreads() { +if (!this.getModuleName().equalsIgnoreCase("shuffle")) { + return 0; +} +int chunkFetchHandlerThreadsPercent = + conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0); --- End diff -- Yes it is documented above... if it is 0 or 100 it is 2*#cores or io.serverThreads --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218508374 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.server; + +import java.net.SocketAddress; + +import com.google.common.base.Throwables; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.protocol.ChunkFetchFailure; +import org.apache.spark.network.protocol.ChunkFetchRequest; +import org.apache.spark.network.protocol.ChunkFetchSuccess; +import org.apache.spark.network.protocol.Encodable; + +import static org.apache.spark.network.util.NettyUtils.*; + + +/** + * A dedicated ChannelHandler for processing ChunkFetchRequest messages. When sending response + * of ChunkFetchRequest messages to the clients, the thread performing the I/O on the underlying + * channel could potentially be blocked due to disk contentions. If several hundreds of clients + * send ChunkFetchRequest to the server at the same time, it could potentially occupying all + * threads from TransportServer's default EventLoopGroup for waiting for disk reads before it + * can send the block data back to the client as part of the ChunkFetchSuccess messages. As a + * result, it would leave no threads left to process other RPC messages, which takes much less + * time to process, and could lead to client timing out on either performing SASL authentication, + * registering executors, or waiting for response for an OpenBlocks messages. + */ +public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class); + + private final TransportClient client; + private final StreamManager streamManager; + /** The max number of chunks being transferred and not finished yet. */ + private final long maxChunksBeingTransferred; + + public ChunkFetchRequestHandler( + TransportClient client, + StreamManager streamManager, + Long maxChunksBeingTransferred) { +this.client = client; +this.streamManager = streamManager; +this.maxChunksBeingTransferred = maxChunksBeingTransferred; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), +cause); +ctx.close(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, + final ChunkFetchRequest msg) throws Exception { +Channel channel = ctx.channel(); +if (logger.isTraceEnabled()) { + logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), + msg.streamChunkId); +} +long chunksBeingTransferred = streamManager.chunksBeingTransferred(); +if (chunksBeingTransferred >= maxChunksBeingTransferred) { + logger.warn("The number of chunks being transferred {} is above {}, close the connection.", + chunksBeingTransferred, maxChunksBeingTransferred); + channel.close(); + return; +} +ManagedBuffer buf; +try { + streamManager.checkAuthorization(client, msg.streamChunkId.streamId); + streamManager.registerChannel(channel, msg.streamChunkId.streamId); + buf =
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218466733 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.server; + +import java.net.SocketAddress; + +import com.google.common.base.Throwables; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.protocol.ChunkFetchFailure; +import org.apache.spark.network.protocol.ChunkFetchRequest; +import org.apache.spark.network.protocol.ChunkFetchSuccess; +import org.apache.spark.network.protocol.Encodable; + +import static org.apache.spark.network.util.NettyUtils.*; + + +/** + * A dedicated ChannelHandler for processing ChunkFetchRequest messages. When sending response + * of ChunkFetchRequest messages to the clients, the thread performing the I/O on the underlying + * channel could potentially be blocked due to disk contentions. If several hundreds of clients + * send ChunkFetchRequest to the server at the same time, it could potentially occupying all + * threads from TransportServer's default EventLoopGroup for waiting for disk reads before it + * can send the block data back to the client as part of the ChunkFetchSuccess messages. As a + * result, it would leave no threads left to process other RPC messages, which takes much less + * time to process, and could lead to client timing out on either performing SASL authentication, + * registering executors, or waiting for response for an OpenBlocks messages. + */ +public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class); + + private final TransportClient client; + private final StreamManager streamManager; + /** The max number of chunks being transferred and not finished yet. */ + private final long maxChunksBeingTransferred; + + public ChunkFetchRequestHandler( + TransportClient client, + StreamManager streamManager, + Long maxChunksBeingTransferred) { +this.client = client; +this.streamManager = streamManager; +this.maxChunksBeingTransferred = maxChunksBeingTransferred; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), +cause); --- End diff -- this can actually be unwrapped here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218533001 --- Diff: common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java --- @@ -281,4 +282,31 @@ public Properties cryptoConf() { public long maxChunksBeingTransferred() { return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE); } + + /** + * Percentage of io.serverThreads used by netty to process ChunkFetchRequest. + * Shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages. + * Although when calling the async writeAndFlush on the underlying channel to send + * response back to client, the I/O on the channel is still being handled by + * {@link org.apache.spark.network.server.TransportServer}'s default EventLoopGroup + * that's registered with the Channel, by waiting inside the ChunkFetchRequest handler + * threads for the completion of sending back responses, we are able to put a limit on + * the max number of threads from TransportServer's default EventLoopGroup that are + * going to be consumed by writing response to ChunkFetchRequest, which are I/O intensive + * and could take long time to process due to disk contentions. By configuring a slightly + * higher number of shuffler server threads, we are able to reserve some threads for + * handling other RPC messages, thus making the Client less likely to experience timeout + * when sending RPC messages to the shuffle server. Default to 0, which is 2*#cores + * or io.serverThreads. 90 would mean 90% of 2*#cores or 90% of io.serverThreads + * which equals 0.9 * 2*#cores or 0.9 * io.serverThreads. + */ + public int chunkFetchHandlerThreads() { +if (!this.getModuleName().equalsIgnoreCase("shuffle")) { + return 0; +} +int chunkFetchHandlerThreadsPercent = + conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0); --- End diff -- fix spacing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218465153 --- Diff: common/network-common/src/main/java/org/apache/spark/network/TransportContext.java --- @@ -88,6 +97,16 @@ public TransportContext( this.conf = conf; this.rpcHandler = rpcHandler; this.closeIdleConnections = closeIdleConnections; + +synchronized(this.getClass()) { + if (chunkFetchWorkers == null && conf.getModuleName() != null && + conf.getModuleName().equalsIgnoreCase("shuffle")) { +chunkFetchWorkers = NettyUtils.createEventLoop( +IOMode.valueOf(conf.ioMode()), +conf.chunkFetchHandlerThreads(), +"chunk-fetch-handler"); --- End diff -- for consistency perhaps name thread shuffle-chunk-fetch-handler --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218468244 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.server; + +import java.net.SocketAddress; + +import com.google.common.base.Throwables; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.protocol.ChunkFetchFailure; +import org.apache.spark.network.protocol.ChunkFetchRequest; +import org.apache.spark.network.protocol.ChunkFetchSuccess; +import org.apache.spark.network.protocol.Encodable; + +import static org.apache.spark.network.util.NettyUtils.*; + + +/** + * A dedicated ChannelHandler for processing ChunkFetchRequest messages. When sending response + * of ChunkFetchRequest messages to the clients, the thread performing the I/O on the underlying + * channel could potentially be blocked due to disk contentions. If several hundreds of clients + * send ChunkFetchRequest to the server at the same time, it could potentially occupying all + * threads from TransportServer's default EventLoopGroup for waiting for disk reads before it + * can send the block data back to the client as part of the ChunkFetchSuccess messages. As a + * result, it would leave no threads left to process other RPC messages, which takes much less + * time to process, and could lead to client timing out on either performing SASL authentication, + * registering executors, or waiting for response for an OpenBlocks messages. + */ +public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class); + + private final TransportClient client; + private final StreamManager streamManager; + /** The max number of chunks being transferred and not finished yet. */ + private final long maxChunksBeingTransferred; + + public ChunkFetchRequestHandler( + TransportClient client, + StreamManager streamManager, + Long maxChunksBeingTransferred) { +this.client = client; +this.streamManager = streamManager; +this.maxChunksBeingTransferred = maxChunksBeingTransferred; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), +cause); +ctx.close(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, + final ChunkFetchRequest msg) throws Exception { +Channel channel = ctx.channel(); +if (logger.isTraceEnabled()) { + logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), + msg.streamChunkId); +} +long chunksBeingTransferred = streamManager.chunksBeingTransferred(); +if (chunksBeingTransferred >= maxChunksBeingTransferred) { + logger.warn("The number of chunks being transferred {} is above {}, close the connection.", + chunksBeingTransferred, maxChunksBeingTransferred); + channel.close(); + return; +} +ManagedBuffer buf; +try { + streamManager.checkAuthorization(client, msg.streamChunkId.streamId); + streamManager.registerChannel(channel, msg.streamChunkId.streamId); + buf =
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218533702 --- Diff: common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java --- @@ -281,4 +282,31 @@ public Properties cryptoConf() { public long maxChunksBeingTransferred() { return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE); } + + /** + * Percentage of io.serverThreads used by netty to process ChunkFetchRequest. + * Shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages. + * Although when calling the async writeAndFlush on the underlying channel to send + * response back to client, the I/O on the channel is still being handled by + * {@link org.apache.spark.network.server.TransportServer}'s default EventLoopGroup + * that's registered with the Channel, by waiting inside the ChunkFetchRequest handler + * threads for the completion of sending back responses, we are able to put a limit on + * the max number of threads from TransportServer's default EventLoopGroup that are + * going to be consumed by writing response to ChunkFetchRequest, which are I/O intensive + * and could take long time to process due to disk contentions. By configuring a slightly + * higher number of shuffler server threads, we are able to reserve some threads for + * handling other RPC messages, thus making the Client less likely to experience timeout + * when sending RPC messages to the shuffle server. Default to 0, which is 2*#cores + * or io.serverThreads. 90 would mean 90% of 2*#cores or 90% of io.serverThreads + * which equals 0.9 * 2*#cores or 0.9 * io.serverThreads. + */ + public int chunkFetchHandlerThreads() { +if (!this.getModuleName().equalsIgnoreCase("shuffle")) { + return 0; +} +int chunkFetchHandlerThreadsPercent = + conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0); --- End diff -- what happens if this is actually 0? you are going to get 0 chunkFetchHandlerThreads so what does the event loop group do with that? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218462401 --- Diff: common/network-common/src/main/java/org/apache/spark/network/TransportContext.java --- @@ -88,6 +97,16 @@ public TransportContext( this.conf = conf; this.rpcHandler = rpcHandler; this.closeIdleConnections = closeIdleConnections; + +synchronized(this.getClass()) { --- End diff -- I think synchronized(this.getClass()) is not recommended due to it not handling inheritance and such. Use synchronized(TransportContext.class) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218467664 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.server; + +import java.net.SocketAddress; + +import com.google.common.base.Throwables; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.protocol.ChunkFetchFailure; +import org.apache.spark.network.protocol.ChunkFetchRequest; +import org.apache.spark.network.protocol.ChunkFetchSuccess; +import org.apache.spark.network.protocol.Encodable; + +import static org.apache.spark.network.util.NettyUtils.*; + + +/** + * A dedicated ChannelHandler for processing ChunkFetchRequest messages. When sending response + * of ChunkFetchRequest messages to the clients, the thread performing the I/O on the underlying + * channel could potentially be blocked due to disk contentions. If several hundreds of clients + * send ChunkFetchRequest to the server at the same time, it could potentially occupying all + * threads from TransportServer's default EventLoopGroup for waiting for disk reads before it + * can send the block data back to the client as part of the ChunkFetchSuccess messages. As a + * result, it would leave no threads left to process other RPC messages, which takes much less + * time to process, and could lead to client timing out on either performing SASL authentication, + * registering executors, or waiting for response for an OpenBlocks messages. + */ +public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class); + + private final TransportClient client; + private final StreamManager streamManager; + /** The max number of chunks being transferred and not finished yet. */ + private final long maxChunksBeingTransferred; + + public ChunkFetchRequestHandler( + TransportClient client, + StreamManager streamManager, + Long maxChunksBeingTransferred) { +this.client = client; +this.streamManager = streamManager; +this.maxChunksBeingTransferred = maxChunksBeingTransferred; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), +cause); +ctx.close(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, + final ChunkFetchRequest msg) throws Exception { --- End diff -- fix wrapping --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218465817 --- Diff: common/network-common/src/main/java/org/apache/spark/network/TransportContext.java --- @@ -144,14 +163,21 @@ public TransportChannelHandler initializePipeline( RpcHandler channelRpcHandler) { try { TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler); - channel.pipeline() + ChunkFetchRequestHandler chunkFetchHandler = + createChunkFetchHandler(channelHandler, channelRpcHandler); --- End diff -- fix spacing, only indented 2 spaces --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218490559 --- Diff: common/network-common/src/main/java/org/apache/spark/network/TransportContext.java --- @@ -144,14 +163,21 @@ public TransportChannelHandler initializePipeline( RpcHandler channelRpcHandler) { try { TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler); - channel.pipeline() + ChunkFetchRequestHandler chunkFetchHandler = + createChunkFetchHandler(channelHandler, channelRpcHandler); + ChannelPipeline pipeline = channel.pipeline() .addLast("encoder", ENCODER) .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) .addLast("decoder", DECODER) -.addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000)) +.addLast("idleStateHandler", +new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000)) --- End diff -- fix indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218494508 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java --- @@ -24,15 +24,27 @@ import com.google.common.base.Throwables; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.buffer.NioManagedBuffer; -import org.apache.spark.network.client.*; -import org.apache.spark.network.protocol.*; +import org.apache.spark.network.client.RpcResponseCallback; --- End diff -- revert the imports to be .* --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218463768 --- Diff: common/network-common/src/main/java/org/apache/spark/network/TransportContext.java --- @@ -88,6 +97,16 @@ public TransportContext( this.conf = conf; this.rpcHandler = rpcHandler; this.closeIdleConnections = closeIdleConnections; + +synchronized(this.getClass()) { + if (chunkFetchWorkers == null && conf.getModuleName() != null && + conf.getModuleName().equalsIgnoreCase("shuffle")) { --- End diff -- fix spacing here, line up conf.getModuleName with the chunkFetchWorkers --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218466397 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.server; + +import java.net.SocketAddress; + +import com.google.common.base.Throwables; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.protocol.ChunkFetchFailure; +import org.apache.spark.network.protocol.ChunkFetchRequest; +import org.apache.spark.network.protocol.ChunkFetchSuccess; +import org.apache.spark.network.protocol.Encodable; + +import static org.apache.spark.network.util.NettyUtils.*; + + +/** + * A dedicated ChannelHandler for processing ChunkFetchRequest messages. When sending response + * of ChunkFetchRequest messages to the clients, the thread performing the I/O on the underlying + * channel could potentially be blocked due to disk contentions. If several hundreds of clients + * send ChunkFetchRequest to the server at the same time, it could potentially occupying all + * threads from TransportServer's default EventLoopGroup for waiting for disk reads before it + * can send the block data back to the client as part of the ChunkFetchSuccess messages. As a + * result, it would leave no threads left to process other RPC messages, which takes much less + * time to process, and could lead to client timing out on either performing SASL authentication, + * registering executors, or waiting for response for an OpenBlocks messages. + */ +public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class); + + private final TransportClient client; + private final StreamManager streamManager; + /** The max number of chunks being transferred and not finished yet. */ + private final long maxChunksBeingTransferred; + + public ChunkFetchRequestHandler( + TransportClient client, + StreamManager streamManager, + Long maxChunksBeingTransferred) { +this.client = client; +this.streamManager = streamManager; +this.maxChunksBeingTransferred = maxChunksBeingTransferred; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), +cause); --- End diff -- spacing 2, fix throughout the file --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218163618 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.server; + +import java.net.SocketAddress; + +import com.google.common.base.Throwables; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.protocol.ChunkFetchFailure; +import org.apache.spark.network.protocol.ChunkFetchRequest; +import org.apache.spark.network.protocol.ChunkFetchSuccess; +import org.apache.spark.network.protocol.Encodable; + +import static org.apache.spark.network.util.NettyUtils.*; + + +/** + * A dedicated ChannelHandler for processing ChunkFetchRequest messages. When sending response + * of ChunkFetchRequest messages to the clients, the thread performing the I/O on the underlying + * channel could potentially be blocked due to disk contentions. If several hundreds of clients + * send ChunkFetchRequest to the server at the same time, it could potentially occupying all + * threads from TransportServer's default EventLoopGroup for waiting for disk reads before it + * can send the block data back to the client as part of the ChunkFetchSuccess messages. As a + * result, it would leave no threads left to process other RPC messages, which takes much less + * time to process, and could lead to client timing out on either performing SASL authentication, + * registering executors, or waiting for response for an OpenBlocks messages. + */ +public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class); + + private final TransportClient client; + private final StreamManager streamManager; + /** The max number of chunks being transferred and not finished yet. */ + private final long maxChunksBeingTransferred; + + public ChunkFetchRequestHandler( + TransportClient client, + StreamManager streamManager, + Long maxChunksBeingTransferred) { +this.client = client; +this.streamManager = streamManager; +this.maxChunksBeingTransferred = maxChunksBeingTransferred; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), +cause); +ctx.close(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, + final ChunkFetchRequest msg) throws Exception { +Channel channel = ctx.channel(); +if (logger.isTraceEnabled()) { + logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), + msg.streamChunkId); +} +long chunksBeingTransferred = streamManager.chunksBeingTransferred(); +if (chunksBeingTransferred >= maxChunksBeingTransferred) { + logger.warn("The number of chunks being transferred {} is above {}, close the connection.", + chunksBeingTransferred, maxChunksBeingTransferred); + channel.close(); + return; +} +ManagedBuffer buf; +try { + streamManager.checkAuthorization(client, msg.streamChunkId.streamId); + streamManager.registerChannel(channel, msg.streamChunkId.streamId); + buf =
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r216074852 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.server; + +import java.net.SocketAddress; + +import com.google.common.base.Throwables; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.protocol.ChunkFetchFailure; +import org.apache.spark.network.protocol.ChunkFetchRequest; +import org.apache.spark.network.protocol.ChunkFetchSuccess; +import org.apache.spark.network.protocol.Encodable; + +import static org.apache.spark.network.util.NettyUtils.*; + + +/** + * A dedicated ChannelHandler for processing ChunkFetchRequest messages. When sending response + * of ChunkFetchRequest messages to the clients, the thread performing the I/O on the underlying + * channel could potentially be blocked due to disk contentions. If several hundreds of clients + * send ChunkFetchRequest to the server at the same time, it could potentially occupying all + * threads from TransportServer's default EventLoopGroup for waiting for disk reads before it + * can send the block data back to the client as part of the ChunkFetchSuccess messages. As a + * result, it would leave no threads left to process other RPC messages, which takes much less + * time to process, and could lead to client timing out on either performing SASL authentication, + * registering executors, or waiting for response for an OpenBlocks messages. + */ +public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class); + + private final TransportClient client; + private final StreamManager streamManager; + /** The max number of chunks being transferred and not finished yet. */ + private final long maxChunksBeingTransferred; + + public ChunkFetchRequestHandler( + TransportClient client, + StreamManager streamManager, + Long maxChunksBeingTransferred) { +this.client = client; +this.streamManager = streamManager; +this.maxChunksBeingTransferred = maxChunksBeingTransferred; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), +cause); +ctx.close(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, + final ChunkFetchRequest msg) throws Exception { +Channel channel = ctx.channel(); +if (logger.isTraceEnabled()) { + logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), + msg.streamChunkId); +} +long chunksBeingTransferred = streamManager.chunksBeingTransferred(); +if (chunksBeingTransferred >= maxChunksBeingTransferred) { + logger.warn("The number of chunks being transferred {} is above {}, close the connection.", + chunksBeingTransferred, maxChunksBeingTransferred); + channel.close(); + return; +} +ManagedBuffer buf; +try { + streamManager.checkAuthorization(client, msg.streamChunkId.streamId); + streamManager.registerChannel(channel, msg.streamChunkId.streamId); + buf =
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r216069578 --- Diff: common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java --- @@ -281,4 +282,31 @@ public Properties cryptoConf() { public long maxChunksBeingTransferred() { return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE); } + + /** + * Percentage of io.serverThreads used by netty to process ChunkFetchRequest. + * Shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages. + * Although when calling the async writeAndFlush on the underlying channel to send + * response back to client, the I/O on the channel is still being handled by + * {@link org.apache.spark.network.server.TransportServer}'s default EventLoopGroup + * that's registered with the Channel, by waiting inside the ChunkFetchRequest handler + * threads for the completion of sending back responses, we are able to put a limit on + * the max number of threads from TransportServer's default EventLoopGroup that are + * going to be consumed by writing response to ChunkFetchRequest, which are I/O intensive + * and could take long time to process due to disk contentions. By configuring a slightly + * higher number of shuffler server threads, we are able to reserve some threads for + * handling other RPC messages, thus making the Client less likely to experience timeout + * when sending RPC messages to the shuffle server. Default to 0, which is 2*#cores + * or io.serverThreads. 10 would mean 10% of 2*#cores or 10% of io.serverThreads + * which equals 0.1 * 2*#cores or 0.1 * io.serverThreads. + */ + public int chunkFetchHandlerThreads() { +if(!this.getModuleName().equalsIgnoreCase("shuffle")) { + return 0; +} +int chunkFetchHandlerThreadsPercent = + conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0); +return this.serverThreads() > 0? (this.serverThreads() * chunkFetchHandlerThreadsPercent)/100: --- End diff -- I think it is a good idea to document both as this is an important config. Let me know your thoughts --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r216068689 --- Diff: common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java --- @@ -281,4 +282,31 @@ public Properties cryptoConf() { public long maxChunksBeingTransferred() { return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE); } + + /** + * Percentage of io.serverThreads used by netty to process ChunkFetchRequest. + * Shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages. + * Although when calling the async writeAndFlush on the underlying channel to send + * response back to client, the I/O on the channel is still being handled by + * {@link org.apache.spark.network.server.TransportServer}'s default EventLoopGroup + * that's registered with the Channel, by waiting inside the ChunkFetchRequest handler + * threads for the completion of sending back responses, we are able to put a limit on + * the max number of threads from TransportServer's default EventLoopGroup that are + * going to be consumed by writing response to ChunkFetchRequest, which are I/O intensive + * and could take long time to process due to disk contentions. By configuring a slightly + * higher number of shuffler server threads, we are able to reserve some threads for + * handling other RPC messages, thus making the Client less likely to experience timeout + * when sending RPC messages to the shuffle server. Default to 0, which is 2*#cores + * or io.serverThreads. 10 would mean 10% of 2*#cores or 10% of io.serverThreads --- End diff -- No based on how many threads required for other rpc calls, i have not tested them, but the whole point would be to reduce the dependency how much time the chunkFetchedRequests will be spending doing disk I/O --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r216004740 --- Diff: common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java --- @@ -281,4 +282,31 @@ public Properties cryptoConf() { public long maxChunksBeingTransferred() { return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE); } + + /** + * Percentage of io.serverThreads used by netty to process ChunkFetchRequest. + * Shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages. + * Although when calling the async writeAndFlush on the underlying channel to send + * response back to client, the I/O on the channel is still being handled by + * {@link org.apache.spark.network.server.TransportServer}'s default EventLoopGroup + * that's registered with the Channel, by waiting inside the ChunkFetchRequest handler + * threads for the completion of sending back responses, we are able to put a limit on + * the max number of threads from TransportServer's default EventLoopGroup that are + * going to be consumed by writing response to ChunkFetchRequest, which are I/O intensive + * and could take long time to process due to disk contentions. By configuring a slightly + * higher number of shuffler server threads, we are able to reserve some threads for + * handling other RPC messages, thus making the Client less likely to experience timeout + * when sending RPC messages to the shuffle server. Default to 0, which is 2*#cores + * or io.serverThreads. 10 would mean 10% of 2*#cores or 10% of io.serverThreads + * which equals 0.1 * 2*#cores or 0.1 * io.serverThreads. + */ + public int chunkFetchHandlerThreads() { +if(!this.getModuleName().equalsIgnoreCase("shuffle")) { + return 0; +} +int chunkFetchHandlerThreadsPercent = + conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0); +return this.serverThreads() > 0? (this.serverThreads() * chunkFetchHandlerThreadsPercent)/100: --- End diff -- space between 0 and ?, and space between 100 and : --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r216063759 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.server; + +import java.net.SocketAddress; + +import com.google.common.base.Throwables; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.protocol.ChunkFetchFailure; +import org.apache.spark.network.protocol.ChunkFetchRequest; +import org.apache.spark.network.protocol.ChunkFetchSuccess; +import org.apache.spark.network.protocol.Encodable; + +import static org.apache.spark.network.util.NettyUtils.*; + + +/** + * A dedicated ChannelHandler for processing ChunkFetchRequest messages. When sending response + * of ChunkFetchRequest messages to the clients, the thread performing the I/O on the underlying + * channel could potentially be blocked due to disk contentions. If several hundreds of clients + * send ChunkFetchRequest to the server at the same time, it could potentially occupying all + * threads from TransportServer's default EventLoopGroup for waiting for disk reads before it + * can send the block data back to the client as part of the ChunkFetchSuccess messages. As a + * result, it would leave no threads left to process other RPC messages, which takes much less + * time to process, and could lead to client timing out on either performing SASL authentication, + * registering executors, or waiting for response for an OpenBlocks messages. + */ +public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class); + + private final TransportClient client; + private final StreamManager streamManager; + /** The max number of chunks being transferred and not finished yet. */ + private final long maxChunksBeingTransferred; + + public ChunkFetchRequestHandler( + TransportClient client, + StreamManager streamManager, + Long maxChunksBeingTransferred) { +this.client = client; +this.streamManager = streamManager; +this.maxChunksBeingTransferred = maxChunksBeingTransferred; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), +cause); +ctx.close(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, + final ChunkFetchRequest msg) throws Exception { +Channel channel = ctx.channel(); +if (logger.isTraceEnabled()) { + logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), + msg.streamChunkId); +} +long chunksBeingTransferred = streamManager.chunksBeingTransferred(); +if (chunksBeingTransferred >= maxChunksBeingTransferred) { + logger.warn("The number of chunks being transferred {} is above {}, close the connection.", + chunksBeingTransferred, maxChunksBeingTransferred); + channel.close(); + return; +} +ManagedBuffer buf; +try { + streamManager.checkAuthorization(client, msg.streamChunkId.streamId); + streamManager.registerChannel(channel, msg.streamChunkId.streamId); + buf =
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r216006120 --- Diff: common/network-common/src/test/java/org/apache/spark/network/ExtendedChannelPromise.java --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network; + +import java.util.ArrayList; +import java.util.List; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelPromise; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; + + --- End diff -- remove extra line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r216057933 --- Diff: common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java --- @@ -281,4 +282,31 @@ public Properties cryptoConf() { public long maxChunksBeingTransferred() { return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE); } + + /** + * Percentage of io.serverThreads used by netty to process ChunkFetchRequest. + * Shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages. + * Although when calling the async writeAndFlush on the underlying channel to send + * response back to client, the I/O on the channel is still being handled by + * {@link org.apache.spark.network.server.TransportServer}'s default EventLoopGroup + * that's registered with the Channel, by waiting inside the ChunkFetchRequest handler + * threads for the completion of sending back responses, we are able to put a limit on + * the max number of threads from TransportServer's default EventLoopGroup that are + * going to be consumed by writing response to ChunkFetchRequest, which are I/O intensive + * and could take long time to process due to disk contentions. By configuring a slightly + * higher number of shuffler server threads, we are able to reserve some threads for + * handling other RPC messages, thus making the Client less likely to experience timeout + * when sending RPC messages to the shuffle server. Default to 0, which is 2*#cores + * or io.serverThreads. 10 would mean 10% of 2*#cores or 10% of io.serverThreads + * which equals 0.1 * 2*#cores or 0.1 * io.serverThreads. + */ + public int chunkFetchHandlerThreads() { +if(!this.getModuleName().equalsIgnoreCase("shuffle")) { + return 0; +} +int chunkFetchHandlerThreadsPercent = + conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0); +return this.serverThreads() > 0? (this.serverThreads() * chunkFetchHandlerThreadsPercent)/100: --- End diff -- I assume we aren't documenting chunkFetchHandlerThreadsPercent since the serverthreads config isn't documented --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r216005883 --- Diff: common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java --- @@ -281,4 +282,31 @@ public Properties cryptoConf() { public long maxChunksBeingTransferred() { return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE); } + + /** + * Percentage of io.serverThreads used by netty to process ChunkFetchRequest. + * Shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages. + * Although when calling the async writeAndFlush on the underlying channel to send + * response back to client, the I/O on the channel is still being handled by + * {@link org.apache.spark.network.server.TransportServer}'s default EventLoopGroup + * that's registered with the Channel, by waiting inside the ChunkFetchRequest handler + * threads for the completion of sending back responses, we are able to put a limit on + * the max number of threads from TransportServer's default EventLoopGroup that are + * going to be consumed by writing response to ChunkFetchRequest, which are I/O intensive + * and could take long time to process due to disk contentions. By configuring a slightly + * higher number of shuffler server threads, we are able to reserve some threads for + * handling other RPC messages, thus making the Client less likely to experience timeout + * when sending RPC messages to the shuffle server. Default to 0, which is 2*#cores + * or io.serverThreads. 10 would mean 10% of 2*#cores or 10% of io.serverThreads --- End diff -- I realize these are just examples but normally I would expect a user to have the threads processing chunk request much more then those doing other things so perhaps the example should be like 90%. Were any tests done to see how many threads were needed for the others, I would expect very few? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r216005916 --- Diff: common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java --- @@ -281,4 +282,31 @@ public Properties cryptoConf() { public long maxChunksBeingTransferred() { return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE); } + + /** + * Percentage of io.serverThreads used by netty to process ChunkFetchRequest. + * Shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages. + * Although when calling the async writeAndFlush on the underlying channel to send + * response back to client, the I/O on the channel is still being handled by + * {@link org.apache.spark.network.server.TransportServer}'s default EventLoopGroup + * that's registered with the Channel, by waiting inside the ChunkFetchRequest handler + * threads for the completion of sending back responses, we are able to put a limit on + * the max number of threads from TransportServer's default EventLoopGroup that are + * going to be consumed by writing response to ChunkFetchRequest, which are I/O intensive + * and could take long time to process due to disk contentions. By configuring a slightly + * higher number of shuffler server threads, we are able to reserve some threads for + * handling other RPC messages, thus making the Client less likely to experience timeout + * when sending RPC messages to the shuffle server. Default to 0, which is 2*#cores + * or io.serverThreads. 10 would mean 10% of 2*#cores or 10% of io.serverThreads + * which equals 0.1 * 2*#cores or 0.1 * io.serverThreads. + */ + public int chunkFetchHandlerThreads() { +if(!this.getModuleName().equalsIgnoreCase("shuffle")) { + return 0; +} +int chunkFetchHandlerThreadsPercent = + conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0); +return this.serverThreads() > 0? (this.serverThreads() * chunkFetchHandlerThreadsPercent)/100: +(2* NettyRuntime.availableProcessors() * chunkFetchHandlerThreadsPercent)/100; --- End diff -- space between 2 and * --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r216005137 --- Diff: common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java --- @@ -281,4 +282,31 @@ public Properties cryptoConf() { public long maxChunksBeingTransferred() { return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE); } + + /** + * Percentage of io.serverThreads used by netty to process ChunkFetchRequest. + * Shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages. + * Although when calling the async writeAndFlush on the underlying channel to send + * response back to client, the I/O on the channel is still being handled by + * {@link org.apache.spark.network.server.TransportServer}'s default EventLoopGroup + * that's registered with the Channel, by waiting inside the ChunkFetchRequest handler + * threads for the completion of sending back responses, we are able to put a limit on + * the max number of threads from TransportServer's default EventLoopGroup that are + * going to be consumed by writing response to ChunkFetchRequest, which are I/O intensive + * and could take long time to process due to disk contentions. By configuring a slightly + * higher number of shuffler server threads, we are able to reserve some threads for + * handling other RPC messages, thus making the Client less likely to experience timeout + * when sending RPC messages to the shuffle server. Default to 0, which is 2*#cores + * or io.serverThreads. 10 would mean 10% of 2*#cores or 10% of io.serverThreads + * which equals 0.1 * 2*#cores or 0.1 * io.serverThreads. + */ + public int chunkFetchHandlerThreads() { +if(!this.getModuleName().equalsIgnoreCase("shuffle")) { --- End diff -- space after if before ( --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r212639516 --- Diff: common/network-common/src/main/java/org/apache/spark/network/TransportContext.java --- @@ -144,14 +161,17 @@ public TransportChannelHandler initializePipeline( RpcHandler channelRpcHandler) { try { TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler); + ChunkFetchRequestHandler chunkFetchHandler = createChunkFetchHandler(channelHandler, channelRpcHandler); channel.pipeline() .addLast("encoder", ENCODER) .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) .addLast("decoder", DECODER) .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000)) // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this // would require more logic to guarantee if this were not part of the same event loop. -.addLast("handler", channelHandler); +.addLast("handler", channelHandler) +// Use a separate EventLoopGroup to handle ChunkFetchRequest messages. +.addLast(chunkFetchWorkers, "chunkFetchHandler", chunkFetchHandler); --- End diff -- yes i did notice that... makes sense --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r212070669 --- Diff: common/network-common/src/main/java/org/apache/spark/network/TransportContext.java --- @@ -144,14 +161,17 @@ public TransportChannelHandler initializePipeline( RpcHandler channelRpcHandler) { try { TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler); + ChunkFetchRequestHandler chunkFetchHandler = createChunkFetchHandler(channelHandler, channelRpcHandler); channel.pipeline() .addLast("encoder", ENCODER) .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) .addLast("decoder", DECODER) .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000)) // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this // would require more logic to guarantee if this were not part of the same event loop. -.addLast("handler", channelHandler); +.addLast("handler", channelHandler) +// Use a separate EventLoopGroup to handle ChunkFetchRequest messages. +.addLast(chunkFetchWorkers, "chunkFetchHandler", chunkFetchHandler); --- End diff -- Hmm... I think there is some waste here. Not all channels actually need the chunk fetch handler. Basically only the shuffle server (external or not) does. So for all other cases - RpcEnv server and clients, shuffle clients - you'd have this new thread pool just sitting there. Before you'd have the handler in the pipeline, but that's just a few bytes of memory being wasted. It would be good to avoid that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
GitHub user redsanket opened a pull request: https://github.com/apache/spark/pull/22173 [SPARK-24335] Spark external shuffle server improvement to better handle block fetch requests. ## What changes were proposed in this pull request? This is a continuation PR from https://github.com/apache/spark/pull/21402 Since there is no activity, I am willing to take this over and made few minor changes and tested them. Adding the description from the earlier PR Description: Right now, the default server side netty handler threads is 2 * # cores, and can be further configured with parameter spark.shuffle.io.serverThreads. In order to process a client request, it would require one available server netty handler thread. However, when the server netty handler threads start to process ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk contentions from the random read operations initiated by all the ChunkFetchRequests received from clients. As a result, when the shuffle server is serving many concurrent ChunkFetchRequests, the server side netty handler threads could all be blocked on reading shuffle files, thus leaving no handler thread available to process other types of requests which should all be very quick to process. This issue could potentially be fixed by limiting the number of netty handler threads that could get blocked when processing ChunkFetchRequest. We have a patch to do this by using a separate EventLoopGroup with a dedicated ChannelHandler to process ChunkFetchRequest. This enables shuffle server to reserve netty handler threads for non-ChunkFetchRequest, thus enabling consistent processing time for these requests which are fast to process. After deploying the patch in our infrastructure, we no longer see timeout issues with either executor registration with local shuffle server or shuffle client establishing connection with remote shuffle server. (Please fill in changes proposed in this fix) ## How was this patch tested? Unit tests and stress testing. (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/redsanket/spark SPARK-24335 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22173.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22173 commit 44bb55759a4059d8bb0e60c361a8a3210a234f92 Author: Sanket Chintapalli Date: 2018-08-21T17:34:31Z SPARK-24355 Spark external shuffle server improvement to better handle block fetch requests. commit 3bab74ca84fe1b6682000741b958c8792f792472 Author: Sanket Chintapalli Date: 2018-08-21T16:49:50Z make chunk fetch handler threads as a percentage of transport server threads --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org