Re: Spark shuffle: FileNotFound exception

2016-12-04 Thread Evgenii Morozov
Swapnil, 

What do you think might be the size of the file that’s not found? For spark 
version below 2.0.0 there might be issues with blocks of size 2g. 
Is the file actually on a file system?

I’d try to increase default parallelism to make sure partitions got smaller.

Hope, this helps.

> On 04 Dec 2016, at 09:03, Swapnil Shinde  wrote:
> 
> Hello All
> I am facing FileNotFoundException for shuffle index file when running job 
> with large data. Same job runs fine with smaller datasets. These our my 
> cluster specifications -
> 
> No of nodes - 19
> Total cores - 380
> Memory per executor - 32G
> Spark 1.6 mapr version
> spark.shuffle.service.enabled - false
> 
>  I am running job with 28G memory, 50 executors and 1 core per 
> executor. Job is failing at stage having dataframe explode where each row 
> gets multiplied to 6 rows. Here are exception details-
> 
> Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: 
> /tmp/hadoop-mapr/nm-local-dir/usercache/sshinde/appcache/application_1480622725467_0071/blockmgr-3b2051f5-81c8-40a5-a332-9d32b4586a5d/38/shuffle_14_229_0.index
>  (No such file or directory)
> at java.io.FileInputStream.open0(Native Method)
> at java.io.FileInputStream.open(FileInputStream.java:195)
> at java.io.FileInputStream.(FileInputStream.java:138)
> at 
> org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:191)
> at 
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:291)
> at 
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:58)
> at 
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:58)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> 
> I tried with below configurations but nothing worked out-
> conf.set("spark.io.compression.codec", "lz4")
> conf.set("spark.network.timeout", "1000s")
> conf.set("spark.sql.shuffle.partitions", "2500")
> spark.yarn.executor.memoryOverhead should be high due to 32g of executor 
> memory. (10% of 32g)
>   Increased number of partitions till 15000
>   I checked yarn logs briefly and nothing stand out apart from above 
> exception.
> 
> 
> Please let me if there is something I am missing or alternatives to make 
> large data jobs run.  Thank you..
> 
> Thanks
> Swapnil
>



Spark shuffle: FileNotFound exception

2016-12-03 Thread Swapnil Shinde
Hello All
I am facing FileNotFoundException for shuffle index file when running
job with large data. Same job runs fine with smaller datasets. These our my
cluster specifications -

No of nodes - 19
Total cores - 380
Memory per executor - 32G
Spark 1.6 mapr version
spark.shuffle.service.enabled - false

 I am running job with 28G memory, 50 executors and 1 core per
executor. Job is failing at stage having dataframe explode where each row
gets multiplied to 6 rows. Here are exception details-

Caused by: java.lang.RuntimeException: java.io.FileNotFoundException:
/tmp/hadoop-mapr/nm-local-dir/usercache/sshinde/appcache/application_1480622725467_0071/blockmgr-3b2051f5-81c8-40a5-a332-9d32b4586a5d/38/shuffle_14_229_0.index
(No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.(FileInputStream.java:138)
at
org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:191)
at
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:291)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:58)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:58)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

I tried with below configurations but nothing worked out-
conf.set("spark.io.compression.codec", "lz4")
conf.set("spark.network.timeout", "1000s")
conf.set("spark.sql.shuffle.partitions", "2500")
spark.yarn.executor.memoryOverhead should be high due to 32g of
executor memory. (10% of 32g)
  Increased number of partitions till 15000
  I checked yarn logs briefly and nothing stand out apart from above
exception.


Please let me if there is something I am missing or alternatives to make
large data jobs run.  Thank you..

Thanks
Swapnil


Shuffle FileNotFound Exception

2015-11-18 Thread Tom Arnfeld
Hey,

I’m wondering if anyone has run into issues with Spark 1.5 and a FileNotFound 
exception with shuffle.index files? It’s been cropping up with very large joins 
and aggregations, and causing all of our jobs to fail towards the end. The 
memory limit for the executors (we’re running on mesos) is touching 60GB+ with 
~10 cores per executor, which is way oversubscribed.

We’re running spark inside containers, and have configured 
“spark.executorEnv.SPARK_LOCAL_DIRS” to a special directory path in the 
container for performance/disk reasons, and since then the issue started to 
arise. I’m wondering if there’s a bug with the way spark looks for shuffle 
files, and one of the implementations isn’t obeying the path properly?

I don’t want to set "spark.local.dir” because that requires the driver also 
have this directory set up, which is not the case.

Has anyone seen this issue before?



15/11/18 11:32:27 ERROR storage.ShuffleBlockFetcherIterator: Failed to get 
block(s) from XXX:50777
java.lang.RuntimeException: java.io.FileNotFoundException: 
/mnt/mesos/sandbox/spark-tmp/blockmgr-7a5b85c8-7c5f-43c2-b9e2-2bc0ffdb902d/23/shuffle_2_94_0.index
 (No such file or directory)
   at java.io.FileInputStream.open(Native Method)
   at java.io.FileInputStream.(FileInputStream.java:146)
   at 
org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:98)
   at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:300)
   at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
   at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
   at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
   at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
   at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
   at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
   at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
   at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
   at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
   at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
   at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
   at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
   at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
   at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
   at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
   at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
   at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
   at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
   at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
   at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
   at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
   at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
   at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
   at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
   at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
   at java.lang.Thread.run(Thread.java:745)

   at 
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:162)
   at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:103)
   at 

