Looks like there's slowness in sending shuffle files, maybe one executor get overwhelmed with all the other executors trying to pull data? Try lifting `spark.network.timeout` further, we ourselves had to push it to 600s from the default 120s
On Thu, Sep 28, 2017 at 10:19 AM, Ilya Karpov <i.kar...@cleverdata.ru> wrote: > Hi, > I see strange behaviour in my job, and can’t understand what is wrong: > the stage that uses shuffle data as an input job fails number of times > because of org.apache.spark.shuffle.FetchFailedException seen in spark UI: > FetchFailed(BlockManagerId(8, hostname, 11431, None), shuffleId=1, > mapId=50192, reduceId=12698, message= > FetchFailed(BlockManagerId(8, hostname, 11431, None), shuffleId=1, > mapId=3, reduceId=12699, message= > > Digging in logs I found a scenario of task failure: > 1. some shuffle-server-X-Y (important note: external shuffle service is > OFF) report 'Broken pipe’ at 2017-09-26T05:40:26.484Z > java.io.IOException: Broken pipe > at sun.nio.ch.FileChannelImpl.transferTo0(Native Method) > at sun.nio.ch.FileChannelImpl.transferToDirectlyInternal( > FileChannelImpl.java:428) > at sun.nio.ch.FileChannelImpl.transferToDirectly( > FileChannelImpl.java:493) > at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:608) > at io.netty.channel.DefaultFileRegion.transferTo( > DefaultFileRegion.java:139) > at org.apache.spark.network.protocol.MessageWithHeader. > transferTo(MessageWithHeader.java:121) > at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion( > NioSocketChannel.java:287) > at io.netty.channel.nio.AbstractNioByteChannel.doWrite( > AbstractNioByteChannel.java:237) > at io.netty.channel.socket.nio.NioSocketChannel.doWrite( > NioSocketChannel.java:314) > at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0( > AbstractChannel.java:802) > at io.netty.channel.nio.AbstractNioChannel$ > AbstractNioUnsafe.flush0(AbstractNioChannel.java:313) > at io.netty.channel.AbstractChannel$AbstractUnsafe.flush( > AbstractChannel.java:770) > > and "chunk send" errors: > Error sending result > ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=65546478185, > chunkIndex=0}, buffer=FileSegmentManagedBuffer{file= > /data/1/yarn/nm/usercache/hdfs/appcache/application_ > 1505206571245_2989/blockmgr-9be47304-ffe2-443a-bb10- > 33a89928f5b9/04/shuffle_1_3_0.data, offset=40858881, length=3208}} to > /someClientIP:somePort; closing connection > with exceptions: > java.nio.channels.ClosedChannelException > at io.netty.channel.AbstractChannel$AbstractUnsafe.write(...)(Unknown > Source) > > 2. then client of this shuffle-server complains with: > Connection to some-hostname/someIP:port has been quiet for 240000 ms while > there are outstanding requests. Assuming connection is dead; please adjust > spark.network.timeout if this is wrong. > and then > Still have 3386 requests outstanding when connection from > some-hostname/someIP:11431 is closed > and then > java.io.IOException: Connection from > shuffleServerHostname/shuffleServerIP:port > closed > at org.apache.spark.network.client.TransportResponseHandler. > channelInactive(TransportResponseHandler.java:146) > at org.apache.spark.network.server.TransportChannelHandler. > channelInactive(TransportChannelHandler.java:108) > at io.netty.channel.AbstractChannelHandlerContext. > invokeChannelInactive(AbstractChannelHandlerContext.java:241) > at io.netty.channel.AbstractChannelHandlerContext. > invokeChannelInactive(AbstractChannelHandlerContext.java:227) > at io.netty.channel.AbstractChannelHandlerContext. > fireChannelInactive(AbstractChannelHandlerContext.java:220) > at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive( > ChannelInboundHandlerAdapter.java:75) > at io.netty.handler.timeout.IdleStateHandler.channelInactive( > IdleStateHandler.java:278) > at io.netty.channel.AbstractChannelHandlerContext. > invokeChannelInactive(AbstractChannelHandlerContext.java:241) > at io.netty.channel.AbstractChannelHandlerContext. > invokeChannelInactive(AbstractChannelHandlerContext.java:227) > at io.netty.channel.AbstractChannelHandlerContext. > fireChannelInactive(AbstractChannelHandlerContext.java:220) > at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive( > ChannelInboundHandlerAdapter.java:75) > at io.netty.channel.AbstractChannelHandlerContext. > invokeChannelInactive(AbstractChannelHandlerContext.java:241) > at io.netty.channel.AbstractChannelHandlerContext. > invokeChannelInactive(AbstractChannelHandlerContext.java:227) > at io.netty.channel.AbstractChannelHandlerContext. > fireChannelInactive(AbstractChannelHandlerContext.java:220) > at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive( > ChannelInboundHandlerAdapter.java:75) > > this fails tasks and stage for several times. And finally job is failed. > > I also see retry messages: Failed to fetch block shuffle_1_3_12682, and > will not retry (5 retries). > > Such a failures occur on different hosts. > > I can’t say that we were experiencing any network connectivity issues, > node failures or smth similar. Seems that connection was dropped by some > spark internal mechanisms. > > Any guesses what was the reason is appreciated! > > Spark 2.2.0 > > Spark config: > --num-executors 39 > \ > --conf spark.dynamicAllocation.enabled=false > --conf spark.shuffle.service.enabled=false > \ > --conf spark.driver.cores=1 > --conf spark.driver.memory=8g > --conf spark.yarn.driver.memoryOverhead=2048 > --conf spark.driver.maxResultSize=2g > \ > --conf spark.executor.cores=2 > --conf spark.executor.memory=6g > --conf spark.executor.extraJavaOptions="-XX:+UseG1GC -XX:-ResizePLAB > -XX:ConcGCThreads=6 -XX:ParallelGCThreads=8 -XX: > InitiatingHeapOccupancyPercent=30" > --conf spark.yarn.executor.memoryOverhead=4096 > \ > --conf spark.memory.fraction=0.8 > --conf spark.memory.storageFraction=0.0 > \ > --conf spark.eventLog.compress=true > --conf spark.eventLog.enabled=true > --conf spark.eventLog.overwrite=true > \ > --conf spark.logConf=true > --conf spark.logLineage=true > \ > --conf spark.yarn.historyServer.address=address1 > --conf spark.eventLog.dir=address2 > \ > --conf spark.reducer.maxReqsInFlight=10 > --conf spark.shuffle.io.maxRetries=5 > --conf spark.network.timeout=240 > > > Input shuffle size: 2.6 TB > Partitions in stage: 20480 and 12768 were completed successfully. > > -- > Ilya Karpov > Developer > > CleverDATA > make your data clever > > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >