Re: Memory compute-intensive tasks
This one turned out to be another problem with my app configuration, not with Spark. The compute task was dependent on the local filesystem, and config errors on 8 of 10 of the nodes made them fail early. The Spark wrapper was not checking the process exit value, so it appeared as if they were producing only a little data. Ravi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p11375.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Memory compute-intensive tasks
OK, I did figure this out. I was running the app (avocado) using spark-submit, when it was actually designed to take command line arguments to connect to a spark cluster. Since I didn't provide any such arguments, it started a nested local Spark cluster *inside* the YARN Spark executor and so of course everything ran on one node. If I spin up a Spark cluster manually and provide the spark master URI to avocado, it works fine. Now, I've tried running a reasonable-sized job through (400GB of data on 10 HDFS/Spark nodes), and the partitioning is strange. Eight nodes get almost nothing, and the other two nodes each get half the work. This happens whether I use coalesce with shuffle=true or false before the work stage. (Though if I use shuffle=true, it creates 3000 tasks to do the shuffle, and still ends up with this skewed distribution!) Any suggestions on how to figure out what's going on? Thanks, Ravi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p10868.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Memory compute-intensive tasks
Hi Matei- Changing to coalesce(numNodes, true) still runs all partitions on a single node, which I verified by printing the hostname before I exec the external process. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p10189.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Memory compute-intensive tasks
I also tried increasing --num-executors to numNodes * coresPerNode and using coalesce(numNodes*10,true), and it still ran all the tasks on one node. It seems like it is placing all the executors on one node (though not always the same node, which indicates it is aware of more than one!). I'm using spark-submit --master yarn --deploy-mode cluster with spark-1.0.1 built for hadoop 2.4 on HDP 2.1/Hadoop 2.4. There's clearly just something wrong with my Hadoop configuration, or in how I'm submitting my spark job - any suggestions? Thanks, Ravi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p10209.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Memory compute-intensive tasks
Matei - I tried using coalesce(numNodes, true), but it then seemed to run too few SNAP tasks - only 2 or 3 when I had specified 46. The job failed, perhaps for unrelated reasons, with some odd exceptions in the log (at the end of this message). But I really don't want to force data movement between nodes. The input data is in HDFS and should already be somewhat balanced among the nodes. We've run this scenario using the simple hadoop jar runner and a custom format jar to break the input into 8-line chunks (paired FASTQ). Ideally I'd like Spark to do the minimum data movement to balance the work, feeding each task mostly from data local to that node. Daniel - that's a good thought, I could invoke a small stub for each task that talks to a single local demon process over a socket, and serializes all the tasks on a given machine. Thanks, Ravi P.S. Log exceptions: 14/07/15 17:02:00 WARN yarn.ApplicationMaster: Unable to retrieve SparkContext in spite of waiting for 10, maxNumTries = 10 Exception in thread main java.lang.NullPointerException at org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkContextInitialized(ApplicationMaster.scala:233) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:110) ...and later... 14/07/15 17:11:07 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM 14/07/15 17:11:07 INFO yarn.ApplicationMaster: AppMaster received a signal. 14/07/15 17:11:07 WARN rdd.NewHadoopRDD: Exception in RecordReader.close() java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:707) at org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:619) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p9991.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Memory compute-intensive tasks
Hi Ravi, I have seen a similar issue before. You can try to set fs.hdfs.impl.disable.cache to true in your hadoop configuration. For example, suppose your hadoop configuration file is hadoopConf, you can use hadoopConf.setBoolean(fs.hdfs.impl.disable.cache, true) Let me know if that helps. Best, Liquan On Wed, Jul 16, 2014 at 4:56 PM, rpandya r...@iecommerce.com wrote: Matei - I tried using coalesce(numNodes, true), but it then seemed to run too few SNAP tasks - only 2 or 3 when I had specified 46. The job failed, perhaps for unrelated reasons, with some odd exceptions in the log (at the end of this message). But I really don't want to force data movement between nodes. The input data is in HDFS and should already be somewhat balanced among the nodes. We've run this scenario using the simple hadoop jar runner and a custom format jar to break the input into 8-line chunks (paired FASTQ). Ideally I'd like Spark to do the minimum data movement to balance the work, feeding each task mostly from data local to that node. Daniel - that's a good thought, I could invoke a small stub for each task that talks to a single local demon process over a socket, and serializes all the tasks on a given machine. Thanks, Ravi P.S. Log exceptions: 14/07/15 17:02:00 WARN yarn.ApplicationMaster: Unable to retrieve SparkContext in spite of waiting for 10, maxNumTries = 10 Exception in thread main java.lang.NullPointerException at org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkContextInitialized(ApplicationMaster.scala:233) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:110) ...and later... 14/07/15 17:11:07 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM 14/07/15 17:11:07 INFO yarn.ApplicationMaster: AppMaster received a signal. 14/07/15 17:11:07 WARN rdd.NewHadoopRDD: Exception in RecordReader.close() java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:707) at org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:619) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p9991.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Liquan Pei Department of Physics University of Massachusetts Amherst
Memory compute-intensive tasks
I'm trying to run a job that includes an invocation of a memory compute-intensive multithreaded C++ program, and so I'd like to run one task per physical node. Using rdd.coalesce(# nodes) seems to just allocate one task per core, and so runs out of memory on the node. Is there any way to give the scheduler a hint that the task uses lots of memory and cores so it spreads it out more evenly? Thanks, Ravi Pandya Microsoft Research
Re: Memory compute-intensive tasks
I don't have a solution for you (sorry), but do note that rdd.coalesce(numNodes) keeps data on the same nodes where it was. If you set shuffle=true then it should repartition and redistribute the data. But it uses the hash partitioner according to the ScalaDoc - I don't know of any way to supply a custom partitioner. On Mon, Jul 14, 2014 at 4:09 PM, Ravi Pandya r...@iecommerce.com wrote: I'm trying to run a job that includes an invocation of a memory compute-intensive multithreaded C++ program, and so I'd like to run one task per physical node. Using rdd.coalesce(# nodes) seems to just allocate one task per core, and so runs out of memory on the node. Is there any way to give the scheduler a hint that the task uses lots of memory and cores so it spreads it out more evenly? Thanks, Ravi Pandya Microsoft Research -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Re: Memory compute-intensive tasks
I think coalesce with shuffle=true will force it to have one task per node. Without that, it might be that due to data locality it decides to launch multiple ones on the same node even though the total # of tasks is equal to the # of nodes. If this is the *only* thing you run on the cluster, you could also configure the Workers to only report one core by manually launching the spark.deploy.worker.Worker process with that flag (see http://spark.apache.org/docs/latest/spark-standalone.html). Matei On Jul 14, 2014, at 1:59 PM, Daniel Siegmann daniel.siegm...@velos.io wrote: I don't have a solution for you (sorry), but do note that rdd.coalesce(numNodes) keeps data on the same nodes where it was. If you set shuffle=true then it should repartition and redistribute the data. But it uses the hash partitioner according to the ScalaDoc - I don't know of any way to supply a custom partitioner. On Mon, Jul 14, 2014 at 4:09 PM, Ravi Pandya r...@iecommerce.com wrote: I'm trying to run a job that includes an invocation of a memory compute-intensive multithreaded C++ program, and so I'd like to run one task per physical node. Using rdd.coalesce(# nodes) seems to just allocate one task per core, and so runs out of memory on the node. Is there any way to give the scheduler a hint that the task uses lots of memory and cores so it spreads it out more evenly? Thanks, Ravi Pandya Microsoft Research -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Re: Memory compute-intensive tasks
Depending on how your C++ program is designed, maybe you can feed the data from multiple partitions into the same process? Getting the results back might be tricky. But that may be the only way to guarantee you're only using one invocation per node. On Mon, Jul 14, 2014 at 5:12 PM, Matei Zaharia matei.zaha...@gmail.com wrote: I think coalesce with shuffle=true will force it to have one task per node. Without that, it might be that due to data locality it decides to launch multiple ones on the same node even though the total # of tasks is equal to the # of nodes. If this is the *only* thing you run on the cluster, you could also configure the Workers to only report one core by manually launching the spark.deploy.worker.Worker process with that flag (see http://spark.apache.org/docs/latest/spark-standalone.html). Matei On Jul 14, 2014, at 1:59 PM, Daniel Siegmann daniel.siegm...@velos.io wrote: I don't have a solution for you (sorry), but do note that rdd.coalesce(numNodes) keeps data on the same nodes where it was. If you set shuffle=true then it should repartition and redistribute the data. But it uses the hash partitioner according to the ScalaDoc - I don't know of any way to supply a custom partitioner. On Mon, Jul 14, 2014 at 4:09 PM, Ravi Pandya r...@iecommerce.com wrote: I'm trying to run a job that includes an invocation of a memory compute-intensive multithreaded C++ program, and so I'd like to run one task per physical node. Using rdd.coalesce(# nodes) seems to just allocate one task per core, and so runs out of memory on the node. Is there any way to give the scheduler a hint that the task uses lots of memory and cores so it spreads it out more evenly? Thanks, Ravi Pandya Microsoft Research -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io