Tamas Domok created MAPREDUCE-7431: -------------------------------------- Summary: ShuffleHandler is not working correctly in SSL mode after the Netty 4 upgrade Key: MAPREDUCE-7431 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7431 Project: Hadoop Map/Reduce Issue Type: Improvement Affects Versions: 3.4.0 Reporter: Tamas Domok Attachments: sendMapPipeline.png
HADOOP-15327 introduced some regressions in the ShuffleHandler. h3. 1. a memory leak {code:java} ERROR io.netty.util.ResourceLeakDetector: LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information. {code} The Shuffle's channelRead didn't release the message properly, the fix would be this: {code:java} try { // .... } finally { ReferenceCountUtil.release(msg); } {code} Or even simpler: {code:java} extends SimpleChannelInboundHandler<FullHttpRequest> {code} h3. 1. a bug in SSL mode with more than 1 reducers It manifested in multiple errors: {code:java} ERROR org.apache.hadoop.mapred.ShuffleHandler: Future is unsuccessful. Cause: java.io.IOException: Broken pipe ERROR org.apache.hadoop.mapred.ShuffleHandler: Future is unsuccessful. Cause: java.nio.channels.ClosedChannelException // if the reducer memory was not enough, then even this: Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#2 at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:136) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:377) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168) Caused by: java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.io.compress.BlockDecompressorStream.getCompressedData(BlockDecompressorStream.java:123) at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:98) at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105) at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:210) at org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.doShuffle(InMemoryMapOutput.java:91) {code} *Configuration* - mapred-site.xml {code:java} mapreduce.shuffle.ssl.enabled=true {code} Alternative is to build a custom jar where *FadvisedFileRegion* is replaced with *FadvisedChunkedFile* in {*}sendMapOutput{*}. *Reproduction* {code:java} hdfs dfs -rm -r -skipTrash /tmp/sort_input hdfs dfs -rm -r -skipTrash /tmp/sort_output yarn jar hadoop-3.4.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0-SNAPSHOT.jar randomwriter "-Dmapreduce.randomwriter.totalbytes=10000000000" /tmp/sort_input yarn jar hadoop-3.4.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0-SNAPSHOT.jar sort -Dmapreduce.job.reduce.slowstart.completedmaps=1 -r 40 /tmp/sort_input /tmp/sort_output | tee sort_app_output.txt {code} h3. ShuffleHandler's protocol {code:java} // HTTP Request GET /mapOutput?job=job_1672901779104_0001&reduce=0&map=attempt_1672901779104_0001_m_000003_0,attempt_1672901779104_0001_m_000002_0,attempt_1672901779104_0001_m_000001_0,attempt_1672901779104_0001_m_000000_0,attempt_1672901779104_0001_m_000005_0,attempt_1672901779104_0001_m_000012_0,attempt_1672901779104_0001_m_000009_0,attempt_1672901779104_0001_m_000010_0,attempt_1672901779104_0001_m_000007_0,attempt_1672901779104_0001_m_000011_0,attempt_1672901779104_0001_m_000008_0,attempt_1672901779104_0001_m_000013_0,attempt_1672901779104_0001_m_000014_0,attempt_1672901779104_0001_m_000015_0,attempt_1672901779104_0001_m_000019_0,attempt_1672901779104_0001_m_000018_0,attempt_1672901779104_0001_m_000016_0,attempt_1672901779104_0001_m_000017_0,attempt_1672901779104_0001_m_000020_0,attempt_1672901779104_0001_m_000023_0 HTTP/1.1 + keep alive headers // HTTP Response Headers content-length=sum(serialised ShuffleHeader in bytes + MapOutput size) + keep alive headers // Response Data (transfer-encoding=chunked) serialised ShuffleHeader content of the MapOutput file (start offset - length) serialised ShuffleHeader content of the MapOutput file (start offset - length) serialised ShuffleHeader content of the MapOutput file (start offset - length) serialised ShuffleHeader content of the MapOutput file (start offset - length) ... LastHttpContent // close socket if no keep-alive {code} h3. Issues - {*}setResponseHeaders{*}: did not always set the the content-length, also the transfer-encoding=chunked header was missing. - {*}ReduceMapFileCount.operationComplete{*}: messed up the futures on the LastHttpContent - {*}ChannelGroup accepted{*}: is only used to close the channels, no need for that magic 5. See the details [here|https://netty.io/4.0/api/io/netty/channel/group/ChannelGroup.html]. - {*}bossGroup{*}: should have only 1 thread for accepting connections. - {*}Shuffle{*}: is unnecessarily Sharable, the 3 async sendMap / channel (see below) caused future errors when using FadvisedChunkedFile h3. Max session open files is not an optimisation, it's actually wasting resources {code:java} // by default maxSessionOpenFiles = 3 for (int i = 0; i < Math.min(handlerCtx.maxSessionOpenFiles, mapIds.size()); i++) { ChannelFuture nextMap = sendMap(reduceContext); if(nextMap == null) { return; } } {code} !sendMapPipeline.png! At the end of the day, we create a http chunked stream, there is no need to run 3 sendMap async, the futures will finish one-by-one sequentially. The osCache magic from the FAdvised classes won't happen either, because the first readChunk will be called only later. So this can be simplified a lot: {code:java} sendMap(reduceContext); {code} h3. My proposal Some refactoring: ShuffleHandler is split into multiple classes to make it possible to remove the sharable annotation. - ShuffleChannel - ShuffleChannelInitializer - ShuffleChannelHandlerContext - ShuffleChannelHandler TODO: - fix/drop/refactor the existing unit tests - add proper unit test that tests SSL/non-SSL mode where the response data is properly verified - documentation about the protocol WIP: [github.com/tomicooler/hadoop|https://github.com/tomicooler/hadoop/commit/3bc027598aea4a3b02a1997fe5d485b9a6e5c41e] h3. Netty useful docs * [User guide for 4.x|https://netty.io/wiki/user-guide-for-4.x.html] * [New and noteworthy in 4.0|https://netty.io/wiki/new-and-noteworthy-in-4.0.html] * [Reference counted objects|https://netty.io/wiki/reference-counted-objects.html] (it will be changed in [Netty 5|https://netty.io/wiki/new-and-noteworthy-in-5.0.html]) * HttpStaticFileServer [example|https://github.com/netty/netty/blob/4.1/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java] -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org