[ https://issues.apache.org/jira/browse/MAPREDUCE-7431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Shilun Fan updated MAPREDUCE-7431: ---------------------------------- Component/s: mrv2 > ShuffleHandler is not working correctly in SSL mode after the Netty 4 upgrade > ----------------------------------------------------------------------------- > > Key: MAPREDUCE-7431 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-7431 > Project: Hadoop Map/Reduce > Issue Type: Improvement > Components: mrv2 > Affects Versions: 3.4.0 > Reporter: Tamas Domok > Assignee: Tamas Domok > Priority: Major > Labels: pull-request-available > Fix For: 3.4.0 > > Attachments: chunked-fileregion.txt, chunked.txt, > normal-fileregion.txt, normal.txt, sendMapPipeline.png > > > HADOOP-15327 introduced some regressions in the ShuffleHandler. > h3. 1. a memory leak > {code:java} > ERROR io.netty.util.ResourceLeakDetector: LEAK: ByteBuf.release() was not > called before it's garbage-collected. See > https://netty.io/wiki/reference-counted-objects.html for more information. > {code} > > The Shuffle's channelRead didn't release the message properly, the fix would > be this: > {code:java} > try { > // .... > } finally { > ReferenceCountUtil.release(msg); > } > {code} > Or even simpler: > {code:java} > extends SimpleChannelInboundHandler<FullHttpRequest> > {code} > h3. 1. a bug in SSL mode with more than 1 reducers > It manifested in multiple errors: > {code:java} > ERROR org.apache.hadoop.mapred.ShuffleHandler: Future is unsuccessful. Cause: > java.io.IOException: Broken pipe > ERROR org.apache.hadoop.mapred.ShuffleHandler: Future is unsuccessful. Cause: > java.nio.channels.ClosedChannelException > // if the reducer memory was not enough, then even this: > Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in > shuffle in fetcher#2 > at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:136) > at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:377) > at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898) > at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168) > Caused by: java.lang.OutOfMemoryError: Java heap space > at > org.apache.hadoop.io.compress.BlockDecompressorStream.getCompressedData(BlockDecompressorStream.java:123) > at > org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:98) > at > org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105) > at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:210) > at > org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.doShuffle(InMemoryMapOutput.java:91) > {code} > *Configuration* - mapred-site.xml > {code:java} > mapreduce.shuffle.ssl.enabled=true > {code} > Alternative is to build a custom jar where *FadvisedFileRegion* is replaced > with *FadvisedChunkedFile* in {*}sendMapOutput{*}. > *Reproduction* > {code:java} > hdfs dfs -rm -r -skipTrash /tmp/sort_input > hdfs dfs -rm -r -skipTrash /tmp/sort_output > yarn jar > hadoop-3.4.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0-SNAPSHOT.jar > randomwriter "-Dmapreduce.randomwriter.totalbytes=10000000000" > /tmp/sort_input > yarn jar > hadoop-3.4.0-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0-SNAPSHOT.jar > sort -Dmapreduce.job.reduce.slowstart.completedmaps=1 -r 40 /tmp/sort_input > /tmp/sort_output | tee sort_app_output.txt > {code} > h3. ShuffleHandler's protocol > {code:java} > // HTTP Request > GET > /mapOutput?job=job_1672901779104_0001&reduce=0&map=attempt_1672901779104_0001_m_000003_0,attempt_1672901779104_0001_m_000002_0,attempt_1672901779104_0001_m_000001_0,attempt_1672901779104_0001_m_000000_0,attempt_1672901779104_0001_m_000005_0,attempt_1672901779104_0001_m_000012_0,attempt_1672901779104_0001_m_000009_0,attempt_1672901779104_0001_m_000010_0,attempt_1672901779104_0001_m_000007_0,attempt_1672901779104_0001_m_000011_0,attempt_1672901779104_0001_m_000008_0,attempt_1672901779104_0001_m_000013_0,attempt_1672901779104_0001_m_000014_0,attempt_1672901779104_0001_m_000015_0,attempt_1672901779104_0001_m_000019_0,attempt_1672901779104_0001_m_000018_0,attempt_1672901779104_0001_m_000016_0,attempt_1672901779104_0001_m_000017_0,attempt_1672901779104_0001_m_000020_0,attempt_1672901779104_0001_m_000023_0 > HTTP/1.1 > + keep alive headers > // HTTP Response Headers > content-length=sum(serialised ShuffleHeader in bytes + MapOutput size) > + keep alive headers > // Response Data (transfer-encoding=chunked) > serialised ShuffleHeader > content of the MapOutput file (start offset - length) > serialised ShuffleHeader > content of the MapOutput file (start offset - length) > serialised ShuffleHeader > content of the MapOutput file (start offset - length) > serialised ShuffleHeader > content of the MapOutput file (start offset - length) > ... > LastHttpContent > // close socket if no keep-alive > {code} > h3. Issues > - {*}setResponseHeaders{*}: did not always set the the content-length, also > the transfer-encoding=chunked header was missing. > - {*}ReduceMapFileCount.operationComplete{*}: messed up the futures on the > LastHttpContent > - {*}ChannelGroup accepted{*}: is only used to close the channels, no need > for that magic 5. See the details > [here|https://netty.io/4.0/api/io/netty/channel/group/ChannelGroup.html]. > - {*}bossGroup{*}: should have only 1 thread for accepting connections. > - {*}Shuffle{*}: is unnecessarily Sharable, the 3 async sendMap / channel > (see below) caused future errors when using FadvisedChunkedFile > h3. Max session open files is not an optimisation, it's actually wasting > resources > {code:java} > // by default maxSessionOpenFiles = 3 > for (int i = 0; i < Math.min(handlerCtx.maxSessionOpenFiles, > mapIds.size()); i++) { > ChannelFuture nextMap = sendMap(reduceContext); > if(nextMap == null) { > return; > } > } > {code} > !sendMapPipeline.png! > At the end of the day, we create a http chunked stream, there is no need to > run 3 sendMap async, the futures will finish one-by-one sequentially. The > osCache magic from the FAdvised classes won't happen either, because the > first readChunk will be called only later. > So this can be simplified a lot: > {code:java} > sendMap(reduceContext); > {code} > h3. My proposal > Some refactoring: ShuffleHandler is split into multiple classes to make it > possible to remove the sharable annotation. > - ShuffleChannel > - ShuffleChannelInitializer > - ShuffleChannelHandlerContext > - ShuffleChannelHandler > TODO: > - fix/drop/refactor the existing unit tests > - add proper unit test that tests SSL/non-SSL mode where the response data > is properly verified > - documentation about the protocol > WIP: > [github.com/tomicooler/hadoop|https://github.com/tomicooler/hadoop/commit/3bc027598aea4a3b02a1997fe5d485b9a6e5c41e] > h3. Netty useful docs > * [User guide for 4.x|https://netty.io/wiki/user-guide-for-4.x.html] > * [New and noteworthy in > 4.0|https://netty.io/wiki/new-and-noteworthy-in-4.0.html] > * [Reference counted > objects|https://netty.io/wiki/reference-counted-objects.html] (it will be > changed in [Netty 5|https://netty.io/wiki/new-and-noteworthy-in-5.0.html]) > * HttpStaticFileServer > [example|https://github.com/netty/netty/blob/4.1/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java] -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org