You should try setting spark.shuffle.consolidate.files<http://spark.incubator.apache.org/docs/latest/configuration.html> to true.
On Sun, Jan 19, 2014 at 4:49 PM, Ryan Compton <[email protected]>wrote: > I think I've shuffled this data before (I often join on it), and I > know I was using distinct() in 0.7.3 for the same computation. > > What do people usually have in /proc/sys/fs/file-max? I'm real > surprised that 13M isn't enough. > > On Sat, Jan 18, 2014 at 11:47 PM, Mark Hamstra <[email protected]> > wrote: > > distinct() needs to do a shuffle -- which is resulting in the need to > > materialize the map outputs as files. count() doesn't. > > > > > > On Sat, Jan 18, 2014 at 10:33 PM, Ryan Compton <[email protected]> > > wrote: > >> > >> I'm able to open ~13M files. I expect the output of > >> .distinct().count() to be under 100M, why do I need so many files > >> open? > >> > >> rfcompton@node19 ~> cat /etc/redhat-release > >> CentOS release 5.7 (Final) > >> rfcompton@node19 ~> cat /proc/sys/fs/file-max > >> 13069279 > >> > >> On Sat, Jan 18, 2014 at 9:12 AM, Jey Kottalam <[email protected]> > wrote: > >> > The "too many open files" error is due to running out of available > >> > FDs, usually due to a limit set in the OS. > >> > > >> > The fix will depend on your specific OS, but under Linux it usually > >> > involves the "fs.file-max" syctl. > >> > > >> > On Fri, Jan 17, 2014 at 3:02 PM, Ryan Compton <[email protected] > > > >> > wrote: > >> >> When I try .distinct() my jobs fail. Possibly related: > >> >> https://groups.google.com/forum/#!topic/shark-users/j2TO-GINuFo > >> >> > >> >> This works > >> >> > >> >> //get the node ids > >> >> val nodes = dupedKeyedEdgeList.map(x => x._1).cache() > >> >> //count the nodes > >> >> val numNodes = nodes.count() > >> >> logWarning("numNodes:\t"+numNodes) > >> >> > >> >> this fails > >> >> > >> >> //get the node ids > >> >> val nodes = dupedKeyedEdgeList.map(x => x._1).cache() > >> >> //count the nodes > >> >> val numNodes = nodes.distinct().count() > >> >> logWarning("numNodes:\t"+numNodes) > >> >> > >> >> with these stacktraces: > >> >> > >> >> 14/01/17 14:54:37 WARN scripts.ComputeNetworkStats: numEdges: > 915189977 > >> >> 14/01/17 14:54:37 INFO rdd.MappedRDD: Removing RDD 1 from persistence > >> >> list > >> >> -- > >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to > >> >> java.io.IOException > >> >> java.io.IOException: Filesystem closed > >> >> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:299) > >> >> at org.apache.hadoop.hdfs.DFSClient.access$1100(DFSClient.java:77) > >> >> at > >> >> > org.apache.hadoop.hdfs.DFSClient$DFSInputStream.read(DFSClient.java:2317) > >> >> at java.io.DataInputStream.read(DataInputStream.java:83) > >> >> at > >> >> > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:205) > >> >> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:169) > >> >> at > >> >> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:160) > >> >> at > >> >> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38) > >> >> at > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:103) > >> >> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:83) > >> >> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > >> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400) > >> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400) > >> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400) > >> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400) > >> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400) > >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772) > >> >> at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399) > >> >> at > >> >> > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > >> >> at > >> >> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:102) > >> >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74) > >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224) > >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29) > >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) > >> >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) > >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224) > >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29) > >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) > >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) > >> >> at > >> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36) > >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) > >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) > >> >> at > >> >> > org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149) > >> >> at > >> >> > org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88) > >> >> at > >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158) > >> >> at > >> >> > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > >> >> at > >> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > >> >> at java.lang.Thread.run(Thread.java:662) > >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task > >> >> 2.0:1419 as TID 1396 on executor 6: node25 (PROCESS_LOCAL) > >> >> -- > >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to > >> >> java.lang.IllegalStateException > >> >> java.lang.IllegalStateException: Shutdown in progress > >> >> at > >> >> > java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:39) > >> >> at java.lang.Runtime.addShutdownHook(Runtime.java:192) > >> >> at > >> >> > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:1655) > >> >> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1627) > >> >> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254) > >> >> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:183) > >> >> at > >> >> > org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:92) > >> >> at > >> >> > org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:54) > >> >> at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:93) > >> >> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:83) > >> >> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:51) > >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) > >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) > >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29) > >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) > >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) > >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29) > >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) > >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) > >> >> at org.apache.spark.rdd.UnionPartition.iterator(UnionRDD.scala:29) > >> >> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:69) > >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) > >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) > >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29) > >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) > >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) > >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29) > >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) > >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) > >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29) > >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) > >> >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) > >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224) > >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29) > >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) > >> >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) > >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224) > >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29) > >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) > >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) > >> >> at > >> >> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36) > >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) > >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) > >> >> at > >> >> > org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149) > >> >> at > >> >> > org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88) > >> >> at > >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158) > >> >> at > >> >> > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > >> >> at > >> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > >> >> at java.lang.Thread.run(Thread.java:662) > >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task > >> >> 2.0:1419 as TID 1403 on executor 6: node25 (PROCESS_LOCAL) > >> >> -- > >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to > >> >> java.io.FileNotFoundException > >> >> java.io.FileNotFoundException: > >> >> /tmp/spark-local-20140117145333-5d01/27/shuffle_0_54_1140 (Too many > >> >> open files) > >> >> at java.io.FileOutputStream.openAppend(Native Method) > >> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:192) > >> >> at > >> >> > org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58) > >> >> at > >> >> > org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107) > >> >> at > >> >> > org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152) > >> >> at > >> >> > org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149) > >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772) > >> >> at > >> >> > scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781) > >> >> at > >> >> > org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149) > >> >> at > >> >> > org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88) > >> >> at > >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158) > >> >> at > >> >> > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > >> >> at > >> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > >> >> at java.lang.Thread.run(Thread.java:662) > >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task > >> >> 2.0:54 as TID 1407 on executor 6: node25 (NODE_LOCAL) > >> >> -- > >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to > >> >> java.io.FileNotFoundException > >> >> java.io.FileNotFoundException: > >> >> /tmp/spark-local-20140117145333-5d01/0b/shuffle_0_441_482 (Too many > >> >> open files) > >> >> at java.io.FileOutputStream.openAppend(Native Method) > >> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:192) > >> >> at > >> >> > org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58) > >> >> at > >> >> > org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107) > >> >> at > >> >> > org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152) > >> >> at > >> >> > org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149) > >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772) > >> >> at > >> >> > scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781) > >> >> at > >> >> > org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149) > >> >> at > >> >> > org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88) > >> >> at > >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158) > >> >> at > >> >> > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > >> >> at > >> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > >> >> at java.lang.Thread.run(Thread.java:662) > >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task > >> >> 2.0:441 as TID 1415 on executor 6: node25 (PROCESS_LOCAL) > >> >> -- > >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to > >> >> java.io.FileNotFoundException > >> >> java.io.FileNotFoundException: > >> >> /tmp/spark-local-20140117145333-5d01/0b/shuffle_0_238_365 (Too many > >> >> open files) > >> >> at java.io.FileOutputStream.openAppend(Native Method) > >> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:192) > >> >> at > >> >> > org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58) > >> >> at > >> >> > org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107) > >> >> at > >> >> > org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152) > >> >> at > >> >> > org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149) > >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772) > >> >> at > >> >> > scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781) > >> >> at > >> >> > org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149) > >> >> at > >> >> > org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88) > >> >> at > >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158) > >> >> at > >> >> > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > >> >> at > >> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > >> >> at java.lang.Thread.run(Thread.java:662) > >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task > >> >> 2.0:238 as TID 1423 on executor 6: node25 (NODE_LOCAL) > >> >> -- > >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to > >> >> java.io.FileNotFoundException > >> >> java.io.FileNotFoundException: > >> >> /tmp/spark-local-20140117145333-5d01/35/shuffle_0_37_144 (Too many > >> >> open files) > >> >> at java.io.FileOutputStream.openAppend(Native Method) > >> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:192) > >> >> at > >> >> > org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58) > >> >> at > >> >> > org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107) > >> >> at > >> >> > org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152) > >> >> at > >> >> > org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149) > >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772) > >> >> at > >> >> > scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781) > >> >> at > >> >> > org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149) > >> >> at > >> >> > org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88) > >> >> at > >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158) > >> >> at > >> >> > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > >> >> at > >> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > >> >> at java.lang.Thread.run(Thread.java:662) > >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task > >> >> 2.0:37 as TID 1424 on executor 6: node25 (NODE_LOCAL) > >> >> -- > >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to > >> >> java.lang.IllegalStateException: Shutdown in progress [duplicate 19] > >> >> 14/01/17 14:56:07 ERROR cluster.ClusterTaskSetManager: Task 2.0:1679 > >> >> failed more than 4 times; aborting job > >> >> 14/01/17 14:56:07 INFO cluster.ClusterScheduler: Remove TaskSet 2.0 > >> >> from pool > >> >> -- > >> >> 14/01/17 14:56:07 INFO scheduler.DAGScheduler: Failed to run count at > >> >> ComputeNetworkStats.scala:59 > >> >> Exception in thread "main" org.apache.spark.SparkException: Job > >> >> failed: Task 2.0:1679 failed more than 4 times > >> >> at > >> >> > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) > >> >> at > >> >> > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) > >> >> at > >> >> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) > >> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > >> >> at > >> >> > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) > >> >> at > >> >> > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379) > >> >> at > >> >> org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) > >> >> at > >> >> > org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) > >> >> 14/01/17 14:56:07 INFO cluster.ClusterScheduler: Ignoring update from > >> >> TID 1418 because its task set is gone > > > > >
