Re: data localisation in spark
Tasks are scheduled on executors based on data locality. Things work as you would expect in the example you brought up. Through dynamic allocation, the number of executors can change throughout the life time of an application. 10 executors (or 5 executors with 2 cores each) are not needed for a reducebyKey with parallelism = 10. If there are fewer slots to run tasks than tasks, the tasks will just be run serially. -Sandy On Tue, Jun 2, 2015 at 11:24 AM, Shushant Arora shushantaror...@gmail.com wrote: So in spark is after acquiring executors from ClusterManeger, does tasks are scheduled on executors based on datalocality ?I Mean if in an application there are 2 jobs and output of 1 job is used as input of another job. And in job1 I did persist on some RDD, then while running job2 will it use the same executor where job1's output was persisted or it acquire executor again and data movement happens? And is it true no of execuotrs in an application are fixed and acquired at start of application and remains same throught application? If yes, how does it takes cares of explicit no of reducers in some of apis say rddd.reduceByKey(func,10); does at converting DAG to stages it calculates executors required and then acquire executors/worker nodes ? On Tue, Jun 2, 2015 at 11:06 PM, Sandy Ryza sandy.r...@cloudera.com wrote: It is not possible with JavaSparkContext either. The API mentioned below currently does not have any effect (we should document this). The primary difference between MR and Spark here is that MR runs each task in its own YARN container, while Spark runs multiple tasks within an executor, which needs to be requested before Spark knows what tasks it will run. Although dynamic allocation improves that last part. -Sandy On Tue, Jun 2, 2015 at 9:55 AM, Shushant Arora shushantaror...@gmail.com wrote: Is it possible in JavaSparkContext ? JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDDStringlines = jsc.textFile(args[0]); If yes , does its programmer's responsibilty to first calculate splits locations and then instantiate spark context with preferred locations? How does its achieved in MR2 with yarn, there is Application Master specifies split locations to ResourceManager before acquiring the node managers ? On Mon, Jun 1, 2015 at 7:24 AM, bit1...@163.com bit1...@163.com wrote: Take a look at the following SparkContext constructor variant that tries to honor the data locality in YARN mode. /** * :: DeveloperApi :: * Alternative constructor for setting preferred locations where Spark will create executors. * * @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. * Can be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]] * from a list of input files or InputFormats for the application. */ @DeveloperApi def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = { this(config) this.preferredNodeLocationData = preferredNodeLocationData } -- bit1...@163.com *From:* Shushant Arora shushantaror...@gmail.com *Date:* 2015-05-31 22:54 *To:* user user@spark.apache.org *Subject:* data localisation in spark I want to understand how spark takes care of data localisation in cluster mode when run on YARN. 1.Driver program asks ResourceManager for executors. Does it tell yarn's RM to check HDFS blocks of input data and then allocate executors to it. And executors remain fixed throughout application or driver program asks for new executors when it submits another job in same application , since in spark new job is created for each action . If executors are fixed then for second job achieving data localisation is impossible? 2.When executors are done with their processing, does they are marked as free in ResourceManager's resoruce queue and executors directly tell this to Rm instead of via driver's ? Thanks Shushant
Re: data localisation in spark
So in spark is after acquiring executors from ClusterManeger, does tasks are scheduled on executors based on datalocality ?I Mean if in an application there are 2 jobs and output of 1 job is used as input of another job. And in job1 I did persist on some RDD, then while running job2 will it use the same executor where job1's output was persisted or it acquire executor again and data movement happens? And is it true no of execuotrs in an application are fixed and acquired at start of application and remains same throught application? If yes, how does it takes cares of explicit no of reducers in some of apis say rddd.reduceByKey(func,10); does at converting DAG to stages it calculates executors required and then acquire executors/worker nodes ? On Tue, Jun 2, 2015 at 11:06 PM, Sandy Ryza sandy.r...@cloudera.com wrote: It is not possible with JavaSparkContext either. The API mentioned below currently does not have any effect (we should document this). The primary difference between MR and Spark here is that MR runs each task in its own YARN container, while Spark runs multiple tasks within an executor, which needs to be requested before Spark knows what tasks it will run. Although dynamic allocation improves that last part. -Sandy On Tue, Jun 2, 2015 at 9:55 AM, Shushant Arora shushantaror...@gmail.com wrote: Is it possible in JavaSparkContext ? JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDDStringlines = jsc.textFile(args[0]); If yes , does its programmer's responsibilty to first calculate splits locations and then instantiate spark context with preferred locations? How does its achieved in MR2 with yarn, there is Application Master specifies split locations to ResourceManager before acquiring the node managers ? On Mon, Jun 1, 2015 at 7:24 AM, bit1...@163.com bit1...@163.com wrote: Take a look at the following SparkContext constructor variant that tries to honor the data locality in YARN mode. /** * :: DeveloperApi :: * Alternative constructor for setting preferred locations where Spark will create executors. * * @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. * Can be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]] * from a list of input files or InputFormats for the application. */ @DeveloperApi def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = { this(config) this.preferredNodeLocationData = preferredNodeLocationData } -- bit1...@163.com *From:* Shushant Arora shushantaror...@gmail.com *Date:* 2015-05-31 22:54 *To:* user user@spark.apache.org *Subject:* data localisation in spark I want to understand how spark takes care of data localisation in cluster mode when run on YARN. 1.Driver program asks ResourceManager for executors. Does it tell yarn's RM to check HDFS blocks of input data and then allocate executors to it. And executors remain fixed throughout application or driver program asks for new executors when it submits another job in same application , since in spark new job is created for each action . If executors are fixed then for second job achieving data localisation is impossible? 2.When executors are done with their processing, does they are marked as free in ResourceManager's resoruce queue and executors directly tell this to Rm instead of via driver's ? Thanks Shushant
Re: data localisation in spark
Is it possible in JavaSparkContext ? JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDDStringlines = jsc.textFile(args[0]); If yes , does its programmer's responsibilty to first calculate splits locations and then instantiate spark context with preferred locations? How does its achieved in MR2 with yarn, there is Application Master specifies split locations to ResourceManager before acquiring the node managers ? On Mon, Jun 1, 2015 at 7:24 AM, bit1...@163.com bit1...@163.com wrote: Take a look at the following SparkContext constructor variant that tries to honor the data locality in YARN mode. /** * :: DeveloperApi :: * Alternative constructor for setting preferred locations where Spark will create executors. * * @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. * Can be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]] * from a list of input files or InputFormats for the application. */ @DeveloperApi def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = { this(config) this.preferredNodeLocationData = preferredNodeLocationData } -- bit1...@163.com *From:* Shushant Arora shushantaror...@gmail.com *Date:* 2015-05-31 22:54 *To:* user user@spark.apache.org *Subject:* data localisation in spark I want to understand how spark takes care of data localisation in cluster mode when run on YARN. 1.Driver program asks ResourceManager for executors. Does it tell yarn's RM to check HDFS blocks of input data and then allocate executors to it. And executors remain fixed throughout application or driver program asks for new executors when it submits another job in same application , since in spark new job is created for each action . If executors are fixed then for second job achieving data localisation is impossible? 2.When executors are done with their processing, does they are marked as free in ResourceManager's resoruce queue and executors directly tell this to Rm instead of via driver's ? Thanks Shushant
Re: data localisation in spark
It is not possible with JavaSparkContext either. The API mentioned below currently does not have any effect (we should document this). The primary difference between MR and Spark here is that MR runs each task in its own YARN container, while Spark runs multiple tasks within an executor, which needs to be requested before Spark knows what tasks it will run. Although dynamic allocation improves that last part. -Sandy On Tue, Jun 2, 2015 at 9:55 AM, Shushant Arora shushantaror...@gmail.com wrote: Is it possible in JavaSparkContext ? JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDDStringlines = jsc.textFile(args[0]); If yes , does its programmer's responsibilty to first calculate splits locations and then instantiate spark context with preferred locations? How does its achieved in MR2 with yarn, there is Application Master specifies split locations to ResourceManager before acquiring the node managers ? On Mon, Jun 1, 2015 at 7:24 AM, bit1...@163.com bit1...@163.com wrote: Take a look at the following SparkContext constructor variant that tries to honor the data locality in YARN mode. /** * :: DeveloperApi :: * Alternative constructor for setting preferred locations where Spark will create executors. * * @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. * Can be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]] * from a list of input files or InputFormats for the application. */ @DeveloperApi def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = { this(config) this.preferredNodeLocationData = preferredNodeLocationData } -- bit1...@163.com *From:* Shushant Arora shushantaror...@gmail.com *Date:* 2015-05-31 22:54 *To:* user user@spark.apache.org *Subject:* data localisation in spark I want to understand how spark takes care of data localisation in cluster mode when run on YARN. 1.Driver program asks ResourceManager for executors. Does it tell yarn's RM to check HDFS blocks of input data and then allocate executors to it. And executors remain fixed throughout application or driver program asks for new executors when it submits another job in same application , since in spark new job is created for each action . If executors are fixed then for second job achieving data localisation is impossible? 2.When executors are done with their processing, does they are marked as free in ResourceManager's resoruce queue and executors directly tell this to Rm instead of via driver's ? Thanks Shushant
Re: data localisation in spark
Hi Shushant, Spark currently makes no effort to request executors based on data locality (although it does try to schedule tasks within executors based on data locality). We're working on adding this capability at SPARK-4352 https://issues.apache.org/jira/browse/SPARK-4352. -Sandy On Sun, May 31, 2015 at 7:24 AM, Shushant Arora shushantaror...@gmail.com wrote: I want to understand how spark takes care of data localisation in cluster mode when run on YARN. 1.Driver program asks ResourceManager for executors. Does it tell yarn's RM to check HDFS blocks of input data and then allocate executors to it. And executors remain fixed throughout application or driver program asks for new executors when it submits another job in same application , since in spark new job is created for each action . If executors are fixed then for second job achieving data localisation is impossible? 2.When executors are done with their processing, does they are marked as free in ResourceManager's resoruce queue and executors directly tell this to Rm instead of via driver's ? Thanks Shushant
data localisation in spark
I want to understand how spark takes care of data localisation in cluster mode when run on YARN. 1.Driver program asks ResourceManager for executors. Does it tell yarn's RM to check HDFS blocks of input data and then allocate executors to it. And executors remain fixed throughout application or driver program asks for new executors when it submits another job in same application , since in spark new job is created for each action . If executors are fixed then for second job achieving data localisation is impossible? 2.When executors are done with their processing, does they are marked as free in ResourceManager's resoruce queue and executors directly tell this to Rm instead of via driver's ? Thanks Shushant