Shuffle FileNotFound Exception
Hey, I’m wondering if anyone has run into issues with Spark 1.5 and a FileNotFound exception with shuffle.index files? It’s been cropping up with very large joins and aggregations, and causing all of our jobs to fail towards the end. The memory limit for the executors (we’re running on mesos) is touching 60GB+ with ~10 cores per executor, which is way oversubscribed. We’re running spark inside containers, and have configured “spark.executorEnv.SPARK_LOCAL_DIRS” to a special directory path in the container for performance/disk reasons, and since then the issue started to arise. I’m wondering if there’s a bug with the way spark looks for shuffle files, and one of the implementations isn’t obeying the path properly? I don’t want to set "spark.local.dir” because that requires the driver also have this directory set up, which is not the case. Has anyone seen this issue before? 15/11/18 11:32:27 ERROR storage.ShuffleBlockFetcherIterator: Failed to get block(s) from XXX:50777 java.lang.RuntimeException: java.io.FileNotFoundException: /mnt/mesos/sandbox/spark-tmp/blockmgr-7a5b85c8-7c5f-43c2-b9e2-2bc0ffdb902d/23/shuffle_2_94_0.index (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.(FileInputStream.java:146) at org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:98) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:300) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:162) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:103) at
Re: Shuffle FileNotFound Exception
Hi Romi, Thanks! Could you give me an indication of how much increase the partitions by? We’ll take a stab in the dark, the input data is around 5M records (though each record is fairly small). We’ve had trouble both with DataFrames and RDDs. Tom. > On 18 Nov 2015, at 12:04, Romi Kuntsman <r...@totango.com> wrote: > > I had many issues with shuffles (but not this one exactly), and what > eventually solved it was to repartition to input into more parts. Have you > tried that? > > P.S. not sure if related, but there's a memory leak in the shuffle mechanism > https://issues.apache.org/jira/browse/SPARK-11293 > <https://issues.apache.org/jira/browse/SPARK-11293> > > Romi Kuntsman, Big Data Engineer > http://www.totango.com <http://www.totango.com/> > > On Wed, Nov 18, 2015 at 2:00 PM, Tom Arnfeld <t...@duedil.com > <mailto:t...@duedil.com>> wrote: > Hey, > > I’m wondering if anyone has run into issues with Spark 1.5 and a FileNotFound > exception with shuffle.index files? It’s been cropping up with very large > joins and aggregations, and causing all of our jobs to fail towards the end. > The memory limit for the executors (we’re running on mesos) is touching 60GB+ > with ~10 cores per executor, which is way oversubscribed. > > We’re running spark inside containers, and have configured > “spark.executorEnv.SPARK_LOCAL_DIRS” to a special directory path in the > container for performance/disk reasons, and since then the issue started to > arise. I’m wondering if there’s a bug with the way spark looks for shuffle > files, and one of the implementations isn’t obeying the path properly? > > I don’t want to set "spark.local.dir” because that requires the driver also > have this directory set up, which is not the case. > > Has anyone seen this issue before? > > > > 15/11/18 11:32:27 ERROR storage.ShuffleBlockFetcherIterator: Failed to get > block(s) from XXX:50777 > java.lang.RuntimeException: java.io.FileNotFoundException: > /mnt/mesos/sandbox/spark-tmp/blockmgr-7a5b85c8-7c5f-43c2-b9e2-2bc0ffdb902d/23/shuffle_2_94_0.index > (No such file or directory) >at java.io.FileInputStream.open(Native Method) >at java.io.FileInputStream.(FileInputStream.java:146) >at > org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:98) >at > org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:300) >at > org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >at > org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >at > org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) >at > org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114) >at > org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87) >at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101) >at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) >at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) >at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) >at > io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) >at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) >at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) >at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) >at > io.netty.channel