Re: Memory compute-intensive tasks

2014-08-04 Thread rpandya
This one turned out to be another problem with my app configuration, not with
Spark. The compute task was dependent on the local filesystem, and config
errors on 8 of 10 of the nodes made them fail early. The Spark wrapper was
not checking the process exit value, so it appeared as if they were
producing only a little data.

Ravi



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p11375.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Memory compute-intensive tasks

2014-07-29 Thread rpandya
OK, I did figure this out. I was running the app (avocado) using
spark-submit, when it was actually designed to take command line arguments
to connect to a spark cluster. Since I didn't provide any such arguments, it
started a nested local Spark cluster *inside* the YARN Spark executor and so
of course everything ran on one node. If I spin up a Spark cluster manually
and provide the spark master URI to avocado, it works fine.

Now, I've tried running a reasonable-sized job through (400GB of data on 10
HDFS/Spark nodes), and the partitioning is strange. Eight nodes get almost
nothing, and the other two nodes each get half the work. This happens
whether I use coalesce with shuffle=true or false before the work stage.
(Though if I use shuffle=true, it creates 3000 tasks to do the shuffle, and
still ends up with this skewed distribution!) Any suggestions on how to
figure out what's going on?

Thanks,

Ravi



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p10868.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Memory compute-intensive tasks

2014-07-18 Thread rpandya
Hi Matei-

Changing to coalesce(numNodes, true) still runs all partitions on a single
node, which I verified by printing the hostname before I exec the external
process.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p10189.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Memory compute-intensive tasks

2014-07-18 Thread rpandya
I also tried increasing --num-executors to numNodes * coresPerNode and using
coalesce(numNodes*10,true), and it still ran all the tasks on one node. It
seems like it is placing all the executors on one node (though not always
the same node, which indicates it is aware of more than one!). I'm using
spark-submit --master yarn --deploy-mode cluster with spark-1.0.1 built for
hadoop 2.4 on HDP 2.1/Hadoop 2.4.

There's clearly just something wrong with my Hadoop configuration, or in how
I'm submitting my spark job - any suggestions?

Thanks,

Ravi



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p10209.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Memory compute-intensive tasks

2014-07-16 Thread rpandya
Matei - I tried using coalesce(numNodes, true), but it then seemed to run too
few SNAP tasks - only 2 or 3 when I had specified 46. The job failed,
perhaps for unrelated reasons, with some odd exceptions in the log (at the
end of this message). But I really don't want to force data movement between
nodes. The input data is in HDFS and should already be somewhat balanced
among the nodes. We've run this scenario using the simple hadoop jar
runner and a custom format jar to break the input into 8-line chunks (paired
FASTQ). Ideally I'd like Spark to do the minimum data movement to balance
the work, feeding each task mostly from data local to that node.

Daniel - that's a good thought, I could invoke a small stub for each task
that talks to a single local demon process over a socket, and serializes all
the tasks on a given machine.

Thanks,

Ravi

P.S. Log exceptions:

14/07/15 17:02:00 WARN yarn.ApplicationMaster: Unable to retrieve
SparkContext in spite of waiting for 10, maxNumTries = 10
Exception in thread main java.lang.NullPointerException
at
org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkContextInitialized(ApplicationMaster.scala:233)
at
org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:110)

...and later...

14/07/15 17:11:07 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
14/07/15 17:11:07 INFO yarn.ApplicationMaster: AppMaster received a signal.
14/07/15 17:11:07 WARN rdd.NewHadoopRDD: Exception in RecordReader.close()
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:707)
at org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:619)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p9991.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Memory compute-intensive tasks

2014-07-16 Thread Liquan Pei
Hi Ravi,

I have seen a similar issue before. You can try to set
fs.hdfs.impl.disable.cache to true in your hadoop configuration. For
example, suppose your hadoop configuration file is hadoopConf, you can use
hadoopConf.setBoolean(fs.hdfs.impl.disable.cache, true)

Let me know if that helps.

Best,
Liquan


On Wed, Jul 16, 2014 at 4:56 PM, rpandya r...@iecommerce.com wrote:

 Matei - I tried using coalesce(numNodes, true), but it then seemed to run
 too
 few SNAP tasks - only 2 or 3 when I had specified 46. The job failed,
 perhaps for unrelated reasons, with some odd exceptions in the log (at the
 end of this message). But I really don't want to force data movement
 between
 nodes. The input data is in HDFS and should already be somewhat balanced
 among the nodes. We've run this scenario using the simple hadoop jar
 runner and a custom format jar to break the input into 8-line chunks
 (paired
 FASTQ). Ideally I'd like Spark to do the minimum data movement to balance
 the work, feeding each task mostly from data local to that node.

 Daniel - that's a good thought, I could invoke a small stub for each task
 that talks to a single local demon process over a socket, and serializes
 all
 the tasks on a given machine.

 Thanks,

 Ravi

 P.S. Log exceptions:

 14/07/15 17:02:00 WARN yarn.ApplicationMaster: Unable to retrieve
 SparkContext in spite of waiting for 10, maxNumTries = 10
 Exception in thread main java.lang.NullPointerException
 at

 org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkContextInitialized(ApplicationMaster.scala:233)
 at

 org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:110)

 ...and later...

 14/07/15 17:11:07 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
 14/07/15 17:11:07 INFO yarn.ApplicationMaster: AppMaster received a signal.
 14/07/15 17:11:07 WARN rdd.NewHadoopRDD: Exception in RecordReader.close()
 java.io.IOException: Filesystem closed
 at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:707)
 at
 org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:619)




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p9991.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Memory compute-intensive tasks

