Re: data localisation in spark

2015-06-03 Thread Sandy Ryza
Tasks are scheduled on executors based on data locality.  Things work as
you would expect in the example you brought up.

Through dynamic allocation, the number of executors can change throughout
the life time of an application.  10 executors (or 5 executors with 2 cores
each) are not needed for a reducebyKey with parallelism = 10.  If there are
fewer slots to run tasks than tasks, the tasks will just be run serially.

-Sandy

On Tue, Jun 2, 2015 at 11:24 AM, Shushant Arora shushantaror...@gmail.com
wrote:

  So in spark is after acquiring executors from ClusterManeger, does tasks
 are scheduled on executors based on datalocality ?I Mean if in an
 application there are 2 jobs and output of 1 job is used as input of
 another job.
 And in job1 I did persist on some RDD, then while running job2 will it use
 the same executor where job1's output was persisted or it acquire executor
 again and data movement happens?

 And is it true no of execuotrs in an application are fixed and acquired at
 start of application  and remains same throught application? If yes, how
 does it takes cares of explicit no of reducers in some of apis say
 rddd.reduceByKey(func,10);

 does at converting DAG to stages it calculates executors required and then
 acquire executors/worker nodes ?


 On Tue, Jun 2, 2015 at 11:06 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 It is not possible with JavaSparkContext either.  The API mentioned below
 currently does not have any effect (we should document this).

 The primary difference between MR and Spark here is that MR runs each
 task in its own YARN container, while Spark runs multiple tasks within an
 executor, which needs to be requested before Spark knows what tasks it will
 run.  Although dynamic allocation improves that last part.

 -Sandy

 On Tue, Jun 2, 2015 at 9:55 AM, Shushant Arora shushantaror...@gmail.com
  wrote:

 Is it possible in JavaSparkContext ?

 JavaSparkContext jsc = new JavaSparkContext(conf);
 JavaRDDStringlines = jsc.textFile(args[0]);

 If yes , does its programmer's responsibilty to first calculate splits
 locations and then instantiate spark context with preferred locations?

 How does its achieved in MR2 with yarn, there is Application Master
 specifies split locations to ResourceManager before acquiring the node
 managers ?



 On Mon, Jun 1, 2015 at 7:24 AM, bit1...@163.com bit1...@163.com wrote:

 Take a look at the following SparkContext constructor variant that
 tries to honor the data locality in YARN mode.

   /**
 * :: DeveloperApi ::
 * Alternative constructor for setting preferred locations where Spark
 will create executors.
 *
 * @param preferredNodeLocationData used in YARN mode to select nodes to
 launch containers on.
 * Can be generated using
 [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
 * from a list of input files or InputFormats for the application.
 */
 @DeveloperApi
 def this(config: SparkConf, preferredNodeLocationData: Map[String,
 Set[SplitInfo]]) = {
 this(config)
 this.preferredNodeLocationData = preferredNodeLocationData
 }

 --
 bit1...@163.com


 *From:* Shushant Arora shushantaror...@gmail.com
 *Date:* 2015-05-31 22:54
 *To:* user user@spark.apache.org
 *Subject:* data localisation in spark

 I want to understand how  spark takes care of data localisation in
 cluster mode when run on YARN.

 1.Driver program asks ResourceManager for executors. Does it tell
 yarn's RM to check HDFS blocks of input data and then allocate executors to
 it.
 And executors remain fixed throughout application or driver program
 asks for new executors when it submits another job in same application ,
 since in spark new job is created for each action . If executors are fixed
 then for second job achieving data localisation is impossible?



 2.When executors are done with their processing, does they are marked
 as free in ResourceManager's resoruce queue and  executors directly tell
 this to Rm  instead of via driver's ?

 Thanks
 Shushant







Re: data localisation in spark

2015-06-02 Thread Shushant Arora
 So in spark is after acquiring executors from ClusterManeger, does tasks
are scheduled on executors based on datalocality ?I Mean if in an
application there are 2 jobs and output of 1 job is used as input of
another job.
And in job1 I did persist on some RDD, then while running job2 will it use
the same executor where job1's output was persisted or it acquire executor
again and data movement happens?

And is it true no of execuotrs in an application are fixed and acquired at
start of application  and remains same throught application? If yes, how
does it takes cares of explicit no of reducers in some of apis say
rddd.reduceByKey(func,10);

