Spark with Cassandra - Shuffle opening to many files

2015-01-07 Thread Ankur Srivastava
Hello,

We are currently running our data pipeline on spark which uses Cassandra as
the data source.

We are currently facing issue with the step where we create an rdd on data
in cassandra table and then try to run flatMapToPair to transform the
data but we are running into Too many open files. I have already
increased the file limits on all the worker and master node by changing the
file /etc/system/limits.conf to 65K but that did not help.

Is there some setting so that we can restrict shuffle?

Also when we use the log4j.properties in conf folder these logs are not
getting emitted.

Exception in thread main org.apache.spark.SparkException: Job aborted due
to stage failure: Task 20 in stage 1.0 failed 4 times, most recent failure:
Lost task 20.3 in stage 1.0 (TID 51,
ip-10-87-36-147.us-west-2.aws.neustar.com): java.io.FileNotFoundException:
/tmp/spark-local-20150107203209-9333/2f/shuffle_0_20_1017 (Too many open
files)

java.io.FileOutputStream.open(Native Method)

java.io.FileOutputStream.init(FileOutputStream.java:221)


org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)


org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192)


org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67)


org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65)

scala.collection.Iterator$class.foreach(Iterator.scala:727)

scala.collection.AbstractIterator.foreach(Iterator.scala:1157)


org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)


org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)


org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

org.apache.spark.scheduler.Task.run(Task.scala:54)


org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)


java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)


java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

java.lang.Thread.run(Thread.java:745)


Thanks  Regards
Ankur


Re: Spark with Cassandra - Shuffle opening to many files

2015-01-07 Thread Cody Koeninger
General ideas regarding too many open files:

Make sure ulimit is actually being set, especially if you're on mesos
(because of https://issues.apache.org/jira/browse/MESOS-123 )  Find the pid
of the executor process, and cat /proc/pid/limits

set spark.shuffle.consolidateFiles = true

try spark.shuffle.manager = sort


On Wed, Jan 7, 2015 at 3:06 PM, Ankur Srivastava ankur.srivast...@gmail.com
 wrote:

 Hello,

 We are currently running our data pipeline on spark which uses Cassandra
 as the data source.

 We are currently facing issue with the step where we create an rdd on data
 in cassandra table and then try to run flatMapToPair to transform the
 data but we are running into Too many open files. I have already
 increased the file limits on all the worker and master node by changing the
 file /etc/system/limits.conf to 65K but that did not help.

 Is there some setting so that we can restrict shuffle?

 Also when we use the log4j.properties in conf folder these logs are not
 getting emitted.

 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 20 in stage 1.0 failed 4 times, most recent
 failure: Lost task 20.3 in stage 1.0 (TID 51,
 ip-10-87-36-147.us-west-2.aws.neustar.com):
 java.io.FileNotFoundException:
 /tmp/spark-local-20150107203209-9333/2f/shuffle_0_20_1017 (Too many open
 files)

 java.io.FileOutputStream.open(Native Method)

 java.io.FileOutputStream.init(FileOutputStream.java:221)


 org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)


 org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192)


 org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67)


 org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65)

 scala.collection.Iterator$class.foreach(Iterator.scala:727)

 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)


 org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)


 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)


 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

 org.apache.spark.scheduler.Task.run(Task.scala:54)


 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)


 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)


 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 java.lang.Thread.run(Thread.java:745)


 Thanks  Regards
 Ankur



Re: Spark with Cassandra - Shuffle opening to many files

2015-01-07 Thread Ankur Srivastava
Thank you Cody!!

I am going to try with the two settings you have mentioned.

We are currently running with Spark standalone cluster manager.

Thanks
Ankur

On Wed, Jan 7, 2015 at 1:20 PM, Cody Koeninger c...@koeninger.org wrote:

 General ideas regarding too many open files:

 Make sure ulimit is actually being set, especially if you're on mesos
 (because of https://issues.apache.org/jira/browse/MESOS-123 )  Find the
 pid of the executor process, and cat /proc/pid/limits

 set spark.shuffle.consolidateFiles = true

 try spark.shuffle.manager = sort


 On Wed, Jan 7, 2015 at 3:06 PM, Ankur Srivastava 
 ankur.srivast...@gmail.com wrote:

 Hello,

 We are currently running our data pipeline on spark which uses Cassandra
 as the data source.

 We are currently facing issue with the step where we create an rdd on
 data in cassandra table and then try to run flatMapToPair to transform
 the data but we are running into Too many open files. I have already
 increased the file limits on all the worker and master node by changing the
 file /etc/system/limits.conf to 65K but that did not help.

 Is there some setting so that we can restrict shuffle?

 Also when we use the log4j.properties in conf folder these logs are not
 getting emitted.

 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 20 in stage 1.0 failed 4 times, most recent
 failure: Lost task 20.3 in stage 1.0 (TID 51,
 ip-10-87-36-147.us-west-2.aws.neustar.com):
 java.io.FileNotFoundException:
 /tmp/spark-local-20150107203209-9333/2f/shuffle_0_20_1017 (Too many open
 files)

 java.io.FileOutputStream.open(Native Method)

 java.io.FileOutputStream.init(FileOutputStream.java:221)


 org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)


 org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192)


 org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67)


 org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65)

 scala.collection.Iterator$class.foreach(Iterator.scala:727)

 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)


 org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)


 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)


 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

 org.apache.spark.scheduler.Task.run(Task.scala:54)


 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)


 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)


 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 java.lang.Thread.run(Thread.java:745)


 Thanks  Regards
 Ankur