Re: Shuffle FileNotFound Exception

2015-11-18 Thread Romi Kuntsman
take executor memory times spark.shuffle.memoryFraction
and divide the data so that each partition is less than the above

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Wed, Nov 18, 2015 at 2:09 PM, Tom Arnfeld  wrote:

> Hi Romi,
>
> Thanks! Could you give me an indication of how much increase the
> partitions by? We’ll take a stab in the dark, the input data is around 5M
> records (though each record is fairly small). We’ve had trouble both with
> DataFrames and RDDs.
>
> Tom.
>
> On 18 Nov 2015, at 12:04, Romi Kuntsman  wrote:
>
> I had many issues with shuffles (but not this one exactly), and what
> eventually solved it was to repartition to input into more parts. Have you
> tried that?
>
> P.S. not sure if related, but there's a memory leak in the shuffle
> mechanism
> https://issues.apache.org/jira/browse/SPARK-11293
>
> *Romi Kuntsman*, *Big Data Engineer*
> http://www.totango.com
>
> On Wed, Nov 18, 2015 at 2:00 PM, Tom Arnfeld  wrote:
>
>> Hey,
>>
>> I’m wondering if anyone has run into issues with Spark 1.5 and a
>> FileNotFound exception with shuffle.index files? It’s been cropping up with
>> very large joins and aggregations, and causing all of our jobs to fail
>> towards the end. The memory limit for the executors (we’re running on
>> mesos) is touching 60GB+ with ~10 cores per executor, which is way
>> oversubscribed.
>>
>> We’re running spark inside containers, and have configured
>> “spark.executorEnv.SPARK_LOCAL_DIRS” to a special directory path in the
>> container for performance/disk reasons, and since then the issue started to
>> arise. I’m wondering if there’s a bug with the way spark looks for shuffle
>> files, and one of the implementations isn’t obeying the path properly?
>>
>> I don’t want to set "spark.local.dir” because that requires the driver
>> also have this directory set up, which is not the case.
>>
>> Has anyone seen this issue before?
>>
>> 
>>
>> 15/11/18 11:32:27 ERROR storage.ShuffleBlockFetcherIterator: Failed to
>> get block(s) from XXX:50777
>> java.lang.RuntimeException: java.io.FileNotFoundException:
>> /mnt/mesos/sandbox/spark-tmp/blockmgr-7a5b85c8-7c5f-43c2-b9e2-2bc0ffdb902d/23/shuffle_2_94_0.index
>> (No such file or directory)
>>at java.io.FileInputStream.open(Native Method)
>>at java.io.FileInputStream.(FileInputStream.java:146)
>>at
>> org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:98)
>>at
>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:300)
>>at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>at
>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>>at
>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
>>at
>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
>>at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
>>at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>>at
>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>>at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>>at
>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>>at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>>at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>>at
>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>>at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>>at
>> 

Re: Shuffle FileNotFound Exception

2015-11-18 Thread Tom Arnfeld
Hi Romi,

Thanks! Could you give me an indication of how much increase the partitions by? 
We’ll take a stab in the dark, the input data is around 5M records (though each 
record is fairly small). We’ve had trouble both with DataFrames and RDDs.

Tom.

> On 18 Nov 2015, at 12:04, Romi Kuntsman  wrote:
> 
> I had many issues with shuffles (but not this one exactly), and what 
> eventually solved it was to repartition to input into more parts. Have you 
> tried that?
> 
> P.S. not sure if related, but there's a memory leak in the shuffle mechanism
> https://issues.apache.org/jira/browse/SPARK-11293 
> 
> 
> Romi Kuntsman, Big Data Engineer
> http://www.totango.com 
> 
> On Wed, Nov 18, 2015 at 2:00 PM, Tom Arnfeld  > wrote:
> Hey,
> 
> I’m wondering if anyone has run into issues with Spark 1.5 and a FileNotFound 
> exception with shuffle.index files? It’s been cropping up with very large 
> joins and aggregations, and causing all of our jobs to fail towards the end. 
> The memory limit for the executors (we’re running on mesos) is touching 60GB+ 
> with ~10 cores per executor, which is way oversubscribed.
> 
> We’re running spark inside containers, and have configured 
> “spark.executorEnv.SPARK_LOCAL_DIRS” to a special directory path in the 
> container for performance/disk reasons, and since then the issue started to 
> arise. I’m wondering if there’s a bug with the way spark looks for shuffle 
> files, and one of the implementations isn’t obeying the path properly?
> 
> I don’t want to set "spark.local.dir” because that requires the driver also 
> have this directory set up, which is not the case.
> 
> Has anyone seen this issue before?
> 
> 
> 
> 15/11/18 11:32:27 ERROR storage.ShuffleBlockFetcherIterator: Failed to get 
> block(s) from XXX:50777
> java.lang.RuntimeException: java.io.FileNotFoundException: 
> /mnt/mesos/sandbox/spark-tmp/blockmgr-7a5b85c8-7c5f-43c2-b9e2-2bc0ffdb902d/23/shuffle_2_94_0.index
>  (No such file or directory)
>at java.io.FileInputStream.open(Native Method)
>at java.io.FileInputStream.(FileInputStream.java:146)
>at 
> org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:98)
>at 
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:300)
>at 
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>at 
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>at 
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>at 
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
>at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
>at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
>at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
>at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>at 
>