does at converting DAG to stages it calculates executors required and then
acquire executors/worker nodes ?


On Tue, Jun 2, 2015 at 11:06 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 It is not possible with JavaSparkContext either.  The API mentioned below
 currently does not have any effect (we should document this).

 The primary difference between MR and Spark here is that MR runs each task
 in its own YARN container, while Spark runs multiple tasks within an
 executor, which needs to be requested before Spark knows what tasks it will
 run.  Although dynamic allocation improves that last part.

 -Sandy

 On Tue, Jun 2, 2015 at 9:55 AM, Shushant Arora shushantaror...@gmail.com
 wrote:

 Is it possible in JavaSparkContext ?

 JavaSparkContext jsc = new JavaSparkContext(conf);
 JavaRDDStringlines = jsc.textFile(args[0]);

 If yes , does its programmer's responsibilty to first calculate splits
 locations and then instantiate spark context with preferred locations?

 How does its achieved in MR2 with yarn, there is Application Master
 specifies split locations to ResourceManager before acquiring the node
 managers ?



 On Mon, Jun 1, 2015 at 7:24 AM, bit1...@163.com bit1...@163.com wrote:

 Take a look at the following SparkContext constructor variant that
 tries to honor the data locality in YARN mode.

   /**
 * :: DeveloperApi ::
 * Alternative constructor for setting preferred locations where Spark
 will create executors.
 *
 * @param preferredNodeLocationData used in YARN mode to select nodes to
 launch containers on.
 * Can be generated using
 [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
 * from a list of input files or InputFormats for the application.
 */
 @DeveloperApi
 def this(config: SparkConf, preferredNodeLocationData: Map[String,
 Set[SplitInfo]]) = {
 this(config)
 this.preferredNodeLocationData = preferredNodeLocationData
 }

 --
 bit1...@163.com


 *From:* Shushant Arora shushantaror...@gmail.com
 *Date:* 2015-05-31 22:54
 *To:* user user@spark.apache.org
 *Subject:* data localisation in spark

 I want to understand how  spark takes care of data localisation in
 cluster mode when run on YARN.

 1.Driver program asks ResourceManager for executors. Does it tell yarn's
 RM to check HDFS blocks of input data and then allocate executors to it.
 And executors remain fixed throughout application or driver program asks
 for new executors when it submits another job in same application , since
 in spark new job is created for each action . If executors are fixed then
 for second job achieving data localisation is impossible?



 2.When executors are done with their processing, does they are marked as
 free in ResourceManager's resoruce queue and  executors directly tell this
 to Rm  instead of via driver's ?

 Thanks
 Shushant






Re: data localisation in spark

2015-06-02 Thread Shushant Arora
Is it possible in JavaSparkContext ?

JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDDStringlines = jsc.textFile(args[0]);

If yes , does its programmer's responsibilty to first calculate splits
locations and then instantiate spark context with preferred locations?

How does its achieved in MR2 with yarn, there is Application Master
specifies split locations to ResourceManager before acquiring the node
managers ?



On Mon, Jun 1, 2015 at 7:24 AM, bit1...@163.com bit1...@163.com wrote:

 Take a look at the following SparkContext constructor variant that tries
 to honor the data locality in YARN mode.

   /**
 * :: DeveloperApi ::
 * Alternative constructor for setting preferred locations where Spark will
 create executors.
 *
 * @param preferredNodeLocationData used in YARN mode to select nodes to
 launch containers on.
 * Can be generated using
 [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
 * from a list of input files or InputFormats for the application.
 */
 @DeveloperApi
 def this(config: SparkConf, preferredNodeLocationData: Map[String,
 Set[SplitInfo]]) = {
 this(config)
 this.preferredNodeLocationData = preferredNodeLocationData
 }

 --
 bit1...@163.com


 *From:* Shushant Arora shushantaror...@gmail.com
 *Date:* 2015-05-31 22:54
 *To:* user user@spark.apache.org
 *Subject:* data localisation in spark

 I want to understand how  spark takes care of data localisation in cluster
 mode when run on YARN.

 1.Driver program asks ResourceManager for executors. Does it tell yarn's
 RM to check HDFS blocks of input data and then allocate executors to it.
 And executors remain fixed throughout application or driver program asks
 for new executors when it submits another job in same application , since
 in spark new job is created for each action . If executors are fixed then
 for second job achieving data localisation is impossible?



 2.When executors are done with their processing, does they are marked as
 free in ResourceManager's resoruce queue and  executors directly tell this
 to Rm  instead of via driver's ?

 Thanks
 Shushant




