Hi,
I see strange behaviour in my job, and can’t understand what is wrong:
the stage that uses shuffle data as an input job fails number of times because
of org.apache.spark.shuffle.FetchFailedException seen in spark UI:
FetchFailed(BlockManagerId(8, hostname, 11431, None), shuffleId=1, mapId=50192,
reduceId=12698, message=
FetchFailed(BlockManagerId(8, hostname, 11431, None), shuffleId=1, mapId=3,
reduceId=12699, message=
Digging in logs I found a scenario of task failure:
1. some shuffle-server-X-Y (important note: external shuffle service is OFF)
report 'Broken pipe’ at 2017-09-26T05:40:26.484Z
java.io.IOException: Broken pipe
at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
at
sun.nio.ch.FileChannelImpl.transferToDirectlyInternal(FileChannelImpl.java:428)
at
sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:493)
at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:608)
at
io.netty.channel.DefaultFileRegion.transferTo(DefaultFileRegion.java:139)
at
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:121)
at
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:287)
at
io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:237)
at
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:314)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:802)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:313)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:770)
and "chunk send" errors:
Error sending result
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=65546478185,
chunkIndex=0},
buffer=FileSegmentManagedBuffer{file=/data/1/yarn/nm/usercache/hdfs/appcache/application_1505206571245_2989/blockmgr-9be47304-ffe2-443a-bb10-33a89928f5b9/04/shuffle_1_3_0.data,
offset=40858881, length=3208}} to /someClientIP:somePort; closing connection
with exceptions:
java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(...)(Unknown
Source)
2. then client of this shuffle-server complains with:
Connection to some-hostname/someIP:port has been quiet for 240000 ms while
there are outstanding requests. Assuming connection is dead; please adjust
spark.network.timeout if this is wrong.
and then
Still have 3386 requests outstanding when connection from
some-hostname/someIP:11431 is closed
and then
java.io.IOException: Connection from shuffleServerHostname/shuffleServerIP:port
closed
at
org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146)
at
org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:108)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at
io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:278)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
this fails tasks and stage for several times. And finally job is failed.
I also see retry messages: Failed to fetch block shuffle_1_3_12682, and will
not retry (5 retries).
Such a failures occur on different hosts.
I can’t say that we were experiencing any network connectivity issues, node
failures or smth similar. Seems that connection was dropped by some spark
internal mechanisms.
Any guesses what was the reason is appreciated!
Spark 2.2.0
Spark config:
--num-executors 39
\
--conf spark.dynamicAllocation.enabled=false
--conf spark.shuffle.service.enabled=false
\
--conf spark.driver.cores=1
--conf spark.driver.memory=8g
--conf spark.yarn.driver.memoryOverhead=2048
--conf spark.driver.maxResultSize=2g
\
--conf spark.executor.cores=2
--conf spark.executor.memory=6g
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC -XX:-ResizePLAB
-XX:ConcGCThreads=6 -XX:ParallelGCThreads=8
-XX:InitiatingHeapOccupancyPercent=30"
--conf spark.yarn.executor.memoryOverhead=4096
\
--conf spark.memory.fraction=0.8
--conf spark.memory.storageFraction=0.0
\
--conf spark.eventLog.compress=true
--conf spark.eventLog.enabled=true
--conf spark.eventLog.overwrite=true
\
--conf spark.logConf=true
--conf spark.logLineage=true
\
--conf spark.yarn.historyServer.address=address1
--conf spark.eventLog.dir=address2
\
--conf spark.reducer.maxReqsInFlight=10
--conf spark.shuffle.io.maxRetries=5
--conf spark.network.timeout=240
Input shuffle size: 2.6 TB
Partitions in stage: 20480 and 12768 were completed successfully.
--
Ilya Karpov
Developer
CleverDATA
make your data clever
---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]