[
https://issues.apache.org/jira/browse/MAPREDUCE-7431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17678658#comment-17678658
]
ASF GitHub Bot commented on MAPREDUCE-7431:
-------------------------------------------
K0K0V0K commented on code in PR #5311:
URL: https://github.com/apache/hadoop/pull/5311#discussion_r1081143805
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelHandler.java:
##########
@@ -0,0 +1,715 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.CharsetUtil;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.URL;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
+import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
+import org.eclipse.jetty.http.HttpHeader;
+
+import static io.netty.buffer.Unpooled.wrappedBuffer;
+import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static
io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static org.apache.hadoop.mapred.ShuffleHandler.AUDITLOG;
+import static org.apache.hadoop.mapred.ShuffleHandler.CONNECTION_CLOSE;
+import static org.apache.hadoop.mapred.ShuffleHandler.FETCH_RETRY_DELAY;
+import static org.apache.hadoop.mapred.ShuffleHandler.IGNORABLE_ERROR_MESSAGE;
+import static org.apache.hadoop.mapred.ShuffleHandler.RETRY_AFTER_HEADER;
+import static org.apache.hadoop.mapred.ShuffleHandler.TIMEOUT_HANDLER;
+import static org.apache.hadoop.mapred.ShuffleHandler.TOO_MANY_REQ_STATUS;
+import static org.apache.hadoop.mapred.ShuffleHandler.LOG;
+
+/**
+ * ShuffleChannelHandler verifies the map request then servers the attempts in
a http stream.
+ * Before each attempt a serialised ShuffleHeader object is written with the
details.
+ *
+ * <pre>
+ * Example Request
+ * ===================
+ * GET /mapOutput?job=job_1111111111111_0001&reduce=0&
+ * map=attempt_1111111111111_0001_m_000001_0,
+ * attempt_1111111111111_0002_m_000002_0,
+ * attempt_1111111111111_0003_m_000003_0 HTTP/1.1
+ * name: mapreduce
+ * version: 1.0.0
+ * UrlHash: 9zS++qE0/7/D2l1Rg0TqRoSguAk=
+ *
+ * Example Response
+ * ===================
+ * HTTP/1.1 200 OK
+ * ReplyHash: GcuojWkAxXUyhZHPnwoV/MW2tGA=
+ * name: mapreduce
+ * version: 1.0.0
+ * connection: close
+ * content-length: 138
+ *
+ *
+--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31
|%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 31 5f 6d 5f 30 30
|111111_0001_m_00|
+ * |00000020| 30 30 30 31 5f 30 05 0a 00 |0001_0...
|
+ *
+--------+-------------------------------------------------+----------------+
+ * |00000000| 61 61 61 61 61 |aaaaa
|
+ *
+--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31
|%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 32 5f 6d 5f 30 30
|111111_0002_m_00|
+ * |00000020| 30 30 30 32 5f 30 05 0a 00 |0002_0...
|
+ *
+--------+-------------------------------------------------+----------------+
+ * |00000000| 62 62 62 62 62 |bbbbb
|
+ *
+--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31
|%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 33 5f 6d 5f 30 30
|111111_0003_m_00|
+ * |00000020| 30 30 30 33 5f 30 05 0a 00 |0003_0...
|
+ *
+--------+-------------------------------------------------+----------------+
+ * |00000000| 63 63 63 63 63 |ccccc
|
+ *
+--------+-------------------------------------------------+----------------+
+ * </pre>
+ */
+public class ShuffleChannelHandler extends
SimpleChannelInboundHandler<FullHttpRequest> {
+ private final ShuffleChannelHandlerContext handlerCtx;
+
+ ShuffleChannelHandler(ShuffleChannelHandlerContext ctx) {
+ handlerCtx = ctx;
+ }
+
+ private List<String> splitMaps(List<String> mapq) {
+ if (null == mapq) {
+ return null;
+ }
+ final List<String> ret = new ArrayList<>();
+ for (String s : mapq) {
+ Collections.addAll(ret, s.split(","));
+ }
+ return ret;
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx)
+ throws Exception {
+ LOG.debug("Executing channelActive; channel='{}'", ctx.channel().id());
+ int numConnections = handlerCtx.activeConnections.incrementAndGet();
+ if ((handlerCtx.maxShuffleConnections > 0) &&
+ (numConnections > handlerCtx.maxShuffleConnections)) {
+ LOG.info(String.format("Current number of shuffle connections (%d) is " +
+ "greater than the max allowed shuffle connections (%d)",
+ handlerCtx.allChannels.size(), handlerCtx.maxShuffleConnections));
+
+ Map<String, String> headers = new HashMap<>(1);
+ // notify fetchers to backoff for a while before closing the connection
+ // if the shuffle connection limit is hit. Fetchers are expected to
+ // handle this notification gracefully, that is, not treating this as a
+ // fetch failure.
+ headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY));
+ sendError(ctx, "", TOO_MANY_REQ_STATUS, headers);
+ } else {
+ super.channelActive(ctx);
+ handlerCtx.allChannels.add(ctx.channel());
+ LOG.debug("Added channel: {}, channel id: {}. Accepted number of
connections={}",
+ ctx.channel(), ctx.channel().id(),
handlerCtx.activeConnections.get());
+ }
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ LOG.debug("Executing channelInactive; channel='{}'", ctx.channel().id());
+ super.channelInactive(ctx);
+ int noOfConnections = handlerCtx.activeConnections.decrementAndGet();
+ LOG.debug("New value of Accepted number of connections={}",
noOfConnections);
+ }
+
+ @Override
+ public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request)
{
+ Channel channel = ctx.channel();
+ LOG.debug("Received HTTP request: {}, channel='{}'", request,
channel.id());
+
+ if (request.method() != GET) {
+ sendError(ctx, METHOD_NOT_ALLOWED);
+ return;
+ }
+ // Check whether the shuffle version is compatible
+ String shuffleVersion = ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION;
+ String httpHeaderName = ShuffleHeader.DEFAULT_HTTP_HEADER_NAME;
+ if (request.headers() != null) {
+ shuffleVersion =
request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION);
+ httpHeaderName = request.headers().get(ShuffleHeader.HTTP_HEADER_NAME);
+ LOG.debug("Received from request header: ShuffleVersion={} header
name={}, channel id: {}",
+ shuffleVersion, httpHeaderName, channel.id());
+ }
+ if (request.headers() == null ||
+ !ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(httpHeaderName) ||
+ !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(shuffleVersion)) {
+ sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
+ return;
+ }
+ final Map<String, List<String>> q =
+ new QueryStringDecoder(request.uri()).parameters();
+
+ final List<String> keepAliveList = q.get("keepAlive");
+ boolean keepAliveParam = false;
+ if (keepAliveList != null && keepAliveList.size() == 1) {
+ keepAliveParam = Boolean.parseBoolean(keepAliveList.get(0));
+ if (LOG.isDebugEnabled()) {
Review Comment:
Why this if is required here?
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/log4j.properties:
##########
@@ -17,5 +17,5 @@ log4j.threshold=ALL
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}
(%F:%M(%L)) - %m%n
-log4j.logger.io.netty=INFO
-log4j.logger.org.apache.hadoop.mapred=INFO
\ No newline at end of file
+log4j.logger.io.netty=TRACE
Review Comment:
I think this can stay as INFO, to reduce the build test times
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelHandler.java:
##########
@@ -0,0 +1,715 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.CharsetUtil;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.URL;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
+import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
+import org.eclipse.jetty.http.HttpHeader;
+
+import static io.netty.buffer.Unpooled.wrappedBuffer;
+import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static
io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static org.apache.hadoop.mapred.ShuffleHandler.AUDITLOG;
+import static org.apache.hadoop.mapred.ShuffleHandler.CONNECTION_CLOSE;
+import static org.apache.hadoop.mapred.ShuffleHandler.FETCH_RETRY_DELAY;
+import static org.apache.hadoop.mapred.ShuffleHandler.IGNORABLE_ERROR_MESSAGE;
+import static org.apache.hadoop.mapred.ShuffleHandler.RETRY_AFTER_HEADER;
+import static org.apache.hadoop.mapred.ShuffleHandler.TIMEOUT_HANDLER;
+import static org.apache.hadoop.mapred.ShuffleHandler.TOO_MANY_REQ_STATUS;
+import static org.apache.hadoop.mapred.ShuffleHandler.LOG;
+
+/**
+ * ShuffleChannelHandler verifies the map request then servers the attempts in
a http stream.
+ * Before each attempt a serialised ShuffleHeader object is written with the
details.
+ *
+ * <pre>
+ * Example Request
+ * ===================
+ * GET /mapOutput?job=job_1111111111111_0001&reduce=0&
+ * map=attempt_1111111111111_0001_m_000001_0,
+ * attempt_1111111111111_0002_m_000002_0,
+ * attempt_1111111111111_0003_m_000003_0 HTTP/1.1
+ * name: mapreduce
+ * version: 1.0.0
+ * UrlHash: 9zS++qE0/7/D2l1Rg0TqRoSguAk=
+ *
+ * Example Response
+ * ===================
+ * HTTP/1.1 200 OK
+ * ReplyHash: GcuojWkAxXUyhZHPnwoV/MW2tGA=
+ * name: mapreduce
+ * version: 1.0.0
+ * connection: close
+ * content-length: 138
+ *
+ *
+--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31
|%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 31 5f 6d 5f 30 30
|111111_0001_m_00|
+ * |00000020| 30 30 30 31 5f 30 05 0a 00 |0001_0...
|
+ *
+--------+-------------------------------------------------+----------------+
+ * |00000000| 61 61 61 61 61 |aaaaa
|
+ *
+--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31
|%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 32 5f 6d 5f 30 30
|111111_0002_m_00|
+ * |00000020| 30 30 30 32 5f 30 05 0a 00 |0002_0...
|
+ *
+--------+-------------------------------------------------+----------------+
+ * |00000000| 62 62 62 62 62 |bbbbb
|
+ *
+--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31
|%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 33 5f 6d 5f 30 30
|111111_0003_m_00|
+ * |00000020| 30 30 30 33 5f 30 05 0a 00 |0003_0...
|
+ *
+--------+-------------------------------------------------+----------------+
+ * |00000000| 63 63 63 63 63 |ccccc
|
+ *
+--------+-------------------------------------------------+----------------+
+ * </pre>
+ */
+public class ShuffleChannelHandler extends
SimpleChannelInboundHandler<FullHttpRequest> {
+ private final ShuffleChannelHandlerContext handlerCtx;
+
+ ShuffleChannelHandler(ShuffleChannelHandlerContext ctx) {
+ handlerCtx = ctx;
+ }
+
+ private List<String> splitMaps(List<String> mapq) {
+ if (null == mapq) {
+ return null;
+ }
+ final List<String> ret = new ArrayList<>();
+ for (String s : mapq) {
+ Collections.addAll(ret, s.split(","));
+ }
+ return ret;
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx)
+ throws Exception {
+ LOG.debug("Executing channelActive; channel='{}'", ctx.channel().id());
+ int numConnections = handlerCtx.activeConnections.incrementAndGet();
+ if ((handlerCtx.maxShuffleConnections > 0) &&
+ (numConnections > handlerCtx.maxShuffleConnections)) {
+ LOG.info(String.format("Current number of shuffle connections (%d) is " +
+ "greater than the max allowed shuffle connections (%d)",
+ handlerCtx.allChannels.size(), handlerCtx.maxShuffleConnections));
+
+ Map<String, String> headers = new HashMap<>(1);
+ // notify fetchers to backoff for a while before closing the connection
+ // if the shuffle connection limit is hit. Fetchers are expected to
+ // handle this notification gracefully, that is, not treating this as a
+ // fetch failure.
+ headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY));
+ sendError(ctx, "", TOO_MANY_REQ_STATUS, headers);
+ } else {
+ super.channelActive(ctx);
+ handlerCtx.allChannels.add(ctx.channel());
+ LOG.debug("Added channel: {}, channel id: {}. Accepted number of
connections={}",
+ ctx.channel(), ctx.channel().id(),
handlerCtx.activeConnections.get());
+ }
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ LOG.debug("Executing channelInactive; channel='{}'", ctx.channel().id());
+ super.channelInactive(ctx);
+ int noOfConnections = handlerCtx.activeConnections.decrementAndGet();
+ LOG.debug("New value of Accepted number of connections={}",
noOfConnections);
+ }
+
+ @Override
+ public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request)
{
+ Channel channel = ctx.channel();
+ LOG.debug("Received HTTP request: {}, channel='{}'", request,
channel.id());
+
+ if (request.method() != GET) {
+ sendError(ctx, METHOD_NOT_ALLOWED);
+ return;
+ }
+ // Check whether the shuffle version is compatible
+ String shuffleVersion = ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION;
+ String httpHeaderName = ShuffleHeader.DEFAULT_HTTP_HEADER_NAME;
+ if (request.headers() != null) {
+ shuffleVersion =
request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION);
+ httpHeaderName = request.headers().get(ShuffleHeader.HTTP_HEADER_NAME);
+ LOG.debug("Received from request header: ShuffleVersion={} header
name={}, channel id: {}",
+ shuffleVersion, httpHeaderName, channel.id());
+ }
+ if (request.headers() == null ||
+ !ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(httpHeaderName) ||
+ !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(shuffleVersion)) {
+ sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
+ return;
+ }
+ final Map<String, List<String>> q =
+ new QueryStringDecoder(request.uri()).parameters();
+
+ final List<String> keepAliveList = q.get("keepAlive");
+ boolean keepAliveParam = false;
+ if (keepAliveList != null && keepAliveList.size() == 1) {
+ keepAliveParam = Boolean.parseBoolean(keepAliveList.get(0));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("KeepAliveParam: {} : {}, channel id: {}",
+ keepAliveList, keepAliveParam, channel.id());
+ }
+ }
+ final List<String> mapIds = splitMaps(q.get("map"));
+ final List<String> reduceQ = q.get("reduce");
+ final List<String> jobQ = q.get("job");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RECV: " + request.uri() +
+ "\n mapId: " + mapIds +
+ "\n reduceId: " + reduceQ +
+ "\n jobId: " + jobQ +
+ "\n keepAlive: " + keepAliveParam +
+ "\n channel id: " + channel.id());
+ }
+
+ if (mapIds == null || reduceQ == null || jobQ == null) {
+ sendError(ctx, "Required param job, map and reduce", BAD_REQUEST);
Review Comment:
Maybe some error log can be added here what was null?
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelHandler.java:
##########
@@ -0,0 +1,715 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.CharsetUtil;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.URL;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
+import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
+import org.eclipse.jetty.http.HttpHeader;
+
+import static io.netty.buffer.Unpooled.wrappedBuffer;
+import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static
io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static org.apache.hadoop.mapred.ShuffleHandler.AUDITLOG;
+import static org.apache.hadoop.mapred.ShuffleHandler.CONNECTION_CLOSE;
+import static org.apache.hadoop.mapred.ShuffleHandler.FETCH_RETRY_DELAY;
+import static org.apache.hadoop.mapred.ShuffleHandler.IGNORABLE_ERROR_MESSAGE;
+import static org.apache.hadoop.mapred.ShuffleHandler.RETRY_AFTER_HEADER;
+import static org.apache.hadoop.mapred.ShuffleHandler.TIMEOUT_HANDLER;
+import static org.apache.hadoop.mapred.ShuffleHandler.TOO_MANY_REQ_STATUS;
+import static org.apache.hadoop.mapred.ShuffleHandler.LOG;
+
+/**
+ * ShuffleChannelHandler verifies the map request then servers the attempts in
a http stream.
+ * Before each attempt a serialised ShuffleHeader object is written with the
details.
+ *
+ * <pre>
+ * Example Request
+ * ===================
+ * GET /mapOutput?job=job_1111111111111_0001&reduce=0&
+ * map=attempt_1111111111111_0001_m_000001_0,
+ * attempt_1111111111111_0002_m_000002_0,
+ * attempt_1111111111111_0003_m_000003_0 HTTP/1.1
+ * name: mapreduce
+ * version: 1.0.0
+ * UrlHash: 9zS++qE0/7/D2l1Rg0TqRoSguAk=
+ *
+ * Example Response
+ * ===================
+ * HTTP/1.1 200 OK
+ * ReplyHash: GcuojWkAxXUyhZHPnwoV/MW2tGA=
+ * name: mapreduce
+ * version: 1.0.0
+ * connection: close
+ * content-length: 138
+ *
+ *
+--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31
|%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 31 5f 6d 5f 30 30
|111111_0001_m_00|
+ * |00000020| 30 30 30 31 5f 30 05 0a 00 |0001_0...
|
+ *
+--------+-------------------------------------------------+----------------+
+ * |00000000| 61 61 61 61 61 |aaaaa
|
+ *
+--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31
|%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 32 5f 6d 5f 30 30
|111111_0002_m_00|
+ * |00000020| 30 30 30 32 5f 30 05 0a 00 |0002_0...
|
+ *
+--------+-------------------------------------------------+----------------+
+ * |00000000| 62 62 62 62 62 |bbbbb
|
+ *
+--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31
|%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 33 5f 6d 5f 30 30
|111111_0003_m_00|
+ * |00000020| 30 30 30 33 5f 30 05 0a 00 |0003_0...
|
+ *
+--------+-------------------------------------------------+----------------+
+ * |00000000| 63 63 63 63 63 |ccccc
|
+ *
+--------+-------------------------------------------------+----------------+
+ * </pre>
+ */
+public class ShuffleChannelHandler extends
SimpleChannelInboundHandler<FullHttpRequest> {
+ private final ShuffleChannelHandlerContext handlerCtx;
+
+ ShuffleChannelHandler(ShuffleChannelHandlerContext ctx) {
+ handlerCtx = ctx;
+ }
+
+ private List<String> splitMaps(List<String> mapq) {
+ if (null == mapq) {
+ return null;
+ }
+ final List<String> ret = new ArrayList<>();
+ for (String s : mapq) {
+ Collections.addAll(ret, s.split(","));
+ }
+ return ret;
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx)
+ throws Exception {
+ LOG.debug("Executing channelActive; channel='{}'", ctx.channel().id());
+ int numConnections = handlerCtx.activeConnections.incrementAndGet();
+ if ((handlerCtx.maxShuffleConnections > 0) &&
+ (numConnections > handlerCtx.maxShuffleConnections)) {
+ LOG.info(String.format("Current number of shuffle connections (%d) is " +
+ "greater than the max allowed shuffle connections (%d)",
+ handlerCtx.allChannels.size(), handlerCtx.maxShuffleConnections));
+
+ Map<String, String> headers = new HashMap<>(1);
+ // notify fetchers to backoff for a while before closing the connection
+ // if the shuffle connection limit is hit. Fetchers are expected to
+ // handle this notification gracefully, that is, not treating this as a
+ // fetch failure.
+ headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY));
+ sendError(ctx, "", TOO_MANY_REQ_STATUS, headers);
+ } else {
+ super.channelActive(ctx);
+ handlerCtx.allChannels.add(ctx.channel());
+ LOG.debug("Added channel: {}, channel id: {}. Accepted number of
connections={}",
+ ctx.channel(), ctx.channel().id(),
handlerCtx.activeConnections.get());
+ }
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ LOG.debug("Executing channelInactive; channel='{}'", ctx.channel().id());
+ super.channelInactive(ctx);
+ int noOfConnections = handlerCtx.activeConnections.decrementAndGet();
+ LOG.debug("New value of Accepted number of connections={}",
noOfConnections);
+ }
+
+ @Override
+ public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request)
{
+ Channel channel = ctx.channel();
+ LOG.debug("Received HTTP request: {}, channel='{}'", request,
channel.id());
+
+ if (request.method() != GET) {
+ sendError(ctx, METHOD_NOT_ALLOWED);
+ return;
+ }
+ // Check whether the shuffle version is compatible
+ String shuffleVersion = ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION;
+ String httpHeaderName = ShuffleHeader.DEFAULT_HTTP_HEADER_NAME;
+ if (request.headers() != null) {
+ shuffleVersion =
request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION);
+ httpHeaderName = request.headers().get(ShuffleHeader.HTTP_HEADER_NAME);
+ LOG.debug("Received from request header: ShuffleVersion={} header
name={}, channel id: {}",
+ shuffleVersion, httpHeaderName, channel.id());
+ }
+ if (request.headers() == null ||
+ !ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(httpHeaderName) ||
+ !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(shuffleVersion)) {
+ sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
+ return;
+ }
+ final Map<String, List<String>> q =
+ new QueryStringDecoder(request.uri()).parameters();
+
+ final List<String> keepAliveList = q.get("keepAlive");
+ boolean keepAliveParam = false;
+ if (keepAliveList != null && keepAliveList.size() == 1) {
+ keepAliveParam = Boolean.parseBoolean(keepAliveList.get(0));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("KeepAliveParam: {} : {}, channel id: {}",
+ keepAliveList, keepAliveParam, channel.id());
+ }
+ }
+ final List<String> mapIds = splitMaps(q.get("map"));
+ final List<String> reduceQ = q.get("reduce");
+ final List<String> jobQ = q.get("job");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RECV: " + request.uri() +
+ "\n mapId: " + mapIds +
+ "\n reduceId: " + reduceQ +
+ "\n jobId: " + jobQ +
+ "\n keepAlive: " + keepAliveParam +
+ "\n channel id: " + channel.id());
+ }
+
+ if (mapIds == null || reduceQ == null || jobQ == null) {
+ sendError(ctx, "Required param job, map and reduce", BAD_REQUEST);
+ return;
+ }
+ if (reduceQ.size() != 1 || jobQ.size() != 1) {
+ sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST);
+ return;
+ }
+
+ int reduceId;
+ String jobId;
+ try {
+ reduceId = Integer.parseInt(reduceQ.get(0));
+ jobId = jobQ.get(0);
+ } catch (NumberFormatException e) {
+ sendError(ctx, "Bad reduce parameter", BAD_REQUEST);
+ return;
+ } catch (IllegalArgumentException e) {
+ sendError(ctx, "Bad job parameter", BAD_REQUEST);
+ return;
+ }
+ final String reqUri = request.uri();
+ if (null == reqUri) {
+ // TODO? add upstream?
+ sendError(ctx, FORBIDDEN);
+ return;
+ }
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+ try {
+ verifyRequest(jobId, ctx, request, response,
+ new URL("http", "", handlerCtx.port, reqUri));
+ } catch (IOException e) {
+ LOG.warn("Shuffle failure ", e);
+ sendError(ctx, e.getMessage(), UNAUTHORIZED);
+ return;
+ }
+
+ Map<String, MapOutputInfo> mapOutputInfoMap = new HashMap<>();
+ ChannelPipeline pipeline = channel.pipeline();
+ ShuffleHandler.TimeoutHandler timeoutHandler =
+ (ShuffleHandler.TimeoutHandler)pipeline.get(TIMEOUT_HANDLER);
+ timeoutHandler.setEnabledTimeout(false);
+ String user = handlerCtx.userRsrc.get(jobId);
+
+ try {
+ populateHeaders(mapIds, jobId, user, reduceId,
+ response, keepAliveParam, mapOutputInfoMap);
+ } catch(IOException e) {
+ LOG.error("Shuffle error while populating headers. Channel id: " +
channel.id(), e);
+ sendError(ctx, getErrorMessage(e), INTERNAL_SERVER_ERROR);
+ return;
+ }
+
+ channel.write(response);
+
+ //Initialize one ReduceContext object per channelRead call
+ boolean keepAlive = keepAliveParam ||
handlerCtx.connectionKeepAliveEnabled;
+ ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx,
+ user, mapOutputInfoMap, jobId, keepAlive);
+
+ sendMap(reduceContext);
+ }
+
+ /**
+ * Calls sendMapOutput for the mapId pointed by ReduceContext.mapsToSend
+ * and increments it. This method is first called by messageReceived()
+ * maxSessionOpenFiles times and then on the completion of every
+ * sendMapOutput operation. This limits the number of open files on a node,
+ * which can get really large(exhausting file descriptors on the NM) if all
+ * sendMapOutputs are called in one go, as was done previous to this change.
+ * @param reduceContext used to call sendMapOutput with correct params.
+ */
+ public void sendMap(ReduceContext reduceContext) {
+ LOG.trace("Executing sendMap; channel='{}'",
reduceContext.ctx.channel().id());
+ if (reduceContext.getMapsToSend().get() <
+ reduceContext.getMapIds().size()) {
+ int nextIndex = reduceContext.getMapsToSend().getAndIncrement();
+ String mapId = reduceContext.getMapIds().get(nextIndex);
+
+ try {
+ MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
+ if (info == null) {
+ info = getMapOutputInfo(mapId, reduceContext.getReduceId(),
+ reduceContext.getJobId(), reduceContext.getUser());
+ }
+ LOG.trace("Calling sendMapOutput; channel='{}'",
reduceContext.ctx.channel().id());
+ ChannelFuture nextMap = sendMapOutput(
+ reduceContext.getCtx().channel(),
+ reduceContext.getUser(), mapId,
+ reduceContext.getReduceId(), info);
+ nextMap.addListener(new ReduceMapFileCount(this, reduceContext));
+ } catch (IOException e) {
+ LOG.error("Shuffle error: {}; channel={}", e,
reduceContext.ctx.channel().id());
+
+ // It is not possible to sendError, the success HttpResponse has been
already sent
+ reduceContext.ctx.channel().close();
+ }
+ }
+ }
+
+ private String getErrorMessage(Throwable t) {
+ StringBuilder sb = new StringBuilder(t.getMessage());
+ while (t.getCause() != null) {
+ sb.append(t.getCause().getMessage());
+ t = t.getCause();
+ }
+ return sb.toString();
+ }
+
+ protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, String
jobId, String user)
+ throws IOException {
+ ShuffleHandler.AttemptPathInfo pathInfo;
+ try {
+ ShuffleHandler.AttemptPathIdentifier identifier = new
ShuffleHandler.AttemptPathIdentifier(
+ jobId, user, mapId);
+ pathInfo = handlerCtx.pathCache.get(identifier);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Retrieved pathInfo for " + identifier +
+ " check for corresponding loaded messages to determine whether" +
+ " it was loaded or cached");
+ }
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ } else {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
+ IndexRecord info =
+ handlerCtx.indexCache.getIndexInformation(mapId, reduce,
pathInfo.indexPath, user);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getMapOutputInfo: jobId=" + jobId + ", mapId=" + mapId +
+ ",dataFile=" + pathInfo.dataPath + ", indexFile=" +
+ pathInfo.indexPath);
+ LOG.debug("getMapOutputInfo: startOffset={}, partLength={} rawLength={}",
+ info.startOffset, info.partLength, info.rawLength);
+ }
+
+ return new MapOutputInfo(pathInfo.dataPath, info);
+ }
+
+ protected void populateHeaders(List<String> mapIds, String jobId,
+ String user, int reduce, HttpResponse
response,
+ boolean keepAliveParam,
+ Map<String, MapOutputInfo> mapOutputInfoMap)
+ throws IOException {
+
+ long contentLength = 0;
+ for (String mapId : mapIds) {
+ MapOutputInfo outputInfo = getMapOutputInfo(mapId, reduce, jobId, user);
+ if (mapOutputInfoMap.size() < handlerCtx.mapOutputMetaInfoCacheSize) {
+ mapOutputInfoMap.put(mapId, outputInfo);
+ }
+
+ ShuffleHeader header =
+ new ShuffleHeader(mapId, outputInfo.indexRecord.partLength,
+ outputInfo.indexRecord.rawLength, reduce);
+ DataOutputBuffer dob = new DataOutputBuffer();
+ header.write(dob);
+ contentLength += outputInfo.indexRecord.partLength;
+ contentLength += dob.getLength();
+
+ // verify file access to data file to send an actually correct http error
+ final File spillFile = new File(outputInfo.mapOutputFileName.toString());
+ RandomAccessFile r = SecureIOUtils.openForRandomRead(spillFile, "r",
user, null);
+ r.close();
+ }
+
+ // Now set the response headers.
+ setResponseHeaders(response, keepAliveParam, contentLength);
+
+ // this audit log is disabled by default,
+ // to turn it on please enable this audit log
+ // on log4j.properties by uncommenting the setting
+ if (AUDITLOG.isDebugEnabled()) {
+ StringBuilder sb = new StringBuilder("shuffle for ");
+ sb.append(jobId).append(" reducer ").append(reduce);
+ sb.append(" length ").append(contentLength);
+ if (AUDITLOG.isTraceEnabled()) {
+ // For trace level logging, append the list of mappers
+ sb.append(" mappers: ").append(mapIds);
+ AUDITLOG.trace(sb.toString());
+ } else {
+ AUDITLOG.debug(sb.toString());
+ }
+ }
+ }
+
+ protected void setResponseHeaders(HttpResponse response,
+ boolean keepAliveParam, long
contentLength) {
+ if (!handlerCtx.connectionKeepAliveEnabled && !keepAliveParam) {
+ response.headers().set(HttpHeader.CONNECTION.asString(),
CONNECTION_CLOSE);
+ } else {
+ response.headers().set(HttpHeader.CONNECTION.asString(),
+ HttpHeader.KEEP_ALIVE.asString());
+ response.headers().set(HttpHeader.KEEP_ALIVE.asString(),
+ "timeout=" + handlerCtx.connectionKeepAliveTimeOut);
+ }
+
+ // Content length must be set
(https://www.rfc-editor.org/rfc/rfc7230#section-3.3.3)
+ HttpUtil.setContentLength(response, contentLength);
+ }
+
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ static class MapOutputInfo {
+ final Path mapOutputFileName;
+ final IndexRecord indexRecord;
+
+ MapOutputInfo(Path mapOutputFileName, IndexRecord indexRecord) {
+ this.mapOutputFileName = mapOutputFileName;
+ this.indexRecord = indexRecord;
+ }
+ }
+
+ protected void verifyRequest(String appid, ChannelHandlerContext ctx,
+ HttpRequest request, HttpResponse response, URL
requestUri)
+ throws IOException {
+ SecretKey tokenSecret =
handlerCtx.secretManager.retrieveTokenSecret(appid);
+ if (null == tokenSecret) {
+ LOG.info("Request for unknown token {}, channel id: {}", appid,
ctx.channel().id());
+ throw new IOException("Could not find jobid");
+ }
+ // encrypting URL
+ String encryptedURL = SecureShuffleUtils.buildMsgFrom(requestUri);
+ // hash from the fetcher
+ String urlHashStr =
+ request.headers().get(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
+ if (urlHashStr == null) {
+ LOG.info("Missing header hash for {}, channel id: {}", appid,
ctx.channel().id());
+ throw new IOException("fetcher cannot be authenticated");
+ }
+ if (LOG.isDebugEnabled()) {
+ int len = urlHashStr.length();
+ LOG.debug("Verifying request. encryptedURL:{}, hash:{}, channel id: " +
+ "{}", encryptedURL,
+ urlHashStr.substring(len - len / 2, len - 1), ctx.channel().id());
+ }
+ // verify - throws exception
+ SecureShuffleUtils.verifyReply(urlHashStr, encryptedURL, tokenSecret);
+ // verification passed - encode the reply
+ String reply =
SecureShuffleUtils.generateHash(urlHashStr.getBytes(Charsets.UTF_8),
+ tokenSecret);
+ response.headers().set(
+ SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
+ // Put shuffle version into http header
+ response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+ if (LOG.isDebugEnabled()) {
+ int len = reply.length();
+ LOG.debug("Fetcher request verified. " +
+ "encryptedURL: {}, reply: {}, channel id: {}",
+ encryptedURL, reply.substring(len - len / 2, len - 1),
+ ctx.channel().id());
+ }
+ }
+
+ public static ByteBuf shuffleHeaderToBytes(ShuffleHeader header) throws
IOException {
+ final DataOutputBuffer dob = new DataOutputBuffer();
+ header.write(dob);
+ return wrappedBuffer(dob.getData(), 0, dob.getLength());
+ }
+
+ protected ChannelFuture sendMapOutput(Channel ch, String user, String mapId,
int reduce,
+ MapOutputInfo mapOutputInfo)
+ throws IOException {
+ final IndexRecord info = mapOutputInfo.indexRecord;
+ ch.write(shuffleHeaderToBytes(
+ new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce)));
+ final File spillFile =
+ new File(mapOutputInfo.mapOutputFileName.toString());
+ RandomAccessFile spill = SecureIOUtils.openForRandomRead(spillFile, "r",
user, null);
Review Comment:
This should not be closed?
> ShuffleHandler is not working correctly in SSL mode after the Netty 4 upgrade
> -----------------------------------------------------------------------------
>
> Key: MAPREDUCE-7431
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-7431
> Project: Hadoop Map/Reduce
> Issue Type: Improvement
> Affects Versions: 3.4.0
> Reporter: Tamas Domok
> Assignee: Tamas Domok
> Priority: Major
> Labels: pull-request-available
> Attachments: chunked-fileregion.txt, chunked.txt,
> normal-fileregion.txt, normal.txt, sendMapPipeline.png
>
>
> HADOOP-15327 introduced some regressions in the ShuffleHandler.
> h3. 1. a memory leak
> {code:java}
> ERROR io.netty.util.ResourceLeakDetector: LEAK: ByteBuf.release() was not
> called before it's garbage-collected. See
> https://netty.io/wiki/reference-counted-objects.html for more information.
> {code}
>
> The Shuffle's channelRead didn't release the message properly, the fix would
> be this:
> {code:java}
> try {
> // ....
> } finally {
> ReferenceCountUtil.release(msg);
> }
> {code}
> Or even simpler:
> {code:java}
> extends SimpleChannelInboundHandler<FullHttpRequest>
> {code}
> h3. 1. a bug in SSL mode with more than 1 reducers
> It manifested in multiple errors:
> {code:java}
> ERROR org.apache.hadoop.mapred.ShuffleHandler: Future is unsuccessful. Cause:
> java.io.IOException: Broken pipe
> ERROR org.apache.hadoop.mapred.ShuffleHandler: Future is unsuccessful. Cause:
> java.nio.channels.ClosedChannelException
> // if the reducer memory was not enough, then even this:
> Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in
> shuffle in fetcher#2
> at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:136)
> at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:377)
> at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)
> Caused by: java.lang.OutOfMemoryError: Java heap space
> at
> org.apache.hadoop.io.compress.BlockDecompressorStream.getCompressedData(BlockDecompressorStream.java:123)
> at
> org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:98)
> at
> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
> at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:210)
> at
> org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.doShuffle(InMemoryMapOutput.java:91)
> {code}
> *Configuration* - mapred-site.xml
> {code:java}
> mapreduce.shuffle.ssl.enabled=true
> {code}
> Alternative is to build a custom jar where *FadvisedFileRegion* is replaced
> with *FadvisedChunkedFile* in {*}sendMapOutput{*}.
> *Reproduction*
> {code:java}
> hdfs dfs -rm -r -skipTrash /tmp/sort_input
> hdfs dfs -rm -r -skipTrash /tmp/sort_output
> yarn jar
> hadoop-3.4.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0-SNAPSHOT.jar
> randomwriter "-Dmapreduce.randomwriter.totalbytes=10000000000"
> /tmp/sort_input
> yarn jar
> hadoop-3.4.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0-SNAPSHOT.jar
> sort -Dmapreduce.job.reduce.slowstart.completedmaps=1 -r 40 /tmp/sort_input
> /tmp/sort_output | tee sort_app_output.txt
> {code}
> h3. ShuffleHandler's protocol
> {code:java}
> // HTTP Request
> GET
> /mapOutput?job=job_1672901779104_0001&reduce=0&map=attempt_1672901779104_0001_m_000003_0,attempt_1672901779104_0001_m_000002_0,attempt_1672901779104_0001_m_000001_0,attempt_1672901779104_0001_m_000000_0,attempt_1672901779104_0001_m_000005_0,attempt_1672901779104_0001_m_000012_0,attempt_1672901779104_0001_m_000009_0,attempt_1672901779104_0001_m_000010_0,attempt_1672901779104_0001_m_000007_0,attempt_1672901779104_0001_m_000011_0,attempt_1672901779104_0001_m_000008_0,attempt_1672901779104_0001_m_000013_0,attempt_1672901779104_0001_m_000014_0,attempt_1672901779104_0001_m_000015_0,attempt_1672901779104_0001_m_000019_0,attempt_1672901779104_0001_m_000018_0,attempt_1672901779104_0001_m_000016_0,attempt_1672901779104_0001_m_000017_0,attempt_1672901779104_0001_m_000020_0,attempt_1672901779104_0001_m_000023_0
> HTTP/1.1
> + keep alive headers
> // HTTP Response Headers
> content-length=sum(serialised ShuffleHeader in bytes + MapOutput size)
> + keep alive headers
> // Response Data (transfer-encoding=chunked)
> serialised ShuffleHeader
> content of the MapOutput file (start offset - length)
> serialised ShuffleHeader
> content of the MapOutput file (start offset - length)
> serialised ShuffleHeader
> content of the MapOutput file (start offset - length)
> serialised ShuffleHeader
> content of the MapOutput file (start offset - length)
> ...
> LastHttpContent
> // close socket if no keep-alive
> {code}
> h3. Issues
> - {*}setResponseHeaders{*}: did not always set the the content-length, also
> the transfer-encoding=chunked header was missing.
> - {*}ReduceMapFileCount.operationComplete{*}: messed up the futures on the
> LastHttpContent
> - {*}ChannelGroup accepted{*}: is only used to close the channels, no need
> for that magic 5. See the details
> [here|https://netty.io/4.0/api/io/netty/channel/group/ChannelGroup.html].
> - {*}bossGroup{*}: should have only 1 thread for accepting connections.
> - {*}Shuffle{*}: is unnecessarily Sharable, the 3 async sendMap / channel
> (see below) caused future errors when using FadvisedChunkedFile
> h3. Max session open files is not an optimisation, it's actually wasting
> resources
> {code:java}
> // by default maxSessionOpenFiles = 3
> for (int i = 0; i < Math.min(handlerCtx.maxSessionOpenFiles,
> mapIds.size()); i++) {
> ChannelFuture nextMap = sendMap(reduceContext);
> if(nextMap == null) {
> return;
> }
> }
> {code}
> !sendMapPipeline.png!
> At the end of the day, we create a http chunked stream, there is no need to
> run 3 sendMap async, the futures will finish one-by-one sequentially. The
> osCache magic from the FAdvised classes won't happen either, because the
> first readChunk will be called only later.
> So this can be simplified a lot:
> {code:java}
> sendMap(reduceContext);
> {code}
> h3. My proposal
> Some refactoring: ShuffleHandler is split into multiple classes to make it
> possible to remove the sharable annotation.
> - ShuffleChannel
> - ShuffleChannelInitializer
> - ShuffleChannelHandlerContext
> - ShuffleChannelHandler
> TODO:
> - fix/drop/refactor the existing unit tests
> - add proper unit test that tests SSL/non-SSL mode where the response data
> is properly verified
> - documentation about the protocol
> WIP:
> [github.com/tomicooler/hadoop|https://github.com/tomicooler/hadoop/commit/3bc027598aea4a3b02a1997fe5d485b9a6e5c41e]
> h3. Netty useful docs
> * [User guide for 4.x|https://netty.io/wiki/user-guide-for-4.x.html]
> * [New and noteworthy in
> 4.0|https://netty.io/wiki/new-and-noteworthy-in-4.0.html]
> * [Reference counted
> objects|https://netty.io/wiki/reference-counted-objects.html] (it will be
> changed in [Netty 5|https://netty.io/wiki/new-and-noteworthy-in-5.0.html])
> * HttpStaticFileServer
> [example|https://github.com/netty/netty/blob/4.1/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]