Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22173#discussion_r218807868
  
    --- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java
 ---
    @@ -0,0 +1,134 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.server;
    +
    +import java.net.SocketAddress;
    +
    +import com.google.common.base.Throwables;
    +import io.netty.channel.Channel;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.channel.ChannelFutureListener;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.SimpleChannelInboundHandler;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.spark.network.buffer.ManagedBuffer;
    +import org.apache.spark.network.client.TransportClient;
    +import org.apache.spark.network.protocol.ChunkFetchFailure;
    +import org.apache.spark.network.protocol.ChunkFetchRequest;
    +import org.apache.spark.network.protocol.ChunkFetchSuccess;
    +import org.apache.spark.network.protocol.Encodable;
    +
    +import static org.apache.spark.network.util.NettyUtils.*;
    +
    +
    +/**
    + * A dedicated ChannelHandler for processing ChunkFetchRequest messages. 
When sending response
    + * of ChunkFetchRequest messages to the clients, the thread performing the 
I/O on the underlying
    + * channel could potentially be blocked due to disk contentions. If 
several hundreds of clients
    + * send ChunkFetchRequest to the server at the same time, it could 
potentially occupying all
    + * threads from TransportServer's default EventLoopGroup for waiting for 
disk reads before it
    + * can send the block data back to the client as part of the 
ChunkFetchSuccess messages. As a
    + * result, it would leave no threads left to process other RPC messages, 
which takes much less
    + * time to process, and could lead to client timing out on either 
performing SASL authentication,
    + * registering executors, or waiting for response for an OpenBlocks 
messages.
    + */
    +public class ChunkFetchRequestHandler extends 
SimpleChannelInboundHandler<ChunkFetchRequest> {
    +  private static final Logger logger = 
LoggerFactory.getLogger(ChunkFetchRequestHandler.class);
    +
    +  private final TransportClient client;
    +  private final StreamManager streamManager;
    +  /** The max number of chunks being transferred and not finished yet. */
    +  private final long maxChunksBeingTransferred;
    +
    +  public ChunkFetchRequestHandler(
    +      TransportClient client,
    +      StreamManager streamManager,
    +      Long maxChunksBeingTransferred) {
    +    this.client = client;
    +    this.streamManager = streamManager;
    +    this.maxChunksBeingTransferred = maxChunksBeingTransferred;
    +  }
    +
    +  @Override
    +  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
    +    logger.warn("Exception in connection from " + 
getRemoteAddress(ctx.channel()), cause);
    +    ctx.close();
    +  }
    +
    +  @Override
    +  protected void channelRead0(ChannelHandlerContext ctx, final 
ChunkFetchRequest msg)
    +    throws Exception {
    --- End diff --
    
    indent this 2 more spaces


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to