Spark with Cassandra - Shuffle opening to many files
Hello, We are currently running our data pipeline on spark which uses Cassandra as the data source. We are currently facing issue with the step where we create an rdd on data in cassandra table and then try to run flatMapToPair to transform the data but we are running into Too many open files. I have already increased the file limits on all the worker and master node by changing the file /etc/system/limits.conf to 65K but that did not help. Is there some setting so that we can restrict shuffle? Also when we use the log4j.properties in conf folder these logs are not getting emitted. Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 1.0 failed 4 times, most recent failure: Lost task 20.3 in stage 1.0 (TID 51, ip-10-87-36-147.us-west-2.aws.neustar.com): java.io.FileNotFoundException: /tmp/spark-local-20150107203209-9333/2f/shuffle_0_20_1017 (Too many open files) java.io.FileOutputStream.open(Native Method) java.io.FileOutputStream.init(FileOutputStream.java:221) org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192) org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67) org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Thanks Regards Ankur
Re: Spark with Cassandra - Shuffle opening to many files
General ideas regarding too many open files: Make sure ulimit is actually being set, especially if you're on mesos (because of https://issues.apache.org/jira/browse/MESOS-123 ) Find the pid of the executor process, and cat /proc/pid/limits set spark.shuffle.consolidateFiles = true try spark.shuffle.manager = sort On Wed, Jan 7, 2015 at 3:06 PM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hello, We are currently running our data pipeline on spark which uses Cassandra as the data source. We are currently facing issue with the step where we create an rdd on data in cassandra table and then try to run flatMapToPair to transform the data but we are running into Too many open files. I have already increased the file limits on all the worker and master node by changing the file /etc/system/limits.conf to 65K but that did not help. Is there some setting so that we can restrict shuffle? Also when we use the log4j.properties in conf folder these logs are not getting emitted. Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 1.0 failed 4 times, most recent failure: Lost task 20.3 in stage 1.0 (TID 51, ip-10-87-36-147.us-west-2.aws.neustar.com): java.io.FileNotFoundException: /tmp/spark-local-20150107203209-9333/2f/shuffle_0_20_1017 (Too many open files) java.io.FileOutputStream.open(Native Method) java.io.FileOutputStream.init(FileOutputStream.java:221) org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192) org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67) org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Thanks Regards Ankur
Re: Spark with Cassandra - Shuffle opening to many files
Thank you Cody!! I am going to try with the two settings you have mentioned. We are currently running with Spark standalone cluster manager. Thanks Ankur On Wed, Jan 7, 2015 at 1:20 PM, Cody Koeninger c...@koeninger.org wrote: General ideas regarding too many open files: Make sure ulimit is actually being set, especially if you're on mesos (because of https://issues.apache.org/jira/browse/MESOS-123 ) Find the pid of the executor process, and cat /proc/pid/limits set spark.shuffle.consolidateFiles = true try spark.shuffle.manager = sort On Wed, Jan 7, 2015 at 3:06 PM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hello, We are currently running our data pipeline on spark which uses Cassandra as the data source. We are currently facing issue with the step where we create an rdd on data in cassandra table and then try to run flatMapToPair to transform the data but we are running into Too many open files. I have already increased the file limits on all the worker and master node by changing the file /etc/system/limits.conf to 65K but that did not help. Is there some setting so that we can restrict shuffle? Also when we use the log4j.properties in conf folder these logs are not getting emitted. Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 1.0 failed 4 times, most recent failure: Lost task 20.3 in stage 1.0 (TID 51, ip-10-87-36-147.us-west-2.aws.neustar.com): java.io.FileNotFoundException: /tmp/spark-local-20150107203209-9333/2f/shuffle_0_20_1017 (Too many open files) java.io.FileOutputStream.open(Native Method) java.io.FileOutputStream.init(FileOutputStream.java:221) org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192) org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67) org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Thanks Regards Ankur