cloud-fan commented on a change in pull request #27665: [SPARK-30623][Core]
Spark external shuffle allow disable of separate event loop group
URL: https://github.com/apache/spark/pull/27665#discussion_r396410407
##########
File path:
common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
##########
@@ -107,31 +85,10 @@ protected void channelRead0(
(ChannelFutureListener) future ->
streamManager.chunkSent(msg.streamChunkId.streamId));
}
- /**
- * The invocation to channel.writeAndFlush is async, and the actual I/O on
the
- * channel will be handled by the EventLoop the channel is registered to. So
even
- * though we are processing the ChunkFetchRequest in a separate thread pool,
the actual I/O,
- * which is the potentially blocking call that could deplete server handler
threads, is still
- * being processed by TransportServer's default EventLoopGroup. In order to
throttle the max
- * number of threads that channel I/O for sending response to
ChunkFetchRequest, the thread
- * calling channel.writeAndFlush will wait for the completion of sending
response back to
- * client by invoking await(). This will throttle the rate at which threads
from
- * ChunkFetchRequest dedicated EventLoopGroup submit channel I/O requests to
TransportServer's
- * default EventLoopGroup, thus making sure that we can reserve some threads
in
- * TransportServer's default EventLoopGroup for handling other RPC messages.
- */
- private ChannelFuture respond(
- final Channel channel,
- final Encodable result) throws InterruptedException {
- final SocketAddress remoteAddress = channel.remoteAddress();
- return
channel.writeAndFlush(result).await().addListener((ChannelFutureListener)
future -> {
- if (future.isSuccess()) {
- logger.trace("Sent result {} to client {}", result, remoteAddress);
- } else {
- logger.error(String.format("Error sending result %s to %s; closing
connection",
- result, remoteAddress), future.cause());
- channel.close();
- }
- });
- }
+ public abstract ChannelFuture respond(Channel channel, Encodable result)
throws Exception;
Review comment:
why we need to define this method?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]