Shuffle FileNotFound Exception

2015-11-18 Thread Tom Arnfeld
Hey,

I’m wondering if anyone has run into issues with Spark 1.5 and a FileNotFound 
exception with shuffle.index files? It’s been cropping up with very large joins 
and aggregations, and causing all of our jobs to fail towards the end. The 
memory limit for the executors (we’re running on mesos) is touching 60GB+ with 
~10 cores per executor, which is way oversubscribed.

We’re running spark inside containers, and have configured 
“spark.executorEnv.SPARK_LOCAL_DIRS” to a special directory path in the 
container for performance/disk reasons, and since then the issue started to 
arise. I’m wondering if there’s a bug with the way spark looks for shuffle 
files, and one of the implementations isn’t obeying the path properly?

I don’t want to set "spark.local.dir” because that requires the driver also 
have this directory set up, which is not the case.

Has anyone seen this issue before?



15/11/18 11:32:27 ERROR storage.ShuffleBlockFetcherIterator: Failed to get 
block(s) from XXX:50777
java.lang.RuntimeException: java.io.FileNotFoundException: 
/mnt/mesos/sandbox/spark-tmp/blockmgr-7a5b85c8-7c5f-43c2-b9e2-2bc0ffdb902d/23/shuffle_2_94_0.index
 (No such file or directory)
   at java.io.FileInputStream.open(Native Method)
   at java.io.FileInputStream.(FileInputStream.java:146)
   at 
org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:98)
   at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:300)
   at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
   at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
   at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
   at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
   at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
   at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
   at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
   at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
   at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
   at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
   at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
   at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
   at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
   at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
   at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
   at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
   at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
   at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
   at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
   at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
   at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
   at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
   at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
   at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
   at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
   at java.lang.Thread.run(Thread.java:745)

   at 
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:162)
   at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:103)
   at 

Re: Shuffle FileNotFound Exception

2015-11-18 Thread Tom Arnfeld
Hi Romi,

Thanks! Could you give me an indication of how much increase the partitions by? 
We’ll take a stab in the dark, the input data is around 5M records (though each 
record is fairly small). We’ve had trouble both with DataFrames and RDDs.

Tom.

> On 18 Nov 2015, at 12:04, Romi Kuntsman <r...@totango.com> wrote:
> 
> I had many issues with shuffles (but not this one exactly), and what 
> eventually solved it was to repartition to input into more parts. Have you 
> tried that?
> 
> P.S. not sure if related, but there's a memory leak in the shuffle mechanism
> https://issues.apache.org/jira/browse/SPARK-11293 
> <https://issues.apache.org/jira/browse/SPARK-11293>
> 
> Romi Kuntsman, Big Data Engineer
> http://www.totango.com <http://www.totango.com/>
> 
> On Wed, Nov 18, 2015 at 2:00 PM, Tom Arnfeld <t...@duedil.com 
> <mailto:t...@duedil.com>> wrote:
> Hey,
> 
> I’m wondering if anyone has run into issues with Spark 1.5 and a FileNotFound 
> exception with shuffle.index files? It’s been cropping up with very large 
> joins and aggregations, and causing all of our jobs to fail towards the end. 
> The memory limit for the executors (we’re running on mesos) is touching 60GB+ 
> with ~10 cores per executor, which is way oversubscribed.
> 
> We’re running spark inside containers, and have configured 
> “spark.executorEnv.SPARK_LOCAL_DIRS” to a special directory path in the 
> container for performance/disk reasons, and since then the issue started to 
> arise. I’m wondering if there’s a bug with the way spark looks for shuffle 
> files, and one of the implementations isn’t obeying the path properly?
> 
> I don’t want to set "spark.local.dir” because that requires the driver also 
> have this directory set up, which is not the case.
> 
> Has anyone seen this issue before?
> 
> 
> 
> 15/11/18 11:32:27 ERROR storage.ShuffleBlockFetcherIterator: Failed to get 
> block(s) from XXX:50777
> java.lang.RuntimeException: java.io.FileNotFoundException: 
> /mnt/mesos/sandbox/spark-tmp/blockmgr-7a5b85c8-7c5f-43c2-b9e2-2bc0ffdb902d/23/shuffle_2_94_0.index
>  (No such file or directory)
>at java.io.FileInputStream.open(Native Method)
>at java.io.FileInputStream.(FileInputStream.java:146)
>at 
> org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:98)
>at 
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:300)
>at 
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>at 
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>at 
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>at 
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
>at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
>at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
>at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>at 
> io.netty.channel