Re: data localisation in spark

2015-06-02 Thread Sandy Ryza
It is not possible with JavaSparkContext either.  The API mentioned below
currently does not have any effect (we should document this).

The primary difference between MR and Spark here is that MR runs each task
in its own YARN container, while Spark runs multiple tasks within an
executor, which needs to be requested before Spark knows what tasks it will
run.  Although dynamic allocation improves that last part.

-Sandy

On Tue, Jun 2, 2015 at 9:55 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 Is it possible in JavaSparkContext ?

 JavaSparkContext jsc = new JavaSparkContext(conf);
 JavaRDDStringlines = jsc.textFile(args[0]);

 If yes , does its programmer's responsibilty to first calculate splits
 locations and then instantiate spark context with preferred locations?

 How does its achieved in MR2 with yarn, there is Application Master
 specifies split locations to ResourceManager before acquiring the node
 managers ?



 On Mon, Jun 1, 2015 at 7:24 AM, bit1...@163.com bit1...@163.com wrote:

 Take a look at the following SparkContext constructor variant that tries
 to honor the data locality in YARN mode.

   /**
 * :: DeveloperApi ::
 * Alternative constructor for setting preferred locations where Spark
 will create executors.
 *
 * @param preferredNodeLocationData used in YARN mode to select nodes to
 launch containers on.
 * Can be generated using
 [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
 * from a list of input files or InputFormats for the application.
 */
 @DeveloperApi
 def this(config: SparkConf, preferredNodeLocationData: Map[String,
 Set[SplitInfo]]) = {
 this(config)
 this.preferredNodeLocationData = preferredNodeLocationData
 }

 --
 bit1...@163.com


 *From:* Shushant Arora shushantaror...@gmail.com
 *Date:* 2015-05-31 22:54
 *To:* user user@spark.apache.org
 *Subject:* data localisation in spark

 I want to understand how  spark takes care of data localisation in
 cluster mode when run on YARN.

 1.Driver program asks ResourceManager for executors. Does it tell yarn's
 RM to check HDFS blocks of input data and then allocate executors to it.
 And executors remain fixed throughout application or driver program asks
 for new executors when it submits another job in same application , since
 in spark new job is created for each action . If executors are fixed then
 for second job achieving data localisation is impossible?



 2.When executors are done with their processing, does they are marked as
 free in ResourceManager's resoruce queue and  executors directly tell this
 to Rm  instead of via driver's ?

 Thanks
 Shushant





Re: data localisation in spark

2015-05-31 Thread Sandy Ryza
Hi Shushant,

Spark currently makes no effort to request executors based on data locality
(although it does try to schedule tasks within executors based on data
locality).  We're working on adding this capability at SPARK-4352
https://issues.apache.org/jira/browse/SPARK-4352.

-Sandy

On Sun, May 31, 2015 at 7:24 AM, Shushant Arora shushantaror...@gmail.com
wrote:


 I want to understand how  spark takes care of data localisation in cluster
 mode when run on YARN.

 1.Driver program asks ResourceManager for executors. Does it tell yarn's
 RM to check HDFS blocks of input data and then allocate executors to it.
 And executors remain fixed throughout application or driver program asks
 for new executors when it submits another job in same application , since
 in spark new job is created for each action . If executors are fixed then
 for second job achieving data localisation is impossible?



 2.When executors are done with their processing, does they are marked as
 free in ResourceManager's resoruce queue and  executors directly tell this
 to Rm  instead of via driver's ?

 Thanks
 Shushant



data localisation in spark

2015-05-31 Thread Shushant Arora
I want to understand how  spark takes care of data localisation in cluster
mode when run on YARN.

1.Driver program asks ResourceManager for executors. Does it tell yarn's RM
to check HDFS blocks of input data and then allocate executors to it.
And executors remain fixed throughout application or driver program asks
for new executors when it submits another job in same application , since
in spark new job is created for each action . If executors are fixed then
for second job achieving data localisation is impossible?



2.When executors are done with their processing, does they are marked as
free in ResourceManager's resoruce queue and  executors directly tell this
to Rm  instead of via driver's ?

Thanks
Shushant