[ 
https://issues.apache.org/jira/browse/MAPREDUCE-7431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17678706#comment-17678706
 ] 

ASF GitHub Bot commented on MAPREDUCE-7431:
-------------------------------------------

tomicooler commented on code in PR #5311:
URL: https://github.com/apache/hadoop/pull/5311#discussion_r1081262415


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelHandler.java:
##########
@@ -0,0 +1,715 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.CharsetUtil;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.URL;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
+import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
+import org.eclipse.jetty.http.HttpHeader;
+
+import static io.netty.buffer.Unpooled.wrappedBuffer;
+import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static org.apache.hadoop.mapred.ShuffleHandler.AUDITLOG;
+import static org.apache.hadoop.mapred.ShuffleHandler.CONNECTION_CLOSE;
+import static org.apache.hadoop.mapred.ShuffleHandler.FETCH_RETRY_DELAY;
+import static org.apache.hadoop.mapred.ShuffleHandler.IGNORABLE_ERROR_MESSAGE;
+import static org.apache.hadoop.mapred.ShuffleHandler.RETRY_AFTER_HEADER;
+import static org.apache.hadoop.mapred.ShuffleHandler.TIMEOUT_HANDLER;
+import static org.apache.hadoop.mapred.ShuffleHandler.TOO_MANY_REQ_STATUS;
+import static org.apache.hadoop.mapred.ShuffleHandler.LOG;
+
+/**
+ * ShuffleChannelHandler verifies the map request then servers the attempts in 
a http stream.
+ * Before each attempt a serialised ShuffleHeader object is written with the 
details.
+ *
+ * <pre>
+ * Example Request
+ * ===================
+ * GET /mapOutput?job=job_1111111111111_0001&amp;reduce=0&amp;
+ *     map=attempt_1111111111111_0001_m_000001_0,
+ *     attempt_1111111111111_0002_m_000002_0,
+ *     attempt_1111111111111_0003_m_000003_0 HTTP/1.1
+ * name: mapreduce
+ * version: 1.0.0
+ * UrlHash: 9zS++qE0/7/D2l1Rg0TqRoSguAk=
+ *
+ * Example Response
+ * ===================
+ * HTTP/1.1 200 OK
+ * ReplyHash: GcuojWkAxXUyhZHPnwoV/MW2tGA=
+ * name: mapreduce
+ * version: 1.0.0
+ * connection: close
+ * content-length: 138
+ *
+ * 
+--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31 
|%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 31 5f 6d 5f 30 30 
|111111_0001_m_00|
+ * |00000020| 30 30 30 31 5f 30 05 0a 00                      |0001_0...       
|
+ * 
+--------+-------------------------------------------------+----------------+
+ * |00000000| 61 61 61 61 61                                  |aaaaa           
|
+ * 
+--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31 
|%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 32 5f 6d 5f 30 30 
|111111_0002_m_00|
+ * |00000020| 30 30 30 32 5f 30 05 0a 00                      |0002_0...       
|
+ * 
+--------+-------------------------------------------------+----------------+
+ * |00000000| 62 62 62 62 62                                  |bbbbb           
|
+ * 
+--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31 
|%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 33 5f 6d 5f 30 30 
|111111_0003_m_00|
+ * |00000020| 30 30 30 33 5f 30 05 0a 00                      |0003_0...       
|
+ * 
+--------+-------------------------------------------------+----------------+
+ * |00000000| 63 63 63 63 63                                  |ccccc           
|
+ * 
+--------+-------------------------------------------------+----------------+
+ * </pre>
+ */
+public class ShuffleChannelHandler extends 
SimpleChannelInboundHandler<FullHttpRequest> {
+  private final ShuffleChannelHandlerContext handlerCtx;
+
+  ShuffleChannelHandler(ShuffleChannelHandlerContext ctx) {
+    handlerCtx = ctx;
+  }
+
+  private List<String> splitMaps(List<String> mapq) {
+    if (null == mapq) {
+      return null;
+    }
+    final List<String> ret = new ArrayList<>();
+    for (String s : mapq) {
+      Collections.addAll(ret, s.split(","));
+    }
+    return ret;
+  }
+
+  @Override
+  public void channelActive(ChannelHandlerContext ctx)
+      throws Exception {
+    LOG.debug("Executing channelActive; channel='{}'", ctx.channel().id());
+    int numConnections = handlerCtx.activeConnections.incrementAndGet();
+    if ((handlerCtx.maxShuffleConnections > 0) &&
+        (numConnections > handlerCtx.maxShuffleConnections)) {
+      LOG.info(String.format("Current number of shuffle connections (%d) is " +
+              "greater than the max allowed shuffle connections (%d)",
+          handlerCtx.allChannels.size(), handlerCtx.maxShuffleConnections));
+
+      Map<String, String> headers = new HashMap<>(1);
+      // notify fetchers to backoff for a while before closing the connection
+      // if the shuffle connection limit is hit. Fetchers are expected to
+      // handle this notification gracefully, that is, not treating this as a
+      // fetch failure.
+      headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY));
+      sendError(ctx, "", TOO_MANY_REQ_STATUS, headers);
+    } else {
+      super.channelActive(ctx);
+      handlerCtx.allChannels.add(ctx.channel());
+      LOG.debug("Added channel: {}, channel id: {}. Accepted number of 
connections={}",
+          ctx.channel(), ctx.channel().id(), 
handlerCtx.activeConnections.get());
+    }
+  }
+
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    LOG.debug("Executing channelInactive; channel='{}'", ctx.channel().id());
+    super.channelInactive(ctx);
+    int noOfConnections = handlerCtx.activeConnections.decrementAndGet();
+    LOG.debug("New value of Accepted number of connections={}", 
noOfConnections);
+  }
+
+  @Override
+  public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) 
{
+    Channel channel = ctx.channel();
+    LOG.debug("Received HTTP request: {}, channel='{}'", request, 
channel.id());
+
+    if (request.method() != GET) {
+      sendError(ctx, METHOD_NOT_ALLOWED);
+      return;
+    }
+    // Check whether the shuffle version is compatible
+    String shuffleVersion = ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION;
+    String httpHeaderName = ShuffleHeader.DEFAULT_HTTP_HEADER_NAME;
+    if (request.headers() != null) {
+      shuffleVersion = 
request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION);
+      httpHeaderName = request.headers().get(ShuffleHeader.HTTP_HEADER_NAME);
+      LOG.debug("Received from request header: ShuffleVersion={} header 
name={}, channel id: {}",
+          shuffleVersion, httpHeaderName, channel.id());
+    }
+    if (request.headers() == null ||
+        !ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(httpHeaderName) ||
+        !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(shuffleVersion)) {
+      sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
+      return;
+    }
+    final Map<String, List<String>> q =
+        new QueryStringDecoder(request.uri()).parameters();
+
+    final List<String> keepAliveList = q.get("keepAlive");
+    boolean keepAliveParam = false;
+    if (keepAliveList != null && keepAliveList.size() == 1) {
+      keepAliveParam = Boolean.parseBoolean(keepAliveList.get(0));
+      if (LOG.isDebugEnabled()) {

Review Comment:
   I guess it was used like this to perform better when DEBUG log is not 
activated. (The log parameters are not evaluated). This is just copy-pasta code 
from the original solution, no change here.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelHandler.java:
##########
@@ -0,0 +1,715 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.CharsetUtil;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.URL;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
+import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
+import org.eclipse.jetty.http.HttpHeader;
+
+import static io.netty.buffer.Unpooled.wrappedBuffer;
+import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static org.apache.hadoop.mapred.ShuffleHandler.AUDITLOG;
+import static org.apache.hadoop.mapred.ShuffleHandler.CONNECTION_CLOSE;
+import static org.apache.hadoop.mapred.ShuffleHandler.FETCH_RETRY_DELAY;
+import static org.apache.hadoop.mapred.ShuffleHandler.IGNORABLE_ERROR_MESSAGE;
+import static org.apache.hadoop.mapred.ShuffleHandler.RETRY_AFTER_HEADER;
+import static org.apache.hadoop.mapred.ShuffleHandler.TIMEOUT_HANDLER;
+import static org.apache.hadoop.mapred.ShuffleHandler.TOO_MANY_REQ_STATUS;
+import static org.apache.hadoop.mapred.ShuffleHandler.LOG;
+
+/**
+ * ShuffleChannelHandler verifies the map request then servers the attempts in 
a http stream.
+ * Before each attempt a serialised ShuffleHeader object is written with the 
details.
+ *
+ * <pre>
+ * Example Request
+ * ===================
+ * GET /mapOutput?job=job_1111111111111_0001&amp;reduce=0&amp;
+ *     map=attempt_1111111111111_0001_m_000001_0,
+ *     attempt_1111111111111_0002_m_000002_0,
+ *     attempt_1111111111111_0003_m_000003_0 HTTP/1.1
+ * name: mapreduce
+ * version: 1.0.0
+ * UrlHash: 9zS++qE0/7/D2l1Rg0TqRoSguAk=
+ *
+ * Example Response
+ * ===================
+ * HTTP/1.1 200 OK
+ * ReplyHash: GcuojWkAxXUyhZHPnwoV/MW2tGA=
+ * name: mapreduce
+ * version: 1.0.0
+ * connection: close
+ * content-length: 138
+ *
+ * 
+--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31 
|%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 31 5f 6d 5f 30 30 
|111111_0001_m_00|
+ * |00000020| 30 30 30 31 5f 30 05 0a 00                      |0001_0...       
|
+ * 
+--------+-------------------------------------------------+----------------+
+ * |00000000| 61 61 61 61 61                                  |aaaaa           
|
+ * 
+--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31 
|%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 32 5f 6d 5f 30 30 
|111111_0002_m_00|
+ * |00000020| 30 30 30 32 5f 30 05 0a 00                      |0002_0...       
|
+ * 
+--------+-------------------------------------------------+----------------+
+ * |00000000| 62 62 62 62 62                                  |bbbbb           
|
+ * 
+--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31 
|%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 33 5f 6d 5f 30 30 
|111111_0003_m_00|
+ * |00000020| 30 30 30 33 5f 30 05 0a 00                      |0003_0...       
|
+ * 
+--------+-------------------------------------------------+----------------+
+ * |00000000| 63 63 63 63 63                                  |ccccc           
|
+ * 
+--------+-------------------------------------------------+----------------+
+ * </pre>
+ */
+public class ShuffleChannelHandler extends 
SimpleChannelInboundHandler<FullHttpRequest> {
+  private final ShuffleChannelHandlerContext handlerCtx;
+
+  ShuffleChannelHandler(ShuffleChannelHandlerContext ctx) {
+    handlerCtx = ctx;
+  }
+
+  private List<String> splitMaps(List<String> mapq) {
+    if (null == mapq) {
+      return null;
+    }
+    final List<String> ret = new ArrayList<>();
+    for (String s : mapq) {
+      Collections.addAll(ret, s.split(","));
+    }
+    return ret;
+  }
+
+  @Override
+  public void channelActive(ChannelHandlerContext ctx)
+      throws Exception {
+    LOG.debug("Executing channelActive; channel='{}'", ctx.channel().id());
+    int numConnections = handlerCtx.activeConnections.incrementAndGet();
+    if ((handlerCtx.maxShuffleConnections > 0) &&
+        (numConnections > handlerCtx.maxShuffleConnections)) {
+      LOG.info(String.format("Current number of shuffle connections (%d) is " +
+              "greater than the max allowed shuffle connections (%d)",
+          handlerCtx.allChannels.size(), handlerCtx.maxShuffleConnections));
+
+      Map<String, String> headers = new HashMap<>(1);
+      // notify fetchers to backoff for a while before closing the connection
+      // if the shuffle connection limit is hit. Fetchers are expected to
+      // handle this notification gracefully, that is, not treating this as a
+      // fetch failure.
+      headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY));
+      sendError(ctx, "", TOO_MANY_REQ_STATUS, headers);
+    } else {
+      super.channelActive(ctx);
+      handlerCtx.allChannels.add(ctx.channel());
+      LOG.debug("Added channel: {}, channel id: {}. Accepted number of 
connections={}",
+          ctx.channel(), ctx.channel().id(), 
handlerCtx.activeConnections.get());
+    }
+  }
+
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    LOG.debug("Executing channelInactive; channel='{}'", ctx.channel().id());
+    super.channelInactive(ctx);
+    int noOfConnections = handlerCtx.activeConnections.decrementAndGet();
+    LOG.debug("New value of Accepted number of connections={}", 
noOfConnections);
+  }
+
+  @Override
+  public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) 
{
+    Channel channel = ctx.channel();
+    LOG.debug("Received HTTP request: {}, channel='{}'", request, 
channel.id());
+
+    if (request.method() != GET) {
+      sendError(ctx, METHOD_NOT_ALLOWED);
+      return;
+    }
+    // Check whether the shuffle version is compatible
+    String shuffleVersion = ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION;
+    String httpHeaderName = ShuffleHeader.DEFAULT_HTTP_HEADER_NAME;
+    if (request.headers() != null) {
+      shuffleVersion = 
request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION);
+      httpHeaderName = request.headers().get(ShuffleHeader.HTTP_HEADER_NAME);
+      LOG.debug("Received from request header: ShuffleVersion={} header 
name={}, channel id: {}",
+          shuffleVersion, httpHeaderName, channel.id());
+    }
+    if (request.headers() == null ||
+        !ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(httpHeaderName) ||
+        !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(shuffleVersion)) {
+      sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
+      return;
+    }
+    final Map<String, List<String>> q =
+        new QueryStringDecoder(request.uri()).parameters();
+
+    final List<String> keepAliveList = q.get("keepAlive");
+    boolean keepAliveParam = false;
+    if (keepAliveList != null && keepAliveList.size() == 1) {
+      keepAliveParam = Boolean.parseBoolean(keepAliveList.get(0));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("KeepAliveParam: {} : {}, channel id: {}",
+            keepAliveList, keepAliveParam, channel.id());
+      }
+    }
+    final List<String> mapIds = splitMaps(q.get("map"));
+    final List<String> reduceQ = q.get("reduce");
+    final List<String> jobQ = q.get("job");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("RECV: " + request.uri() +
+          "\n  mapId: " + mapIds +
+          "\n  reduceId: " + reduceQ +
+          "\n  jobId: " + jobQ +
+          "\n  keepAlive: " + keepAliveParam +
+          "\n  channel id: " + channel.id());
+    }
+
+    if (mapIds == null || reduceQ == null || jobQ == null) {
+      sendError(ctx, "Required param job, map and reduce", BAD_REQUEST);
+      return;
+    }
+    if (reduceQ.size() != 1 || jobQ.size() != 1) {
+      sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST);
+      return;
+    }
+
+    int reduceId;
+    String jobId;
+    try {
+      reduceId = Integer.parseInt(reduceQ.get(0));
+      jobId = jobQ.get(0);
+    } catch (NumberFormatException e) {
+      sendError(ctx, "Bad reduce parameter", BAD_REQUEST);
+      return;
+    } catch (IllegalArgumentException e) {
+      sendError(ctx, "Bad job parameter", BAD_REQUEST);
+      return;
+    }
+    final String reqUri = request.uri();
+    if (null == reqUri) {
+      // TODO? add upstream?
+      sendError(ctx, FORBIDDEN);
+      return;
+    }
+    HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+    try {
+      verifyRequest(jobId, ctx, request, response,
+          new URL("http", "", handlerCtx.port, reqUri));
+    } catch (IOException e) {
+      LOG.warn("Shuffle failure ", e);
+      sendError(ctx, e.getMessage(), UNAUTHORIZED);
+      return;
+    }
+
+    Map<String, MapOutputInfo> mapOutputInfoMap = new HashMap<>();
+    ChannelPipeline pipeline = channel.pipeline();
+    ShuffleHandler.TimeoutHandler timeoutHandler =
+        (ShuffleHandler.TimeoutHandler)pipeline.get(TIMEOUT_HANDLER);
+    timeoutHandler.setEnabledTimeout(false);
+    String user = handlerCtx.userRsrc.get(jobId);
+
+    try {
+      populateHeaders(mapIds, jobId, user, reduceId,
+          response, keepAliveParam, mapOutputInfoMap);
+    } catch(IOException e) {
+      LOG.error("Shuffle error while populating headers. Channel id: " + 
channel.id(), e);
+      sendError(ctx, getErrorMessage(e), INTERNAL_SERVER_ERROR);
+      return;
+    }
+
+    channel.write(response);
+
+    //Initialize one ReduceContext object per channelRead call
+    boolean keepAlive = keepAliveParam || 
handlerCtx.connectionKeepAliveEnabled;
+    ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx,
+        user, mapOutputInfoMap, jobId, keepAlive);
+
+    sendMap(reduceContext);
+  }
+
+  /**
+   * Calls sendMapOutput for the mapId pointed by ReduceContext.mapsToSend
+   * and increments it. This method is first called by messageReceived()
+   * maxSessionOpenFiles times and then on the completion of every
+   * sendMapOutput operation. This limits the number of open files on a node,
+   * which can get really large(exhausting file descriptors on the NM) if all
+   * sendMapOutputs are called in one go, as was done previous to this change.
+   * @param reduceContext used to call sendMapOutput with correct params.
+   */
+  public void sendMap(ReduceContext reduceContext) {
+    LOG.trace("Executing sendMap; channel='{}'", 
reduceContext.ctx.channel().id());
+    if (reduceContext.getMapsToSend().get() <
+        reduceContext.getMapIds().size()) {
+      int nextIndex = reduceContext.getMapsToSend().getAndIncrement();
+      String mapId = reduceContext.getMapIds().get(nextIndex);
+
+      try {
+        MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
+        if (info == null) {
+          info = getMapOutputInfo(mapId, reduceContext.getReduceId(),
+              reduceContext.getJobId(), reduceContext.getUser());
+        }
+        LOG.trace("Calling sendMapOutput; channel='{}'", 
reduceContext.ctx.channel().id());
+        ChannelFuture nextMap = sendMapOutput(
+            reduceContext.getCtx().channel(),
+            reduceContext.getUser(), mapId,
+            reduceContext.getReduceId(), info);
+        nextMap.addListener(new ReduceMapFileCount(this, reduceContext));
+      } catch (IOException e) {
+        LOG.error("Shuffle error: {}; channel={}", e, 
reduceContext.ctx.channel().id());
+
+        // It is not possible to sendError, the success HttpResponse has been 
already sent
+        reduceContext.ctx.channel().close();
+      }
+    }
+  }
+
+  private String getErrorMessage(Throwable t) {
+    StringBuilder sb = new StringBuilder(t.getMessage());
+    while (t.getCause() != null) {
+      sb.append(t.getCause().getMessage());
+      t = t.getCause();
+    }
+    return sb.toString();
+  }
+
+  protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, String 
jobId, String user)
+      throws IOException {
+    ShuffleHandler.AttemptPathInfo pathInfo;
+    try {
+      ShuffleHandler.AttemptPathIdentifier identifier = new 
ShuffleHandler.AttemptPathIdentifier(
+          jobId, user, mapId);
+      pathInfo = handlerCtx.pathCache.get(identifier);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Retrieved pathInfo for " + identifier +
+            " check for corresponding loaded messages to determine whether" +
+            " it was loaded or cached");
+      }
+    } catch (ExecutionException e) {
+      if (e.getCause() instanceof IOException) {
+        throw (IOException) e.getCause();
+      } else {
+        throw new RuntimeException(e.getCause());
+      }
+    }
+
+    IndexRecord info =
+        handlerCtx.indexCache.getIndexInformation(mapId, reduce, 
pathInfo.indexPath, user);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getMapOutputInfo: jobId=" + jobId + ", mapId=" + mapId +
+          ",dataFile=" + pathInfo.dataPath + ", indexFile=" +
+          pathInfo.indexPath);
+      LOG.debug("getMapOutputInfo: startOffset={}, partLength={} rawLength={}",
+          info.startOffset, info.partLength, info.rawLength);
+    }
+
+    return new MapOutputInfo(pathInfo.dataPath, info);
+  }
+
+  protected void populateHeaders(List<String> mapIds, String jobId,
+                                 String user, int reduce, HttpResponse 
response,
+                                 boolean keepAliveParam,
+                                 Map<String, MapOutputInfo> mapOutputInfoMap)
+      throws IOException {
+
+    long contentLength = 0;
+    for (String mapId : mapIds) {
+      MapOutputInfo outputInfo = getMapOutputInfo(mapId, reduce, jobId, user);
+      if (mapOutputInfoMap.size() < handlerCtx.mapOutputMetaInfoCacheSize) {
+        mapOutputInfoMap.put(mapId, outputInfo);
+      }
+
+      ShuffleHeader header =
+          new ShuffleHeader(mapId, outputInfo.indexRecord.partLength,
+              outputInfo.indexRecord.rawLength, reduce);
+      DataOutputBuffer dob = new DataOutputBuffer();
+      header.write(dob);
+      contentLength += outputInfo.indexRecord.partLength;
+      contentLength += dob.getLength();
+
+      // verify file access to data file to send an actually correct http error
+      final File spillFile = new File(outputInfo.mapOutputFileName.toString());
+      RandomAccessFile r = SecureIOUtils.openForRandomRead(spillFile, "r", 
user, null);
+      r.close();
+    }
+
+    // Now set the response headers.
+    setResponseHeaders(response, keepAliveParam, contentLength);
+
+    // this audit log is disabled by default,
+    // to turn it on please enable this audit log
+    // on log4j.properties by uncommenting the setting
+    if (AUDITLOG.isDebugEnabled()) {
+      StringBuilder sb = new StringBuilder("shuffle for ");
+      sb.append(jobId).append(" reducer ").append(reduce);
+      sb.append(" length ").append(contentLength);
+      if (AUDITLOG.isTraceEnabled()) {
+        // For trace level logging, append the list of mappers
+        sb.append(" mappers: ").append(mapIds);
+        AUDITLOG.trace(sb.toString());
+      } else {
+        AUDITLOG.debug(sb.toString());
+      }
+    }
+  }
+
+  protected void setResponseHeaders(HttpResponse response,
+                                    boolean keepAliveParam, long 
contentLength) {
+    if (!handlerCtx.connectionKeepAliveEnabled && !keepAliveParam) {
+      response.headers().set(HttpHeader.CONNECTION.asString(), 
CONNECTION_CLOSE);
+    } else {
+      response.headers().set(HttpHeader.CONNECTION.asString(),
+          HttpHeader.KEEP_ALIVE.asString());
+      response.headers().set(HttpHeader.KEEP_ALIVE.asString(),
+          "timeout=" + handlerCtx.connectionKeepAliveTimeOut);
+    }
+
+    // Content length must be set 
(https://www.rfc-editor.org/rfc/rfc7230#section-3.3.3)
+    HttpUtil.setContentLength(response, contentLength);
+  }
+
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  static class MapOutputInfo {
+    final Path mapOutputFileName;
+    final IndexRecord indexRecord;
+
+    MapOutputInfo(Path mapOutputFileName, IndexRecord indexRecord) {
+      this.mapOutputFileName = mapOutputFileName;
+      this.indexRecord = indexRecord;
+    }
+  }
+
+  protected void verifyRequest(String appid, ChannelHandlerContext ctx,
+                               HttpRequest request, HttpResponse response, URL 
requestUri)
+      throws IOException {
+    SecretKey tokenSecret = 
handlerCtx.secretManager.retrieveTokenSecret(appid);
+    if (null == tokenSecret) {
+      LOG.info("Request for unknown token {}, channel id: {}", appid, 
ctx.channel().id());
+      throw new IOException("Could not find jobid");
+    }
+    // encrypting URL
+    String encryptedURL = SecureShuffleUtils.buildMsgFrom(requestUri);
+    // hash from the fetcher
+    String urlHashStr =
+        request.headers().get(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
+    if (urlHashStr == null) {
+      LOG.info("Missing header hash for {}, channel id: {}", appid, 
ctx.channel().id());
+      throw new IOException("fetcher cannot be authenticated");
+    }
+    if (LOG.isDebugEnabled()) {
+      int len = urlHashStr.length();
+      LOG.debug("Verifying request. encryptedURL:{}, hash:{}, channel id: " +
+              "{}", encryptedURL,
+          urlHashStr.substring(len - len / 2, len - 1), ctx.channel().id());
+    }
+    // verify - throws exception
+    SecureShuffleUtils.verifyReply(urlHashStr, encryptedURL, tokenSecret);
+    // verification passed - encode the reply
+    String reply = 
SecureShuffleUtils.generateHash(urlHashStr.getBytes(Charsets.UTF_8),
+        tokenSecret);
+    response.headers().set(
+        SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
+    // Put shuffle version into http header
+    response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+    if (LOG.isDebugEnabled()) {
+      int len = reply.length();
+      LOG.debug("Fetcher request verified. " +
+              "encryptedURL: {}, reply: {}, channel id: {}",
+          encryptedURL, reply.substring(len - len / 2, len - 1),
+          ctx.channel().id());
+    }
+  }
+
+  public static ByteBuf shuffleHeaderToBytes(ShuffleHeader header) throws 
IOException {
+    final DataOutputBuffer dob = new DataOutputBuffer();
+    header.write(dob);
+    return wrappedBuffer(dob.getData(), 0, dob.getLength());
+  }
+
+  protected ChannelFuture sendMapOutput(Channel ch, String user, String mapId, 
int reduce,
+                                        MapOutputInfo mapOutputInfo)
+      throws IOException {
+    final IndexRecord info = mapOutputInfo.indexRecord;
+    ch.write(shuffleHeaderToBytes(
+        new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce)));
+    final File spillFile =
+        new File(mapOutputInfo.mapOutputFileName.toString());
+    RandomAccessFile spill = SecureIOUtils.openForRandomRead(spillFile, "r", 
user, null);

Review Comment:
   The FadvisedFileRegion/FadvisedChunkedFile closes the file.
   
   (The openForRandomRead() could be moved to each Fadvised() constructor to 
make it a bit harder to make a mistake with it, but the current code can't 
throw and exception between openForRandomRead and giving it to Fadvised 
constructors.)



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/log4j.properties:
##########
@@ -17,5 +17,5 @@ log4j.threshold=ALL
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} 
(%F:%M(%L)) - %m%n
-log4j.logger.io.netty=INFO
-log4j.logger.org.apache.hadoop.mapred=INFO
\ No newline at end of file
+log4j.logger.io.netty=TRACE

Review Comment:
   It doesn't log that much at all.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelHandler.java:
##########
@@ -0,0 +1,715 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.CharsetUtil;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.URL;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
+import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
+import org.eclipse.jetty.http.HttpHeader;
+
+import static io.netty.buffer.Unpooled.wrappedBuffer;
+import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static org.apache.hadoop.mapred.ShuffleHandler.AUDITLOG;
+import static org.apache.hadoop.mapred.ShuffleHandler.CONNECTION_CLOSE;
+import static org.apache.hadoop.mapred.ShuffleHandler.FETCH_RETRY_DELAY;
+import static org.apache.hadoop.mapred.ShuffleHandler.IGNORABLE_ERROR_MESSAGE;
+import static org.apache.hadoop.mapred.ShuffleHandler.RETRY_AFTER_HEADER;
+import static org.apache.hadoop.mapred.ShuffleHandler.TIMEOUT_HANDLER;
+import static org.apache.hadoop.mapred.ShuffleHandler.TOO_MANY_REQ_STATUS;
+import static org.apache.hadoop.mapred.ShuffleHandler.LOG;
+
+/**
+ * ShuffleChannelHandler verifies the map request then servers the attempts in 
a http stream.
+ * Before each attempt a serialised ShuffleHeader object is written with the 
details.
+ *
+ * <pre>
+ * Example Request
+ * ===================
+ * GET /mapOutput?job=job_1111111111111_0001&amp;reduce=0&amp;
+ *     map=attempt_1111111111111_0001_m_000001_0,
+ *     attempt_1111111111111_0002_m_000002_0,
+ *     attempt_1111111111111_0003_m_000003_0 HTTP/1.1
+ * name: mapreduce
+ * version: 1.0.0
+ * UrlHash: 9zS++qE0/7/D2l1Rg0TqRoSguAk=
+ *
+ * Example Response
+ * ===================
+ * HTTP/1.1 200 OK
+ * ReplyHash: GcuojWkAxXUyhZHPnwoV/MW2tGA=
+ * name: mapreduce
+ * version: 1.0.0
+ * connection: close
+ * content-length: 138
+ *
+ * 
+--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31 
|%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 31 5f 6d 5f 30 30 
|111111_0001_m_00|
+ * |00000020| 30 30 30 31 5f 30 05 0a 00                      |0001_0...       
|
+ * 
+--------+-------------------------------------------------+----------------+
+ * |00000000| 61 61 61 61 61                                  |aaaaa           
|
+ * 
+--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31 
|%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 32 5f 6d 5f 30 30 
|111111_0002_m_00|
+ * |00000020| 30 30 30 32 5f 30 05 0a 00                      |0002_0...       
|
+ * 
+--------+-------------------------------------------------+----------------+
+ * |00000000| 62 62 62 62 62                                  |bbbbb           
|
+ * 
+--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31 
|%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 33 5f 6d 5f 30 30 
|111111_0003_m_00|
+ * |00000020| 30 30 30 33 5f 30 05 0a 00                      |0003_0...       
|
+ * 
+--------+-------------------------------------------------+----------------+
+ * |00000000| 63 63 63 63 63                                  |ccccc           
|
+ * 
+--------+-------------------------------------------------+----------------+
+ * </pre>
+ */
+public class ShuffleChannelHandler extends 
SimpleChannelInboundHandler<FullHttpRequest> {
+  private final ShuffleChannelHandlerContext handlerCtx;
+
+  ShuffleChannelHandler(ShuffleChannelHandlerContext ctx) {
+    handlerCtx = ctx;
+  }
+
+  private List<String> splitMaps(List<String> mapq) {
+    if (null == mapq) {
+      return null;
+    }
+    final List<String> ret = new ArrayList<>();
+    for (String s : mapq) {
+      Collections.addAll(ret, s.split(","));
+    }
+    return ret;
+  }
+
+  @Override
+  public void channelActive(ChannelHandlerContext ctx)
+      throws Exception {
+    LOG.debug("Executing channelActive; channel='{}'", ctx.channel().id());
+    int numConnections = handlerCtx.activeConnections.incrementAndGet();
+    if ((handlerCtx.maxShuffleConnections > 0) &&
+        (numConnections > handlerCtx.maxShuffleConnections)) {
+      LOG.info(String.format("Current number of shuffle connections (%d) is " +
+              "greater than the max allowed shuffle connections (%d)",
+          handlerCtx.allChannels.size(), handlerCtx.maxShuffleConnections));
+
+      Map<String, String> headers = new HashMap<>(1);
+      // notify fetchers to backoff for a while before closing the connection
+      // if the shuffle connection limit is hit. Fetchers are expected to
+      // handle this notification gracefully, that is, not treating this as a
+      // fetch failure.
+      headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY));
+      sendError(ctx, "", TOO_MANY_REQ_STATUS, headers);
+    } else {
+      super.channelActive(ctx);
+      handlerCtx.allChannels.add(ctx.channel());
+      LOG.debug("Added channel: {}, channel id: {}. Accepted number of 
connections={}",
+          ctx.channel(), ctx.channel().id(), 
handlerCtx.activeConnections.get());
+    }
+  }
+
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    LOG.debug("Executing channelInactive; channel='{}'", ctx.channel().id());
+    super.channelInactive(ctx);
+    int noOfConnections = handlerCtx.activeConnections.decrementAndGet();
+    LOG.debug("New value of Accepted number of connections={}", 
noOfConnections);
+  }
+
+  @Override
+  public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) 
{
+    Channel channel = ctx.channel();
+    LOG.debug("Received HTTP request: {}, channel='{}'", request, 
channel.id());
+
+    if (request.method() != GET) {
+      sendError(ctx, METHOD_NOT_ALLOWED);
+      return;
+    }
+    // Check whether the shuffle version is compatible
+    String shuffleVersion = ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION;
+    String httpHeaderName = ShuffleHeader.DEFAULT_HTTP_HEADER_NAME;
+    if (request.headers() != null) {
+      shuffleVersion = 
request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION);
+      httpHeaderName = request.headers().get(ShuffleHeader.HTTP_HEADER_NAME);
+      LOG.debug("Received from request header: ShuffleVersion={} header 
name={}, channel id: {}",
+          shuffleVersion, httpHeaderName, channel.id());
+    }
+    if (request.headers() == null ||
+        !ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(httpHeaderName) ||
+        !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(shuffleVersion)) {
+      sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
+      return;
+    }
+    final Map<String, List<String>> q =
+        new QueryStringDecoder(request.uri()).parameters();
+
+    final List<String> keepAliveList = q.get("keepAlive");
+    boolean keepAliveParam = false;
+    if (keepAliveList != null && keepAliveList.size() == 1) {
+      keepAliveParam = Boolean.parseBoolean(keepAliveList.get(0));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("KeepAliveParam: {} : {}, channel id: {}",
+            keepAliveList, keepAliveParam, channel.id());
+      }
+    }
+    final List<String> mapIds = splitMaps(q.get("map"));
+    final List<String> reduceQ = q.get("reduce");
+    final List<String> jobQ = q.get("job");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("RECV: " + request.uri() +
+          "\n  mapId: " + mapIds +
+          "\n  reduceId: " + reduceQ +
+          "\n  jobId: " + jobQ +
+          "\n  keepAlive: " + keepAliveParam +
+          "\n  channel id: " + channel.id());
+    }
+
+    if (mapIds == null || reduceQ == null || jobQ == null) {
+      sendError(ctx, "Required param job, map and reduce", BAD_REQUEST);

Review Comment:
   No change here from my side.





> ShuffleHandler is not working correctly in SSL mode after the Netty 4 upgrade
> -----------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-7431
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7431
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>    Affects Versions: 3.4.0
>            Reporter: Tamas Domok
>            Assignee: Tamas Domok
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: chunked-fileregion.txt, chunked.txt, 
> normal-fileregion.txt, normal.txt, sendMapPipeline.png
>
>
> HADOOP-15327 introduced some regressions in the ShuffleHandler.
> h3. 1. a memory leak
> {code:java}
> ERROR io.netty.util.ResourceLeakDetector: LEAK: ByteBuf.release() was not 
> called before it's garbage-collected. See 
> https://netty.io/wiki/reference-counted-objects.html for more information.
> {code}
>  
> The Shuffle's channelRead didn't release the message properly, the fix would 
> be this:
> {code:java}
>       try {
>         // ....
>       } finally {
>         ReferenceCountUtil.release(msg);
>       }
> {code}
> Or even simpler:
> {code:java}
> extends SimpleChannelInboundHandler<FullHttpRequest>
> {code}
> h3. 1. a bug in SSL mode with more than 1 reducers
> It manifested in multiple errors:
> {code:java}
> ERROR org.apache.hadoop.mapred.ShuffleHandler: Future is unsuccessful. Cause:
> java.io.IOException: Broken pipe
> ERROR org.apache.hadoop.mapred.ShuffleHandler: Future is unsuccessful. Cause:
> java.nio.channels.ClosedChannelException
> // if the reducer memory was not enough, then even this:
> Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in 
> shuffle in fetcher#2
>       at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:136)
>       at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:377)
>       at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
>       at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)
> Caused by: java.lang.OutOfMemoryError: Java heap space
>       at 
> org.apache.hadoop.io.compress.BlockDecompressorStream.getCompressedData(BlockDecompressorStream.java:123)
>       at 
> org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:98)
>       at 
> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
>       at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:210)
>       at 
> org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.doShuffle(InMemoryMapOutput.java:91)
> {code}
> *Configuration* - mapred-site.xml
> {code:java}
> mapreduce.shuffle.ssl.enabled=true
> {code}
> Alternative is to build a custom jar where *FadvisedFileRegion* is replaced 
> with *FadvisedChunkedFile* in {*}sendMapOutput{*}.
> *Reproduction*
> {code:java}
> hdfs dfs -rm -r -skipTrash /tmp/sort_input
> hdfs dfs -rm -r -skipTrash /tmp/sort_output
> yarn jar 
> hadoop-3.4.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0-SNAPSHOT.jar
>  randomwriter "-Dmapreduce.randomwriter.totalbytes=10000000000" 
> /tmp/sort_input
> yarn jar 
> hadoop-3.4.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0-SNAPSHOT.jar
>  sort -Dmapreduce.job.reduce.slowstart.completedmaps=1 -r 40 /tmp/sort_input 
> /tmp/sort_output | tee sort_app_output.txt
> {code}
> h3. ShuffleHandler's protocol
> {code:java}
> // HTTP Request
> GET 
> /mapOutput?job=job_1672901779104_0001&reduce=0&map=attempt_1672901779104_0001_m_000003_0,attempt_1672901779104_0001_m_000002_0,attempt_1672901779104_0001_m_000001_0,attempt_1672901779104_0001_m_000000_0,attempt_1672901779104_0001_m_000005_0,attempt_1672901779104_0001_m_000012_0,attempt_1672901779104_0001_m_000009_0,attempt_1672901779104_0001_m_000010_0,attempt_1672901779104_0001_m_000007_0,attempt_1672901779104_0001_m_000011_0,attempt_1672901779104_0001_m_000008_0,attempt_1672901779104_0001_m_000013_0,attempt_1672901779104_0001_m_000014_0,attempt_1672901779104_0001_m_000015_0,attempt_1672901779104_0001_m_000019_0,attempt_1672901779104_0001_m_000018_0,attempt_1672901779104_0001_m_000016_0,attempt_1672901779104_0001_m_000017_0,attempt_1672901779104_0001_m_000020_0,attempt_1672901779104_0001_m_000023_0
>  HTTP/1.1
> + keep alive headers
> // HTTP Response Headers
> content-length=sum(serialised ShuffleHeader in bytes + MapOutput size)
> + keep alive headers
> // Response Data (transfer-encoding=chunked)
> serialised ShuffleHeader
> content of the MapOutput file (start offset - length)
> serialised ShuffleHeader
> content of the MapOutput file (start offset - length)
> serialised ShuffleHeader
> content of the MapOutput file (start offset - length)
> serialised ShuffleHeader
> content of the MapOutput file (start offset - length)
> ...
> LastHttpContent
> // close socket if no keep-alive
> {code}
> h3. Issues
>  - {*}setResponseHeaders{*}: did not always set the the content-length, also 
> the transfer-encoding=chunked header was missing.
>  - {*}ReduceMapFileCount.operationComplete{*}: messed up the futures on the 
> LastHttpContent
>  - {*}ChannelGroup accepted{*}: is only used to close the channels, no need 
> for that magic 5. See the details 
> [here|https://netty.io/4.0/api/io/netty/channel/group/ChannelGroup.html].
>  - {*}bossGroup{*}: should have only 1 thread for accepting connections.
>  - {*}Shuffle{*}: is unnecessarily Sharable, the 3 async sendMap / channel 
> (see below) caused future errors when using FadvisedChunkedFile
> h3. Max session open files is not an optimisation, it's actually wasting 
> resources
> {code:java}
>     // by default maxSessionOpenFiles = 3
>     for (int i = 0; i < Math.min(handlerCtx.maxSessionOpenFiles, 
> mapIds.size()); i++) {
>       ChannelFuture nextMap = sendMap(reduceContext);
>       if(nextMap == null) {
>         return;
>       }
>     }
> {code}
> !sendMapPipeline.png!
> At the end of the day, we create a http chunked stream, there is no need to 
> run 3 sendMap async, the futures will finish one-by-one sequentially. The 
> osCache magic from the FAdvised classes won't happen either, because the 
> first readChunk will be called only later.
> So this can be simplified a lot:
> {code:java}
> sendMap(reduceContext);
> {code}
> h3. My proposal
> Some refactoring: ShuffleHandler is split into multiple classes to make it 
> possible to remove the sharable annotation.
>  - ShuffleChannel
>  - ShuffleChannelInitializer
>  - ShuffleChannelHandlerContext
>  - ShuffleChannelHandler
> TODO:
>  - fix/drop/refactor the existing unit tests
>  - add proper unit test that tests SSL/non-SSL mode where the response data 
> is properly verified
>  - documentation about the protocol
> WIP: 
> [github.com/tomicooler/hadoop|https://github.com/tomicooler/hadoop/commit/3bc027598aea4a3b02a1997fe5d485b9a6e5c41e]
> h3. Netty useful docs
>  * [User guide for 4.x|https://netty.io/wiki/user-guide-for-4.x.html]
>  * [New and noteworthy in 
> 4.0|https://netty.io/wiki/new-and-noteworthy-in-4.0.html]
>  * [Reference counted 
> objects|https://netty.io/wiki/reference-counted-objects.html]   (it will be 
> changed in [Netty 5|https://netty.io/wiki/new-and-noteworthy-in-5.0.html])
>  * HttpStaticFileServer 
> [example|https://github.com/netty/netty/blob/4.1/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org

Reply via email to