[jira] [Commented] (SPARK-5377) Dynamically add jar into Spark Driver's classpath.
[ https://issues.apache.org/jira/browse/SPARK-5377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374841#comment-16374841 ] Xuefu Zhang commented on SPARK-5377: [~shay_elbaz] I think the issue was closed purely because no one was working on this, based on my private communication with [~sowen]. However, this can surely be reopened if someone likes to work on this. > Dynamically add jar into Spark Driver's classpath. > -- > > Key: SPARK-5377 > URL: https://issues.apache.org/jira/browse/SPARK-5377 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.2.0 >Reporter: Chengxiang Li >Priority: Major > > Spark support dynamically add jar to executor classpath through > SparkContext::addJar(), while it does not support dynamically add jar into > driver classpath. In most case(if not all the case), user dynamically add jar > with SparkContext::addJar() because some classes from the jar would be > referred in upcoming Spark job, which means the classes need to be loaded in > Spark driver side either,e.g during serialization. I think it make sense to > add an API to add jar into driver classpath, or just make it available in > SparkContext::addJar(). HIVE-9410 is a real case from Hive on Spark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358759#comment-16358759 ] Xuefu Zhang commented on SPARK-22683: - On a side note, besides the name of the configuration that's subject to change, I think (and mentioned previously) that the value doesn't have to be an integer, to allow finer control. > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle >Priority: Major > Labels: pull-request-available > > While migrating a series of jobs from MR to Spark using dynamicAllocation, > I've noticed almost a doubling (+114% exactly) of resource consumption of > Spark w.r.t MR, for a wall clock time gain of 43% > About the context: > - resource usage stands for vcore-hours allocation for the whole job, as seen > by YARN > - I'm talking about a series of jobs because we provide our users with a way > to define experiments (via UI / DSL) that automatically get translated to > Spark / MR jobs and submitted on the cluster > - we submit around 500 of such jobs each day > - these jobs are usually one shot, and the amount of processing can vary a > lot between jobs, and as such finding an efficient number of executors for > each job is difficult to get right, which is the reason I took the path of > dynamic allocation. > - Some of the tests have been scheduled on an idle queue, some on a full > queue. > - experiments have been conducted with spark.executor-cores = 5 and 10, only > results for 5 cores have been reported because efficiency was overall better > than with 10 cores > - the figures I give are averaged over a representative sample of those jobs > (about 600 jobs) ranging from tens to thousands splits in the data > partitioning and between 400 to 9000 seconds of wall clock time. > - executor idle timeout is set to 30s; > > Definition: > - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, > which represent the max number of tasks an executor will process in parallel. > - the current behaviour of the dynamic allocation is to allocate enough > containers to have one taskSlot per task, which minimizes latency, but wastes > resources when tasks are small regarding executor allocation and idling > overhead. > The results using the proposal (described below) over the job sample (600 > jobs): > - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in > resource usage, for a 37% (against 43%) reduction in wall clock time for > Spark w.r.t MR > - by trying to minimize the average resource consumption, I ended up with 6 > tasks per core, with a 30% resource usage reduction, for a similar wall clock > time w.r.t. MR > What did I try to solve the issue with existing parameters (summing up a few > points mentioned in the comments) ? > - change dynamicAllocation.maxExecutors: this would need to be adapted for > each job (tens to thousands splits can occur), and essentially remove the > interest of using the dynamic allocation. > - use dynamicAllocation.backlogTimeout: > - setting this parameter right to avoid creating unused executors is very > dependant on wall clock time. One basically needs to solve the exponential > ramp up for the target time. So this is not an option for my use case where I > don't want a per-job tuning. > - I've still done a series of experiments, details in the comments. > Result is that after manual tuning, the best I could get was a similar > resource consumption at the expense of 20% more wall clock time, or a similar > wall clock time at the expense of 60% more resource consumption than what I > got using my proposal @ 6 tasks per slot (this value being optimized over a > much larger range of jobs as already stated) > - as mentioned in another comment, tampering with the exponential ramp up > might yield task imbalance and such old executors could become contention > points for other exes trying to remotely access blocks in the old exes (not > witnessed in the jobs I'm talking about, but we did see this behavior in > other jobs) > Proposal: > Simply add a tasksPerExecutorSlot parameter, which makes it possible to > specify how many tasks a single taskSlot should ideally execute to mitigate > the overhead of executor allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail:
[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355634#comment-16355634 ] Xuefu Zhang commented on SPARK-22683: - +1 on the idea of including this. Also, +1 on renaming the config. Thanks. > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle >Priority: Major > Labels: pull-request-available > > While migrating a series of jobs from MR to Spark using dynamicAllocation, > I've noticed almost a doubling (+114% exactly) of resource consumption of > Spark w.r.t MR, for a wall clock time gain of 43% > About the context: > - resource usage stands for vcore-hours allocation for the whole job, as seen > by YARN > - I'm talking about a series of jobs because we provide our users with a way > to define experiments (via UI / DSL) that automatically get translated to > Spark / MR jobs and submitted on the cluster > - we submit around 500 of such jobs each day > - these jobs are usually one shot, and the amount of processing can vary a > lot between jobs, and as such finding an efficient number of executors for > each job is difficult to get right, which is the reason I took the path of > dynamic allocation. > - Some of the tests have been scheduled on an idle queue, some on a full > queue. > - experiments have been conducted with spark.executor-cores = 5 and 10, only > results for 5 cores have been reported because efficiency was overall better > than with 10 cores > - the figures I give are averaged over a representative sample of those jobs > (about 600 jobs) ranging from tens to thousands splits in the data > partitioning and between 400 to 9000 seconds of wall clock time. > - executor idle timeout is set to 30s; > > Definition: > - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, > which represent the max number of tasks an executor will process in parallel. > - the current behaviour of the dynamic allocation is to allocate enough > containers to have one taskSlot per task, which minimizes latency, but wastes > resources when tasks are small regarding executor allocation and idling > overhead. > The results using the proposal (described below) over the job sample (600 > jobs): > - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in > resource usage, for a 37% (against 43%) reduction in wall clock time for > Spark w.r.t MR > - by trying to minimize the average resource consumption, I ended up with 6 > tasks per core, with a 30% resource usage reduction, for a similar wall clock > time w.r.t. MR > What did I try to solve the issue with existing parameters (summing up a few > points mentioned in the comments) ? > - change dynamicAllocation.maxExecutors: this would need to be adapted for > each job (tens to thousands splits can occur), and essentially remove the > interest of using the dynamic allocation. > - use dynamicAllocation.backlogTimeout: > - setting this parameter right to avoid creating unused executors is very > dependant on wall clock time. One basically needs to solve the exponential > ramp up for the target time. So this is not an option for my use case where I > don't want a per-job tuning. > - I've still done a series of experiments, details in the comments. > Result is that after manual tuning, the best I could get was a similar > resource consumption at the expense of 20% more wall clock time, or a similar > wall clock time at the expense of 60% more resource consumption than what I > got using my proposal @ 6 tasks per slot (this value being optimized over a > much larger range of jobs as already stated) > - as mentioned in another comment, tampering with the exponential ramp up > might yield task imbalance and such old executors could become contention > points for other exes trying to remotely access blocks in the old exes (not > witnessed in the jobs I'm talking about, but we did see this behavior in > other jobs) > Proposal: > Simply add a tasksPerExecutorSlot parameter, which makes it possible to > specify how many tasks a single taskSlot should ideally execute to mitigate > the overhead of executor allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22765) Create a new executor allocation scheme based on that of MR
[ https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16300926#comment-16300926 ] Xuefu Zhang edited comment on SPARK-22765 at 12/22/17 4:43 AM: --- Did some benchmarking with a set of 20 queries on upfront allocation against exponential ramp-up. No clear trend is seen: upfront allocation offers better efficiency for some of the queries, similar efficiency for others, and worse for the rest. These variations might just be noise, which is abundant in our production cluster. Thus, I tend to agree that upfront allocation offers limited benefit for efficiency, if any. (On the other hand, it seems benefiting performance somewhat.) I also noticed that when the scheduler schedules a task, it doesn't necessarily pick a core that's available in an executor that's running other tasks. I speculate that efficiency improves if busy executors are favored for a new task so that other idle executors can idle out. (To be tested out.) Making idleTime=0 valid is a good thing to have. I created a separate ticket (SPARK-22870) for that. was (Author: xuefuz): Did some benchmarking with a set of 20 queries on upfront allocation against exponential ramp-up. No clear trend is seen: upfront allocation offers better efficiency for some of the queries, similar efficiency for others, and worse for the rest. These variations might just be noise, which is abundant in our production cluster. Thus, I tend to agree that upfront allocation offers limited benefit for efficiency, if any. (On the other hand, it seems benefiting performance somewhat.) I also noticed that when the scheduler schedules a task, it doesn't necessarily pick a core that's available in an executor that's running other tasks. I speculate that efficiency improves if busy executors are favored for a new task so that other idle executors can idle out. (To be tested out.) Making idleTime=0 valid is a good thing to have. I will create a separate ticket for that. > Create a new executor allocation scheme based on that of MR > --- > > Key: SPARK-22765 > URL: https://issues.apache.org/jira/browse/SPARK-22765 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang > > Many users migrating their workload from MR to Spark find a significant > resource consumption hike (i.e, SPARK-22683). While this might not be a > concern for users that are more performance centric, for others conscious > about cost, such hike creates a migration obstacle. This situation can get > worse as more users are moving to cloud. > Dynamic allocation make it possible for Spark to be deployed in multi-tenant > environment. With its performance-centric design, its inefficiency has also > unfortunately shown up, especially when compared with MR. Thus, it's believed > that MR-styled scheduler still has its merit. Based on our research, the > inefficiency associated with dynamic allocation comes in many aspects such as > executor idling out, bigger executors, many stages (rather than 2 stages only > in MR) in a spark job, etc. > Rather than fine tuning dynamic allocation for efficiency, the proposal here > is to add a new, efficiency-centric scheduling scheme based on that of MR. > Such a MR-based scheme can be further enhanced and be more adapted to Spark > execution model. This alternative is expected to offer good performance > improvement (compared to MR) still with similar to or even better efficiency > than MR. > Inputs are greatly welcome! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22870) Dynamic allocation should allow 0 idle time
Xuefu Zhang created SPARK-22870: --- Summary: Dynamic allocation should allow 0 idle time Key: SPARK-22870 URL: https://issues.apache.org/jira/browse/SPARK-22870 Project: Spark Issue Type: Improvement Components: Scheduler Affects Versions: 1.6.0 Reporter: Xuefu Zhang Priority: Minor As discussed in SPARK-22765, with SPARK-21656, an executor will not idle out when there are pending tasks to run. When there is no task to run, an executor will die out after {{spark.dynamicAllocation.executorIdleTimeout}}, which is currently required to be greater than zero. However, for efficiency, a user should be able to specify that an executor can die out immediately w/o being required to be idle for at least 1s. This is to make {{0}} a valid value for {{spark.dynamicAllocation.executorIdleTimeout}}, and special handling such a case might be needed. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR
[ https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16300926#comment-16300926 ] Xuefu Zhang commented on SPARK-22765: - Did some benchmarking with a set of 20 queries on upfront allocation against exponential ramp-up. No clear trend is seen: upfront allocation offers better efficiency for some of the queries, similar efficiency for others, and worse for the rest. These variations might just be noise, which is abundant in our production cluster. Thus, I tend to agree that upfront allocation offers limited benefit for efficiency, if any. (On the other hand, it seems benefiting performance somewhat.) I also noticed that when the scheduler schedules a task, it doesn't necessarily pick a core that's available in an executor that's running other tasks. I speculate that efficiency improves if busy executors are favored for a new task so that other idle executors can idle out. (To be tested out.) Making idleTime=0 valid is a good thing to have. I will create a separate ticket for that. > Create a new executor allocation scheme based on that of MR > --- > > Key: SPARK-22765 > URL: https://issues.apache.org/jira/browse/SPARK-22765 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang > > Many users migrating their workload from MR to Spark find a significant > resource consumption hike (i.e, SPARK-22683). While this might not be a > concern for users that are more performance centric, for others conscious > about cost, such hike creates a migration obstacle. This situation can get > worse as more users are moving to cloud. > Dynamic allocation make it possible for Spark to be deployed in multi-tenant > environment. With its performance-centric design, its inefficiency has also > unfortunately shown up, especially when compared with MR. Thus, it's believed > that MR-styled scheduler still has its merit. Based on our research, the > inefficiency associated with dynamic allocation comes in many aspects such as > executor idling out, bigger executors, many stages (rather than 2 stages only > in MR) in a spark job, etc. > Rather than fine tuning dynamic allocation for efficiency, the proposal here > is to add a new, efficiency-centric scheduling scheme based on that of MR. > Such a MR-based scheme can be further enhanced and be more adapted to Spark > execution model. This alternative is expected to offer good performance > improvement (compared to MR) still with similar to or even better efficiency > than MR. > Inputs are greatly welcome! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR
[ https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298634#comment-16298634 ] Xuefu Zhang commented on SPARK-22765: - bq: at least based on this one experiment up front allocation didn't help. I'm not sure how the conclusion is drawn here, but with #5, which diffs from #3 only with additional upfront allocation, we see a resource usage decrease from 2X to 1.4X. This doesn't seem supporting this claim. I will do more testing on this to make my observation more evident. For idle=0, I think it's a valid case and should be helpful as well. > Create a new executor allocation scheme based on that of MR > --- > > Key: SPARK-22765 > URL: https://issues.apache.org/jira/browse/SPARK-22765 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang > > Many users migrating their workload from MR to Spark find a significant > resource consumption hike (i.e, SPARK-22683). While this might not be a > concern for users that are more performance centric, for others conscious > about cost, such hike creates a migration obstacle. This situation can get > worse as more users are moving to cloud. > Dynamic allocation make it possible for Spark to be deployed in multi-tenant > environment. With its performance-centric design, its inefficiency has also > unfortunately shown up, especially when compared with MR. Thus, it's believed > that MR-styled scheduler still has its merit. Based on our research, the > inefficiency associated with dynamic allocation comes in many aspects such as > executor idling out, bigger executors, many stages (rather than 2 stages only > in MR) in a spark job, etc. > Rather than fine tuning dynamic allocation for efficiency, the proposal here > is to add a new, efficiency-centric scheduling scheme based on that of MR. > Such a MR-based scheme can be further enhanced and be more adapted to Spark > execution model. This alternative is expected to offer good performance > improvement (compared to MR) still with similar to or even better efficiency > than MR. > Inputs are greatly welcome! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22765) Create a new executor allocation scheme based on that of MR
[ https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297914#comment-16297914 ] Xuefu Zhang edited comment on SPARK-22765 at 12/20/17 5:45 AM: --- Alright, I tested upfront allocation and its combinations with other improvement ideas and here is what I found: 1. the query: just one query, fairly complicated and represented by main spark job like this: {code} Status: Running (Hive on Spark job[4]) -- STAGES ATTEMPTSTATUS TOTAL COMPLETED RUNNING PENDING FAILED -- Stage-10 ... 0 FINISHED33933900 0 Stage-11 ... 0 FINISHED20120100 0 Stage-12 ... 0 FINISHED19119100 0 Stage-13 ... 0 FINISHED17817800 0 Stage-14 ... 0 FINISHED11511500 0 Stage-15 ... 0 FINISHED10510500 0 Stage-16 ... 0 FINISHED59259200 0 Stage-17 ... 0 FINISHED19119100 0 Stage-4 0 FINISHED17817800 0 Stage-5 0 FINISHED11511500 0 Stage-6 0 FINISHED10510500 0 Stage-7 0 FINISHED33933900 0 Stage-8 0 FINISHED20120100 0 Stage-9 0 FINISHED19119100 0 -- {code} 2. Without any improvement, default 60s idleTime, Spark uses more than 3X resources compared to MR. 3. With idleTime=5s and the improvement in SPARK-21656, Spark uses about 2X resources. 4. Same as #3, but with idleTime=1s, Spark uses 1.4X resoures 5. Same as #3, but with additional upfront allocation Spark also uses 1.4X resource 6. Same as #5, but with additional improvement in SPARK-22683 (factor = 2), Spark uses 1.2X resource 7. Same as #6 but with factor=3, Spark uses 0.8X resources. While this is just for one query, far from being conclusive, we can really see that how much those considerations might impact efficiency. I'm sure the mileage varies, but this at least shows there is a lot of room for Spark to improve resource utilization efficiency wrt scheduling. was (Author: xuefuz): Alright, I tested upfront allocation and its combinations with other improvement ideas and here is what I found: 1. the query: just one query, fairly complicated, represented by one of the main spark jobs: {code} Status: Running (Hive on Spark job[4]) -- STAGES ATTEMPTSTATUS TOTAL COMPLETED RUNNING PENDING FAILED -- Stage-10 ... 0 FINISHED33933900 0 Stage-11 ... 0 FINISHED20120100 0 Stage-12 ... 0 FINISHED19119100 0 Stage-13 ... 0 FINISHED17817800 0 Stage-14 ... 0 FINISHED11511500 0 Stage-15 ... 0 FINISHED10510500 0 Stage-16 ... 0 FINISHED59259200 0 Stage-17 ... 0 FINISHED19119100 0 Stage-4 0 FINISHED17817800 0 Stage-5 0 FINISHED11511500 0 Stage-6 0 FINISHED10510500 0 Stage-7 0 FINISHED33933900 0 Stage-8 0 FINISHED20120100 0 Stage-9 0 FINISHED19119100 0 -- {code} 2. Without any improvement, default 60s idleTime, Spark uses more than 3X resources compared to MR. 3. With idleTime=5s and the improvement in SPARK-21656, Spark uses about 2X resources. 4. Same as #3, but with
[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR
[ https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297914#comment-16297914 ] Xuefu Zhang commented on SPARK-22765: - Alright, I tested upfront allocation and its combinations with other improvement ideas and here is what I found: 1. the query: just one query, fairly complicated, represented by one of the main spark jobs: {code} Status: Running (Hive on Spark job[4]) -- STAGES ATTEMPTSTATUS TOTAL COMPLETED RUNNING PENDING FAILED -- Stage-10 ... 0 FINISHED33933900 0 Stage-11 ... 0 FINISHED20120100 0 Stage-12 ... 0 FINISHED19119100 0 Stage-13 ... 0 FINISHED17817800 0 Stage-14 ... 0 FINISHED11511500 0 Stage-15 ... 0 FINISHED10510500 0 Stage-16 ... 0 FINISHED59259200 0 Stage-17 ... 0 FINISHED19119100 0 Stage-4 0 FINISHED17817800 0 Stage-5 0 FINISHED11511500 0 Stage-6 0 FINISHED10510500 0 Stage-7 0 FINISHED33933900 0 Stage-8 0 FINISHED20120100 0 Stage-9 0 FINISHED19119100 0 -- {code} 2. Without any improvement, default 60s idleTime, Spark uses more than 3X resources compared to MR. 3. With idleTime=5s and the improvement in SPARK-21656, Spark uses about 2X resources. 4. Same as #3, but with idleTime=1s, Spark uses 1.4X resoures 5. Same as #3, but with additional upfront allocation Spark also uses 1.4X resource 6. Same as #4, but with additional improvement in SPARK-22683 (factor = 2), Spark uses 1.2X resource 7. Same as #6 but with factor=3, Spark uses 0.8X resources. While this is just for one query, far from being conclusive, we can really see that how much those considerations might impact efficiency. I'm sure the mileage varies, but this at least shows there is a lot of room for Spark to improve resource utilization efficiency wrt scheduling. > Create a new executor allocation scheme based on that of MR > --- > > Key: SPARK-22765 > URL: https://issues.apache.org/jira/browse/SPARK-22765 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang > > Many users migrating their workload from MR to Spark find a significant > resource consumption hike (i.e, SPARK-22683). While this might not be a > concern for users that are more performance centric, for others conscious > about cost, such hike creates a migration obstacle. This situation can get > worse as more users are moving to cloud. > Dynamic allocation make it possible for Spark to be deployed in multi-tenant > environment. With its performance-centric design, its inefficiency has also > unfortunately shown up, especially when compared with MR. Thus, it's believed > that MR-styled scheduler still has its merit. Based on our research, the > inefficiency associated with dynamic allocation comes in many aspects such as > executor idling out, bigger executors, many stages (rather than 2 stages only > in MR) in a spark job, etc. > Rather than fine tuning dynamic allocation for efficiency, the proposal here > is to add a new, efficiency-centric scheduling scheme based on that of MR. > Such a MR-based scheme can be further enhanced and be more adapted to Spark > execution model. This alternative is expected to offer good performance > improvement (compared to MR) still with similar to or even better efficiency > than MR. > Inputs are greatly welcome! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR
[ https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297645#comment-16297645 ] Xuefu Zhang commented on SPARK-22765: - Haven't got a chance to try upfront allocation. Tried one query (runs for a couple of mins) with 1s idle time. The resource usage is further cut down as much as half, very close to that of MR in this case. I think we should allow 0s idle time, even for completeness. I will try upfront allocation and update. > Create a new executor allocation scheme based on that of MR > --- > > Key: SPARK-22765 > URL: https://issues.apache.org/jira/browse/SPARK-22765 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang > > Many users migrating their workload from MR to Spark find a significant > resource consumption hike (i.e, SPARK-22683). While this might not be a > concern for users that are more performance centric, for others conscious > about cost, such hike creates a migration obstacle. This situation can get > worse as more users are moving to cloud. > Dynamic allocation make it possible for Spark to be deployed in multi-tenant > environment. With its performance-centric design, its inefficiency has also > unfortunately shown up, especially when compared with MR. Thus, it's believed > that MR-styled scheduler still has its merit. Based on our research, the > inefficiency associated with dynamic allocation comes in many aspects such as > executor idling out, bigger executors, many stages (rather than 2 stages only > in MR) in a spark job, etc. > Rather than fine tuning dynamic allocation for efficiency, the proposal here > is to add a new, efficiency-centric scheduling scheme based on that of MR. > Such a MR-based scheme can be further enhanced and be more adapted to Spark > execution model. This alternative is expected to offer good performance > improvement (compared to MR) still with similar to or even better efficiency > than MR. > Inputs are greatly welcome! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR
[ https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297529#comment-16297529 ] Xuefu Zhang commented on SPARK-22765: - {quote} SPARK-21656 and the dynamic allocation should handle that, the target number in the dynamic allocation manager is supposed to be based on all running stages for pending and running tasks. Are you saying that is not true? {quote} I verified and it does seem that SPARK-21656 covers concurrent stages. My job ran too fast, so I had to limit maxExecutors to a smaller number to observe more closely. It's no issue, after all. This is great! > Create a new executor allocation scheme based on that of MR > --- > > Key: SPARK-22765 > URL: https://issues.apache.org/jira/browse/SPARK-22765 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang > > Many users migrating their workload from MR to Spark find a significant > resource consumption hike (i.e, SPARK-22683). While this might not be a > concern for users that are more performance centric, for others conscious > about cost, such hike creates a migration obstacle. This situation can get > worse as more users are moving to cloud. > Dynamic allocation make it possible for Spark to be deployed in multi-tenant > environment. With its performance-centric design, its inefficiency has also > unfortunately shown up, especially when compared with MR. Thus, it's believed > that MR-styled scheduler still has its merit. Based on our research, the > inefficiency associated with dynamic allocation comes in many aspects such as > executor idling out, bigger executors, many stages (rather than 2 stages only > in MR) in a spark job, etc. > Rather than fine tuning dynamic allocation for efficiency, the proposal here > is to add a new, efficiency-centric scheduling scheme based on that of MR. > Such a MR-based scheme can be further enhanced and be more adapted to Spark > execution model. This alternative is expected to offer good performance > improvement (compared to MR) still with similar to or even better efficiency > than MR. > Inputs are greatly welcome! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR
[ https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297346#comment-16297346 ] Xuefu Zhang commented on SPARK-22765: - I'm not 100% positive, but that seems to be what I saw. I will verify further. > Create a new executor allocation scheme based on that of MR > --- > > Key: SPARK-22765 > URL: https://issues.apache.org/jira/browse/SPARK-22765 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang > > Many users migrating their workload from MR to Spark find a significant > resource consumption hike (i.e, SPARK-22683). While this might not be a > concern for users that are more performance centric, for others conscious > about cost, such hike creates a migration obstacle. This situation can get > worse as more users are moving to cloud. > Dynamic allocation make it possible for Spark to be deployed in multi-tenant > environment. With its performance-centric design, its inefficiency has also > unfortunately shown up, especially when compared with MR. Thus, it's believed > that MR-styled scheduler still has its merit. Based on our research, the > inefficiency associated with dynamic allocation comes in many aspects such as > executor idling out, bigger executors, many stages (rather than 2 stages only > in MR) in a spark job, etc. > Rather than fine tuning dynamic allocation for efficiency, the proposal here > is to add a new, efficiency-centric scheduling scheme based on that of MR. > Such a MR-based scheme can be further enhanced and be more adapted to Spark > execution model. This alternative is expected to offer good performance > improvement (compared to MR) still with similar to or even better efficiency > than MR. > Inputs are greatly welcome! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR
[ https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297316#comment-16297316 ] Xuefu Zhang commented on SPARK-22765: - Actually I meant across parallel stages (those connected to union transformation), not serial stages that I care much less. The following is what I meant (in Hive on Spark context): {code} Status: Running (Hive on Spark job[4]) -- STAGES ATTEMPTSTATUS TOTAL COMPLETED RUNNING PENDING FAILED -- Stage-10 .. 0 RUNNING340126 2140 0 Stage-11 0 PENDING201 00 201 0 Stage-12 0 PENDING191 00 191 0 Stage-13 . 0 RUNNING178 33 1450 0 Stage-14 0 PENDING115 00 115 0 Stage-15 0 PENDING105 00 105 0 Stage-16 0 PENDING592 00 592 0 Stage-17 0 PENDING191 00 191 0 Stage-4 ... 0 RUNNING178157 210 4 Stage-5 0 PENDING115 00 115 0 Stage-6 0 PENDING105 00 105 0 Stage-7 .0 RUNNING340232 1080 0 Stage-8 0 PENDING201 00 201 0 Stage-9 0 PENDING191 00 191 0 {code} > Create a new executor allocation scheme based on that of MR > --- > > Key: SPARK-22765 > URL: https://issues.apache.org/jira/browse/SPARK-22765 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang > > Many users migrating their workload from MR to Spark find a significant > resource consumption hike (i.e, SPARK-22683). While this might not be a > concern for users that are more performance centric, for others conscious > about cost, such hike creates a migration obstacle. This situation can get > worse as more users are moving to cloud. > Dynamic allocation make it possible for Spark to be deployed in multi-tenant > environment. With its performance-centric design, its inefficiency has also > unfortunately shown up, especially when compared with MR. Thus, it's believed > that MR-styled scheduler still has its merit. Based on our research, the > inefficiency associated with dynamic allocation comes in many aspects such as > executor idling out, bigger executors, many stages (rather than 2 stages only > in MR) in a spark job, etc. > Rather than fine tuning dynamic allocation for efficiency, the proposal here > is to add a new, efficiency-centric scheduling scheme based on that of MR. > Such a MR-based scheme can be further enhanced and be more adapted to Spark > execution model. This alternative is expected to offer good performance > improvement (compared to MR) still with similar to or even better efficiency > than MR. > Inputs are greatly welcome! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR
[ https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297276#comment-16297276 ] Xuefu Zhang commented on SPARK-22765: - Hi [~CodingCat], yes, we adapted both #1 and #2, but we still needed 60s as minimum w/o SPARK-21656. Hi @Tomas Graves, idle=1 seems working, but idle=0 seems illegal. {code} org.apache.spark.SparkException: spark.dynamicAllocation.executorIdleTimeout must be > 0! {code} Now I think it, idle=0 should be permitted because a user might not want any idle executor at all. {quote} with SPARK-21656 executors shouldn't timeout unless you are between stages {quote} This seems functioning correctly. However, I think this needs to be extended across concurrent stages. Based on what I'm seeing, executors are not reused across such stages, which means if an executor working for a completed stage should be recycled if there are other running stages that have pending tasks. > Create a new executor allocation scheme based on that of MR > --- > > Key: SPARK-22765 > URL: https://issues.apache.org/jira/browse/SPARK-22765 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang > > Many users migrating their workload from MR to Spark find a significant > resource consumption hike (i.e, SPARK-22683). While this might not be a > concern for users that are more performance centric, for others conscious > about cost, such hike creates a migration obstacle. This situation can get > worse as more users are moving to cloud. > Dynamic allocation make it possible for Spark to be deployed in multi-tenant > environment. With its performance-centric design, its inefficiency has also > unfortunately shown up, especially when compared with MR. Thus, it's believed > that MR-styled scheduler still has its merit. Based on our research, the > inefficiency associated with dynamic allocation comes in many aspects such as > executor idling out, bigger executors, many stages (rather than 2 stages only > in MR) in a spark job, etc. > Rather than fine tuning dynamic allocation for efficiency, the proposal here > is to add a new, efficiency-centric scheduling scheme based on that of MR. > Such a MR-based scheme can be further enhanced and be more adapted to Spark > execution model. This alternative is expected to offer good performance > improvement (compared to MR) still with similar to or even better efficiency > than MR. > Inputs are greatly welcome! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR
[ https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295814#comment-16295814 ] Xuefu Zhang commented on SPARK-22765: - As an update, I managed to backport SPARK-21656, among a few others, into our codebase and measured the efficiency improvement with a smaller idle time (from 60s to 5s). Our test shows that the efficiency gain is significant (consistent 2X) for small jobs, especially those with many stages. For large, long-running jobs. the gain is less significant, about 10% higher than 60s). I'd like to point out that even with 2X improvement on efficiency, for small jobs, Sparks is till behind. With 60s idle time, MR uses only about 35% of resource used by Spark. With 5s, now MR uses about 70% of that used by Spark. I suspect that the additional overhead comes from: 1. exponential ramp-up allocation; 2. bigger container (I have 4 core per container). It seems clearer to me now about the desired allocation scheme that is optimized for efficiency: 1. Upfront allocation instead of exponential ramp-up 2. Zero idle time (reuse containers if there are pending tasks or kill them right way if there are none) 3. Optimizations for smaller containers (like 1 core per container). In combination with executor conservation factor from SPARK-22683, the new scheme, which diverts from dynamic allocation widely, should offer better resource efficiency. > Create a new executor allocation scheme based on that of MR > --- > > Key: SPARK-22765 > URL: https://issues.apache.org/jira/browse/SPARK-22765 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang > > Many users migrating their workload from MR to Spark find a significant > resource consumption hike (i.e, SPARK-22683). While this might not be a > concern for users that are more performance centric, for others conscious > about cost, such hike creates a migration obstacle. This situation can get > worse as more users are moving to cloud. > Dynamic allocation make it possible for Spark to be deployed in multi-tenant > environment. With its performance-centric design, its inefficiency has also > unfortunately shown up, especially when compared with MR. Thus, it's believed > that MR-styled scheduler still has its merit. Based on our research, the > inefficiency associated with dynamic allocation comes in many aspects such as > executor idling out, bigger executors, many stages (rather than 2 stages only > in MR) in a spark job, etc. > Rather than fine tuning dynamic allocation for efficiency, the proposal here > is to add a new, efficiency-centric scheduling scheme based on that of MR. > Such a MR-based scheme can be further enhanced and be more adapted to Spark > execution model. This alternative is expected to offer good performance > improvement (compared to MR) still with similar to or even better efficiency > than MR. > Inputs are greatly welcome! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295770#comment-16295770 ] Xuefu Zhang commented on SPARK-22683: - I did some tests with the proposed change here and saw general efficiency improvement with increasing tasksPerSlot. A number between 2 and 4 usually gives the best trade off between performance and efficiency because the efficiency gain is big while the performance impact is small. As the number gets larger, the efficiency return diminishes while latency degradation becomes unreasonable. The effect of resource saving here plays best when the tasks are small. In such a case, task finishes quickly, which doesn't require each task run on different core. Because the tasks are small, the latency impact is also limited. On the other hand, if the tasks are long running, keeping them waiting will increase latency while resource saving is limited. Based on what I observed, this knob, whatever is called, is practically useful for efficiency conscious users. It doesn't seem hurting efficiency in any way. For users that care about latency or don't care about cost, the default value will do. Thus, I favor the inclusion of such a knob. Nevertheless, I do agree that this knob is a little bit confusion as it is because the concept of execution slot doesn't exist in Spark. I think it might be better to define as a factor of the number of executors to be requested vs the number of executors required to run all waiting tasks. So, a name like {{spark.executor.conservation.factor}} might do. And it doesn't have to be an integer. With this included, I think the next thing to look at is the exponential ramp-up in scheduling. I'd think up-front full allocation, in combination of the conservation factor here, will have a big impact on resource efficiency. > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > While migrating a series of jobs from MR to Spark using dynamicAllocation, > I've noticed almost a doubling (+114% exactly) of resource consumption of > Spark w.r.t MR, for a wall clock time gain of 43% > About the context: > - resource usage stands for vcore-hours allocation for the whole job, as seen > by YARN > - I'm talking about a series of jobs because we provide our users with a way > to define experiments (via UI / DSL) that automatically get translated to > Spark / MR jobs and submitted on the cluster > - we submit around 500 of such jobs each day > - these jobs are usually one shot, and the amount of processing can vary a > lot between jobs, and as such finding an efficient number of executors for > each job is difficult to get right, which is the reason I took the path of > dynamic allocation. > - Some of the tests have been scheduled on an idle queue, some on a full > queue. > - experiments have been conducted with spark.executor-cores = 5 and 10, only > results for 5 cores have been reported because efficiency was overall better > than with 10 cores > - the figures I give are averaged over a representative sample of those jobs > (about 600 jobs) ranging from tens to thousands splits in the data > partitioning and between 400 to 9000 seconds of wall clock time. > - executor idle timeout is set to 30s; > > Definition: > - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, > which represent the max number of tasks an executor will process in parallel. > - the current behaviour of the dynamic allocation is to allocate enough > containers to have one taskSlot per task, which minimizes latency, but wastes > resources when tasks are small regarding executor allocation and idling > overhead. > The results using the proposal (described below) over the job sample (600 > jobs): > - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in > resource usage, for a 37% (against 43%) reduction in wall clock time for > Spark w.r.t MR > - by trying to minimize the average resource consumption, I ended up with 6 > tasks per core, with a 30% resource usage reduction, for a similar wall clock > time w.r.t. MR > What did I try to solve the issue with existing parameters (summing up a few > points mentioned in the comments) ? > - change dynamicAllocation.maxExecutors: this would need to be adapted for > each job (tens to thousands splits can occur), and essentially remove the > interest of using the dynamic allocation. > - use dynamicAllocation.backlogTimeout:
[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR
[ https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292048#comment-16292048 ] Xuefu Zhang commented on SPARK-22765: - [~tgraves], I think it would help if SPARK-21656 can make a close-to-zero idle time work. This is one source of inefficiency. Our version is too old to backport the fix, but will try out this when we upgrade. The second source of inefficiency comes in the fact that Spark favors bigger containers. A 4-core container might be running one task while wasting the other cores/mem. The executor cannot die as long as there is one task running. One might argue that a user configures 1-core containers under dynamic allocation. but this is probably not optimal on other aspects. The third reason that one might favor MR-styled scheduling is its simplicity and efficiency. Frequently we found that for heavy workload the scheduler cannot really keep up with the task ups and downs, especially when the tasks finish fast. For cost-conscious users, cluster-level resource efficiency is probably what's looked at. My suspicion is that an enhanced MR-styled scheduling, simple and performing, will be significantly improve resource efficiency than a typical use of dynamic allocation, without sacrificing much performance. As a start point, we will first benchmark with SPARK-21656 when possible. > Create a new executor allocation scheme based on that of MR > --- > > Key: SPARK-22765 > URL: https://issues.apache.org/jira/browse/SPARK-22765 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang > > Many users migrating their workload from MR to Spark find a significant > resource consumption hike (i.e, SPARK-22683). While this might not be a > concern for users that are more performance centric, for others conscious > about cost, such hike creates a migration obstacle. This situation can get > worse as more users are moving to cloud. > Dynamic allocation make it possible for Spark to be deployed in multi-tenant > environment. With its performance-centric design, its inefficiency has also > unfortunately shown up, especially when compared with MR. Thus, it's believed > that MR-styled scheduler still has its merit. Based on our research, the > inefficiency associated with dynamic allocation comes in many aspects such as > executor idling out, bigger executors, many stages (rather than 2 stages only > in MR) in a spark job, etc. > Rather than fine tuning dynamic allocation for efficiency, the proposal here > is to add a new, efficiency-centric scheduling scheme based on that of MR. > Such a MR-based scheme can be further enhanced and be more adapted to Spark > execution model. This alternative is expected to offer good performance > improvement (compared to MR) still with similar to or even better efficiency > than MR. > Inputs are greatly welcome! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR
[ https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289726#comment-16289726 ] Xuefu Zhang commented on SPARK-22765: - Yes, we are using Hive on Spark. Our Spark version is 1.6.1, which is old. Obviously it doesn't have the fix in SPARK-21656. As commented in SPARK-22683, our comparison was made between all jobs for MR VS all jobs (usually just 1) for Spark for individual queries. > Create a new executor allocation scheme based on that of MR > --- > > Key: SPARK-22765 > URL: https://issues.apache.org/jira/browse/SPARK-22765 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang > > Many users migrating their workload from MR to Spark find a significant > resource consumption hike (i.e, SPARK-22683). While this might not be a > concern for users that are more performance centric, for others conscious > about cost, such hike creates a migration obstacle. This situation can get > worse as more users are moving to cloud. > Dynamic allocation make it possible for Spark to be deployed in multi-tenant > environment. With its performance-centric design, its inefficiency has also > unfortunately shown up, especially when compared with MR. Thus, it's believed > that MR-styled scheduler still has its merit. Based on our research, the > inefficiency associated with dynamic allocation comes in many aspects such as > executor idling out, bigger executors, many stages (rather than 2 stages only > in MR) in a spark job, etc. > Rather than fine tuning dynamic allocation for efficiency, the proposal here > is to add a new, efficiency-centric scheduling scheme based on that of MR. > Such a MR-based scheme can be further enhanced and be more adapted to Spark > execution model. This alternative is expected to offer good performance > improvement (compared to MR) still with similar to or even better efficiency > than MR. > Inputs are greatly welcome! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289689#comment-16289689 ] Xuefu Zhang commented on SPARK-22683: - [~tgraves], I can speak on our use case, where same queries running on MR vs Spark via Hive. Because Spark gets rid of the intermediate HDFS reads/writes of MR, we expected better efficiency in addition to perf gains. While our expectation is met for some of our queries, usually long running ones with many stages, for the resource usage is much worse for other queries, especially those short running ones. I believe that efficiency can be substantially enhanced in both cases. > DynamicAllocation wastes resources by allocating containers that will barely > be used > > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > let's say an executor has spark.executor.cores / spark.task.cpus taskSlots > The current dynamic allocation policy allocates enough executors > to have each taskSlot execute a single task, which minimizes latency, > but wastes resources when tasks are small regarding executor allocation > and idling overhead. > By adding the tasksPerExecutorSlot, it is made possible to specify how many > tasks > a single slot should ideally execute to mitigate the overhead of executor > allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22765) Create a new executor allocation scheme based on that of MR
[ https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289676#comment-16289676 ] Xuefu Zhang edited comment on SPARK-22765 at 12/13/17 6:30 PM: --- Hi [~tgraves], Thanks for your input. In our busy, heavily loaded cluster environment, we have found that any idle time less than 60s is a problem. 30s works for small jobs, but starts having problem for bigger jobs. The symptom is that newly allocated executors are idled out before completing a single tasks! I suspected that this is caused by a busy scheduler. As a result, we have to keep 60s as a minimum. Having said that, however, I'm not against container reuse. Also, I used the word "enhanced" to qualify MR scheduling. Reusing is good, but in my opinion the speculation factor in dynamic allocation goes against efficiency. That is, you set an idle time just in case a new task comes within that period of time. When that doesn't happen, you waste your executor for 1 minute. (This is good for performance.) Please note that this happens a lot at the end of each stage because no tasks from the next stage will be scheduled until the current stage finishes. If we can remove the speculation aspect of the scheduling, the efficiency should improve significantly with some compromise on performance. This would be a good start point, which is the main purpose of my proposal of an enhanced MR-style scheduling, which is open to many other possible improvements. was (Author: xuefuz): Hi [~tgraves], Thanks for your input. In our busy, heavily loaded cluster environment, we have found that any idle time less than 60s is a problem. 30s works for small jobs, but starts having problem for bigger jobs. The symptom is that newly allocated executors are idled out before completing a single tasks! I suspected that this is caused by a busy scheduler. As a result, we have to keep 60s as a minimum. Having said that, however, I'm not against container reuse. Also, I used the word "enhanced" to improve on MR scheduling. Reusing is good, but in my opinion the speculation factor in dynamic allocation goes against efficiency. That is, you set an idle time just in case a new task comes within that period of time. When that doesn't happen, you waste your executor for 1 minute. (This is good for performance.) Please note that this happens a lot at the end of each stage because no tasks from the next stage will be scheduled until the current stage finishes. If we can remove the speculation aspect of the scheduling, the efficiency should improve significantly with some compromise on performance. This would be a good start point, which is the main purpose of my proposal of an enhanced MR-style scheduling, which is open to many other possible improvements. > Create a new executor allocation scheme based on that of MR > --- > > Key: SPARK-22765 > URL: https://issues.apache.org/jira/browse/SPARK-22765 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang > > Many users migrating their workload from MR to Spark find a significant > resource consumption hike (i.e, SPARK-22683). While this might not be a > concern for users that are more performance centric, for others conscious > about cost, such hike creates a migration obstacle. This situation can get > worse as more users are moving to cloud. > Dynamic allocation make it possible for Spark to be deployed in multi-tenant > environment. With its performance-centric design, its inefficiency has also > unfortunately shown up, especially when compared with MR. Thus, it's believed > that MR-styled scheduler still has its merit. Based on our research, the > inefficiency associated with dynamic allocation comes in many aspects such as > executor idling out, bigger executors, many stages (rather than 2 stages only > in MR) in a spark job, etc. > Rather than fine tuning dynamic allocation for efficiency, the proposal here > is to add a new, efficiency-centric scheduling scheme based on that of MR. > Such a MR-based scheme can be further enhanced and be more adapted to Spark > execution model. This alternative is expected to offer good performance > improvement (compared to MR) still with similar to or even better efficiency > than MR. > Inputs are greatly welcome! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR
[ https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289676#comment-16289676 ] Xuefu Zhang commented on SPARK-22765: - Hi [~tgraves], Thanks for your input. In our busy, heavily loaded cluster environment, we have found that any idle time less than 60s is a problem. 30s works for small jobs, but starts having problem for bigger jobs. The symptom is that newly allocated executors are idled out before completing a single tasks! I suspected that this is caused by a busy scheduler. As a result, we have to keep 60s as a minimum. Having said that, however, I'm not against container reuse. Also, I used the word "enhanced" to improve on MR scheduling. Reusing is good, but in my opinion the speculation factor in dynamic allocation goes against efficiency. That is, you set an idle time just in case a new task comes within that period of time. When that doesn't happen, you waste your executor for 1 minute. (This is good for performance.) Please note that this happens a lot at the end of each stage because no tasks from the next stage will be scheduled until the current stage finishes. If we can remove the speculation aspect of the scheduling, the efficiency should improve significantly with some compromise on performance. This would be a good start point, which is the main purpose of my proposal of an enhanced MR-style scheduling, which is open to many other possible improvements. > Create a new executor allocation scheme based on that of MR > --- > > Key: SPARK-22765 > URL: https://issues.apache.org/jira/browse/SPARK-22765 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang > > Many users migrating their workload from MR to Spark find a significant > resource consumption hike (i.e, SPARK-22683). While this might not be a > concern for users that are more performance centric, for others conscious > about cost, such hike creates a migration obstacle. This situation can get > worse as more users are moving to cloud. > Dynamic allocation make it possible for Spark to be deployed in multi-tenant > environment. With its performance-centric design, its inefficiency has also > unfortunately shown up, especially when compared with MR. Thus, it's believed > that MR-styled scheduler still has its merit. Based on our research, the > inefficiency associated with dynamic allocation comes in many aspects such as > executor idling out, bigger executors, many stages (rather than 2 stages only > in MR) in a spark job, etc. > Rather than fine tuning dynamic allocation for efficiency, the proposal here > is to add a new, efficiency-centric scheduling scheme based on that of MR. > Such a MR-based scheme can be further enhanced and be more adapted to Spark > execution model. This alternative is expected to offer good performance > improvement (compared to MR) still with similar to or even better efficiency > than MR. > Inputs are greatly welcome! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR
[ https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16288414#comment-16288414 ] Xuefu Zhang commented on SPARK-22765: - I wouldn't say that MR is static, at lease not static in Spark's sense. MR allocates an executor for each map or reduce task and the executor exits when the running task completes. This would avoid the inefficiency in dynamic allocation where executor has to slowly die out (0 idle time doesn't really work). Secondly, this proposal doesn't really intend to go back to the MR paradigm. Instead, I propose a scheduling scheme similar to MR but enhanced to fit into Spark's DAG execution model. To be clear, the proposal here is not to replace dynamic allocation. Rather, it provides an alternative that's more efficiency-centric than dynamic allocation. I understand there are a lot of details lacking, but I'd like to start a discussion now and hopefully something concrete will come out soon. > Create a new executor allocation scheme based on that of MR > --- > > Key: SPARK-22765 > URL: https://issues.apache.org/jira/browse/SPARK-22765 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang > > Many users migrating their workload from MR to Spark find a significant > resource consumption hike (i.e, SPARK-22683). While this might not be a > concern for users that are more performance centric, for others conscious > about cost, such hike creates a migration obstacle. This situation can get > worse as more users are moving to cloud. > Dynamic allocation make it possible for Spark to be deployed in multi-tenant > environment. With its performance-centric design, its inefficiency has also > unfortunately shown up, especially when compared with MR. Thus, it's believed > that MR-styled scheduler still has its merit. Based on our research, the > inefficiency associated with dynamic allocation comes in many aspects such as > executor idling out, bigger executors, many stages (rather than 2 stages only > in MR) in a spark job, etc. > Rather than fine tuning dynamic allocation for efficiency, the proposal here > is to add a new, efficiency-centric scheduling scheme based on that of MR. > Such a MR-based scheme can be further enhanced and be more adapted to Spark > execution model. This alternative is expected to offer good performance > improvement (compared to MR) still with similar to or even better efficiency > than MR. > Inputs are greatly welcome! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22765) Create a new executor allocation scheme based on that of MR
Xuefu Zhang created SPARK-22765: --- Summary: Create a new executor allocation scheme based on that of MR Key: SPARK-22765 URL: https://issues.apache.org/jira/browse/SPARK-22765 Project: Spark Issue Type: Improvement Components: Scheduler Affects Versions: 1.6.0 Reporter: Xuefu Zhang Many users migrating their workload from MR to Spark find a significant resource consumption hike (i.e, SPARK-22683). While this might not be a concern for users that are more performance centric, for others conscious about cost, such hike creates a migration obstacle. This situation can get worse as more users are moving to cloud. Dynamic allocation make it possible for Spark to be deployed in multi-tenant environment. With its performance-centric design, its inefficiency has also unfortunately shown up, especially when compared with MR. Thus, it's believed that MR-styled scheduler still has its merit. Based on our research, the inefficiency associated with dynamic allocation comes in many aspects such as executor idling out, bigger executors, many stages (rather than 2 stages only in MR) in a spark job, etc. Rather than fine tuning dynamic allocation for efficiency, the proposal here is to add a new, efficiency-centric scheduling scheme based on that of MR. Such a MR-based scheme can be further enhanced and be more adapted to Spark execution model. This alternative is expected to offer good performance improvement (compared to MR) still with similar to or even better efficiency than MR. Inputs are greatly welcome! -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number
[ https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16288262#comment-16288262 ] Xuefu Zhang commented on SPARK-22683: - Hi [~jcuquemelle], Thanks for working on this and bringing up the efficiency problem associated with dynamic allocation. Significant resource consumption increase is also experienced in our company when workload is migrated from MR to Spark (via Hive). Thus, I believe that there is a strong need to improve spark efficiency in addition to performance. While your proposal has its merit, I largely concur with Sean that it might not be universally applicable to solve a class of problem rather than particular workload. Take MR as an example, which also allocate as many mappers/reducers as the number of map or reduce tasks, yet offers higher efficiency than Spark in many cases. The inefficiency associated with dynamic allocation comes in many aspects such as executor idling out, bigger executors, many stages (rather than 2 stages only in MR) in a spark job, etc. As there is a class of users conscious about resource consumption, especially when many moving their workload to the cloud, there demands a solution that's more generic to such users. I have been thinking about a proposal that introduces a MR-based resource allocation in parallel with dynamic allocation. Such an allocation mechanism is based on MR style, but can be further enhanced to beat MR and be more adapted to Spark execution model. This would be a great alternative to dynamic allocation. While dynamic is certainly performance centric, the new allocation scheme can still offer good performance improvement (compared to MR) while being efficiency-centric. As a start point, I'm going to create an JIRA and move the discussion along this proposal over there. You're welcome to share your thoughts and/or contribute. Thanks. > Allow tuning the number of dynamically allocated executors wrt task number > -- > > Key: SPARK-22683 > URL: https://issues.apache.org/jira/browse/SPARK-22683 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Julien Cuquemelle > Labels: pull-request-available > > let's say an executor has spark.executor.cores / spark.task.cpus taskSlots > The current dynamic allocation policy allocates enough executors > to have each taskSlot execute a single task, which minimizes latency, > but wastes resources when tasks are small regarding executor allocation > overhead. > By adding the tasksPerExecutorSlot, it is made possible to specify how many > tasks > a single slot should ideally execute to mitigate the overhead of executor > allocation. > PR: https://github.com/apache/spark/pull/19881 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20640) Make rpc timeout and retry for shuffle registration configurable
[ https://issues.apache.org/jira/browse/SPARK-20640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286807#comment-16286807 ] Xuefu Zhang commented on SPARK-20640: - [~lyc], thanks for fixing this. I'm wondering if you have any recommendation on the values of the new configurations in a heavy workload environment. Thanks. > Make rpc timeout and retry for shuffle registration configurable > > > Key: SPARK-20640 > URL: https://issues.apache.org/jira/browse/SPARK-20640 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.0.2 >Reporter: Sital Kedia >Assignee: Li Yichao > Fix For: 2.3.0 > > > Currently the shuffle service registration timeout and retry has been > hardcoded (see > https://github.com/sitalkedia/spark/blob/master/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java#L144 > and > https://github.com/sitalkedia/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L197). > This works well for small workloads but under heavy workload when the > shuffle service is busy transferring large amount of data we see significant > delay in responding to the registration request, as a result we often see the > executors fail to register with the shuffle service, eventually failing the > job. We need to make these two parameters configurable. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20662) Block jobs that have greater than a configured number of tasks
[ https://issues.apache.org/jira/browse/SPARK-20662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035519#comment-16035519 ] Xuefu Zhang commented on SPARK-20662: - I can understand the counter argument here if Spark is targeted for single user cases. For multiple users in an enterprise deployment, it's good to provide admin knobs. In this case, an admin just wanted to block bad jobs. I don't think RM meets that goal. This is actually implemented in Hive on Spark. However, I thought this is generic and may be desirable for others as well. In addition, blocking a job at submission is better than killing it after it started to run. If Spark doesn't think this is useful, then very well. > Block jobs that have greater than a configured number of tasks > -- > > Key: SPARK-20662 > URL: https://issues.apache.org/jira/browse/SPARK-20662 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.6.0, 2.0.0 >Reporter: Xuefu Zhang > > In a shared cluster, it's desirable for an admin to block large Spark jobs. > While there might not be a single metrics defining the size of a job, the > number of tasks is usually a good indicator. Thus, it would be useful for > Spark scheduler to block a job whose number of tasks reaches a configured > limit. By default, the limit could be just infinite, to retain the existing > behavior. > MapReduce has mapreduce.job.max.map and mapreduce.job.max.reduce to be > configured, which blocks a MR job at job submission time. > The proposed configuration is spark.job.max.tasks with a default value -1 > (infinite). -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20662) Block jobs that have greater than a configured number of tasks
[ https://issues.apache.org/jira/browse/SPARK-20662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035462#comment-16035462 ] Xuefu Zhang commented on SPARK-20662: - [~lyc] I'm talking about mapreduce.job.max.map, which is the maximum number of map tasks that a MR job may have. If a submitted MR job contains more map tasks than that, it will be rejected. Similar to mapreduce.job.max.reduce. [~sowen], [~vanzin], I don't think blocking a large (perhaps ridiculously) job is equivalent to letting it run slowly and for ever. The use case I have is: while yarn queue can be used to limit how much resources can be used, but a queue can be shared by a team or multiple applications. It's probably not a good idea to let one job takes all resources while starving others. Secondly, many those users who submit ridiculously large job have no idea on what they are doing and they don't even realize that their jobs are huge. Lastly and more importantly, our application environment has a global timeout, beyond which a job will be killed. If a large job gets killed this way, significant resources is wasted. Thus, blocking such a job at submission time helps preserve the resources. BTW, if the scenarios don't apply to a user, there is nothing for him/her to worry about because the default should keep them happy. In addition to spark.job.max.tasks, I'd also propose spark.stage.max.tasks, which limits the number of tasks any stage of a job may contain. The rationale behind this is that spark.job.max.tasks tends to favor jobs with small number of stages. With both, we can not only cover MR's mapreduce.job.max.map and mapreduce.job.max.reduce, but also control the overall size of a job. > Block jobs that have greater than a configured number of tasks > -- > > Key: SPARK-20662 > URL: https://issues.apache.org/jira/browse/SPARK-20662 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.6.0, 2.0.0 >Reporter: Xuefu Zhang > > In a shared cluster, it's desirable for an admin to block large Spark jobs. > While there might not be a single metrics defining the size of a job, the > number of tasks is usually a good indicator. Thus, it would be useful for > Spark scheduler to block a job whose number of tasks reaches a configured > limit. By default, the limit could be just infinite, to retain the existing > behavior. > MapReduce has mapreduce.job.max.map and mapreduce.job.max.reduce to be > configured, which blocks a MR job at job submission time. > The proposed configuration is spark.job.max.tasks with a default value -1 > (infinite). -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20662) Block jobs that have greater than a configured number of tasks
Xuefu Zhang created SPARK-20662: --- Summary: Block jobs that have greater than a configured number of tasks Key: SPARK-20662 URL: https://issues.apache.org/jira/browse/SPARK-20662 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.0.0, 1.6.0 Reporter: Xuefu Zhang In a shared cluster, it's desirable for an admin to block large Spark jobs. While there might not be a single metrics defining the size of a job, the number of tasks is usually a good indicator. Thus, it would be useful for Spark scheduler to block a job whose number of tasks reaches a configured limit. By default, the limit could be just infinite, to retain the existing behavior. MapReduce has mapreduce.job.max.map and mapreduce.job.max.reduce to be configured, which blocks a MR job at job submission time. The proposed configuration is spark.job.max.tasks with a default value -1 (infinite). -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18769) Spark to be smarter about what the upper bound is and to restrict number of executor when dynamic allocation is enabled
[ https://issues.apache.org/jira/browse/SPARK-18769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15891610#comment-15891610 ] Xuefu Zhang commented on SPARK-18769: - Just as fyi, the problem is real and happens when allocation attempts are made as long as there are pending tasks (which is in line with dynamic allocation). However, it's pointless when all containers are already taken and further attempts are very likely in vain, which adversely creates pressure on event processing in Spark driver and may also have impact on YARN RM. I don't know what's the best solution for this, Maybe Spark can just try to allocate all it needs upfront and update (tune down) the allocation request as the job progresses when necessary. As a workaround, we have to set an artificial upper limit (something like 2500), which helps a lot. > Spark to be smarter about what the upper bound is and to restrict number of > executor when dynamic allocation is enabled > > > Key: SPARK-18769 > URL: https://issues.apache.org/jira/browse/SPARK-18769 > Project: Spark > Issue Type: New Feature >Reporter: Neerja Khattar > > Currently when dynamic allocation is enabled max.executor is infinite and > spark creates so many executor and even exceed the yarn nodemanager memory > limit and vcores. > It should have a check to not exceed more that yarn resource limit. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2421) Spark should treat writable as serializable for keys
[ https://issues.apache.org/jira/browse/SPARK-2421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148684#comment-15148684 ] Xuefu Zhang commented on SPARK-2421: [~sowen], I saw you had closed this without giving any explanation. Do you mind sharing? > Spark should treat writable as serializable for keys > > > Key: SPARK-2421 > URL: https://issues.apache.org/jira/browse/SPARK-2421 > Project: Spark > Issue Type: Improvement > Components: Input/Output, Java API >Affects Versions: 1.0.0 >Reporter: Xuefu Zhang > > It seems that Spark requires the key be serializable (class implement > Serializable interface). In Hadoop world, Writable interface is used for the > same purpose. A lot of existing classes, while writable, are not considered > by Spark as Serializable. It would be nice if Spark can treate Writable as > serializable and automatically serialize and de-serialize these classes using > writable interface. > This is identified in HIVE-7279, but its benefits are seen global. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5377) Dynamically add jar into Spark Driver's classpath.
[ https://issues.apache.org/jira/browse/SPARK-5377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148675#comment-15148675 ] Xuefu Zhang commented on SPARK-5377: [~sowen], I saw you had closed this without giving any explanation. Do you mind sharing? > Dynamically add jar into Spark Driver's classpath. > -- > > Key: SPARK-5377 > URL: https://issues.apache.org/jira/browse/SPARK-5377 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.2.0 >Reporter: Chengxiang Li > > Spark support dynamically add jar to executor classpath through > SparkContext::addJar(), while it does not support dynamically add jar into > driver classpath. In most case(if not all the case), user dynamically add jar > with SparkContext::addJar() because some classes from the jar would be > referred in upcoming Spark job, which means the classes need to be loaded in > Spark driver side either,e.g during serialization. I think it make sense to > add an API to add jar into driver classpath, or just make it available in > SparkContext::addJar(). HIVE-9410 is a real case from Hive on Spark. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3621) Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access
[ https://issues.apache.org/jira/browse/SPARK-3621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14343977#comment-14343977 ] Xuefu Zhang commented on SPARK-3621: {quote} you can go a step further if you wanted to and just read the data directly from HDFS into a (singleton) cache on the executor. {quote} Yeah. Sharing cache in an executor is something we wanted to do but it seems a little complicated due to concurrency and lack of job boundary at executor. Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access --- Key: SPARK-3621 URL: https://issues.apache.org/jira/browse/SPARK-3621 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0, 1.1.0 Reporter: Xuefu Zhang In some cases, such as Hive's way of doing map-side join, it would be benefcial to allow client program to broadcast RDDs rather than just variables made of these RDDs. Broadcasting a variable made of RDDs requires all RDD data be collected to the driver and that the variable be shipped to the cluster after being made. It would be more performing if driver just broadcasts the RDDs and uses the corresponding data in jobs (such building hashmaps at executors). Tez has a broadcast edge which can ship data from previous stage to the next stage, which doesn't require driver side processing. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3621) Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access
[ https://issues.apache.org/jira/browse/SPARK-3621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14343582#comment-14343582 ] Xuefu Zhang commented on SPARK-3621: For Hive's map join, we create a hash table out of a small table (HadoopRDD in spark's term) after some transformations. We want to broadcast the hash table (which is written to HDFS) such that each executor will be able to access it to do the join. We thought of spark's broadcast variable for this purpose. However, Spark's broadcast variable will ship the data to the driver and then broadcast to every executor. We wanted to avoid this extra trip since the hash table is already in HDFS. Thus, we wanted a mechanism to broadcast the dataset and make the dataset available (even better if in memory) at each executor, w/o shipping the dataset back to the driver. Referring this dataset as an RDD might have caused the confusion at the first place. Currently, we worked around the problem by calling SparkContext.addFile() at the driver and accessing it using SparkFiles.get() at the executor. Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access --- Key: SPARK-3621 URL: https://issues.apache.org/jira/browse/SPARK-3621 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0, 1.1.0 Reporter: Xuefu Zhang In some cases, such as Hive's way of doing map-side join, it would be benefcial to allow client program to broadcast RDDs rather than just variables made of these RDDs. Broadcasting a variable made of RDDs requires all RDD data be collected to the driver and that the variable be shipped to the cluster after being made. It would be more performing if driver just broadcasts the RDDs and uses the corresponding data in jobs (such building hashmaps at executors). Tez has a broadcast edge which can ship data from previous stage to the next stage, which doesn't require driver side processing. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3621) Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access
[ https://issues.apache.org/jira/browse/SPARK-3621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14343655#comment-14343655 ] Xuefu Zhang commented on SPARK-3621: addFile() can take a HDFS file, for which case, no file is shipped from the driver. And getFile() guarantees that the file is only copied once to local node per executor. Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access --- Key: SPARK-3621 URL: https://issues.apache.org/jira/browse/SPARK-3621 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0, 1.1.0 Reporter: Xuefu Zhang In some cases, such as Hive's way of doing map-side join, it would be benefcial to allow client program to broadcast RDDs rather than just variables made of these RDDs. Broadcasting a variable made of RDDs requires all RDD data be collected to the driver and that the variable be shipped to the cluster after being made. It would be more performing if driver just broadcasts the RDDs and uses the corresponding data in jobs (such building hashmaps at executors). Tez has a broadcast edge which can ship data from previous stage to the next stage, which doesn't require driver side processing. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3691) Provide a mini cluster for testing system built on Spark
[ https://issues.apache.org/jira/browse/SPARK-3691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14340950#comment-14340950 ] Xuefu Zhang commented on SPARK-3691: Hive is using spark.master=local-cluster for unit tests. It also built a mini yarn cluster to test Hive on Spark. Thus, we don't need this any more. Provide a mini cluster for testing system built on Spark Key: SPARK-3691 URL: https://issues.apache.org/jira/browse/SPARK-3691 Project: Spark Issue Type: Test Components: Spark Core Affects Versions: 1.1.0 Reporter: Xuefu Zhang Most Hadoop components such MR, DFS, Tez, and Yarn provide a mini cluster that can be used to test the external systems that rely on those frameworks, such as Pig and Hive. While Spark's local mode can be used to do such testing and is friendly for debugging, it's too far from a real Spark cluster and a lot of problems cannot be discovered. Thus, an equivalent of Hadoop MR mini cluster in Spark would be very helpful in testing system such as Hive/Pig on Spark. Spark's local-cluster is considered for this purpose but it doesn't fit well because it requires a Spark installation on the box where the tests run. Also, local-cluster isn't exposed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-3691) Provide a mini cluster for testing system built on Spark
[ https://issues.apache.org/jira/browse/SPARK-3691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuefu Zhang closed SPARK-3691. -- Resolution: Won't Fix Provide a mini cluster for testing system built on Spark Key: SPARK-3691 URL: https://issues.apache.org/jira/browse/SPARK-3691 Project: Spark Issue Type: Test Components: Spark Core Affects Versions: 1.1.0 Reporter: Xuefu Zhang Most Hadoop components such MR, DFS, Tez, and Yarn provide a mini cluster that can be used to test the external systems that rely on those frameworks, such as Pig and Hive. While Spark's local mode can be used to do such testing and is friendly for debugging, it's too far from a real Spark cluster and a lot of problems cannot be discovered. Thus, an equivalent of Hadoop MR mini cluster in Spark would be very helpful in testing system such as Hive/Pig on Spark. Spark's local-cluster is considered for this purpose but it doesn't fit well because it requires a Spark installation on the box where the tests run. Also, local-cluster isn't exposed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5439) Expose yarn app id for yarn mode
[ https://issues.apache.org/jira/browse/SPARK-5439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14294406#comment-14294406 ] Xuefu Zhang commented on SPARK-5439: Yeah. This is certainly something desirable in Hive on Spark. As far as I know, Hive isn't reporting the application ID to user presently. cc: [~chengxiang li] Expose yarn app id for yarn mode Key: SPARK-5439 URL: https://issues.apache.org/jira/browse/SPARK-5439 Project: Spark Issue Type: Improvement Components: YARN Affects Versions: 1.0.0 Reporter: bc Wong When submitting Spark apps on YARN, the caller should be able to get back the YARN app ID programmatically. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2688) Need a way to run multiple data pipeline concurrently
[ https://issues.apache.org/jira/browse/SPARK-2688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14292448#comment-14292448 ] Xuefu Zhang commented on SPARK-2688: Yeah. We don't need a syntactic suger, but a transformation that just does one pass of the input RDD. This has performance implications on Hive's multi-insert use cases. Need a way to run multiple data pipeline concurrently - Key: SPARK-2688 URL: https://issues.apache.org/jira/browse/SPARK-2688 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 1.0.1 Reporter: Xuefu Zhang Suppose we want to do the following data processing: {code} rdd1 - rdd2 - rdd3 | - rdd4 | - rdd5 \ - rdd6 {code} where - represents a transformation. rdd3 to rrdd6 are all derived from an intermediate rdd2. We use foreach(fn) with a dummy function to trigger the execution. However, rdd.foreach(fn) only trigger pipeline rdd1 - rdd2 - rdd3. To make things worse, when we call rdd4.foreach(), rdd2 will be recomputed. This is very inefficient. Ideally, we should be able to trigger the execution the whole graph and reuse rdd2, but there doesn't seem to be a way doing so. Tez already realized the importance of this (TEZ-391), so I think Spark should provide this too. This is required for Hive to support multi-insert queries. HIVE-7292. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2688) Need a way to run multiple data pipeline concurrently
[ https://issues.apache.org/jira/browse/SPARK-2688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14292415#comment-14292415 ] Xuefu Zhang commented on SPARK-2688: #1 above is exactly what Hive needs badly. Need a way to run multiple data pipeline concurrently - Key: SPARK-2688 URL: https://issues.apache.org/jira/browse/SPARK-2688 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 1.0.1 Reporter: Xuefu Zhang Suppose we want to do the following data processing: {code} rdd1 - rdd2 - rdd3 | - rdd4 | - rdd5 \ - rdd6 {code} where - represents a transformation. rdd3 to rrdd6 are all derived from an intermediate rdd2. We use foreach(fn) with a dummy function to trigger the execution. However, rdd.foreach(fn) only trigger pipeline rdd1 - rdd2 - rdd3. To make things worse, when we call rdd4.foreach(), rdd2 will be recomputed. This is very inefficient. Ideally, we should be able to trigger the execution the whole graph and reuse rdd2, but there doesn't seem to be a way doing so. Tez already realized the importance of this (TEZ-391), so I think Spark should provide this too. This is required for Hive to support multi-insert queries. HIVE-7292. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3621) Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access
[ https://issues.apache.org/jira/browse/SPARK-3621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14291145#comment-14291145 ] Xuefu Zhang commented on SPARK-3621: I'm not sure if I agree that this is not a problem. To broadcast is to make certain dataset available to all nodes in the cluster. Existing broadcast functionality is limited to broadcast data in the driver, while this improvement requests that datasets, which already exists in the cluster, be broadcast to all nodes without requiring shipping that dataset from the cluster to the driver and then to all nodes in the cluster again. Improvement is never a problem if we are not open to it. If for some reason this cannot be done, we need to understand the reason. Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access --- Key: SPARK-3621 URL: https://issues.apache.org/jira/browse/SPARK-3621 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0, 1.1.0 Reporter: Xuefu Zhang In some cases, such as Hive's way of doing map-side join, it would be benefcial to allow client program to broadcast RDDs rather than just variables made of these RDDs. Broadcasting a variable made of RDDs requires all RDD data be collected to the driver and that the variable be shipped to the cluster after being made. It would be more performing if driver just broadcasts the RDDs and uses the corresponding data in jobs (such building hashmaps at executors). Tez has a broadcast edge which can ship data from previous stage to the next stage, which doesn't require driver side processing. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-3621) Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access
[ https://issues.apache.org/jira/browse/SPARK-3621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuefu Zhang reopened SPARK-3621: Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access --- Key: SPARK-3621 URL: https://issues.apache.org/jira/browse/SPARK-3621 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0, 1.1.0 Reporter: Xuefu Zhang In some cases, such as Hive's way of doing map-side join, it would be benefcial to allow client program to broadcast RDDs rather than just variables made of these RDDs. Broadcasting a variable made of RDDs requires all RDD data be collected to the driver and that the variable be shipped to the cluster after being made. It would be more performing if driver just broadcasts the RDDs and uses the corresponding data in jobs (such building hashmaps at executors). Tez has a broadcast edge which can ship data from previous stage to the next stage, which doesn't require driver side processing. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2688) Need a way to run multiple data pipeline concurrently
[ https://issues.apache.org/jira/browse/SPARK-2688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14291134#comment-14291134 ] Xuefu Zhang commented on SPARK-2688: I think SPARK-3622 is related to this JIRA but not exactly the same. This JIRA essentially asks capability of executing a random DAG built of RDDs, while SPARK-3622 is requesting a custom transformation that can take one input RDD and generates multiple output RDDs. HIve on Spark projects needs this because HIve's map-side or reduce-side processing (which is translated to Spark's map functions) generates multiple outputs in general. On this ground, SPARK-3622 is more important than SPARK-2688. It's worth to mention that such a custom transformation can be used in building a random DAG. Need a way to run multiple data pipeline concurrently - Key: SPARK-2688 URL: https://issues.apache.org/jira/browse/SPARK-2688 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 1.0.1 Reporter: Xuefu Zhang Suppose we want to do the following data processing: {code} rdd1 - rdd2 - rdd3 | - rdd4 | - rdd5 \ - rdd6 {code} where - represents a transformation. rdd3 to rrdd6 are all derived from an intermediate rdd2. We use foreach(fn) with a dummy function to trigger the execution. However, rdd.foreach(fn) only trigger pipeline rdd1 - rdd2 - rdd3. To make things worse, when we call rdd4.foreach(), rdd2 will be recomputed. This is very inefficient. Ideally, we should be able to trigger the execution the whole graph and reuse rdd2, but there doesn't seem to be a way doing so. Tez already realized the importance of this (TEZ-391), so I think Spark should provide this too. This is required for Hive to support multi-insert queries. HIVE-7292. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3622) Provide a custom transformation that can output multiple RDDs
[ https://issues.apache.org/jira/browse/SPARK-3622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14287901#comment-14287901 ] Xuefu Zhang commented on SPARK-3622: Unfortunately no. In Hive the problem is more general than just wring to different files. What Hive needs is some transformation that can take multiple map functions and output multiple RDDs from an input RDD. These result RDDs may be further processed than just simply being written to files. Provide a custom transformation that can output multiple RDDs - Key: SPARK-3622 URL: https://issues.apache.org/jira/browse/SPARK-3622 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.1.0 Reporter: Xuefu Zhang All existing transformations return just one RDD at most, even for those which takes user-supplied functions such as mapPartitions() . However, sometimes a user provided function may need to output multiple RDDs. For instance, a filter function that divides the input RDD into serveral RDDs. While it's possible to get multiple RDDs by transforming the same RDD multiple times, it may be more efficient to do this concurrently in one shot. Especially user's existing function is already generating different data sets. This the case in Hive on Spark, where Hive's map function and reduce function can output different data sets to be consumed by subsequent stages. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5377) Dynamically add jar into Spark Driver's classpath.
[ https://issues.apache.org/jira/browse/SPARK-5377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14288866#comment-14288866 ] Xuefu Zhang commented on SPARK-5377: cc [~sandyr]. Dynamically add jar into Spark Driver's classpath. -- Key: SPARK-5377 URL: https://issues.apache.org/jira/browse/SPARK-5377 Project: Spark Issue Type: Improvement Affects Versions: 1.2.0 Reporter: Chengxiang Li Spark support dynamically add jar to executor classpath through SparkContext::addJar(), while it does not support dynamically add jar into driver classpath. In most case(if not all the case), user dynamically add jar with SparkContext::addJar() because some classes from the jar would be referred in upcoming Spark job, which means the classes need to be loaded in Spark driver side either,e.g during serialization. I think it make sense to add an API to add jar into driver classpath, or just make it available in SparkContext::addJar(). HIVE-9410 is a real case from Hive on Spark. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1021) sortByKey() launches a cluster job when it shouldn't
[ https://issues.apache.org/jira/browse/SPARK-1021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14281168#comment-14281168 ] Xuefu Zhang commented on SPARK-1021: This problem also occurred on Hive on Spark (HIVE-9370. Could we take this forward? sortByKey() launches a cluster job when it shouldn't Key: SPARK-1021 URL: https://issues.apache.org/jira/browse/SPARK-1021 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 0.8.0, 0.9.0, 1.0.0, 1.1.0 Reporter: Andrew Ash Assignee: Erik Erlandson Labels: starter The sortByKey() method is listed as a transformation, not an action, in the documentation. But it launches a cluster job regardless. http://spark.incubator.apache.org/docs/latest/scala-programming-guide.html Some discussion on the mailing list suggested that this is a problem with the rdd.count() call inside Partitioner.scala's rangeBounds method. https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala#L102 Josh Rosen suggests that rangeBounds should be made into a lazy variable: {quote} I wonder whether making RangePartitoner .rangeBounds into a lazy val would fix this (https://github.com/apache/incubator-spark/blob/6169fe14a140146602fb07cfcd13eee6efad98f9/core/src/main/scala/org/apache/spark/Partitioner.scala#L95). We'd need to make sure that rangeBounds() is never called before an action is performed. This could be tricky because it's called in the RangePartitioner.equals() method. Maybe it's sufficient to just compare the number of partitions, the ids of the RDDs used to create the RangePartitioner, and the sort ordering. This still supports the case where I range-partition one RDD and pass the same partitioner to a different RDD. It breaks support for the case where two range partitioners created on different RDDs happened to have the same rangeBounds(), but it seems unlikely that this would really harm performance since it's probably unlikely that the range partitioners are equal by chance. {quote} Can we please make this happen? I'll send a PR on GitHub to start the discussion and testing. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5080) Expose more cluster resource information to user
[ https://issues.apache.org/jira/browse/SPARK-5080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14268786#comment-14268786 ] Xuefu Zhang commented on SPARK-5080: cc: [~sandyr] Expose more cluster resource information to user Key: SPARK-5080 URL: https://issues.apache.org/jira/browse/SPARK-5080 Project: Spark Issue Type: Improvement Reporter: Rui Li It'll be useful if user can get detailed cluster resource info, e.g. granted/allocated executors, memory and CPU. Such information is available via WebUI but seems SparkContext doesn't have these APIs. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4921) Performance issue caused by TaskSetManager returning PROCESS_LOCAL for NO_PREF tasks
[ https://issues.apache.org/jira/browse/SPARK-4921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14261436#comment-14261436 ] Xuefu Zhang commented on SPARK-4921: Some will be NODE_LOCAL, but others will be NO_PERF. Returning PROCESS_LOCAL seems at least confusing. As to performance implication, maybe [~lirui] can further confirm. Performance issue caused by TaskSetManager returning PROCESS_LOCAL for NO_PREF tasks - Key: SPARK-4921 URL: https://issues.apache.org/jira/browse/SPARK-4921 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.2.0 Reporter: Xuefu Zhang Attachments: NO_PREF.patch During research for HIVE-9153, we found that TaskSetManager returns PROCESS_LOCAL for NO_PREF tasks, which may caused performance degradation. Changing the return value to NO_PREF, as demonstrated in the attached patch, seemingly improves the performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2387) Remove the stage barrier for better resource utilization
[ https://issues.apache.org/jira/browse/SPARK-2387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14258626#comment-14258626 ] Xuefu Zhang commented on SPARK-2387: cc: [~sandyr] I think the purpose of this enhancement is still valuable, even if the solution here isn't perfect: Tez does this, and this improves performance. Better proposal is greatly welcome. Remove the stage barrier for better resource utilization Key: SPARK-2387 URL: https://issues.apache.org/jira/browse/SPARK-2387 Project: Spark Issue Type: New Feature Components: Spark Core Reporter: Rui Li DAGScheduler divides a Spark job into multiple stages according to RDD dependencies. Whenever there’s a shuffle dependency, DAGScheduler creates a shuffle map stage on the map side, and another stage depending on that stage. Currently, the downstream stage cannot start until all its depended stages have finished. This barrier between stages leads to idle slots when waiting for the last few upstream tasks to finish and thus wasting cluster resources. Therefore we propose to remove the barrier and pre-start the reduce stage once there're free slots. This can achieve better resource utilization and improve the overall job performance, especially when there're lots of executors granted to the application. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4921) Performance issue caused by TaskSetManager returning PROCESS_LOCAL for NO_PREF tasks
Xuefu Zhang created SPARK-4921: -- Summary: Performance issue caused by TaskSetManager returning PROCESS_LOCAL for NO_PREF tasks Key: SPARK-4921 URL: https://issues.apache.org/jira/browse/SPARK-4921 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.2.0 Reporter: Xuefu Zhang During research for HIVE-9153, we found that TaskSetManager returns PROCESS_LOCAL for NO_PREF tasks, which may caused performance degradation. Changing the return value to NO_PREF, as demonstrated in the attached patch, seemingly improves the performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4921) Performance issue caused by TaskSetManager returning PROCESS_LOCAL for NO_PREF tasks
[ https://issues.apache.org/jira/browse/SPARK-4921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14256015#comment-14256015 ] Xuefu Zhang commented on SPARK-4921: cc: [~lirui], [~sandyr] Performance issue caused by TaskSetManager returning PROCESS_LOCAL for NO_PREF tasks - Key: SPARK-4921 URL: https://issues.apache.org/jira/browse/SPARK-4921 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.2.0 Reporter: Xuefu Zhang During research for HIVE-9153, we found that TaskSetManager returns PROCESS_LOCAL for NO_PREF tasks, which may caused performance degradation. Changing the return value to NO_PREF, as demonstrated in the attached patch, seemingly improves the performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-4921) Performance issue caused by TaskSetManager returning PROCESS_LOCAL for NO_PREF tasks
[ https://issues.apache.org/jira/browse/SPARK-4921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuefu Zhang updated SPARK-4921: --- Attachment: NO_PREF.patch Performance issue caused by TaskSetManager returning PROCESS_LOCAL for NO_PREF tasks - Key: SPARK-4921 URL: https://issues.apache.org/jira/browse/SPARK-4921 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.2.0 Reporter: Xuefu Zhang Attachments: NO_PREF.patch During research for HIVE-9153, we found that TaskSetManager returns PROCESS_LOCAL for NO_PREF tasks, which may caused performance degradation. Changing the return value to NO_PREF, as demonstrated in the attached patch, seemingly improves the performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4687) SparkContext#addFile doesn't keep file folder information
[ https://issues.apache.org/jira/browse/SPARK-4687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14230873#comment-14230873 ] Xuefu Zhang commented on SPARK-4687: [~jxiang], alternatively, would a new method, SparkContext.addFolder(), better fit to this use case? CC [~sandyr], [~rxin]. SparkContext#addFile doesn't keep file folder information - Key: SPARK-4687 URL: https://issues.apache.org/jira/browse/SPARK-4687 Project: Spark Issue Type: Bug Reporter: Jimmy Xiang Files added with SparkContext#addFile are loaded with Utils#fetchFile before a task starts. However, Utils#fetchFile puts all files under the Spart root on the worker node. We should have an option to keep the folder information. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4567) Make SparkJobInfo and SparkStageInfo serializable
[ https://issues.apache.org/jira/browse/SPARK-4567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14222929#comment-14222929 ] Xuefu Zhang commented on SPARK-4567: {quote} please don't set the FixVersion field. This field is only set when the patch is merged. {quote} Hi [~pwendell], Thanks for pointing this out. I put that by accident. Thanks. Make SparkJobInfo and SparkStageInfo serializable - Key: SPARK-4567 URL: https://issues.apache.org/jira/browse/SPARK-4567 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Xuefu Zhang SPARK-2321 introduced the two classes, which unforutnately were not made serailizable. From the class definition, there seems no obstacle to make it serializable for client to use as these classes are just wrappers of numbers and strings. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4440) Enhance the job progress API to expose more information
[ https://issues.apache.org/jira/browse/SPARK-4440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14222549#comment-14222549 ] Xuefu Zhang commented on SPARK-4440: CC: [~sandyr], [~rxin] Enhance the job progress API to expose more information --- Key: SPARK-4440 URL: https://issues.apache.org/jira/browse/SPARK-4440 Project: Spark Issue Type: Improvement Reporter: Rui Li The progress API introduced in SPARK-2321 provides a new way for user to monitor job progress. However the information exposed in the API is relatively limited. It'll be much more useful if we can enhance the API to expose more data. Some improvement for example may include but not limited to: 1. Stage submission and completion time. 2. Task metrics. The requirement is initially identified for the hive on spark project(HIVE-7292), other application should benefit as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4567) Make SparkJobInfo and SparkStageInfo serializable
Xuefu Zhang created SPARK-4567: -- Summary: Make SparkJobInfo and SparkStageInfo serializable Key: SPARK-4567 URL: https://issues.apache.org/jira/browse/SPARK-4567 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Xuefu Zhang Fix For: 1.2.0 SPARK-2321 introduced the two classes, which unforutnately were not made serailizable. From the class definition, there seems no obstacle to make it serializable for client to use as these classes are just wrappers of numbers and strings. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4567) Make SparkJobInfo and SparkStageInfo serializable
[ https://issues.apache.org/jira/browse/SPARK-4567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14222591#comment-14222591 ] Xuefu Zhang commented on SPARK-4567: CC: [~sandyr], [~rxin] Make SparkJobInfo and SparkStageInfo serializable - Key: SPARK-4567 URL: https://issues.apache.org/jira/browse/SPARK-4567 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Xuefu Zhang Fix For: 1.2.0 SPARK-2321 introduced the two classes, which unforutnately were not made serailizable. From the class definition, there seems no obstacle to make it serializable for client to use as these classes are just wrappers of numbers and strings. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2321) Design a proper progress reporting event listener API
[ https://issues.apache.org/jira/browse/SPARK-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14222593#comment-14222593 ] Xuefu Zhang commented on SPARK-2321: I have created SPARK-4567 to track the request of making SparkJobInfo and SparkStageInfo serializable. Design a proper progress reporting event listener API --- Key: SPARK-2321 URL: https://issues.apache.org/jira/browse/SPARK-2321 Project: Spark Issue Type: Improvement Components: Java API, Spark Core Affects Versions: 1.0.0 Reporter: Reynold Xin Assignee: Josh Rosen Priority: Critical Fix For: 1.2.0 This is a ticket to track progress on redesigning the SparkListener and JobProgressListener API. There are multiple problems with the current design, including: 0. I'm not sure if the API is usable in Java (there are at least some enums we used in Scala and a bunch of case classes that might complicate things). 1. The whole API is marked as DeveloperApi, because we haven't paid a lot of attention to it yet. Something as important as progress reporting deserves a more stable API. 2. There is no easy way to connect jobs with stages. Similarly, there is no easy way to connect job groups with jobs / stages. 3. JobProgressListener itself has no encapsulation at all. States can be arbitrarily mutated by external programs. Variable names are sort of randomly decided and inconsistent. We should just revisit these and propose a new, concrete design. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4290) Provide an equivalent functionality of distributed cache as MR does
Xuefu Zhang created SPARK-4290: -- Summary: Provide an equivalent functionality of distributed cache as MR does Key: SPARK-4290 URL: https://issues.apache.org/jira/browse/SPARK-4290 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Xuefu Zhang MapReduce allows client to specify files to be put in distributed cache for a job and the framework guarentees that the file will be available in local file system of a node where a task of the job runs and before the tasks actually starts. While this might be achieved with Yarn via hacks, it's not available in other clusters. It would be nice to have such an equivalent functionality like this in Spark. It would also complement Spark's broadcast variable, which may not be suitable in certain scenarios. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4290) Provide an equivalent functionality of distributed cache as MR does
[ https://issues.apache.org/jira/browse/SPARK-4290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201421#comment-14201421 ] Xuefu Zhang commented on SPARK-4290: CC: [~sandyr], [~rxin] Provide an equivalent functionality of distributed cache as MR does --- Key: SPARK-4290 URL: https://issues.apache.org/jira/browse/SPARK-4290 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Xuefu Zhang MapReduce allows client to specify files to be put in distributed cache for a job and the framework guarentees that the file will be available in local file system of a node where a task of the job runs and before the tasks actually starts. While this might be achieved with Yarn via hacks, it's not available in other clusters. It would be nice to have such an equivalent functionality like this in Spark. It would also complement Spark's broadcast variable, which may not be suitable in certain scenarios. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4290) Provide an equivalent functionality of distributed cache as MR does
[ https://issues.apache.org/jira/browse/SPARK-4290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201496#comment-14201496 ] Xuefu Zhang commented on SPARK-4290: Hi [~rxin], by out of box, do you mean org.apache.hadoop.filecache.DistributedCache [1]? This is a MapReduce client class, which is used when you submit a MR job. It basically tell MR framework that your job needs these files put in distributed cache in order to run. Thus, MR framework will copy these files to local file system of the tasks. The task can access the local files via syslinks. I don't know how this can be used out of box. First, Hive on Spark user may not have MR client library. Secondly, there isn't MR framework that does the copying. Do you have an example on how I might achieve this? [1] http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/filecache/DistributedCache.html Provide an equivalent functionality of distributed cache as MR does --- Key: SPARK-4290 URL: https://issues.apache.org/jira/browse/SPARK-4290 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Xuefu Zhang MapReduce allows client to specify files to be put in distributed cache for a job and the framework guarentees that the file will be available in local file system of a node where a task of the job runs and before the tasks actually starts. While this might be achieved with Yarn via hacks, it's not available in other clusters. It would be nice to have such an equivalent functionality like this in Spark. It would also complement Spark's broadcast variable, which may not be suitable in certain scenarios. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4290) Provide an equivalent functionality of distributed cache as MR does
[ https://issues.apache.org/jira/browse/SPARK-4290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201561#comment-14201561 ] Xuefu Zhang commented on SPARK-4290: Hi [~rxin], from the documentation of above java class, I read {quote} Its efficiency stems from the fact that the files are only copied once per job and the ability to cache archives which are un-archived on the slaves. {quote} Two things are suggested: 1. one copy per job. If multiple tasks of a job running on a node, there is still only one copy from HDFS to local. 2. unarchive the archives on the slaves. In your #2, it will be desirable it can achieve one copy per job (not one copy per request). That is, subsequent request for the file from other tasks of the same job will directly read from local. Of course, this is something on the surface. Actual implementaton can be much more complicated. Provide an equivalent functionality of distributed cache as MR does --- Key: SPARK-4290 URL: https://issues.apache.org/jira/browse/SPARK-4290 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Xuefu Zhang MapReduce allows client to specify files to be put in distributed cache for a job and the framework guarentees that the file will be available in local file system of a node where a task of the job runs and before the tasks actually starts. While this might be achieved with Yarn via hacks, it's not available in other clusters. It would be nice to have such an equivalent functionality like this in Spark. It would also complement Spark's broadcast variable, which may not be suitable in certain scenarios. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4290) Provide an equivalent functionality of distributed cache as MR does
[ https://issues.apache.org/jira/browse/SPARK-4290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201621#comment-14201621 ] Xuefu Zhang commented on SPARK-4290: Yes, SparkContext#addFile() seems to be what we need. If the files can be more efficiently broadcast to every executor, that's even better than distributed cache. In the meantime, we can set a large replication factor for the files to mitigate the problem. Provide an equivalent functionality of distributed cache as MR does --- Key: SPARK-4290 URL: https://issues.apache.org/jira/browse/SPARK-4290 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Xuefu Zhang MapReduce allows client to specify files to be put in distributed cache for a job and the framework guarentees that the file will be available in local file system of a node where a task of the job runs and before the tasks actually starts. While this might be achieved with Yarn via hacks, it's not available in other clusters. It would be nice to have such an equivalent functionality like this in Spark. It would also complement Spark's broadcast variable, which may not be suitable in certain scenarios. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-4290) Provide an equivalent functionality of distributed cache as MR does
[ https://issues.apache.org/jira/browse/SPARK-4290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201621#comment-14201621 ] Xuefu Zhang edited comment on SPARK-4290 at 11/7/14 5:37 AM: - Yes, SparkContext#addFile() seems to be what we need. If the files can be more efficiently broadcast to every executor, that's even better than distributed cache. In the meantime, we can set a large replication factor for the files to mitigate the problem. To clarify, [~sandyr], [~rxin], do files added via SparkContext.addFile() get automatically downloaded to executor, or SparkFiles.get() has to be called in order to make that happen? was (Author: xuefuz): Yes, SparkContext#addFile() seems to be what we need. If the files can be more efficiently broadcast to every executor, that's even better than distributed cache. In the meantime, we can set a large replication factor for the files to mitigate the problem. Provide an equivalent functionality of distributed cache as MR does --- Key: SPARK-4290 URL: https://issues.apache.org/jira/browse/SPARK-4290 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Xuefu Zhang MapReduce allows client to specify files to be put in distributed cache for a job and the framework guarentees that the file will be available in local file system of a node where a task of the job runs and before the tasks actually starts. While this might be achieved with Yarn via hacks, it's not available in other clusters. It would be nice to have such an equivalent functionality like this in Spark. It would also complement Spark's broadcast variable, which may not be suitable in certain scenarios. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3691) Provide a mini cluster for testing system built on Spark
Xuefu Zhang created SPARK-3691: -- Summary: Provide a mini cluster for testing system built on Spark Key: SPARK-3691 URL: https://issues.apache.org/jira/browse/SPARK-3691 Project: Spark Issue Type: Test Components: Spark Core Affects Versions: 1.1.0 Reporter: Xuefu Zhang Most Hadoop components such MR, DFS, Tez, and Yarn provide a mini cluster that can be used to test the external systems that rely on those frameworks, such as Pig and Hive. While Spark's local mode can be used to do such testing and is friendly for debugging, it's too far from a real Spark cluster and a lot of problems cannot be discovered. Thus, an equivalent of Hadoop MR mini cluster in Spark would be very helpful in testing system such as Hive/Pig on Spark. Spark's local-cluster is considered for this purpose but it doesn't fit well because it requires a Spark installation on the box where the tests run. Also, local-cluster isn't exposed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3691) Provide a mini cluster for testing system built on Spark
[ https://issues.apache.org/jira/browse/SPARK-3691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148071#comment-14148071 ] Xuefu Zhang commented on SPARK-3691: cc [~sandyr] Provide a mini cluster for testing system built on Spark Key: SPARK-3691 URL: https://issues.apache.org/jira/browse/SPARK-3691 Project: Spark Issue Type: Test Components: Spark Core Affects Versions: 1.1.0 Reporter: Xuefu Zhang Most Hadoop components such MR, DFS, Tez, and Yarn provide a mini cluster that can be used to test the external systems that rely on those frameworks, such as Pig and Hive. While Spark's local mode can be used to do such testing and is friendly for debugging, it's too far from a real Spark cluster and a lot of problems cannot be discovered. Thus, an equivalent of Hadoop MR mini cluster in Spark would be very helpful in testing system such as Hive/Pig on Spark. Spark's local-cluster is considered for this purpose but it doesn't fit well because it requires a Spark installation on the box where the tests run. Also, local-cluster isn't exposed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-3691) Provide a mini cluster for testing system built on Spark
[ https://issues.apache.org/jira/browse/SPARK-3691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148071#comment-14148071 ] Xuefu Zhang edited comment on SPARK-3691 at 9/25/14 6:21 PM: - cc [~sandyr], [~rxin] was (Author: xuefuz): cc [~sandyr] Provide a mini cluster for testing system built on Spark Key: SPARK-3691 URL: https://issues.apache.org/jira/browse/SPARK-3691 Project: Spark Issue Type: Test Components: Spark Core Affects Versions: 1.1.0 Reporter: Xuefu Zhang Most Hadoop components such MR, DFS, Tez, and Yarn provide a mini cluster that can be used to test the external systems that rely on those frameworks, such as Pig and Hive. While Spark's local mode can be used to do such testing and is friendly for debugging, it's too far from a real Spark cluster and a lot of problems cannot be discovered. Thus, an equivalent of Hadoop MR mini cluster in Spark would be very helpful in testing system such as Hive/Pig on Spark. Spark's local-cluster is considered for this purpose but it doesn't fit well because it requires a Spark installation on the box where the tests run. Also, local-cluster isn't exposed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3693) Cached Hadoop RDD always return rows with the same value
Xuefu Zhang created SPARK-3693: -- Summary: Cached Hadoop RDD always return rows with the same value Key: SPARK-3693 URL: https://issues.apache.org/jira/browse/SPARK-3693 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.2.0 Reporter: Xuefu Zhang While trying RDD caching, it's found that caching a Hadoop RDD causes data correctness issues. The following code snippet demonstrates the usage: {code} public final class Test { private static final Pattern SPACE = Pattern.compile( ); public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName(Test); JavaSparkContext ctx = new JavaSparkContext(sparkConf); ... JavaPairRDDBytesWritable, BytesWritable input = ctx.hadoopRDD(jobConf, CombineHiveInputClass.class, WritableComparable.class, Writable.class); input = input.cache(); input.foreach(new VoidFunctionTuple2BytesWritable, BytesWritable() { @Override public void call(Tuple2BytesWritable, BytesWritable row) throws Exception { if (row._1() != null) { System.out.println(Key: + row._1()); } if (row._2() != null) { System.out.println(Value: + row._2()); } } }); ctx.stop(); } } {code} In this case, row._2() always gives the same value. If we disable caching by removing input.cache(), the program gives the expected rows. Further analysis shows that MemoryStore (see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L236) is storing the references to (key, value) pairs returned by HadoopRDD.getNext() (See https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L220), but this method always returns the same (key, value) object references, except each getNext() call updates values inside these objects. When there are no more records (key, value) objects are filled with empty strings (no values) in CombineFileRecordReader. As all pairs in MemoryStore.vector refer to the same key, value object pairs, all values become NULL. Probably MemoryStore should instead store a copy of key, value pair rather than keeping a reference to it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3693) Cached Hadoop RDD always return rows with the same value
[ https://issues.apache.org/jira/browse/SPARK-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuefu Zhang updated SPARK-3693: --- Description: While trying RDD caching, it's found that caching a Hadoop RDD causes data correctness issues. The following code snippet demonstrates the usage: {code} public final class Test { public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName(Test); JavaSparkContext ctx = new JavaSparkContext(sparkConf); ... JavaPairRDDBytesWritable, BytesWritable input = ctx.hadoopRDD(jobConf, CombineHiveInputClass.class, WritableComparable.class, Writable.class); input = input.cache(); input.foreach(new VoidFunctionTuple2BytesWritable, BytesWritable() { @Override public void call(Tuple2BytesWritable, BytesWritable row) throws Exception { if (row._1() != null) { System.out.println(Key: + row._1()); } if (row._2() != null) { System.out.println(Value: + row._2()); } } }); ctx.stop(); } } {code} In this case, row._2() always gives the same value. If we disable caching by removing input.cache(), the program gives the expected rows. Further analysis shows that MemoryStore (see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L236) is storing the references to (key, value) pairs returned by HadoopRDD.getNext() (See https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L220), but this method always returns the same (key, value) object references, except each getNext() call updates values inside these objects. When there are no more records (key, value) objects are filled with empty strings (no values) in CombineFileRecordReader. As all pairs in MemoryStore.vector refer to the same key, value object pairs, all values become NULL. Probably MemoryStore should instead store a copy of key, value pair rather than keeping a reference to it. was: While trying RDD caching, it's found that caching a Hadoop RDD causes data correctness issues. The following code snippet demonstrates the usage: {code} public final class Test { private static final Pattern SPACE = Pattern.compile( ); public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName(Test); JavaSparkContext ctx = new JavaSparkContext(sparkConf); ... JavaPairRDDBytesWritable, BytesWritable input = ctx.hadoopRDD(jobConf, CombineHiveInputClass.class, WritableComparable.class, Writable.class); input = input.cache(); input.foreach(new VoidFunctionTuple2BytesWritable, BytesWritable() { @Override public void call(Tuple2BytesWritable, BytesWritable row) throws Exception { if (row._1() != null) { System.out.println(Key: + row._1()); } if (row._2() != null) { System.out.println(Value: + row._2()); } } }); ctx.stop(); } } {code} In this case, row._2() always gives the same value. If we disable caching by removing input.cache(), the program gives the expected rows. Further analysis shows that MemoryStore (see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L236) is storing the references to (key, value) pairs returned by HadoopRDD.getNext() (See https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L220), but this method always returns the same (key, value) object references, except each getNext() call updates values inside these objects. When there are no more records (key, value) objects are filled with empty strings (no values) in CombineFileRecordReader. As all pairs in MemoryStore.vector refer to the same key, value object pairs, all values become NULL. Probably MemoryStore should instead store a copy of key, value pair rather than keeping a reference to it. Cached Hadoop RDD always return rows with the same value Key: SPARK-3693 URL: https://issues.apache.org/jira/browse/SPARK-3693 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.2.0 Reporter: Xuefu Zhang While trying RDD caching, it's found that caching a Hadoop RDD causes data correctness issues. The following code snippet demonstrates the usage: {code} public final class Test { public static void main(String[] args) throws Exception { SparkConf
[jira] [Commented] (SPARK-3693) Cached Hadoop RDD always return rows with the same value
[ https://issues.apache.org/jira/browse/SPARK-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148295#comment-14148295 ] Xuefu Zhang commented on SPARK-3693: cc [~rxin], [~sandyr] Cached Hadoop RDD always return rows with the same value Key: SPARK-3693 URL: https://issues.apache.org/jira/browse/SPARK-3693 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.2.0 Reporter: Xuefu Zhang While trying RDD caching, it's found that caching a Hadoop RDD causes data correctness issues. The following code snippet demonstrates the usage: {code} public final class Test { public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName(Test); JavaSparkContext ctx = new JavaSparkContext(sparkConf); ... JavaPairRDDBytesWritable, BytesWritable input = ctx.hadoopRDD(jobConf, CombineHiveInputClass.class, WritableComparable.class, Writable.class); input = input.cache(); input.foreach(new VoidFunctionTuple2BytesWritable, BytesWritable() { @Override public void call(Tuple2BytesWritable, BytesWritable row) throws Exception { if (row._1() != null) { System.out.println(Key: + row._1()); } if (row._2() != null) { System.out.println(Value: + row._2()); } } }); ctx.stop(); } } {code} In this case, row._2() always gives the same value. If we disable caching by removing input.cache(), the program gives the expected rows. Further analysis shows that MemoryStore (see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L236) is storing the references to (key, value) pairs returned by HadoopRDD.getNext() (See https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L220), but this method always returns the same (key, value) object references, except each getNext() call updates values inside these objects. When there are no more records (key, value) objects are filled with empty strings (no values) in CombineFileRecordReader. As all pairs in MemoryStore.vector refer to the same key, value object pairs, all values become NULL. Probably MemoryStore should instead store a copy of key, value pair rather than keeping a reference to it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3693) Cached Hadoop RDD always return rows with the same value
[ https://issues.apache.org/jira/browse/SPARK-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148318#comment-14148318 ] Xuefu Zhang commented on SPARK-3693: Thanks, guys. We are fine with the workaround. Cached Hadoop RDD always return rows with the same value Key: SPARK-3693 URL: https://issues.apache.org/jira/browse/SPARK-3693 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.2.0 Reporter: Xuefu Zhang While trying RDD caching, it's found that caching a Hadoop RDD causes data correctness issues. The following code snippet demonstrates the usage: {code} public final class Test { public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName(Test); JavaSparkContext ctx = new JavaSparkContext(sparkConf); ... JavaPairRDDBytesWritable, BytesWritable input = ctx.hadoopRDD(jobConf, CombineHiveInputClass.class, WritableComparable.class, Writable.class); input = input.cache(); input.foreach(new VoidFunctionTuple2BytesWritable, BytesWritable() { @Override public void call(Tuple2BytesWritable, BytesWritable row) throws Exception { if (row._1() != null) { System.out.println(Key: + row._1()); } if (row._2() != null) { System.out.println(Value: + row._2()); } } }); ctx.stop(); } } {code} In this case, row._2() always gives the same value. If we disable caching by removing input.cache(), the program gives the expected rows. Further analysis shows that MemoryStore (see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L236) is storing the references to (key, value) pairs returned by HadoopRDD.getNext() (See https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L220), but this method always returns the same (key, value) object references, except each getNext() call updates values inside these objects. When there are no more records (key, value) objects are filled with empty strings (no values) in CombineFileRecordReader. As all pairs in MemoryStore.vector refer to the same key, value object pairs, all values become NULL. Probably MemoryStore should instead store a copy of key, value pair rather than keeping a reference to it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3621) Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access
[ https://issues.apache.org/jira/browse/SPARK-3621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143231#comment-14143231 ] Xuefu Zhang commented on SPARK-3621: In my limited understanding, to broadcast a variable made of an RDD, you have to call RDD.collect() at the driver, which means data will be transferred to the driver. While broadcasting the variable might be very efficient, I'd like to avoid shipping data to the driver also. Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access --- Key: SPARK-3621 URL: https://issues.apache.org/jira/browse/SPARK-3621 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0, 1.1.0 Reporter: Xuefu Zhang In some cases, such as Hive's way of doing map-side join, it would be benefcial to allow client program to broadcast RDDs rather than just variables made of these RDDs. Broadcasting a variable made of RDDs requires all RDD data be collected to the driver and that the variable be shipped to the cluster after being made. It would be more performing if driver just broadcasts the RDDs and uses the corresponding data in jobs (such building hashmaps at executors). Tez has a broadcast edge which can ship data from previous stage to the next stage, which doesn't require driver side processing. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3622) Provide a custom transformation that can output multiple RDDs
[ https://issues.apache.org/jira/browse/SPARK-3622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143921#comment-14143921 ] Xuefu Zhang commented on SPARK-3622: They are related but not exactly the same. SPARK-2688 is about branching off RDD tree with no custom transformation invovled. This JIRA is about returning multiple RDDs from a single transformation (branching happening within a transformation). Provide a custom transformation that can output multiple RDDs - Key: SPARK-3622 URL: https://issues.apache.org/jira/browse/SPARK-3622 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.1.0 Reporter: Xuefu Zhang All existing transformations return just one RDD at most, even for those which takes user-supplied functions such as mapPartitions() . However, sometimes a user provided function may need to output multiple RDDs. For instance, a filter function that divides the input RDD into serveral RDDs. While it's possible to get multiple RDDs by transforming the same RDD multiple times, it may be more efficient to do this concurrently in one shot. Especially user's existing function is already generating different data sets. This the case in Hive on Spark, where Hive's map function and reduce function can output different data sets to be consumed by subsequent stages. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3621) Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access
[ https://issues.apache.org/jira/browse/SPARK-3621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14142466#comment-14142466 ] Xuefu Zhang commented on SPARK-3621: I understand RDD is a concept existing only in the driver. However, accessing the data in Spark job doesn't have to be in the form of RDD. An iterator over the underlying data is sufficient, as long as the data is already shipped to the node when the job starts to run. One way to identify the shipped RDD and the iterator afterwards could be a UUID. Hive on Spark isn't using Spark's transformations to do map-join, or join in general. Hive's own implementation is to build hash maps for the small tables when the join starts, and then do key lookups while streaming thru the big table. For this, small table data (which can be a result RDD of another Spark job) needs to be shipped to all nodes that do the join. Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access --- Key: SPARK-3621 URL: https://issues.apache.org/jira/browse/SPARK-3621 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0, 1.1.0 Reporter: Xuefu Zhang In some cases, such as Hive's way of doing map-side join, it would be benefcial to allow client program to broadcast RDDs rather than just variables made of these RDDs. Broadcasting a variable made of RDDs requires all RDD data be collected to the driver and that the variable be shipped to the cluster after being made. It would be more performing if driver just broadcasts the RDDs and uses the corresponding data in jobs (such building hashmaps at executors). Tez has a broadcast edge which can ship data from previous stage to the next stage, which doesn't require driver side processing. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3622) Provide a custom transformation that can output multiple RDDs
[ https://issues.apache.org/jira/browse/SPARK-3622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14142881#comment-14142881 ] Xuefu Zhang commented on SPARK-3622: Thanks for your comments, [~pwendell]. I understand caching A would be helpful if I need to transform it to get B and C separately. My proposal is to get B and C just by one pass of A, so A doens't even need to be cached. Here is an example how it may be used in Hive. {code} JavaPairRDD table = sparkContext.hadoopRDD(..); Mapname, JavaPairRDD mappedRDDs = table.mapPartitions(mapFunction); JavaPairRDD rddA = mapperRDDs.get(A); JavaPairRDD rddB = mapperRDDs.get(A); JavaPairRDD sortedRddA = rddA.sortByKey(); javaPairRDD groupedRddB = rddB.groupByKey(); // further processing sortedRddA and groupedRddB. ... {code} In this case, mapFunction can return named iterators for A and B. B is automatically computed whenever A is computed, and vice versa. Since both are computed if any of them computed, subsequent reference to either one should not recompute any of them. The benefits of it: 1) no need to cache A; 2) only one pass of the input. I'm not sure if this is possible feasible in Spark, but Hive's map function is exactly doing this. It's operator tree can branch off anywhere, resulting multiple output datasets from a single input dataset. Please let me know if there are more questions. Provide a custom transformation that can output multiple RDDs - Key: SPARK-3622 URL: https://issues.apache.org/jira/browse/SPARK-3622 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.1.0 Reporter: Xuefu Zhang All existing transformations return just one RDD at most, even for those which takes user-supplied functions such as mapPartitions() . However, sometimes a user provided function may need to output multiple RDDs. For instance, a filter function that divides the input RDD into serveral RDDs. While it's possible to get multiple RDDs by transforming the same RDD multiple times, it may be more efficient to do this concurrently in one shot. Especially user's existing function is already generating different data sets. This the case in Hive on Spark, where Hive's map function and reduce function can output different data sets to be consumed by subsequent stages. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-3622) Provide a custom transformation that can output multiple RDDs
[ https://issues.apache.org/jira/browse/SPARK-3622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14142881#comment-14142881 ] Xuefu Zhang edited comment on SPARK-3622 at 9/22/14 4:39 AM: - Thanks for your comments, [~pwendell]. I understand caching A would be helpful if I need to transform it to get B and C separately. My proposal is to get B and C just by one pass of A, so A doens't even need to be cached. Here is an example how it may be used in Hive. {code} JavaPairRDD table = sparkContext.hadoopRDD(..); Mapname, JavaPairRDD mappedRDDs = table.mapPartitions(mapFunction); JavaPairRDD rddA = mapperRDDs.get(A); JavaPairRDD rddB = mapperRDDs.get(B); JavaPairRDD sortedRddA = rddA.sortByKey(); javaPairRDD groupedRddB = rddB.groupByKey(); // further processing sortedRddA and groupedRddB. ... {code} In this case, mapFunction can return named iterators for A and B. B is automatically computed whenever A is computed, and vice versa. Since both are computed if any of them computed, subsequent reference to either one should not recompute any of them. The benefits of it: 1) no need to cache A; 2) only one pass of the input. I'm not sure if this is possible feasible in Spark, but Hive's map function is exactly doing this. It's operator tree can branch off anywhere, resulting multiple output datasets from a single input dataset. Please let me know if there are more questions. was (Author: xuefuz): Thanks for your comments, [~pwendell]. I understand caching A would be helpful if I need to transform it to get B and C separately. My proposal is to get B and C just by one pass of A, so A doens't even need to be cached. Here is an example how it may be used in Hive. {code} JavaPairRDD table = sparkContext.hadoopRDD(..); Mapname, JavaPairRDD mappedRDDs = table.mapPartitions(mapFunction); JavaPairRDD rddA = mapperRDDs.get(A); JavaPairRDD rddB = mapperRDDs.get(A); JavaPairRDD sortedRddA = rddA.sortByKey(); javaPairRDD groupedRddB = rddB.groupByKey(); // further processing sortedRddA and groupedRddB. ... {code} In this case, mapFunction can return named iterators for A and B. B is automatically computed whenever A is computed, and vice versa. Since both are computed if any of them computed, subsequent reference to either one should not recompute any of them. The benefits of it: 1) no need to cache A; 2) only one pass of the input. I'm not sure if this is possible feasible in Spark, but Hive's map function is exactly doing this. It's operator tree can branch off anywhere, resulting multiple output datasets from a single input dataset. Please let me know if there are more questions. Provide a custom transformation that can output multiple RDDs - Key: SPARK-3622 URL: https://issues.apache.org/jira/browse/SPARK-3622 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.1.0 Reporter: Xuefu Zhang All existing transformations return just one RDD at most, even for those which takes user-supplied functions such as mapPartitions() . However, sometimes a user provided function may need to output multiple RDDs. For instance, a filter function that divides the input RDD into serveral RDDs. While it's possible to get multiple RDDs by transforming the same RDD multiple times, it may be more efficient to do this concurrently in one shot. Especially user's existing function is already generating different data sets. This the case in Hive on Spark, where Hive's map function and reduce function can output different data sets to be consumed by subsequent stages. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3621) Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access
Xuefu Zhang created SPARK-3621: -- Summary: Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access Key: SPARK-3621 URL: https://issues.apache.org/jira/browse/SPARK-3621 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.1.0, 1.0.0 Reporter: Xuefu Zhang In some cases, such as Hive's way of doing map-side join, it would be benefcial to allow client program to broadcast RDDs rather than just variables made of these RDDs. Broadcasting a variable made of RDDs requires all RDD data be collected to the driver and that the variable be shipped to the cluster after being made. It would be more performing if driver just broadcasts the RDDs and uses the corresponding data in jobs (such building hashmaps at executors). Tez has a broadcast edge which can ship data from previous stage to the next stage, which doesn't require driver side processing. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3622) Provide a custom transformation that can output multiple RDDs
Xuefu Zhang created SPARK-3622: -- Summary: Provide a custom transformation that can output multiple RDDs Key: SPARK-3622 URL: https://issues.apache.org/jira/browse/SPARK-3622 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.1.0 Reporter: Xuefu Zhang All existing transformations return just one RDD at most, even for those which takes user-supplied functions such as mapPartitions() . However, sometimes a user provided function may need to output multiple RDDs. For instance, a filter function that divides the input RDD into serveral RDDs. While it's possible to get multiple RDDs by transforming the same RDD multiple times, it may be more efficient to do this concurrently in one shot. Especially user's existing function is already generating different data sets. This the case in Hive on Spark, where Hive's map function and reduce function can output different data sets to be consumed by subsequent stages. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2895) Support mapPartitionsWithContext in Spark Java API
[ https://issues.apache.org/jira/browse/SPARK-2895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14118759#comment-14118759 ] Xuefu Zhang commented on SPARK-2895: Hi [~rxin], could you review [~chengxiang li]'s patch when you get a chance? This is needed for our hive-on-spark milestone #1. Thanks. Support mapPartitionsWithContext in Spark Java API -- Key: SPARK-2895 URL: https://issues.apache.org/jira/browse/SPARK-2895 Project: Spark Issue Type: New Feature Components: Java API Reporter: Chengxiang Li Assignee: Chengxiang Li Labels: hive This is a requirement from Hive on Spark, mapPartitionsWithContext only exists in Spark Scala API, we expect to access from Spark Java API. For HIVE-7627, HIVE-7843, Hive operators which are invoked in mapPartitions closure need to get taskId. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2741) Publish version of spark assembly which does not contain Hive
[ https://issues.apache.org/jira/browse/SPARK-2741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14078813#comment-14078813 ] Xuefu Zhang commented on SPARK-2741: cc: [~rxin], [~sandyr] Publish version of spark assembly which does not contain Hive - Key: SPARK-2741 URL: https://issues.apache.org/jira/browse/SPARK-2741 Project: Spark Issue Type: Task Reporter: Brock Noland The current spark assembly contains Hive. This conflicts with Hive + Spark which is attempting to use it's own version of Hive. We'll need to publish a version of the assembly which does not contain the Hive jars. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2741) Publish version of spark assembly which does not contain Hive
[ https://issues.apache.org/jira/browse/SPARK-2741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14078835#comment-14078835 ] Xuefu Zhang commented on SPARK-2741: I did see a profile about Hive. However, it seems that spark-assembly.jar coming from Spark install was built with this profile enabled. The question is, for user of hive on spark, where can the user to get an spark-assembly without Hive classes? Is it our vision that the user has to build the jar himself/herself? Publish version of spark assembly which does not contain Hive - Key: SPARK-2741 URL: https://issues.apache.org/jira/browse/SPARK-2741 Project: Spark Issue Type: Task Reporter: Brock Noland The current spark assembly contains Hive. This conflicts with Hive + Spark which is attempting to use it's own version of Hive. We'll need to publish a version of the assembly which does not contain the Hive jars. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (SPARK-2688) Need a way to run multiple data pipeline concurrently
Xuefu Zhang created SPARK-2688: -- Summary: Need a way to run multiple data pipeline concurrently Key: SPARK-2688 URL: https://issues.apache.org/jira/browse/SPARK-2688 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.1 Reporter: Xuefu Zhang Suppose we want to do the following data processing: {code} rdd1 - rdd2 - rdd3 | - rdd4 | - rdd5 \ - rdd6 {code} where - represents a transformation. rdd3 to rrdd6 are all derived from an intermediate rdd2. We use foreach(fn) with a dummy function to trigger the execution. However, rdd.foreach(fn) only trigger pipeline rdd1 - rdd2 - rdd3. To make things worse, when we call rdd4.foreach(), rdd2 will be recomputed. This is very inefficient. Ideally, we should be able to trigger the execution the whole graph and reuse rdd2, but there doesn't seem to be a way doing so. As far as I know, It's possible in Tez. This is required for Hive to support multi-insert queries. HIVE-7292. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2688) Need a way to run multiple data pipeline concurrently
[ https://issues.apache.org/jira/browse/SPARK-2688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074368#comment-14074368 ] Xuefu Zhang commented on SPARK-2688: cc: [~rxin] [~sandyr] Need a way to run multiple data pipeline concurrently - Key: SPARK-2688 URL: https://issues.apache.org/jira/browse/SPARK-2688 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.1 Reporter: Xuefu Zhang Suppose we want to do the following data processing: {code} rdd1 - rdd2 - rdd3 | - rdd4 | - rdd5 \ - rdd6 {code} where - represents a transformation. rdd3 to rrdd6 are all derived from an intermediate rdd2. We use foreach(fn) with a dummy function to trigger the execution. However, rdd.foreach(fn) only trigger pipeline rdd1 - rdd2 - rdd3. To make things worse, when we call rdd4.foreach(), rdd2 will be recomputed. This is very inefficient. Ideally, we should be able to trigger the execution the whole graph and reuse rdd2, but there doesn't seem to be a way doing so. As far as I know, It's possible in Tez. This is required for Hive to support multi-insert queries. HIVE-7292. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (SPARK-2688) Need a way to run multiple data pipeline concurrently
[ https://issues.apache.org/jira/browse/SPARK-2688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuefu Zhang updated SPARK-2688: --- Description: Suppose we want to do the following data processing: {code} rdd1 - rdd2 - rdd3 | - rdd4 | - rdd5 \ - rdd6 {code} where - represents a transformation. rdd3 to rrdd6 are all derived from an intermediate rdd2. We use foreach(fn) with a dummy function to trigger the execution. However, rdd.foreach(fn) only trigger pipeline rdd1 - rdd2 - rdd3. To make things worse, when we call rdd4.foreach(), rdd2 will be recomputed. This is very inefficient. Ideally, we should be able to trigger the execution the whole graph and reuse rdd2, but there doesn't seem to be a way doing so. As far as I know, It's possible in Tez. This is required for Hive to support multi-insert queries. HIVE-7292. was: Suppose we want to do the following data processing: {code} rdd1 - rdd2 - rdd3 | - rdd4 | - rdd5 \ - rdd6 {code} where - represents a transformation. rdd3 to rrdd6 are all derived from an intermediate rdd2. We use foreach(fn) with a dummy function to trigger the execution. However, rdd.foreach(fn) only trigger pipeline rdd1 - rdd2 - rdd3. To make things worse, when we call rdd4.foreach(), rdd2 will be recomputed. This is very inefficient. Ideally, we should be able to trigger the execution the whole graph and reuse rdd2, but there doesn't seem to be a way doing so. As far as I know, It's possible in Tez. This is required for Hive to support multi-insert queries. HIVE-7292. Need a way to run multiple data pipeline concurrently - Key: SPARK-2688 URL: https://issues.apache.org/jira/browse/SPARK-2688 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.1 Reporter: Xuefu Zhang Suppose we want to do the following data processing: {code} rdd1 - rdd2 - rdd3 | - rdd4 | - rdd5 \ - rdd6 {code} where - represents a transformation. rdd3 to rrdd6 are all derived from an intermediate rdd2. We use foreach(fn) with a dummy function to trigger the execution. However, rdd.foreach(fn) only trigger pipeline rdd1 - rdd2 - rdd3. To make things worse, when we call rdd4.foreach(), rdd2 will be recomputed. This is very inefficient. Ideally, we should be able to trigger the execution the whole graph and reuse rdd2, but there doesn't seem to be a way doing so. As far as I know, It's possible in Tez. This is required for Hive to support multi-insert queries. HIVE-7292. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (SPARK-2688) Need a way to run multiple data pipeline concurrently
[ https://issues.apache.org/jira/browse/SPARK-2688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuefu Zhang updated SPARK-2688: --- Description: Suppose we want to do the following data processing: {code} rdd1 - rdd2 - rdd3 | - rdd4 | - rdd5 \ - rdd6 {code} where - represents a transformation. rdd3 to rrdd6 are all derived from an intermediate rdd2. We use foreach(fn) with a dummy function to trigger the execution. However, rdd.foreach(fn) only trigger pipeline rdd1 - rdd2 - rdd3. To make things worse, when we call rdd4.foreach(), rdd2 will be recomputed. This is very inefficient. Ideally, we should be able to trigger the execution the whole graph and reuse rdd2, but there doesn't seem to be a way doing so. As far as I know, It's possible in Tez. This is required for Hive to support multi-insert queries. HIVE-7292. was: Suppose we want to do the following data processing: {code} rdd1 - rdd2 - rdd3 | - rdd4 | - rdd5 \ - rdd6 {code} where - represents a transformation. rdd3 to rrdd6 are all derived from an intermediate rdd2. We use foreach(fn) with a dummy function to trigger the execution. However, rdd.foreach(fn) only trigger pipeline rdd1 - rdd2 - rdd3. To make things worse, when we call rdd4.foreach(), rdd2 will be recomputed. This is very inefficient. Ideally, we should be able to trigger the execution the whole graph and reuse rdd2, but there doesn't seem to be a way doing so. As far as I know, It's possible in Tez. This is required for Hive to support multi-insert queries. HIVE-7292. Need a way to run multiple data pipeline concurrently - Key: SPARK-2688 URL: https://issues.apache.org/jira/browse/SPARK-2688 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.1 Reporter: Xuefu Zhang Suppose we want to do the following data processing: {code} rdd1 - rdd2 - rdd3 | - rdd4 | - rdd5 \ - rdd6 {code} where - represents a transformation. rdd3 to rrdd6 are all derived from an intermediate rdd2. We use foreach(fn) with a dummy function to trigger the execution. However, rdd.foreach(fn) only trigger pipeline rdd1 - rdd2 - rdd3. To make things worse, when we call rdd4.foreach(), rdd2 will be recomputed. This is very inefficient. Ideally, we should be able to trigger the execution the whole graph and reuse rdd2, but there doesn't seem to be a way doing so. As far as I know, It's possible in Tez. This is required for Hive to support multi-insert queries. HIVE-7292. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2420) Change Spark build to minimize library conflicts
[ https://issues.apache.org/jira/browse/SPARK-2420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14072778#comment-14072778 ] Xuefu Zhang commented on SPARK-2420: Is shading guava in Spark build a reasonable compromise? We tested it, and it worked. All we need is this in Spark's assemble pom.xml: {code} relocations relocation patterncom.google.common/pattern shadedPatternspark.com.google.common/shadedPattern /relocation /relocations {code} We really like to get this resolved and move on. Change Spark build to minimize library conflicts Key: SPARK-2420 URL: https://issues.apache.org/jira/browse/SPARK-2420 Project: Spark Issue Type: Wish Components: Build Affects Versions: 1.0.0 Reporter: Xuefu Zhang Attachments: spark_1.0.0.patch During the prototyping of HIVE-7292, many library conflicts showed up because Spark build contains versions of libraries that's vastly different from current major Hadoop version. It would be nice if we can choose versions that's in line with Hadoop or shading them in the assembly. Here are the wish list: 1. Upgrade protobuf version to 2.5.0 from current 2.4.1 2. Shading Spark's jetty and servlet dependency in the assembly. 3. guava version difference. Spark is using a higher version. I'm not sure what's the best solution for this. The list may grow as HIVE-7292 proceeds. For information only, the attached is a patch that we applied on Spark in order to make Spark work with Hive. It gives an idea of the scope of changes. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2420) Change Spark build to minimize library conflicts
[ https://issues.apache.org/jira/browse/SPARK-2420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066600#comment-14066600 ] Xuefu Zhang commented on SPARK-2420: {quote} 2. For jetty, it was a problem with Hive on Spark POC, possibly because we shipped all libraries from Hive process's classpath to the spark cluster. We have a task (HIVE-7371) to identify a minimum set of jars to be shipped. With that, the story might change. We will confirm if Jetty is a problem once we have a better idea on HIVE-7371. {quote} A better good news. With the latest work in HIVE-7292, we found that servlet-api/jetty didn't seem to be a problem any more. Thus, the only conflict remaining is guava, for which HIVE-7387 has all the details. Change Spark build to minimize library conflicts Key: SPARK-2420 URL: https://issues.apache.org/jira/browse/SPARK-2420 Project: Spark Issue Type: Wish Components: Build Affects Versions: 1.0.0 Reporter: Xuefu Zhang Attachments: spark_1.0.0.patch During the prototyping of HIVE-7292, many library conflicts showed up because Spark build contains versions of libraries that's vastly different from current major Hadoop version. It would be nice if we can choose versions that's in line with Hadoop or shading them in the assembly. Here are the wish list: 1. Upgrade protobuf version to 2.5.0 from current 2.4.1 2. Shading Spark's jetty and servlet dependency in the assembly. 3. guava version difference. Spark is using a higher version. I'm not sure what's the best solution for this. The list may grow as HIVE-7292 proceeds. For information only, the attached is a patch that we applied on Spark in order to make Spark work with Hive. It gives an idea of the scope of changes. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2420) Change Spark build to minimize library conflicts
[ https://issues.apache.org/jira/browse/SPARK-2420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14063998#comment-14063998 ] Xuefu Zhang commented on SPARK-2420: Thanks for your comments, [~srowen]. I mostly agree with your assessment. 1. Guava is indeed a problem and the problem is confirmed. The solution also seems simple. It it can be set to 11 in spark, it solves the problem we have. 2. For jetty, it was a problem with Hive on Spark POC, possibly because we shipped all libraries from Hive process's classpath to the spark cluster. We have a task (HIVE-7371) to identify a minimum set of jars to be shipped. With that, the story might change. We will confirm if Jetty is a problem once we have a better idea on HIVE-7371. Also, we will check if the problem, if exists, can be fixed on Hive side first. If not, we'd like to get help from Spark and also provide a reason why. Change Spark build to minimize library conflicts Key: SPARK-2420 URL: https://issues.apache.org/jira/browse/SPARK-2420 Project: Spark Issue Type: Wish Components: Build Affects Versions: 1.0.0 Reporter: Xuefu Zhang Attachments: spark_1.0.0.patch During the prototyping of HIVE-7292, many library conflicts showed up because Spark build contains versions of libraries that's vastly different from current major Hadoop version. It would be nice if we can choose versions that's in line with Hadoop or shading them in the assembly. Here are the wish list: 1. Upgrade protobuf version to 2.5.0 from current 2.4.1 2. Shading Spark's jetty and servlet dependency in the assembly. 3. guava version difference. Spark is using a higher version. I'm not sure what's the best solution for this. The list may grow as HIVE-7292 proceeds. For information only, the attached is a patch that we applied on Spark in order to make Spark work with Hive. It gives an idea of the scope of changes. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2420) Change Spark build to minimize library conflicts
[ https://issues.apache.org/jira/browse/SPARK-2420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14064168#comment-14064168 ] Xuefu Zhang commented on SPARK-2420: As to guava conflict, HIVE-7387 has more details and analysis. Change Spark build to minimize library conflicts Key: SPARK-2420 URL: https://issues.apache.org/jira/browse/SPARK-2420 Project: Spark Issue Type: Wish Components: Build Affects Versions: 1.0.0 Reporter: Xuefu Zhang Attachments: spark_1.0.0.patch During the prototyping of HIVE-7292, many library conflicts showed up because Spark build contains versions of libraries that's vastly different from current major Hadoop version. It would be nice if we can choose versions that's in line with Hadoop or shading them in the assembly. Here are the wish list: 1. Upgrade protobuf version to 2.5.0 from current 2.4.1 2. Shading Spark's jetty and servlet dependency in the assembly. 3. guava version difference. Spark is using a higher version. I'm not sure what's the best solution for this. The list may grow as HIVE-7292 proceeds. For information only, the attached is a patch that we applied on Spark in order to make Spark work with Hive. It gives an idea of the scope of changes. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2420) Change Spark build to minimize library conflicts
[ https://issues.apache.org/jira/browse/SPARK-2420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14063049#comment-14063049 ] Xuefu Zhang commented on SPARK-2420: [~rxin] As pointed above, Hive and its dependent hadoop components are all on guava 11, unless Spark shades or downgrade to a common version, I don't see an easy workaround. Same story for servlet-api/jetty. Change Spark build to minimize library conflicts Key: SPARK-2420 URL: https://issues.apache.org/jira/browse/SPARK-2420 Project: Spark Issue Type: Wish Components: Build Affects Versions: 1.0.0 Reporter: Xuefu Zhang Attachments: spark_1.0.0.patch During the prototyping of HIVE-7292, many library conflicts showed up because Spark build contains versions of libraries that's vastly different from current major Hadoop version. It would be nice if we can choose versions that's in line with Hadoop or shading them in the assembly. Here are the wish list: 1. Upgrade protobuf version to 2.5.0 from current 2.4.1 2. Shading Spark's jetty and servlet dependency in the assembly. 3. guava version difference. Spark is using a higher version. I'm not sure what's the best solution for this. The list may grow as HIVE-7292 proceeds. For information only, the attached is a patch that we applied on Spark in order to make Spark work with Hive. It gives an idea of the scope of changes. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2420) Change Spark build to minimize library conflicts
[ https://issues.apache.org/jira/browse/SPARK-2420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14058757#comment-14058757 ] Xuefu Zhang commented on SPARK-2420: [~rxin] Is it possible to downgrade or shade guava to 11 in Spark? it doesn't seem feasible to upgrade guava across all hadoop components. This problem currently blocks HIVE-7292, Spark and Hive integration. Change Spark build to minimize library conflicts Key: SPARK-2420 URL: https://issues.apache.org/jira/browse/SPARK-2420 Project: Spark Issue Type: Wish Components: Build Affects Versions: 1.0.0 Reporter: Xuefu Zhang Attachments: spark_1.0.0.patch During the prototyping of HIVE-7292, many library conflicts showed up because Spark build contains versions of libraries that's vastly different from current major Hadoop version. It would be nice if we can choose versions that's in line with Hadoop or shading them in the assembly. Here are the wish list: 1. Upgrade protobuf version to 2.5.0 from current 2.4.1 2. Shading Spark's jetty and servlet dependency in the assembly. 3. guava version difference. Spark is using a higher version. I'm not sure what's the best solution for this. The list may grow as HIVE-7292 proceeds. For information only, the attached is a patch that we applied on Spark in order to make Spark work with Hive. It gives an idea of the scope of changes. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (SPARK-2420) Change Spark build to minimize library conflicts
Xuefu Zhang created SPARK-2420: -- Summary: Change Spark build to minimize library conflicts Key: SPARK-2420 URL: https://issues.apache.org/jira/browse/SPARK-2420 Project: Spark Issue Type: Wish Components: Build Affects Versions: 1.0.0 Reporter: Xuefu Zhang During the prototyping of HIVE-7292, many library conflicts showed up because Spark build contains versions of libraries that's vastly different from current major Hadoop version. It would be nice if we can choose versions that's in line with Hadoop or shading them in the assembly. Here are the wish list: 1. Upgrade protobuf version to 2.5.0 from current 2.4.1 2. Shading Spark's jetty and servlet dependency in the assembly. 3. guava version difference. Spark is using a higher version. I'm not sure what's the best solution for this. The list may grow as HIVE-7292 proceeds. For information only, the attached is a patch that we applied on Spark in order to make Spark work with Hive. It gives an idea of the scope of changes. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2421) Spark should treat writable as serializable for keys
[ https://issues.apache.org/jira/browse/SPARK-2421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14056677#comment-14056677 ] Xuefu Zhang commented on SPARK-2421: CC [~rxin], [~hshreedharan] Spark should treat writable as serializable for keys Key: SPARK-2421 URL: https://issues.apache.org/jira/browse/SPARK-2421 Project: Spark Issue Type: Improvement Components: Input/Output, Java API Affects Versions: 1.0.0 Reporter: Xuefu Zhang It seems that Spark requires the key be serializable (class implement Serializable interface). In Hadoop world, Writable interface is used for the same purpose. A lot of existing classes, while writable, are not considered by Spark as Serializable. It would be nice if Spark can treate Writable as serializable and automatically serialize and de-serialize these classes using writable interface. This is identified in HIVE-7279, but its benefits are seen global. -- This message was sent by Atlassian JIRA (v6.2#6252)