2014-07-14 Thread Ravi Pandya
I'm trying to run a job that includes an invocation of a memory 
compute-intensive multithreaded C++ program, and so I'd like to run one
task per physical node. Using rdd.coalesce(# nodes) seems to just allocate
one task per core, and so runs out of memory on the node. Is there any way
to give the scheduler a hint that the task uses lots of memory and cores so
it spreads it out more evenly?

Thanks,

Ravi Pandya
Microsoft Research


Re: Memory compute-intensive tasks

2014-07-14 Thread Daniel Siegmann
I don't have a solution for you (sorry), but do note that
rdd.coalesce(numNodes) keeps data on the same nodes where it was. If you
set shuffle=true then it should repartition and redistribute the data. But
it uses the hash partitioner according to the ScalaDoc - I don't know of
any way to supply a custom partitioner.


On Mon, Jul 14, 2014 at 4:09 PM, Ravi Pandya r...@iecommerce.com wrote:

 I'm trying to run a job that includes an invocation of a memory 
 compute-intensive multithreaded C++ program, and so I'd like to run one
 task per physical node. Using rdd.coalesce(# nodes) seems to just allocate
 one task per core, and so runs out of memory on the node. Is there any way
 to give the scheduler a hint that the task uses lots of memory and cores so
 it spreads it out more evenly?

 Thanks,

 Ravi Pandya
 Microsoft Research




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Memory compute-intensive tasks

2014-07-14 Thread Matei Zaharia
I think coalesce with shuffle=true will force it to have one task per node. 
Without that, it might be that due to data locality it decides to launch 
multiple ones on the same node even though the total # of tasks is equal to the 
# of nodes.

If this is the *only* thing you run on the cluster, you could also configure 
the Workers to only report one core by manually launching the 
spark.deploy.worker.Worker process with that flag (see 
http://spark.apache.org/docs/latest/spark-standalone.html).

Matei

On Jul 14, 2014, at 1:59 PM, Daniel Siegmann daniel.siegm...@velos.io wrote:

 I don't have a solution for you (sorry), but do note that 
 rdd.coalesce(numNodes) keeps data on the same nodes where it was. If you set 
 shuffle=true then it should repartition and redistribute the data. But it 
 uses the hash partitioner according to the ScalaDoc - I don't know of any way 
 to supply a custom partitioner.
 
 
 On Mon, Jul 14, 2014 at 4:09 PM, Ravi Pandya r...@iecommerce.com wrote:
 I'm trying to run a job that includes an invocation of a memory  
 compute-intensive multithreaded C++ program, and so I'd like to run one task 
 per physical node. Using rdd.coalesce(# nodes) seems to just allocate one 
 task per core, and so runs out of memory on the node. Is there any way to 
 give the scheduler a hint that the task uses lots of memory and cores so it 
 spreads it out more evenly?
 
 Thanks,
 
 Ravi Pandya
 Microsoft Research
 
 
 
 -- 
 Daniel Siegmann, Software Developer
 Velos
 Accelerating Machine Learning
 
 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
 E: daniel.siegm...@velos.io W: www.velos.io



Re: Memory compute-intensive tasks

2014-07-14 Thread Daniel Siegmann
Depending on how your C++ program is designed, maybe you can feed the data
from multiple partitions into the same process? Getting the results back
might be tricky. But that may be the only way to guarantee you're only
using one invocation per node.


On Mon, Jul 14, 2014 at 5:12 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 I think coalesce with shuffle=true will force it to have one task per
 node. Without that, it might be that due to data locality it decides to
 launch multiple ones on the same node even though the total # of tasks is
 equal to the # of nodes.

 If this is the *only* thing you run on the cluster, you could also
 configure the Workers to only report one core by manually launching the
 spark.deploy.worker.Worker process with that flag (see
 http://spark.apache.org/docs/latest/spark-standalone.html).

 Matei

 On Jul 14, 2014, at 1:59 PM, Daniel Siegmann daniel.siegm...@velos.io
 wrote:

 I don't have a solution for you (sorry), but do note that
 rdd.coalesce(numNodes) keeps data on the same nodes where it was. If you
 set shuffle=true then it should repartition and redistribute the data.
 But it uses the hash partitioner according to the ScalaDoc - I don't know
 of any way to supply a custom partitioner.


 On Mon, Jul 14, 2014 at 4:09 PM, Ravi Pandya r...@iecommerce.com wrote:

 I'm trying to run a job that includes an invocation of a memory 
 compute-intensive multithreaded C++ program, and so I'd like to run one
 task per physical node. Using rdd.coalesce(# nodes) seems to just allocate
 one task per core, and so runs out of memory on the node. Is there any way
 to give the scheduler a hint that the task uses lots of memory and cores so
 it spreads it out more evenly?

 Thanks,

 Ravi Pandya
 Microsoft Research




 --
 Daniel Siegmann, Software Developer
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
 E: daniel.siegm...@velos.io W: www.velos.io





-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io