[jira] [Commented] (SPARK-5377) Dynamically add jar into Spark Driver's classpath.

2018-02-23 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374841#comment-16374841
 ] 

Xuefu Zhang commented on SPARK-5377:


[~shay_elbaz] I think the issue was closed purely because no one was working on 
this, based on my private communication with [~sowen]. However, this can surely 
be reopened if someone likes to work on this.

> Dynamically add jar into Spark Driver's classpath.
> --
>
> Key: SPARK-5377
> URL: https://issues.apache.org/jira/browse/SPARK-5377
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.2.0
>Reporter: Chengxiang Li
>Priority: Major
>
> Spark support dynamically add jar to executor classpath through 
> SparkContext::addJar(), while it does not support dynamically add jar into 
> driver classpath. In most case(if not all the case), user dynamically add jar 
> with SparkContext::addJar()  because some classes from the jar would be 
> referred in upcoming Spark job, which means the classes need to be loaded in 
> Spark driver side either,e.g during serialization. I think it make sense to 
> add an API to add jar into driver classpath, or just make it available in 
> SparkContext::addJar(). HIVE-9410 is a real case from Hive on Spark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2018-02-09 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358759#comment-16358759
 ] 

Xuefu Zhang commented on SPARK-22683:
-

On a side note, besides the name of the configuration that's subject to change, 
I think (and mentioned previously) that the value doesn't have to be an 
integer, to allow finer control.

> DynamicAllocation wastes resources by allocating containers that will barely 
> be used
> 
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>Priority: Major
>  Labels: pull-request-available
>
> While migrating a series of jobs from MR to Spark using dynamicAllocation, 
> I've noticed almost a doubling (+114% exactly) of resource consumption of 
> Spark w.r.t MR, for a wall clock time gain of 43%
> About the context: 
> - resource usage stands for vcore-hours allocation for the whole job, as seen 
> by YARN
> - I'm talking about a series of jobs because we provide our users with a way 
> to define experiments (via UI / DSL) that automatically get translated to 
> Spark / MR jobs and submitted on the cluster
> - we submit around 500 of such jobs each day
> - these jobs are usually one shot, and the amount of processing can vary a 
> lot between jobs, and as such finding an efficient number of executors for 
> each job is difficult to get right, which is the reason I took the path of 
> dynamic allocation.  
> - Some of the tests have been scheduled on an idle queue, some on a full 
> queue.
> - experiments have been conducted with spark.executor-cores = 5 and 10, only 
> results for 5 cores have been reported because efficiency was overall better 
> than with 10 cores
> - the figures I give are averaged over a representative sample of those jobs 
> (about 600 jobs) ranging from tens to thousands splits in the data 
> partitioning and between 400 to 9000 seconds of wall clock time.
> - executor idle timeout is set to 30s;
>  
> Definition: 
> - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, 
> which represent the max number of tasks an executor will process in parallel.
> - the current behaviour of the dynamic allocation is to allocate enough 
> containers to have one taskSlot per task, which minimizes latency, but wastes 
> resources when tasks are small regarding executor allocation and idling 
> overhead. 
> The results using the proposal (described below) over the job sample (600 
> jobs):
> - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in 
> resource usage, for a 37% (against 43%) reduction in wall clock time for 
> Spark w.r.t MR
> - by trying to minimize the average resource consumption, I ended up with 6 
> tasks per core, with a 30% resource usage reduction, for a similar wall clock 
> time w.r.t. MR
> What did I try to solve the issue with existing parameters (summing up a few 
> points mentioned in the comments) ?
> - change dynamicAllocation.maxExecutors: this would need to be adapted for 
> each job (tens to thousands splits can occur), and essentially remove the 
> interest of using the dynamic allocation.
> - use dynamicAllocation.backlogTimeout: 
> - setting this parameter right to avoid creating unused executors is very 
> dependant on wall clock time. One basically needs to solve the exponential 
> ramp up for the target time. So this is not an option for my use case where I 
> don't want a per-job tuning. 
> - I've still done a series of experiments, details in the comments. 
> Result is that after manual tuning, the best I could get was a similar 
> resource consumption at the expense of 20% more wall clock time, or a similar 
> wall clock time at the expense of 60% more resource consumption than what I 
> got using my proposal @ 6 tasks per slot (this value being optimized over a 
> much larger range of jobs as already stated)
> - as mentioned in another comment, tampering with the exponential ramp up 
> might yield task imbalance and such old executors could become contention 
> points for other exes trying to remotely access blocks in the old exes (not 
> witnessed in the jobs I'm talking about, but we did see this behavior in 
> other jobs)
> Proposal: 
> Simply add a tasksPerExecutorSlot parameter, which makes it possible to 
> specify how many tasks a single taskSlot should ideally execute to mitigate 
> the overhead of executor allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: 

[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2018-02-07 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355634#comment-16355634
 ] 

Xuefu Zhang commented on SPARK-22683:
-

+1 on the idea of including this. Also, +1 on renaming the config. Thanks.

> DynamicAllocation wastes resources by allocating containers that will barely 
> be used
> 
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>Priority: Major
>  Labels: pull-request-available
>
> While migrating a series of jobs from MR to Spark using dynamicAllocation, 
> I've noticed almost a doubling (+114% exactly) of resource consumption of 
> Spark w.r.t MR, for a wall clock time gain of 43%
> About the context: 
> - resource usage stands for vcore-hours allocation for the whole job, as seen 
> by YARN
> - I'm talking about a series of jobs because we provide our users with a way 
> to define experiments (via UI / DSL) that automatically get translated to 
> Spark / MR jobs and submitted on the cluster
> - we submit around 500 of such jobs each day
> - these jobs are usually one shot, and the amount of processing can vary a 
> lot between jobs, and as such finding an efficient number of executors for 
> each job is difficult to get right, which is the reason I took the path of 
> dynamic allocation.  
> - Some of the tests have been scheduled on an idle queue, some on a full 
> queue.
> - experiments have been conducted with spark.executor-cores = 5 and 10, only 
> results for 5 cores have been reported because efficiency was overall better 
> than with 10 cores
> - the figures I give are averaged over a representative sample of those jobs 
> (about 600 jobs) ranging from tens to thousands splits in the data 
> partitioning and between 400 to 9000 seconds of wall clock time.
> - executor idle timeout is set to 30s;
>  
> Definition: 
> - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, 
> which represent the max number of tasks an executor will process in parallel.
> - the current behaviour of the dynamic allocation is to allocate enough 
> containers to have one taskSlot per task, which minimizes latency, but wastes 
> resources when tasks are small regarding executor allocation and idling 
> overhead. 
> The results using the proposal (described below) over the job sample (600 
> jobs):
> - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in 
> resource usage, for a 37% (against 43%) reduction in wall clock time for 
> Spark w.r.t MR
> - by trying to minimize the average resource consumption, I ended up with 6 
> tasks per core, with a 30% resource usage reduction, for a similar wall clock 
> time w.r.t. MR
> What did I try to solve the issue with existing parameters (summing up a few 
> points mentioned in the comments) ?
> - change dynamicAllocation.maxExecutors: this would need to be adapted for 
> each job (tens to thousands splits can occur), and essentially remove the 
> interest of using the dynamic allocation.
> - use dynamicAllocation.backlogTimeout: 
> - setting this parameter right to avoid creating unused executors is very 
> dependant on wall clock time. One basically needs to solve the exponential 
> ramp up for the target time. So this is not an option for my use case where I 
> don't want a per-job tuning. 
> - I've still done a series of experiments, details in the comments. 
> Result is that after manual tuning, the best I could get was a similar 
> resource consumption at the expense of 20% more wall clock time, or a similar 
> wall clock time at the expense of 60% more resource consumption than what I 
> got using my proposal @ 6 tasks per slot (this value being optimized over a 
> much larger range of jobs as already stated)
> - as mentioned in another comment, tampering with the exponential ramp up 
> might yield task imbalance and such old executors could become contention 
> points for other exes trying to remotely access blocks in the old exes (not 
> witnessed in the jobs I'm talking about, but we did see this behavior in 
> other jobs)
> Proposal: 
> Simply add a tasksPerExecutorSlot parameter, which makes it possible to 
> specify how many tasks a single taskSlot should ideally execute to mitigate 
> the overhead of executor allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22765) Create a new executor allocation scheme based on that of MR

2017-12-21 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16300926#comment-16300926
 ] 

Xuefu Zhang edited comment on SPARK-22765 at 12/22/17 4:43 AM:
---

Did some benchmarking with a set of 20 queries on upfront allocation against 
exponential ramp-up. No clear trend is seen: upfront allocation offers better 
efficiency for some of the queries, similar efficiency for others, and worse 
for the rest. These variations might just be noise, which is abundant in our 
production cluster. Thus, I tend to agree that upfront allocation offers 
limited benefit for efficiency, if any. (On the other hand, it seems benefiting 
performance somewhat.)

I also noticed that when the scheduler schedules a task, it doesn't necessarily 
pick a core that's available in an executor that's running other tasks. I 
speculate that efficiency improves if busy executors are favored for a new task 
so that other idle executors can idle out. (To be tested out.)

Making idleTime=0 valid is a good thing to have. I created a separate ticket 
(SPARK-22870) for that.



was (Author: xuefuz):
Did some benchmarking with a set of 20 queries on upfront allocation against 
exponential ramp-up. No clear trend is seen: upfront allocation offers better 
efficiency for some of the queries, similar efficiency for others, and worse 
for the rest. These variations might just be noise, which is abundant in our 
production cluster. Thus, I tend to agree that upfront allocation offers 
limited benefit for efficiency, if any. (On the other hand, it seems benefiting 
performance somewhat.)

I also noticed that when the scheduler schedules a task, it doesn't necessarily 
pick a core that's available in an executor that's running other tasks. I 
speculate that efficiency improves if busy executors are favored for a new task 
so that other idle executors can idle out. (To be tested out.)

Making idleTime=0 valid is a good thing to have. I will create a separate 
ticket for that.


> Create a new executor allocation scheme based on that of MR
> ---
>
> Key: SPARK-22765
> URL: https://issues.apache.org/jira/browse/SPARK-22765
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>
> Many users migrating their workload from MR to Spark find a significant 
> resource consumption hike (i.e, SPARK-22683). While this might not be a 
> concern for users that are more performance centric, for others conscious 
> about cost, such hike creates a migration obstacle. This situation can get 
> worse as more users are moving to cloud.
> Dynamic allocation make it possible for Spark to be deployed in multi-tenant 
> environment. With its performance-centric design, its inefficiency has also 
> unfortunately shown up, especially when compared with MR. Thus, it's believed 
> that MR-styled scheduler still has its merit. Based on our research, the 
> inefficiency associated with dynamic allocation comes in many aspects such as 
> executor idling out, bigger executors, many stages (rather than 2 stages only 
> in MR) in a spark job, etc.
> Rather than fine tuning dynamic allocation for efficiency, the proposal here 
> is to add a new, efficiency-centric  scheduling scheme based on that of MR. 
> Such a MR-based scheme can be further enhanced and be more adapted to Spark 
> execution model. This alternative is expected to offer good performance 
> improvement (compared to MR) still with similar to or even better efficiency 
> than MR.
> Inputs are greatly welcome!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22870) Dynamic allocation should allow 0 idle time

2017-12-21 Thread Xuefu Zhang (JIRA)
Xuefu Zhang created SPARK-22870:
---

 Summary: Dynamic allocation should allow 0 idle time
 Key: SPARK-22870
 URL: https://issues.apache.org/jira/browse/SPARK-22870
 Project: Spark
  Issue Type: Improvement
  Components: Scheduler
Affects Versions: 1.6.0
Reporter: Xuefu Zhang
Priority: Minor


As discussed in SPARK-22765, with SPARK-21656, an executor will not idle out 
when there are pending tasks to run. When there is no task to run, an executor 
will die out after {{spark.dynamicAllocation.executorIdleTimeout}}, which is 
currently required to be greater than zero. However, for efficiency, a user 
should be able to specify that an executor can die out immediately w/o being 
required to be idle for at least 1s.

This is to make {{0}} a valid value for 
{{spark.dynamicAllocation.executorIdleTimeout}}, and special handling such a 
case might be needed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR

2017-12-21 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16300926#comment-16300926
 ] 

Xuefu Zhang commented on SPARK-22765:
-

Did some benchmarking with a set of 20 queries on upfront allocation against 
exponential ramp-up. No clear trend is seen: upfront allocation offers better 
efficiency for some of the queries, similar efficiency for others, and worse 
for the rest. These variations might just be noise, which is abundant in our 
production cluster. Thus, I tend to agree that upfront allocation offers 
limited benefit for efficiency, if any. (On the other hand, it seems benefiting 
performance somewhat.)

I also noticed that when the scheduler schedules a task, it doesn't necessarily 
pick a core that's available in an executor that's running other tasks. I 
speculate that efficiency improves if busy executors are favored for a new task 
so that other idle executors can idle out. (To be tested out.)

Making idleTime=0 valid is a good thing to have. I will create a separate 
ticket for that.


> Create a new executor allocation scheme based on that of MR
> ---
>
> Key: SPARK-22765
> URL: https://issues.apache.org/jira/browse/SPARK-22765
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>
> Many users migrating their workload from MR to Spark find a significant 
> resource consumption hike (i.e, SPARK-22683). While this might not be a 
> concern for users that are more performance centric, for others conscious 
> about cost, such hike creates a migration obstacle. This situation can get 
> worse as more users are moving to cloud.
> Dynamic allocation make it possible for Spark to be deployed in multi-tenant 
> environment. With its performance-centric design, its inefficiency has also 
> unfortunately shown up, especially when compared with MR. Thus, it's believed 
> that MR-styled scheduler still has its merit. Based on our research, the 
> inefficiency associated with dynamic allocation comes in many aspects such as 
> executor idling out, bigger executors, many stages (rather than 2 stages only 
> in MR) in a spark job, etc.
> Rather than fine tuning dynamic allocation for efficiency, the proposal here 
> is to add a new, efficiency-centric  scheduling scheme based on that of MR. 
> Such a MR-based scheme can be further enhanced and be more adapted to Spark 
> execution model. This alternative is expected to offer good performance 
> improvement (compared to MR) still with similar to or even better efficiency 
> than MR.
> Inputs are greatly welcome!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR

2017-12-20 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16298634#comment-16298634
 ] 

Xuefu Zhang commented on SPARK-22765:
-

bq:  at least based on this one experiment up front allocation didn't help.

I'm not sure how the conclusion is drawn here, but with #5, which diffs from #3 
only with additional upfront allocation, we see a resource usage decrease from 
2X to 1.4X. This doesn't seem supporting this claim. I will do more testing on 
this to make my observation more evident.

For idle=0, I think it's a valid case and should be helpful as well.

> Create a new executor allocation scheme based on that of MR
> ---
>
> Key: SPARK-22765
> URL: https://issues.apache.org/jira/browse/SPARK-22765
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>
> Many users migrating their workload from MR to Spark find a significant 
> resource consumption hike (i.e, SPARK-22683). While this might not be a 
> concern for users that are more performance centric, for others conscious 
> about cost, such hike creates a migration obstacle. This situation can get 
> worse as more users are moving to cloud.
> Dynamic allocation make it possible for Spark to be deployed in multi-tenant 
> environment. With its performance-centric design, its inefficiency has also 
> unfortunately shown up, especially when compared with MR. Thus, it's believed 
> that MR-styled scheduler still has its merit. Based on our research, the 
> inefficiency associated with dynamic allocation comes in many aspects such as 
> executor idling out, bigger executors, many stages (rather than 2 stages only 
> in MR) in a spark job, etc.
> Rather than fine tuning dynamic allocation for efficiency, the proposal here 
> is to add a new, efficiency-centric  scheduling scheme based on that of MR. 
> Such a MR-based scheme can be further enhanced and be more adapted to Spark 
> execution model. This alternative is expected to offer good performance 
> improvement (compared to MR) still with similar to or even better efficiency 
> than MR.
> Inputs are greatly welcome!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22765) Create a new executor allocation scheme based on that of MR

2017-12-19 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297914#comment-16297914
 ] 

Xuefu Zhang edited comment on SPARK-22765 at 12/20/17 5:45 AM:
---

Alright, I tested upfront allocation and its combinations with other 
improvement ideas and here is what I found:

1. the query: just one query, fairly complicated and represented by main spark 
job like this:
{code}
Status: Running (Hive on Spark job[4])
--
  STAGES   ATTEMPTSTATUS  TOTAL  COMPLETED  RUNNING  PENDING  
FAILED  
--
Stage-10 ... 0  FINISHED33933900
   0  
Stage-11 ... 0  FINISHED20120100
   0  
Stage-12 ... 0  FINISHED19119100
   0  
Stage-13 ... 0  FINISHED17817800
   0  
Stage-14 ... 0  FINISHED11511500
   0  
Stage-15 ... 0  FINISHED10510500
   0  
Stage-16 ... 0  FINISHED59259200
   0  
Stage-17 ... 0  FINISHED19119100
   0  
Stage-4  0  FINISHED17817800
   0  
Stage-5  0  FINISHED11511500
   0  
Stage-6  0  FINISHED10510500
   0  
Stage-7  0  FINISHED33933900
   0  
Stage-8  0  FINISHED20120100
   0  
Stage-9  0  FINISHED19119100
   0  
--
{code}
2. Without any improvement, default 60s idleTime, Spark uses more than 3X 
resources compared to MR.
3. With idleTime=5s and the improvement in SPARK-21656, Spark uses about 2X 
resources.
4. Same as #3, but with idleTime=1s, Spark uses 1.4X resoures
5. Same as #3, but with additional upfront allocation Spark also uses 1.4X 
resource
6. Same as #5, but with additional improvement in SPARK-22683 (factor = 2), 
Spark uses 1.2X resource
7. Same as #6 but with factor=3, Spark uses 0.8X resources.

While this is just for one query, far from being conclusive, we can really see 
that how much those considerations might impact efficiency. I'm sure the 
mileage varies, but this at least shows there is a lot of room for Spark to 
improve resource utilization efficiency wrt scheduling.


was (Author: xuefuz):
Alright, I tested upfront allocation and its combinations with other 
improvement ideas and here is what I found:

1. the query: just one query, fairly complicated, represented by one of the 
main spark jobs:
{code}
Status: Running (Hive on Spark job[4])
--
  STAGES   ATTEMPTSTATUS  TOTAL  COMPLETED  RUNNING  PENDING  
FAILED  
--
Stage-10 ... 0  FINISHED33933900
   0  
Stage-11 ... 0  FINISHED20120100
   0  
Stage-12 ... 0  FINISHED19119100
   0  
Stage-13 ... 0  FINISHED17817800
   0  
Stage-14 ... 0  FINISHED11511500
   0  
Stage-15 ... 0  FINISHED10510500
   0  
Stage-16 ... 0  FINISHED59259200
   0  
Stage-17 ... 0  FINISHED19119100
   0  
Stage-4  0  FINISHED17817800
   0  
Stage-5  0  FINISHED11511500
   0  
Stage-6  0  FINISHED10510500
   0  
Stage-7  0  FINISHED33933900
   0  
Stage-8  0  FINISHED20120100
   0  
Stage-9  0  FINISHED19119100
   0  
--
{code}
2. Without any improvement, default 60s idleTime, Spark uses more than 3X 
resources compared to MR.
3. With idleTime=5s and the improvement in SPARK-21656, Spark uses about 2X 
resources.
4. Same as #3, but with 

[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR

2017-12-19 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297914#comment-16297914
 ] 

Xuefu Zhang commented on SPARK-22765:
-

Alright, I tested upfront allocation and its combinations with other 
improvement ideas and here is what I found:

1. the query: just one query, fairly complicated, represented by one of the 
main spark jobs:
{code}
Status: Running (Hive on Spark job[4])
--
  STAGES   ATTEMPTSTATUS  TOTAL  COMPLETED  RUNNING  PENDING  
FAILED  
--
Stage-10 ... 0  FINISHED33933900
   0  
Stage-11 ... 0  FINISHED20120100
   0  
Stage-12 ... 0  FINISHED19119100
   0  
Stage-13 ... 0  FINISHED17817800
   0  
Stage-14 ... 0  FINISHED11511500
   0  
Stage-15 ... 0  FINISHED10510500
   0  
Stage-16 ... 0  FINISHED59259200
   0  
Stage-17 ... 0  FINISHED19119100
   0  
Stage-4  0  FINISHED17817800
   0  
Stage-5  0  FINISHED11511500
   0  
Stage-6  0  FINISHED10510500
   0  
Stage-7  0  FINISHED33933900
   0  
Stage-8  0  FINISHED20120100
   0  
Stage-9  0  FINISHED19119100
   0  
--
{code}
2. Without any improvement, default 60s idleTime, Spark uses more than 3X 
resources compared to MR.
3. With idleTime=5s and the improvement in SPARK-21656, Spark uses about 2X 
resources.
4. Same as #3, but with idleTime=1s, Spark uses 1.4X resoures
5. Same as #3, but with additional upfront allocation Spark also uses 1.4X 
resource
6. Same as #4, but with additional improvement in SPARK-22683 (factor = 2), 
Spark uses 1.2X resource
7. Same as #6 but with factor=3, Spark uses 0.8X resources.

While this is just for one query, far from being conclusive, we can really see 
that how much those considerations might impact efficiency. I'm sure the 
mileage varies, but this at least shows there is a lot of room for Spark to 
improve resource utilization efficiency wrt scheduling.

> Create a new executor allocation scheme based on that of MR
> ---
>
> Key: SPARK-22765
> URL: https://issues.apache.org/jira/browse/SPARK-22765
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>
> Many users migrating their workload from MR to Spark find a significant 
> resource consumption hike (i.e, SPARK-22683). While this might not be a 
> concern for users that are more performance centric, for others conscious 
> about cost, such hike creates a migration obstacle. This situation can get 
> worse as more users are moving to cloud.
> Dynamic allocation make it possible for Spark to be deployed in multi-tenant 
> environment. With its performance-centric design, its inefficiency has also 
> unfortunately shown up, especially when compared with MR. Thus, it's believed 
> that MR-styled scheduler still has its merit. Based on our research, the 
> inefficiency associated with dynamic allocation comes in many aspects such as 
> executor idling out, bigger executors, many stages (rather than 2 stages only 
> in MR) in a spark job, etc.
> Rather than fine tuning dynamic allocation for efficiency, the proposal here 
> is to add a new, efficiency-centric  scheduling scheme based on that of MR. 
> Such a MR-based scheme can be further enhanced and be more adapted to Spark 
> execution model. This alternative is expected to offer good performance 
> improvement (compared to MR) still with similar to or even better efficiency 
> than MR.
> Inputs are greatly welcome!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR

2017-12-19 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297645#comment-16297645
 ] 

Xuefu Zhang commented on SPARK-22765:
-

Haven't got a chance to try upfront allocation. Tried one query (runs for a 
couple of mins) with 1s idle time. The resource usage is further cut down as 
much as half, very close to that of MR in this case.

I think we should allow 0s idle time, even for completeness.

I will try upfront allocation and update.

> Create a new executor allocation scheme based on that of MR
> ---
>
> Key: SPARK-22765
> URL: https://issues.apache.org/jira/browse/SPARK-22765
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>
> Many users migrating their workload from MR to Spark find a significant 
> resource consumption hike (i.e, SPARK-22683). While this might not be a 
> concern for users that are more performance centric, for others conscious 
> about cost, such hike creates a migration obstacle. This situation can get 
> worse as more users are moving to cloud.
> Dynamic allocation make it possible for Spark to be deployed in multi-tenant 
> environment. With its performance-centric design, its inefficiency has also 
> unfortunately shown up, especially when compared with MR. Thus, it's believed 
> that MR-styled scheduler still has its merit. Based on our research, the 
> inefficiency associated with dynamic allocation comes in many aspects such as 
> executor idling out, bigger executors, many stages (rather than 2 stages only 
> in MR) in a spark job, etc.
> Rather than fine tuning dynamic allocation for efficiency, the proposal here 
> is to add a new, efficiency-centric  scheduling scheme based on that of MR. 
> Such a MR-based scheme can be further enhanced and be more adapted to Spark 
> execution model. This alternative is expected to offer good performance 
> improvement (compared to MR) still with similar to or even better efficiency 
> than MR.
> Inputs are greatly welcome!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR

2017-12-19 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297529#comment-16297529
 ] 

Xuefu Zhang commented on SPARK-22765:
-

{quote}
SPARK-21656 and the dynamic allocation should handle that, the target number in 
the dynamic allocation manager is supposed to be based on all running stages 
for pending and running tasks. Are you saying that is not true?
{quote}
I verified and it does seem that SPARK-21656 covers concurrent stages. My job 
ran too fast, so I had to limit maxExecutors to a smaller number to observe 
more closely. It's no issue, after all. This is great!

> Create a new executor allocation scheme based on that of MR
> ---
>
> Key: SPARK-22765
> URL: https://issues.apache.org/jira/browse/SPARK-22765
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>
> Many users migrating their workload from MR to Spark find a significant 
> resource consumption hike (i.e, SPARK-22683). While this might not be a 
> concern for users that are more performance centric, for others conscious 
> about cost, such hike creates a migration obstacle. This situation can get 
> worse as more users are moving to cloud.
> Dynamic allocation make it possible for Spark to be deployed in multi-tenant 
> environment. With its performance-centric design, its inefficiency has also 
> unfortunately shown up, especially when compared with MR. Thus, it's believed 
> that MR-styled scheduler still has its merit. Based on our research, the 
> inefficiency associated with dynamic allocation comes in many aspects such as 
> executor idling out, bigger executors, many stages (rather than 2 stages only 
> in MR) in a spark job, etc.
> Rather than fine tuning dynamic allocation for efficiency, the proposal here 
> is to add a new, efficiency-centric  scheduling scheme based on that of MR. 
> Such a MR-based scheme can be further enhanced and be more adapted to Spark 
> execution model. This alternative is expected to offer good performance 
> improvement (compared to MR) still with similar to or even better efficiency 
> than MR.
> Inputs are greatly welcome!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR

2017-12-19 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297346#comment-16297346
 ] 

Xuefu Zhang commented on SPARK-22765:
-

I'm not 100% positive, but that seems to be what I saw. I will verify further.

> Create a new executor allocation scheme based on that of MR
> ---
>
> Key: SPARK-22765
> URL: https://issues.apache.org/jira/browse/SPARK-22765
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>
> Many users migrating their workload from MR to Spark find a significant 
> resource consumption hike (i.e, SPARK-22683). While this might not be a 
> concern for users that are more performance centric, for others conscious 
> about cost, such hike creates a migration obstacle. This situation can get 
> worse as more users are moving to cloud.
> Dynamic allocation make it possible for Spark to be deployed in multi-tenant 
> environment. With its performance-centric design, its inefficiency has also 
> unfortunately shown up, especially when compared with MR. Thus, it's believed 
> that MR-styled scheduler still has its merit. Based on our research, the 
> inefficiency associated with dynamic allocation comes in many aspects such as 
> executor idling out, bigger executors, many stages (rather than 2 stages only 
> in MR) in a spark job, etc.
> Rather than fine tuning dynamic allocation for efficiency, the proposal here 
> is to add a new, efficiency-centric  scheduling scheme based on that of MR. 
> Such a MR-based scheme can be further enhanced and be more adapted to Spark 
> execution model. This alternative is expected to offer good performance 
> improvement (compared to MR) still with similar to or even better efficiency 
> than MR.
> Inputs are greatly welcome!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR

2017-12-19 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297316#comment-16297316
 ] 

Xuefu Zhang commented on SPARK-22765:
-

Actually I meant across parallel stages (those connected to union 
transformation), not serial stages that I care much less. The following is what 
I meant (in Hive on Spark context):
{code}
Status: Running (Hive on Spark job[4])
--
  STAGES   ATTEMPTSTATUS  TOTAL  COMPLETED  RUNNING  PENDING  
FAILED  
--
Stage-10 ..  0   RUNNING340126  2140
   0  
Stage-11 0   PENDING201  00  201
   0  
Stage-12 0   PENDING191  00  191
   0  
Stage-13 .   0   RUNNING178 33  1450
   0  
Stage-14 0   PENDING115  00  115
   0  
Stage-15 0   PENDING105  00  105
   0  
Stage-16 0   PENDING592  00  592
   0  
Stage-17 0   PENDING191  00  191
   0  
Stage-4 ...  0   RUNNING178157   210
   4  
Stage-5  0   PENDING115  00  115
   0  
Stage-6  0   PENDING105  00  105
   0  
Stage-7 .0   RUNNING340232  1080
   0  
Stage-8  0   PENDING201  00  201
   0  
Stage-9  0   PENDING191  00  191
   0  
{code}

> Create a new executor allocation scheme based on that of MR
> ---
>
> Key: SPARK-22765
> URL: https://issues.apache.org/jira/browse/SPARK-22765
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>
> Many users migrating their workload from MR to Spark find a significant 
> resource consumption hike (i.e, SPARK-22683). While this might not be a 
> concern for users that are more performance centric, for others conscious 
> about cost, such hike creates a migration obstacle. This situation can get 
> worse as more users are moving to cloud.
> Dynamic allocation make it possible for Spark to be deployed in multi-tenant 
> environment. With its performance-centric design, its inefficiency has also 
> unfortunately shown up, especially when compared with MR. Thus, it's believed 
> that MR-styled scheduler still has its merit. Based on our research, the 
> inefficiency associated with dynamic allocation comes in many aspects such as 
> executor idling out, bigger executors, many stages (rather than 2 stages only 
> in MR) in a spark job, etc.
> Rather than fine tuning dynamic allocation for efficiency, the proposal here 
> is to add a new, efficiency-centric  scheduling scheme based on that of MR. 
> Such a MR-based scheme can be further enhanced and be more adapted to Spark 
> execution model. This alternative is expected to offer good performance 
> improvement (compared to MR) still with similar to or even better efficiency 
> than MR.
> Inputs are greatly welcome!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR

2017-12-19 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297276#comment-16297276
 ] 

Xuefu Zhang commented on SPARK-22765:
-

Hi [~CodingCat], yes, we adapted both #1 and #2, but we still needed 60s as 
minimum w/o SPARK-21656.

Hi @Tomas Graves, idle=1 seems working, but idle=0 seems illegal.
{code}
org.apache.spark.SparkException: spark.dynamicAllocation.executorIdleTimeout 
must be > 0!
{code}

Now I think it, idle=0 should be permitted because a user might not want any 
idle executor at all.

{quote}
with SPARK-21656 executors shouldn't timeout unless you are between stages
{quote}
This seems functioning correctly. However, I think this needs to be extended 
across concurrent stages. Based on what I'm seeing, executors are not reused 
across such stages, which means if an executor working for a completed stage 
should be recycled if there are other running stages that have pending tasks.

> Create a new executor allocation scheme based on that of MR
> ---
>
> Key: SPARK-22765
> URL: https://issues.apache.org/jira/browse/SPARK-22765
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>
> Many users migrating their workload from MR to Spark find a significant 
> resource consumption hike (i.e, SPARK-22683). While this might not be a 
> concern for users that are more performance centric, for others conscious 
> about cost, such hike creates a migration obstacle. This situation can get 
> worse as more users are moving to cloud.
> Dynamic allocation make it possible for Spark to be deployed in multi-tenant 
> environment. With its performance-centric design, its inefficiency has also 
> unfortunately shown up, especially when compared with MR. Thus, it's believed 
> that MR-styled scheduler still has its merit. Based on our research, the 
> inefficiency associated with dynamic allocation comes in many aspects such as 
> executor idling out, bigger executors, many stages (rather than 2 stages only 
> in MR) in a spark job, etc.
> Rather than fine tuning dynamic allocation for efficiency, the proposal here 
> is to add a new, efficiency-centric  scheduling scheme based on that of MR. 
> Such a MR-based scheme can be further enhanced and be more adapted to Spark 
> execution model. This alternative is expected to offer good performance 
> improvement (compared to MR) still with similar to or even better efficiency 
> than MR.
> Inputs are greatly welcome!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR

2017-12-18 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295814#comment-16295814
 ] 

Xuefu Zhang commented on SPARK-22765:
-

As an update, I managed to backport SPARK-21656, among a few others, into our 
codebase and measured the efficiency improvement with a smaller idle time (from 
60s to 5s). Our test shows that the efficiency gain is significant (consistent 
2X) for small jobs, especially those with many stages. For large, long-running 
jobs. the gain is less significant, about 10% higher than 60s).

I'd like to point out that even with 2X improvement on efficiency, for small 
jobs, Sparks is till behind. With 60s idle time, MR uses only about 35% of 
resource used by Spark. With 5s, now MR uses about 70% of that used by Spark. I 
suspect that the additional overhead comes from: 1. exponential ramp-up 
allocation; 2. bigger container (I have 4 core per container).

It seems clearer to me now about the desired allocation scheme that is 
optimized for efficiency:

1. Upfront allocation instead of exponential ramp-up
2. Zero idle time (reuse containers if there are pending tasks or kill them 
right way if there are none)
3. Optimizations for smaller containers (like 1 core per container).

In combination with executor conservation factor from SPARK-22683, the new 
scheme, which diverts from dynamic allocation widely, should offer better 
resource efficiency.

> Create a new executor allocation scheme based on that of MR
> ---
>
> Key: SPARK-22765
> URL: https://issues.apache.org/jira/browse/SPARK-22765
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>
> Many users migrating their workload from MR to Spark find a significant 
> resource consumption hike (i.e, SPARK-22683). While this might not be a 
> concern for users that are more performance centric, for others conscious 
> about cost, such hike creates a migration obstacle. This situation can get 
> worse as more users are moving to cloud.
> Dynamic allocation make it possible for Spark to be deployed in multi-tenant 
> environment. With its performance-centric design, its inefficiency has also 
> unfortunately shown up, especially when compared with MR. Thus, it's believed 
> that MR-styled scheduler still has its merit. Based on our research, the 
> inefficiency associated with dynamic allocation comes in many aspects such as 
> executor idling out, bigger executors, many stages (rather than 2 stages only 
> in MR) in a spark job, etc.
> Rather than fine tuning dynamic allocation for efficiency, the proposal here 
> is to add a new, efficiency-centric  scheduling scheme based on that of MR. 
> Such a MR-based scheme can be further enhanced and be more adapted to Spark 
> execution model. This alternative is expected to offer good performance 
> improvement (compared to MR) still with similar to or even better efficiency 
> than MR.
> Inputs are greatly welcome!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2017-12-18 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295770#comment-16295770
 ] 

Xuefu Zhang commented on SPARK-22683:
-

I did some tests with the proposed change here and saw general efficiency 
improvement with increasing tasksPerSlot. A number between 2 and 4 usually 
gives the best trade off between performance and efficiency because the 
efficiency gain is big while the performance impact is small. As the number 
gets larger, the efficiency return diminishes while latency degradation becomes 
unreasonable.

The effect of resource saving here plays best when the tasks are small. In such 
a case, task finishes quickly, which doesn't require each task run on different 
core. Because the tasks are small, the latency impact is also limited. On the 
other hand, if the tasks are long running, keeping them waiting will increase 
latency while resource saving is limited.

Based on what I observed, this knob, whatever is called, is practically useful 
for efficiency conscious users. It doesn't seem hurting efficiency in any way. 
For users that care about latency or don't care about cost, the default value 
will do. Thus, I favor the inclusion of such a knob.

Nevertheless, I do agree that this knob is a little bit confusion as it is 
because the concept of execution slot doesn't exist in Spark. I think it might 
be better to define as a factor of the number of executors to be requested vs 
the number of executors required to run all waiting tasks. So, a name like 
{{spark.executor.conservation.factor}} might do. And it doesn't have to be an 
integer.

With this included, I think the next thing to look at is the exponential 
ramp-up in scheduling. I'd think up-front full allocation, in combination of 
the conservation factor here, will have a big impact on resource efficiency.

> DynamicAllocation wastes resources by allocating containers that will barely 
> be used
> 
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> While migrating a series of jobs from MR to Spark using dynamicAllocation, 
> I've noticed almost a doubling (+114% exactly) of resource consumption of 
> Spark w.r.t MR, for a wall clock time gain of 43%
> About the context: 
> - resource usage stands for vcore-hours allocation for the whole job, as seen 
> by YARN
> - I'm talking about a series of jobs because we provide our users with a way 
> to define experiments (via UI / DSL) that automatically get translated to 
> Spark / MR jobs and submitted on the cluster
> - we submit around 500 of such jobs each day
> - these jobs are usually one shot, and the amount of processing can vary a 
> lot between jobs, and as such finding an efficient number of executors for 
> each job is difficult to get right, which is the reason I took the path of 
> dynamic allocation.  
> - Some of the tests have been scheduled on an idle queue, some on a full 
> queue.
> - experiments have been conducted with spark.executor-cores = 5 and 10, only 
> results for 5 cores have been reported because efficiency was overall better 
> than with 10 cores
> - the figures I give are averaged over a representative sample of those jobs 
> (about 600 jobs) ranging from tens to thousands splits in the data 
> partitioning and between 400 to 9000 seconds of wall clock time.
> - executor idle timeout is set to 30s;
>  
> Definition: 
> - let's say an executor has spark.executor.cores / spark.task.cpus taskSlots, 
> which represent the max number of tasks an executor will process in parallel.
> - the current behaviour of the dynamic allocation is to allocate enough 
> containers to have one taskSlot per task, which minimizes latency, but wastes 
> resources when tasks are small regarding executor allocation and idling 
> overhead. 
> The results using the proposal (described below) over the job sample (600 
> jobs):
> - by using 2 tasks per taskSlot, we get a 5% (against -114%) reduction in 
> resource usage, for a 37% (against 43%) reduction in wall clock time for 
> Spark w.r.t MR
> - by trying to minimize the average resource consumption, I ended up with 6 
> tasks per core, with a 30% resource usage reduction, for a similar wall clock 
> time w.r.t. MR
> What did I try to solve the issue with existing parameters (summing up a few 
> points mentioned in the comments) ?
> - change dynamicAllocation.maxExecutors: this would need to be adapted for 
> each job (tens to thousands splits can occur), and essentially remove the 
> interest of using the dynamic allocation.
> - use dynamicAllocation.backlogTimeout: 

[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR

2017-12-14 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292048#comment-16292048
 ] 

Xuefu Zhang commented on SPARK-22765:
-

[~tgraves], I think it would help if SPARK-21656 can make a close-to-zero idle 
time work. This is one source of inefficiency. Our version is too old to 
backport the fix, but will try out this when we upgrade.

The second source of inefficiency comes in the fact that Spark favors bigger 
containers. A 4-core container might be running one task while wasting the 
other cores/mem. The executor cannot die as long as there is one task running. 
One might argue that a user configures 1-core containers under dynamic 
allocation. but this is probably not optimal on other aspects.

The third reason that one might favor MR-styled scheduling is its simplicity 
and efficiency. Frequently we found that for heavy workload the scheduler 
cannot really keep up with the task ups and downs, especially when the tasks 
finish fast. 

For cost-conscious users, cluster-level resource efficiency is probably what's 
looked at. My suspicion is that an enhanced MR-styled scheduling, simple and 
performing, will be significantly improve resource efficiency than a typical 
use of dynamic allocation, without sacrificing much performance.

As a start point, we will first benchmark with SPARK-21656 when possible.

> Create a new executor allocation scheme based on that of MR
> ---
>
> Key: SPARK-22765
> URL: https://issues.apache.org/jira/browse/SPARK-22765
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>
> Many users migrating their workload from MR to Spark find a significant 
> resource consumption hike (i.e, SPARK-22683). While this might not be a 
> concern for users that are more performance centric, for others conscious 
> about cost, such hike creates a migration obstacle. This situation can get 
> worse as more users are moving to cloud.
> Dynamic allocation make it possible for Spark to be deployed in multi-tenant 
> environment. With its performance-centric design, its inefficiency has also 
> unfortunately shown up, especially when compared with MR. Thus, it's believed 
> that MR-styled scheduler still has its merit. Based on our research, the 
> inefficiency associated with dynamic allocation comes in many aspects such as 
> executor idling out, bigger executors, many stages (rather than 2 stages only 
> in MR) in a spark job, etc.
> Rather than fine tuning dynamic allocation for efficiency, the proposal here 
> is to add a new, efficiency-centric  scheduling scheme based on that of MR. 
> Such a MR-based scheme can be further enhanced and be more adapted to Spark 
> execution model. This alternative is expected to offer good performance 
> improvement (compared to MR) still with similar to or even better efficiency 
> than MR.
> Inputs are greatly welcome!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR

2017-12-13 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289726#comment-16289726
 ] 

Xuefu Zhang commented on SPARK-22765:
-

Yes, we are using Hive on Spark. Our Spark version is 1.6.1, which is old. 
Obviously it doesn't have the fix in SPARK-21656.

As commented in SPARK-22683, our comparison was made between all jobs for MR VS 
all jobs (usually just 1) for Spark for individual queries.

> Create a new executor allocation scheme based on that of MR
> ---
>
> Key: SPARK-22765
> URL: https://issues.apache.org/jira/browse/SPARK-22765
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>
> Many users migrating their workload from MR to Spark find a significant 
> resource consumption hike (i.e, SPARK-22683). While this might not be a 
> concern for users that are more performance centric, for others conscious 
> about cost, such hike creates a migration obstacle. This situation can get 
> worse as more users are moving to cloud.
> Dynamic allocation make it possible for Spark to be deployed in multi-tenant 
> environment. With its performance-centric design, its inefficiency has also 
> unfortunately shown up, especially when compared with MR. Thus, it's believed 
> that MR-styled scheduler still has its merit. Based on our research, the 
> inefficiency associated with dynamic allocation comes in many aspects such as 
> executor idling out, bigger executors, many stages (rather than 2 stages only 
> in MR) in a spark job, etc.
> Rather than fine tuning dynamic allocation for efficiency, the proposal here 
> is to add a new, efficiency-centric  scheduling scheme based on that of MR. 
> Such a MR-based scheme can be further enhanced and be more adapted to Spark 
> execution model. This alternative is expected to offer good performance 
> improvement (compared to MR) still with similar to or even better efficiency 
> than MR.
> Inputs are greatly welcome!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22683) DynamicAllocation wastes resources by allocating containers that will barely be used

2017-12-13 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289689#comment-16289689
 ] 

Xuefu Zhang commented on SPARK-22683:
-

[~tgraves], I can speak on our use case, where same queries  running on MR vs 
Spark via Hive. Because Spark gets rid of the intermediate HDFS reads/writes of 
MR, we expected better efficiency in addition to perf gains. While our 
expectation is met for some of our queries, usually long running ones with many 
stages, for the resource usage is much worse for other queries, especially 
those short running ones. 

I believe that efficiency can be substantially enhanced in both cases.

> DynamicAllocation wastes resources by allocating containers that will barely 
> be used
> 
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> let's say an executor has spark.executor.cores / spark.task.cpus taskSlots
> The current dynamic allocation policy allocates enough executors
> to have each taskSlot execute a single task, which minimizes latency, 
> but wastes resources when tasks are small regarding executor allocation
> and idling overhead. 
> By adding the tasksPerExecutorSlot, it is made possible to specify how many 
> tasks
> a single slot should ideally execute to mitigate the overhead of executor
> allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22765) Create a new executor allocation scheme based on that of MR

2017-12-13 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289676#comment-16289676
 ] 

Xuefu Zhang edited comment on SPARK-22765 at 12/13/17 6:30 PM:
---

Hi [~tgraves], Thanks for your input.

In our busy, heavily loaded cluster environment, we have found that any idle 
time less than 60s is a problem. 30s works for small jobs, but starts having 
problem for bigger jobs. The symptom is that newly allocated executors are 
idled out before completing a single tasks! I suspected that this is caused by 
a busy scheduler. As a result, we have to keep 60s as a minimum.

Having said that, however, I'm not against container reuse. Also, I used the 
word "enhanced" to qualify MR scheduling. Reusing is good, but in my opinion 
the speculation factor in dynamic allocation goes against efficiency. That is, 
you set an idle time just in case a new task comes within that period of time. 
When that doesn't happen, you waste your executor for 1 minute. (This is good 
for performance.) Please note that this happens a lot at the end of each stage 
because no tasks from the next stage will be scheduled until the current stage 
finishes.

If we can remove the speculation aspect of the scheduling, the efficiency 
should improve significantly with some compromise on performance. This would be 
a good start point, which is the main purpose of my proposal of an enhanced 
MR-style scheduling, which is open to many other possible improvements.



was (Author: xuefuz):
Hi [~tgraves], Thanks for your input.

In our busy, heavily loaded cluster environment, we have found that any idle 
time less than 60s is a problem. 30s works for small jobs, but starts having 
problem for bigger jobs. The symptom is that newly allocated executors are 
idled out before completing a single tasks! I suspected that this is caused by 
a busy scheduler. As a result, we have to keep 60s as a minimum.

Having said that, however, I'm not against container reuse. Also, I used the 
word "enhanced" to improve on MR scheduling. Reusing is good, but in my opinion 
the speculation factor in dynamic allocation goes against efficiency. That is, 
you set an idle time just in case a new task comes within that period of time. 
When that doesn't happen, you waste your executor for 1 minute. (This is good 
for performance.) Please note that this happens a lot at the end of each stage 
because no tasks from the next stage will be scheduled until the current stage 
finishes.

If we can remove the speculation aspect of the scheduling, the efficiency 
should improve significantly with some compromise on performance. This would be 
a good start point, which is the main purpose of my proposal of an enhanced 
MR-style scheduling, which is open to many other possible improvements.


> Create a new executor allocation scheme based on that of MR
> ---
>
> Key: SPARK-22765
> URL: https://issues.apache.org/jira/browse/SPARK-22765
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>
> Many users migrating their workload from MR to Spark find a significant 
> resource consumption hike (i.e, SPARK-22683). While this might not be a 
> concern for users that are more performance centric, for others conscious 
> about cost, such hike creates a migration obstacle. This situation can get 
> worse as more users are moving to cloud.
> Dynamic allocation make it possible for Spark to be deployed in multi-tenant 
> environment. With its performance-centric design, its inefficiency has also 
> unfortunately shown up, especially when compared with MR. Thus, it's believed 
> that MR-styled scheduler still has its merit. Based on our research, the 
> inefficiency associated with dynamic allocation comes in many aspects such as 
> executor idling out, bigger executors, many stages (rather than 2 stages only 
> in MR) in a spark job, etc.
> Rather than fine tuning dynamic allocation for efficiency, the proposal here 
> is to add a new, efficiency-centric  scheduling scheme based on that of MR. 
> Such a MR-based scheme can be further enhanced and be more adapted to Spark 
> execution model. This alternative is expected to offer good performance 
> improvement (compared to MR) still with similar to or even better efficiency 
> than MR.
> Inputs are greatly welcome!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR

2017-12-13 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289676#comment-16289676
 ] 

Xuefu Zhang commented on SPARK-22765:
-

Hi [~tgraves], Thanks for your input.

In our busy, heavily loaded cluster environment, we have found that any idle 
time less than 60s is a problem. 30s works for small jobs, but starts having 
problem for bigger jobs. The symptom is that newly allocated executors are 
idled out before completing a single tasks! I suspected that this is caused by 
a busy scheduler. As a result, we have to keep 60s as a minimum.

Having said that, however, I'm not against container reuse. Also, I used the 
word "enhanced" to improve on MR scheduling. Reusing is good, but in my opinion 
the speculation factor in dynamic allocation goes against efficiency. That is, 
you set an idle time just in case a new task comes within that period of time. 
When that doesn't happen, you waste your executor for 1 minute. (This is good 
for performance.) Please note that this happens a lot at the end of each stage 
because no tasks from the next stage will be scheduled until the current stage 
finishes.

If we can remove the speculation aspect of the scheduling, the efficiency 
should improve significantly with some compromise on performance. This would be 
a good start point, which is the main purpose of my proposal of an enhanced 
MR-style scheduling, which is open to many other possible improvements.


> Create a new executor allocation scheme based on that of MR
> ---
>
> Key: SPARK-22765
> URL: https://issues.apache.org/jira/browse/SPARK-22765
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>
> Many users migrating their workload from MR to Spark find a significant 
> resource consumption hike (i.e, SPARK-22683). While this might not be a 
> concern for users that are more performance centric, for others conscious 
> about cost, such hike creates a migration obstacle. This situation can get 
> worse as more users are moving to cloud.
> Dynamic allocation make it possible for Spark to be deployed in multi-tenant 
> environment. With its performance-centric design, its inefficiency has also 
> unfortunately shown up, especially when compared with MR. Thus, it's believed 
> that MR-styled scheduler still has its merit. Based on our research, the 
> inefficiency associated with dynamic allocation comes in many aspects such as 
> executor idling out, bigger executors, many stages (rather than 2 stages only 
> in MR) in a spark job, etc.
> Rather than fine tuning dynamic allocation for efficiency, the proposal here 
> is to add a new, efficiency-centric  scheduling scheme based on that of MR. 
> Such a MR-based scheme can be further enhanced and be more adapted to Spark 
> execution model. This alternative is expected to offer good performance 
> improvement (compared to MR) still with similar to or even better efficiency 
> than MR.
> Inputs are greatly welcome!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22765) Create a new executor allocation scheme based on that of MR

2017-12-12 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16288414#comment-16288414
 ] 

Xuefu Zhang commented on SPARK-22765:
-

I wouldn't say that MR is static, at lease not static in Spark's sense. MR 
allocates an executor for each map or reduce task and the executor exits when 
the running task completes. This would avoid the inefficiency in dynamic 
allocation where executor has to slowly die out (0 idle time doesn't really 
work). Secondly, this proposal doesn't really intend to go back to the MR 
paradigm. Instead, I propose a scheduling scheme similar to MR but enhanced to 
fit into Spark's DAG execution model.

To be clear, the proposal here is not to replace dynamic allocation. Rather, it 
provides an alternative that's more efficiency-centric than dynamic allocation. 
I understand there are a lot of details lacking, but I'd like to start a 
discussion now and hopefully something concrete will come out soon.

> Create a new executor allocation scheme based on that of MR
> ---
>
> Key: SPARK-22765
> URL: https://issues.apache.org/jira/browse/SPARK-22765
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>
> Many users migrating their workload from MR to Spark find a significant 
> resource consumption hike (i.e, SPARK-22683). While this might not be a 
> concern for users that are more performance centric, for others conscious 
> about cost, such hike creates a migration obstacle. This situation can get 
> worse as more users are moving to cloud.
> Dynamic allocation make it possible for Spark to be deployed in multi-tenant 
> environment. With its performance-centric design, its inefficiency has also 
> unfortunately shown up, especially when compared with MR. Thus, it's believed 
> that MR-styled scheduler still has its merit. Based on our research, the 
> inefficiency associated with dynamic allocation comes in many aspects such as 
> executor idling out, bigger executors, many stages (rather than 2 stages only 
> in MR) in a spark job, etc.
> Rather than fine tuning dynamic allocation for efficiency, the proposal here 
> is to add a new, efficiency-centric  scheduling scheme based on that of MR. 
> Such a MR-based scheme can be further enhanced and be more adapted to Spark 
> execution model. This alternative is expected to offer good performance 
> improvement (compared to MR) still with similar to or even better efficiency 
> than MR.
> Inputs are greatly welcome!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22765) Create a new executor allocation scheme based on that of MR

2017-12-12 Thread Xuefu Zhang (JIRA)
Xuefu Zhang created SPARK-22765:
---

 Summary: Create a new executor allocation scheme based on that of 
MR
 Key: SPARK-22765
 URL: https://issues.apache.org/jira/browse/SPARK-22765
 Project: Spark
  Issue Type: Improvement
  Components: Scheduler
Affects Versions: 1.6.0
Reporter: Xuefu Zhang


Many users migrating their workload from MR to Spark find a significant 
resource consumption hike (i.e, SPARK-22683). While this might not be a concern 
for users that are more performance centric, for others conscious about cost, 
such hike creates a migration obstacle. This situation can get worse as more 
users are moving to cloud.

Dynamic allocation make it possible for Spark to be deployed in multi-tenant 
environment. With its performance-centric design, its inefficiency has also 
unfortunately shown up, especially when compared with MR. Thus, it's believed 
that MR-styled scheduler still has its merit. Based on our research, the 
inefficiency associated with dynamic allocation comes in many aspects such as 
executor idling out, bigger executors, many stages (rather than 2 stages only 
in MR) in a spark job, etc.

Rather than fine tuning dynamic allocation for efficiency, the proposal here is 
to add a new, efficiency-centric  scheduling scheme based on that of MR. Such a 
MR-based scheme can be further enhanced and be more adapted to Spark execution 
model. This alternative is expected to offer good performance improvement 
(compared to MR) still with similar to or even better efficiency than MR.

Inputs are greatly welcome!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22683) Allow tuning the number of dynamically allocated executors wrt task number

2017-12-12 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16288262#comment-16288262
 ] 

Xuefu Zhang commented on SPARK-22683:
-

Hi [~jcuquemelle], Thanks for working on this and bringing up the efficiency 
problem associated with dynamic allocation. Significant resource consumption 
increase is also experienced in our company when workload is migrated from MR 
to Spark (via Hive). Thus, I believe that there is a strong need to improve 
spark efficiency in addition to performance.

While your proposal has its merit, I largely concur with Sean that it might not 
be universally applicable to solve a class of problem rather than particular 
workload. Take MR as an example, which also allocate as many mappers/reducers 
as the number of map or reduce tasks, yet offers higher efficiency than Spark 
in many cases. The inefficiency associated with dynamic allocation comes in 
many aspects such as executor idling out, bigger executors, many stages (rather 
than 2 stages only in MR) in a spark job, etc. As there is a class of users 
conscious about resource consumption, especially when many moving their 
workload to the cloud, there demands a solution that's more generic to such 
users.

I have been thinking about a proposal that introduces a MR-based resource 
allocation in parallel with dynamic allocation. Such an allocation mechanism is 
based on MR style, but can be further enhanced to beat MR and be more adapted 
to Spark execution model. This would be a great alternative to dynamic 
allocation.

While dynamic is certainly performance centric, the new allocation scheme can 
still offer good performance improvement (compared to MR) while being 
efficiency-centric.

As a start point, I'm going to create an JIRA and move the discussion along 
this proposal over there. You're welcome to share your thoughts and/or 
contribute.

Thanks.

> Allow tuning the number of dynamically allocated executors wrt task number
> --
>
> Key: SPARK-22683
> URL: https://issues.apache.org/jira/browse/SPARK-22683
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Julien Cuquemelle
>  Labels: pull-request-available
>
> let's say an executor has spark.executor.cores / spark.task.cpus taskSlots
> The current dynamic allocation policy allocates enough executors
> to have each taskSlot execute a single task, which minimizes latency, 
> but wastes resources when tasks are small regarding executor allocation
> overhead. 
> By adding the tasksPerExecutorSlot, it is made possible to specify how many 
> tasks
> a single slot should ideally execute to mitigate the overhead of executor
> allocation.
> PR: https://github.com/apache/spark/pull/19881



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20640) Make rpc timeout and retry for shuffle registration configurable

2017-12-11 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286807#comment-16286807
 ] 

Xuefu Zhang commented on SPARK-20640:
-

[~lyc], thanks for fixing this. I'm wondering if you have any recommendation on 
the values of the new configurations in a heavy workload environment. Thanks.

> Make rpc timeout and retry for shuffle registration configurable
> 
>
> Key: SPARK-20640
> URL: https://issues.apache.org/jira/browse/SPARK-20640
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.0.2
>Reporter: Sital Kedia
>Assignee: Li Yichao
> Fix For: 2.3.0
>
>
> Currently the shuffle service registration timeout and retry has been 
> hardcoded (see 
> https://github.com/sitalkedia/spark/blob/master/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java#L144
>  and 
> https://github.com/sitalkedia/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L197).
>  This works well for small workloads but under heavy workload when the 
> shuffle service is busy transferring large amount of data we see significant 
> delay in responding to the registration request, as a result we often see the 
> executors fail to register with the shuffle service, eventually failing the 
> job. We need to make these two parameters configurable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20662) Block jobs that have greater than a configured number of tasks

2017-06-02 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035519#comment-16035519
 ] 

Xuefu Zhang commented on SPARK-20662:
-

I can understand the counter argument here if Spark is targeted for single user 
cases. For multiple users in an enterprise deployment, it's good to provide 
admin knobs. In this case, an admin just wanted to block bad jobs. I don't 
think RM meets that goal.

This is actually implemented in Hive on Spark. However, I thought this is 
generic and may be desirable for others as well. In addition, blocking a job at 
submission is better than killing it after it started to run.

If Spark doesn't think this is useful, then very well.

> Block jobs that have greater than a configured number of tasks
> --
>
> Key: SPARK-20662
> URL: https://issues.apache.org/jira/browse/SPARK-20662
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.6.0, 2.0.0
>Reporter: Xuefu Zhang
>
> In a shared cluster, it's desirable for an admin to block large Spark jobs. 
> While there might not be a single metrics defining the size of a job, the 
> number of tasks is usually a good indicator. Thus, it would be useful for 
> Spark scheduler to block a job whose number of tasks reaches a configured 
> limit. By default, the limit could be just infinite, to retain the existing 
> behavior.
> MapReduce has mapreduce.job.max.map and mapreduce.job.max.reduce to be 
> configured, which blocks a MR job at job submission time.
> The proposed configuration is spark.job.max.tasks with a default value -1 
> (infinite).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20662) Block jobs that have greater than a configured number of tasks

2017-06-02 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035462#comment-16035462
 ] 

Xuefu Zhang commented on SPARK-20662:
-

[~lyc] I'm talking about mapreduce.job.max.map, which is the maximum number of 
map tasks that a MR job may have. If a submitted MR job contains more map tasks 
than that, it will be rejected. Similar to mapreduce.job.max.reduce.

[~sowen], [~vanzin], I don't think blocking a large (perhaps ridiculously) job 
is equivalent to letting it run slowly and for ever. The use case I have is: 
while yarn queue can be used to limit how much resources can be used, but a 
queue can be shared by a team or multiple applications. It's probably not a 
good idea to let one job takes all resources while starving others. Secondly, 
many those users who submit ridiculously large job have no idea on what they 
are doing and they don't even realize that their jobs are huge. Lastly and more 
importantly, our application environment has a global timeout, beyond which a 
job will be killed. If a large job gets killed this way, significant resources 
is wasted. Thus, blocking such a job at submission time helps preserve the 
resources.

BTW, if the scenarios don't apply to a user, there is nothing for him/her to 
worry about because the default should keep them happy.

In addition to spark.job.max.tasks, I'd also propose spark.stage.max.tasks, 
which limits the number of tasks any stage of a job may contain. The rationale 
behind this is that spark.job.max.tasks tends to favor jobs with small number 
of stages. With both, we can not only cover MR's mapreduce.job.max.map and 
mapreduce.job.max.reduce, but also control the overall size of a job.


> Block jobs that have greater than a configured number of tasks
> --
>
> Key: SPARK-20662
> URL: https://issues.apache.org/jira/browse/SPARK-20662
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.6.0, 2.0.0
>Reporter: Xuefu Zhang
>
> In a shared cluster, it's desirable for an admin to block large Spark jobs. 
> While there might not be a single metrics defining the size of a job, the 
> number of tasks is usually a good indicator. Thus, it would be useful for 
> Spark scheduler to block a job whose number of tasks reaches a configured 
> limit. By default, the limit could be just infinite, to retain the existing 
> behavior.
> MapReduce has mapreduce.job.max.map and mapreduce.job.max.reduce to be 
> configured, which blocks a MR job at job submission time.
> The proposed configuration is spark.job.max.tasks with a default value -1 
> (infinite).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20662) Block jobs that have greater than a configured number of tasks

2017-05-08 Thread Xuefu Zhang (JIRA)
Xuefu Zhang created SPARK-20662:
---

 Summary: Block jobs that have greater than a configured number of 
tasks
 Key: SPARK-20662
 URL: https://issues.apache.org/jira/browse/SPARK-20662
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.0.0, 1.6.0
Reporter: Xuefu Zhang


In a shared cluster, it's desirable for an admin to block large Spark jobs. 
While there might not be a single metrics defining the size of a job, the 
number of tasks is usually a good indicator. Thus, it would be useful for Spark 
scheduler to block a job whose number of tasks reaches a configured limit. By 
default, the limit could be just infinite, to retain the existing behavior.

MapReduce has mapreduce.job.max.map and mapreduce.job.max.reduce to be 
configured, which blocks a MR job at job submission time.

The proposed configuration is spark.job.max.tasks with a default value -1 
(infinite).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18769) Spark to be smarter about what the upper bound is and to restrict number of executor when dynamic allocation is enabled

2017-03-01 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15891610#comment-15891610
 ] 

Xuefu Zhang commented on SPARK-18769:
-

Just as fyi, the problem is real and happens when allocation attempts are made 
as long as there are pending tasks (which is in line with dynamic allocation). 
However, it's pointless when all containers are already taken and further 
attempts are very likely in vain, which adversely creates pressure on event 
processing in Spark driver and may also have impact on YARN RM.

I don't know what's the best solution for this, Maybe Spark can just try to 
allocate all it needs upfront and update (tune down) the allocation request as 
the job progresses when necessary. 

As a workaround, we have to set an artificial upper limit (something like 
2500), which helps a lot.

>  Spark to be smarter about what the upper bound is and to restrict number of 
> executor when dynamic allocation is enabled
> 
>
> Key: SPARK-18769
> URL: https://issues.apache.org/jira/browse/SPARK-18769
> Project: Spark
>  Issue Type: New Feature
>Reporter: Neerja Khattar
>
> Currently when dynamic allocation is enabled max.executor is infinite and 
> spark creates so many executor and even exceed the yarn nodemanager memory 
> limit and vcores.
> It should have a check to not exceed more that yarn resource limit.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2421) Spark should treat writable as serializable for keys

2016-02-16 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148684#comment-15148684
 ] 

Xuefu Zhang commented on SPARK-2421:


[~sowen], I saw you had closed this without giving any explanation. Do you mind 
sharing?

> Spark should treat writable as serializable for keys
> 
>
> Key: SPARK-2421
> URL: https://issues.apache.org/jira/browse/SPARK-2421
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output, Java API
>Affects Versions: 1.0.0
>Reporter: Xuefu Zhang
>
> It seems that Spark requires the key be serializable (class implement 
> Serializable interface). In Hadoop world, Writable interface is used for the 
> same purpose. A lot of existing classes, while writable, are not considered 
> by Spark as Serializable. It would be nice if Spark can treate Writable as 
> serializable and automatically serialize and de-serialize these classes using 
> writable interface.
> This is identified in HIVE-7279, but its benefits are seen global.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5377) Dynamically add jar into Spark Driver's classpath.

2016-02-16 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148675#comment-15148675
 ] 

Xuefu Zhang commented on SPARK-5377:


[~sowen], I saw you had closed this without giving any explanation. Do you mind 
sharing?

> Dynamically add jar into Spark Driver's classpath.
> --
>
> Key: SPARK-5377
> URL: https://issues.apache.org/jira/browse/SPARK-5377
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 1.2.0
>Reporter: Chengxiang Li
>
> Spark support dynamically add jar to executor classpath through 
> SparkContext::addJar(), while it does not support dynamically add jar into 
> driver classpath. In most case(if not all the case), user dynamically add jar 
> with SparkContext::addJar()  because some classes from the jar would be 
> referred in upcoming Spark job, which means the classes need to be loaded in 
> Spark driver side either,e.g during serialization. I think it make sense to 
> add an API to add jar into driver classpath, or just make it available in 
> SparkContext::addJar(). HIVE-9410 is a real case from Hive on Spark.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3621) Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access

2015-03-02 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14343977#comment-14343977
 ] 

Xuefu Zhang commented on SPARK-3621:


{quote}
 you can go a step further if you wanted to and just read the data directly 
from HDFS into a (singleton) cache on the executor.
{quote}
Yeah. Sharing cache in an executor is something we wanted to do but it seems a 
little complicated due to concurrency and lack of job boundary at executor.

 Provide a way to broadcast an RDD (instead of just a variable made of the 
 RDD) so that a job can access
 ---

 Key: SPARK-3621
 URL: https://issues.apache.org/jira/browse/SPARK-3621
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0, 1.1.0
Reporter: Xuefu Zhang

 In some cases, such as Hive's way of doing map-side join, it would be 
 benefcial to allow client program to broadcast RDDs rather than just 
 variables made of these RDDs. Broadcasting a variable made of RDDs requires 
 all RDD data be collected to the driver and that the variable be shipped to 
 the cluster after being made. It would be more performing if driver just 
 broadcasts the RDDs and uses the corresponding data in jobs (such building 
 hashmaps at executors).
 Tez has a broadcast edge which can ship data from previous stage to the next 
 stage, which doesn't require driver side processing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3621) Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access

2015-03-02 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14343582#comment-14343582
 ] 

Xuefu Zhang commented on SPARK-3621:


For Hive's map join, we create a hash table out of a small table (HadoopRDD in 
spark's term) after some transformations. We want to broadcast the hash table 
(which is written to HDFS) such that each executor will be able to access it to 
do the join. We thought of spark's broadcast variable for this purpose. 
However, Spark's broadcast variable will ship the data to the driver and then 
broadcast to every executor. We wanted to avoid this extra trip since the hash 
table is already in HDFS. Thus, we wanted a mechanism to broadcast the dataset 
and make the dataset available (even better if in memory) at each executor, w/o 
shipping the dataset back to the driver. Referring this dataset as an RDD might 
have caused the confusion at the first place.

Currently, we worked around the problem by calling SparkContext.addFile() at 
the driver and accessing it using SparkFiles.get() at the executor.

 Provide a way to broadcast an RDD (instead of just a variable made of the 
 RDD) so that a job can access
 ---

 Key: SPARK-3621
 URL: https://issues.apache.org/jira/browse/SPARK-3621
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0, 1.1.0
Reporter: Xuefu Zhang

 In some cases, such as Hive's way of doing map-side join, it would be 
 benefcial to allow client program to broadcast RDDs rather than just 
 variables made of these RDDs. Broadcasting a variable made of RDDs requires 
 all RDD data be collected to the driver and that the variable be shipped to 
 the cluster after being made. It would be more performing if driver just 
 broadcasts the RDDs and uses the corresponding data in jobs (such building 
 hashmaps at executors).
 Tez has a broadcast edge which can ship data from previous stage to the next 
 stage, which doesn't require driver side processing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3621) Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access

2015-03-02 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14343655#comment-14343655
 ] 

Xuefu Zhang commented on SPARK-3621:


addFile() can take a HDFS file, for which case, no file is shipped from the 
driver. And getFile() guarantees that the file is only copied once to local 
node per executor.

 Provide a way to broadcast an RDD (instead of just a variable made of the 
 RDD) so that a job can access
 ---

 Key: SPARK-3621
 URL: https://issues.apache.org/jira/browse/SPARK-3621
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0, 1.1.0
Reporter: Xuefu Zhang

 In some cases, such as Hive's way of doing map-side join, it would be 
 benefcial to allow client program to broadcast RDDs rather than just 
 variables made of these RDDs. Broadcasting a variable made of RDDs requires 
 all RDD data be collected to the driver and that the variable be shipped to 
 the cluster after being made. It would be more performing if driver just 
 broadcasts the RDDs and uses the corresponding data in jobs (such building 
 hashmaps at executors).
 Tez has a broadcast edge which can ship data from previous stage to the next 
 stage, which doesn't require driver side processing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3691) Provide a mini cluster for testing system built on Spark

2015-02-27 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14340950#comment-14340950
 ] 

Xuefu Zhang commented on SPARK-3691:


Hive is using spark.master=local-cluster for unit tests. It also built a mini 
yarn cluster to test Hive on Spark. Thus, we don't need this any more.

 Provide a mini cluster for testing system built on Spark
 

 Key: SPARK-3691
 URL: https://issues.apache.org/jira/browse/SPARK-3691
 Project: Spark
  Issue Type: Test
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Xuefu Zhang

 Most Hadoop components such MR, DFS, Tez, and Yarn provide a mini cluster 
 that can be used to test the external systems that rely on those frameworks, 
 such as Pig and Hive. While Spark's local mode can be used to do such testing 
 and is friendly for debugging, it's too far from a real Spark cluster and a 
 lot of problems cannot be discovered. Thus, an equivalent of Hadoop MR mini 
 cluster in Spark would be very helpful in testing system such as Hive/Pig on 
 Spark.
 Spark's local-cluster is considered for this purpose but it doesn't fit well 
 because it requires a Spark installation on the box where the tests run. 
 Also, local-cluster isn't exposed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-3691) Provide a mini cluster for testing system built on Spark

2015-02-27 Thread Xuefu Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuefu Zhang closed SPARK-3691.
--
Resolution: Won't Fix

 Provide a mini cluster for testing system built on Spark
 

 Key: SPARK-3691
 URL: https://issues.apache.org/jira/browse/SPARK-3691
 Project: Spark
  Issue Type: Test
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Xuefu Zhang

 Most Hadoop components such MR, DFS, Tez, and Yarn provide a mini cluster 
 that can be used to test the external systems that rely on those frameworks, 
 such as Pig and Hive. While Spark's local mode can be used to do such testing 
 and is friendly for debugging, it's too far from a real Spark cluster and a 
 lot of problems cannot be discovered. Thus, an equivalent of Hadoop MR mini 
 cluster in Spark would be very helpful in testing system such as Hive/Pig on 
 Spark.
 Spark's local-cluster is considered for this purpose but it doesn't fit well 
 because it requires a Spark installation on the box where the tests run. 
 Also, local-cluster isn't exposed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5439) Expose yarn app id for yarn mode

2015-01-27 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14294406#comment-14294406
 ] 

Xuefu Zhang commented on SPARK-5439:


Yeah. This is certainly something desirable in Hive on Spark. As far as I know, 
Hive isn't reporting the application ID to user presently.
cc: [~chengxiang li]

 Expose yarn app id for yarn mode
 

 Key: SPARK-5439
 URL: https://issues.apache.org/jira/browse/SPARK-5439
 Project: Spark
  Issue Type: Improvement
  Components: YARN
Affects Versions: 1.0.0
Reporter: bc Wong

 When submitting Spark apps on YARN, the caller should be able to get back the 
 YARN app ID programmatically. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2688) Need a way to run multiple data pipeline concurrently

2015-01-26 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14292448#comment-14292448
 ] 

Xuefu Zhang commented on SPARK-2688:


Yeah. We don't need a syntactic suger, but a transformation that just does one 
pass of the input RDD. This has performance implications on Hive's multi-insert 
use cases.

 Need a way to run multiple data pipeline concurrently
 -

 Key: SPARK-2688
 URL: https://issues.apache.org/jira/browse/SPARK-2688
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Affects Versions: 1.0.1
Reporter: Xuefu Zhang

 Suppose we want to do the following data processing: 
 {code}
 rdd1 - rdd2 - rdd3
| - rdd4
| - rdd5
\ - rdd6
 {code}
 where - represents a transformation. rdd3 to rrdd6 are all derived from an 
 intermediate rdd2. We use foreach(fn) with a dummy function to trigger the 
 execution. However, rdd.foreach(fn) only trigger pipeline rdd1 - rdd2 - 
 rdd3. To make things worse, when we call rdd4.foreach(), rdd2 will be 
 recomputed. This is very inefficient. Ideally, we should be able to trigger 
 the execution the whole graph and reuse rdd2, but there doesn't seem to be a 
 way doing so. Tez already realized the importance of this (TEZ-391), so I 
 think Spark should provide this too.
 This is required for Hive to support multi-insert queries. HIVE-7292.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2688) Need a way to run multiple data pipeline concurrently

2015-01-26 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14292415#comment-14292415
 ] 

Xuefu Zhang commented on SPARK-2688:


#1 above is exactly what Hive needs badly.

 Need a way to run multiple data pipeline concurrently
 -

 Key: SPARK-2688
 URL: https://issues.apache.org/jira/browse/SPARK-2688
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Affects Versions: 1.0.1
Reporter: Xuefu Zhang

 Suppose we want to do the following data processing: 
 {code}
 rdd1 - rdd2 - rdd3
| - rdd4
| - rdd5
\ - rdd6
 {code}
 where - represents a transformation. rdd3 to rrdd6 are all derived from an 
 intermediate rdd2. We use foreach(fn) with a dummy function to trigger the 
 execution. However, rdd.foreach(fn) only trigger pipeline rdd1 - rdd2 - 
 rdd3. To make things worse, when we call rdd4.foreach(), rdd2 will be 
 recomputed. This is very inefficient. Ideally, we should be able to trigger 
 the execution the whole graph and reuse rdd2, but there doesn't seem to be a 
 way doing so. Tez already realized the importance of this (TEZ-391), so I 
 think Spark should provide this too.
 This is required for Hive to support multi-insert queries. HIVE-7292.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3621) Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access

2015-01-25 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14291145#comment-14291145
 ] 

Xuefu Zhang commented on SPARK-3621:


I'm not sure if I agree that this is not a problem. To broadcast is to make 
certain dataset available to all nodes in the cluster. Existing broadcast 
functionality is limited to broadcast data in the driver, while this 
improvement requests that datasets, which already exists in the cluster, be 
broadcast to all nodes without requiring shipping that dataset from the cluster 
to the driver and then to all nodes in the cluster again.

Improvement is never a problem if we are not open to it. If for some reason 
this cannot be done, we need to understand the reason.

 Provide a way to broadcast an RDD (instead of just a variable made of the 
 RDD) so that a job can access
 ---

 Key: SPARK-3621
 URL: https://issues.apache.org/jira/browse/SPARK-3621
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0, 1.1.0
Reporter: Xuefu Zhang

 In some cases, such as Hive's way of doing map-side join, it would be 
 benefcial to allow client program to broadcast RDDs rather than just 
 variables made of these RDDs. Broadcasting a variable made of RDDs requires 
 all RDD data be collected to the driver and that the variable be shipped to 
 the cluster after being made. It would be more performing if driver just 
 broadcasts the RDDs and uses the corresponding data in jobs (such building 
 hashmaps at executors).
 Tez has a broadcast edge which can ship data from previous stage to the next 
 stage, which doesn't require driver side processing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-3621) Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access

2015-01-25 Thread Xuefu Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuefu Zhang reopened SPARK-3621:


 Provide a way to broadcast an RDD (instead of just a variable made of the 
 RDD) so that a job can access
 ---

 Key: SPARK-3621
 URL: https://issues.apache.org/jira/browse/SPARK-3621
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0, 1.1.0
Reporter: Xuefu Zhang

 In some cases, such as Hive's way of doing map-side join, it would be 
 benefcial to allow client program to broadcast RDDs rather than just 
 variables made of these RDDs. Broadcasting a variable made of RDDs requires 
 all RDD data be collected to the driver and that the variable be shipped to 
 the cluster after being made. It would be more performing if driver just 
 broadcasts the RDDs and uses the corresponding data in jobs (such building 
 hashmaps at executors).
 Tez has a broadcast edge which can ship data from previous stage to the next 
 stage, which doesn't require driver side processing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2688) Need a way to run multiple data pipeline concurrently

2015-01-25 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14291134#comment-14291134
 ] 

Xuefu Zhang commented on SPARK-2688:


I think SPARK-3622 is related to this JIRA but not exactly the same. This JIRA 
essentially asks capability of executing a random DAG built of RDDs, while 
SPARK-3622 is requesting a custom transformation that can take one input RDD 
and generates multiple output RDDs. HIve on Spark projects needs this because 
HIve's map-side or reduce-side processing (which is translated to Spark's map 
functions) generates multiple outputs in general. On this ground, SPARK-3622 is 
more important than SPARK-2688.

It's worth to mention that such a custom transformation can be used in building 
a random DAG.


 Need a way to run multiple data pipeline concurrently
 -

 Key: SPARK-2688
 URL: https://issues.apache.org/jira/browse/SPARK-2688
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Affects Versions: 1.0.1
Reporter: Xuefu Zhang

 Suppose we want to do the following data processing: 
 {code}
 rdd1 - rdd2 - rdd3
| - rdd4
| - rdd5
\ - rdd6
 {code}
 where - represents a transformation. rdd3 to rrdd6 are all derived from an 
 intermediate rdd2. We use foreach(fn) with a dummy function to trigger the 
 execution. However, rdd.foreach(fn) only trigger pipeline rdd1 - rdd2 - 
 rdd3. To make things worse, when we call rdd4.foreach(), rdd2 will be 
 recomputed. This is very inefficient. Ideally, we should be able to trigger 
 the execution the whole graph and reuse rdd2, but there doesn't seem to be a 
 way doing so. Tez already realized the importance of this (TEZ-391), so I 
 think Spark should provide this too.
 This is required for Hive to support multi-insert queries. HIVE-7292.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3622) Provide a custom transformation that can output multiple RDDs

2015-01-22 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14287901#comment-14287901
 ] 

Xuefu Zhang commented on SPARK-3622:


Unfortunately no. In Hive the problem is more general than just wring to 
different files. What Hive needs is some transformation that can take multiple 
map functions and output multiple RDDs from an input RDD. These result RDDs may 
be further processed than just simply being written to files.

 Provide a custom transformation that can output multiple RDDs
 -

 Key: SPARK-3622
 URL: https://issues.apache.org/jira/browse/SPARK-3622
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Xuefu Zhang

 All existing transformations return just one RDD at most, even for those 
 which takes user-supplied functions such as mapPartitions() . However, 
 sometimes a user provided function may need to output multiple RDDs. For 
 instance, a filter function that divides the input RDD into serveral RDDs. 
 While it's possible to get multiple RDDs by transforming the same RDD 
 multiple times, it may be more efficient to do this concurrently in one shot. 
 Especially user's existing function is already generating different data sets.
 This the case in Hive on Spark, where Hive's map function and reduce function 
 can output different data sets to be consumed by subsequent stages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5377) Dynamically add jar into Spark Driver's classpath.

2015-01-22 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14288866#comment-14288866
 ] 

Xuefu Zhang commented on SPARK-5377:


cc [~sandyr].

 Dynamically add jar into Spark Driver's classpath.
 --

 Key: SPARK-5377
 URL: https://issues.apache.org/jira/browse/SPARK-5377
 Project: Spark
  Issue Type: Improvement
Affects Versions: 1.2.0
Reporter: Chengxiang Li

 Spark support dynamically add jar to executor classpath through 
 SparkContext::addJar(), while it does not support dynamically add jar into 
 driver classpath. In most case(if not all the case), user dynamically add jar 
 with SparkContext::addJar()  because some classes from the jar would be 
 referred in upcoming Spark job, which means the classes need to be loaded in 
 Spark driver side either,e.g during serialization. I think it make sense to 
 add an API to add jar into driver classpath, or just make it available in 
 SparkContext::addJar(). HIVE-9410 is a real case from Hive on Spark.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1021) sortByKey() launches a cluster job when it shouldn't

2015-01-16 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14281168#comment-14281168
 ] 

Xuefu Zhang commented on SPARK-1021:


This problem also occurred on Hive on Spark (HIVE-9370. Could we take this 
forward?

 sortByKey() launches a cluster job when it shouldn't
 

 Key: SPARK-1021
 URL: https://issues.apache.org/jira/browse/SPARK-1021
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core
Affects Versions: 0.8.0, 0.9.0, 1.0.0, 1.1.0
Reporter: Andrew Ash
Assignee: Erik Erlandson
  Labels: starter

 The sortByKey() method is listed as a transformation, not an action, in the 
 documentation.  But it launches a cluster job regardless.
 http://spark.incubator.apache.org/docs/latest/scala-programming-guide.html
 Some discussion on the mailing list suggested that this is a problem with the 
 rdd.count() call inside Partitioner.scala's rangeBounds method.
 https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala#L102
 Josh Rosen suggests that rangeBounds should be made into a lazy variable:
 {quote}
 I wonder whether making RangePartitoner .rangeBounds into a lazy val would 
 fix this 
 (https://github.com/apache/incubator-spark/blob/6169fe14a140146602fb07cfcd13eee6efad98f9/core/src/main/scala/org/apache/spark/Partitioner.scala#L95).
   We'd need to make sure that rangeBounds() is never called before an action 
 is performed.  This could be tricky because it's called in the 
 RangePartitioner.equals() method.  Maybe it's sufficient to just compare the 
 number of partitions, the ids of the RDDs used to create the 
 RangePartitioner, and the sort ordering.  This still supports the case where 
 I range-partition one RDD and pass the same partitioner to a different RDD.  
 It breaks support for the case where two range partitioners created on 
 different RDDs happened to have the same rangeBounds(), but it seems unlikely 
 that this would really harm performance since it's probably unlikely that the 
 range partitioners are equal by chance.
 {quote}
 Can we please make this happen?  I'll send a PR on GitHub to start the 
 discussion and testing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5080) Expose more cluster resource information to user

2015-01-07 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14268786#comment-14268786
 ] 

Xuefu Zhang commented on SPARK-5080:


cc: [~sandyr]

 Expose more cluster resource information to user
 

 Key: SPARK-5080
 URL: https://issues.apache.org/jira/browse/SPARK-5080
 Project: Spark
  Issue Type: Improvement
Reporter: Rui Li

 It'll be useful if user can get detailed cluster resource info, e.g. 
 granted/allocated executors, memory and CPU.
 Such information is available via WebUI but seems SparkContext doesn't have 
 these APIs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4921) Performance issue caused by TaskSetManager returning PROCESS_LOCAL for NO_PREF tasks

2014-12-30 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14261436#comment-14261436
 ] 

Xuefu Zhang commented on SPARK-4921:


Some will be NODE_LOCAL, but others will be NO_PERF. Returning PROCESS_LOCAL 
seems at least confusing. As to performance implication, maybe [~lirui] can 
further confirm.



 Performance issue caused by TaskSetManager returning  PROCESS_LOCAL for 
 NO_PREF tasks
 -

 Key: SPARK-4921
 URL: https://issues.apache.org/jira/browse/SPARK-4921
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.2.0
Reporter: Xuefu Zhang
 Attachments: NO_PREF.patch


 During research for HIVE-9153, we found that TaskSetManager returns 
 PROCESS_LOCAL for NO_PREF tasks, which may caused performance degradation. 
 Changing the return value to NO_PREF, as demonstrated in the attached patch, 
 seemingly improves the performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2387) Remove the stage barrier for better resource utilization

2014-12-24 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14258626#comment-14258626
 ] 

Xuefu Zhang commented on SPARK-2387:


cc: [~sandyr]

I think the purpose of this enhancement is still valuable, even if the solution 
here isn't perfect: Tez does this, and this improves performance. Better 
proposal is greatly welcome.

 Remove the stage barrier for better resource utilization
 

 Key: SPARK-2387
 URL: https://issues.apache.org/jira/browse/SPARK-2387
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Reporter: Rui Li

 DAGScheduler divides a Spark job into multiple stages according to RDD 
 dependencies. Whenever there’s a shuffle dependency, DAGScheduler creates a 
 shuffle map stage on the map side, and another stage depending on that stage.
 Currently, the downstream stage cannot start until all its depended stages 
 have finished. This barrier between stages leads to idle slots when waiting 
 for the last few upstream tasks to finish and thus wasting cluster resources.
 Therefore we propose to remove the barrier and pre-start the reduce stage 
 once there're free slots. This can achieve better resource utilization and 
 improve the overall job performance, especially when there're lots of 
 executors granted to the application.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4921) Performance issue caused by TaskSetManager returning PROCESS_LOCAL for NO_PREF tasks

2014-12-22 Thread Xuefu Zhang (JIRA)
Xuefu Zhang created SPARK-4921:
--

 Summary: Performance issue caused by TaskSetManager returning  
PROCESS_LOCAL for NO_PREF tasks
 Key: SPARK-4921
 URL: https://issues.apache.org/jira/browse/SPARK-4921
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.2.0
Reporter: Xuefu Zhang


During research for HIVE-9153, we found that TaskSetManager returns 
PROCESS_LOCAL for NO_PREF tasks, which may caused performance degradation. 
Changing the return value to NO_PREF, as demonstrated in the attached patch, 
seemingly improves the performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4921) Performance issue caused by TaskSetManager returning PROCESS_LOCAL for NO_PREF tasks

2014-12-22 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14256015#comment-14256015
 ] 

Xuefu Zhang commented on SPARK-4921:


cc: [~lirui], [~sandyr]

 Performance issue caused by TaskSetManager returning  PROCESS_LOCAL for 
 NO_PREF tasks
 -

 Key: SPARK-4921
 URL: https://issues.apache.org/jira/browse/SPARK-4921
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.2.0
Reporter: Xuefu Zhang

 During research for HIVE-9153, we found that TaskSetManager returns 
 PROCESS_LOCAL for NO_PREF tasks, which may caused performance degradation. 
 Changing the return value to NO_PREF, as demonstrated in the attached patch, 
 seemingly improves the performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-4921) Performance issue caused by TaskSetManager returning PROCESS_LOCAL for NO_PREF tasks

2014-12-22 Thread Xuefu Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuefu Zhang updated SPARK-4921:
---
Attachment: NO_PREF.patch

 Performance issue caused by TaskSetManager returning  PROCESS_LOCAL for 
 NO_PREF tasks
 -

 Key: SPARK-4921
 URL: https://issues.apache.org/jira/browse/SPARK-4921
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.2.0
Reporter: Xuefu Zhang
 Attachments: NO_PREF.patch


 During research for HIVE-9153, we found that TaskSetManager returns 
 PROCESS_LOCAL for NO_PREF tasks, which may caused performance degradation. 
 Changing the return value to NO_PREF, as demonstrated in the attached patch, 
 seemingly improves the performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4687) SparkContext#addFile doesn't keep file folder information

2014-12-01 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14230873#comment-14230873
 ] 

Xuefu Zhang commented on SPARK-4687:


[~jxiang], alternatively, would a new method, SparkContext.addFolder(), better 
fit to this use case?

CC [~sandyr], [~rxin].

 SparkContext#addFile doesn't keep file folder information
 -

 Key: SPARK-4687
 URL: https://issues.apache.org/jira/browse/SPARK-4687
 Project: Spark
  Issue Type: Bug
Reporter: Jimmy Xiang

 Files added with SparkContext#addFile are loaded with Utils#fetchFile before 
 a task starts. However, Utils#fetchFile puts all files under the Spart root 
 on the worker node. We should have an option to keep the folder information. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4567) Make SparkJobInfo and SparkStageInfo serializable

2014-11-24 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14222929#comment-14222929
 ] 

Xuefu Zhang commented on SPARK-4567:


{quote}
please don't set the FixVersion field. This field is only set when the patch 
is merged.
{quote}

Hi [~pwendell], Thanks for pointing this out. I put that by accident. Thanks.

 Make SparkJobInfo and SparkStageInfo serializable
 -

 Key: SPARK-4567
 URL: https://issues.apache.org/jira/browse/SPARK-4567
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: Xuefu Zhang

 SPARK-2321 introduced the two classes, which unforutnately were not made 
 serailizable. From the class definition, there seems no obstacle to make it 
 serializable for client to use as these classes are just wrappers of numbers 
 and strings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4440) Enhance the job progress API to expose more information

2014-11-23 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14222549#comment-14222549
 ] 

Xuefu Zhang commented on SPARK-4440:


CC: [~sandyr], [~rxin]

 Enhance the job progress API to expose more information
 ---

 Key: SPARK-4440
 URL: https://issues.apache.org/jira/browse/SPARK-4440
 Project: Spark
  Issue Type: Improvement
Reporter: Rui Li

 The progress API introduced in SPARK-2321 provides a new way for user to 
 monitor job progress. However the information exposed in the API is 
 relatively limited. It'll be much more useful if we can enhance the API to 
 expose more data.
 Some improvement for example may include but not limited to:
 1. Stage submission and completion time.
 2. Task metrics.
 The requirement is initially identified for the hive on spark 
 project(HIVE-7292), other application should benefit as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4567) Make SparkJobInfo and SparkStageInfo serializable

2014-11-23 Thread Xuefu Zhang (JIRA)
Xuefu Zhang created SPARK-4567:
--

 Summary: Make SparkJobInfo and SparkStageInfo serializable
 Key: SPARK-4567
 URL: https://issues.apache.org/jira/browse/SPARK-4567
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: Xuefu Zhang
 Fix For: 1.2.0


SPARK-2321 introduced the two classes, which unforutnately were not made 
serailizable. From the class definition, there seems no obstacle to make it 
serializable for client to use as these classes are just wrappers of numbers 
and strings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4567) Make SparkJobInfo and SparkStageInfo serializable

2014-11-23 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14222591#comment-14222591
 ] 

Xuefu Zhang commented on SPARK-4567:


CC: [~sandyr], [~rxin]

 Make SparkJobInfo and SparkStageInfo serializable
 -

 Key: SPARK-4567
 URL: https://issues.apache.org/jira/browse/SPARK-4567
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: Xuefu Zhang
 Fix For: 1.2.0


 SPARK-2321 introduced the two classes, which unforutnately were not made 
 serailizable. From the class definition, there seems no obstacle to make it 
 serializable for client to use as these classes are just wrappers of numbers 
 and strings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2321) Design a proper progress reporting event listener API

2014-11-23 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14222593#comment-14222593
 ] 

Xuefu Zhang commented on SPARK-2321:


I have created SPARK-4567 to track the request of making SparkJobInfo and 
SparkStageInfo serializable.

 Design a proper progress reporting  event listener API
 ---

 Key: SPARK-2321
 URL: https://issues.apache.org/jira/browse/SPARK-2321
 Project: Spark
  Issue Type: Improvement
  Components: Java API, Spark Core
Affects Versions: 1.0.0
Reporter: Reynold Xin
Assignee: Josh Rosen
Priority: Critical
 Fix For: 1.2.0


 This is a ticket to track progress on redesigning the SparkListener and 
 JobProgressListener API.
 There are multiple problems with the current design, including:
 0. I'm not sure if the API is usable in Java (there are at least some enums 
 we used in Scala and a bunch of case classes that might complicate things).
 1. The whole API is marked as DeveloperApi, because we haven't paid a lot of 
 attention to it yet. Something as important as progress reporting deserves a 
 more stable API.
 2. There is no easy way to connect jobs with stages. Similarly, there is no 
 easy way to connect job groups with jobs / stages.
 3. JobProgressListener itself has no encapsulation at all. States can be 
 arbitrarily mutated by external programs. Variable names are sort of randomly 
 decided and inconsistent. 
 We should just revisit these and propose a new, concrete design. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4290) Provide an equivalent functionality of distributed cache as MR does

2014-11-06 Thread Xuefu Zhang (JIRA)
Xuefu Zhang created SPARK-4290:
--

 Summary: Provide an equivalent functionality of distributed cache 
as MR does
 Key: SPARK-4290
 URL: https://issues.apache.org/jira/browse/SPARK-4290
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: Xuefu Zhang


MapReduce allows client to specify files to be put in distributed cache for a 
job and the framework guarentees that the file will be available in local file 
system of a node where a task of the job runs and before the tasks actually 
starts. While this might be achieved with Yarn via hacks, it's not available in 
other clusters. It would be nice to have such an equivalent functionality like 
this in Spark.

It would also complement Spark's broadcast variable, which may not be suitable 
in certain scenarios.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4290) Provide an equivalent functionality of distributed cache as MR does

2014-11-06 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201421#comment-14201421
 ] 

Xuefu Zhang commented on SPARK-4290:


CC: [~sandyr], [~rxin]


 Provide an equivalent functionality of distributed cache as MR does
 ---

 Key: SPARK-4290
 URL: https://issues.apache.org/jira/browse/SPARK-4290
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: Xuefu Zhang

 MapReduce allows client to specify files to be put in distributed cache for a 
 job and the framework guarentees that the file will be available in local 
 file system of a node where a task of the job runs and before the tasks 
 actually starts. While this might be achieved with Yarn via hacks, it's not 
 available in other clusters. It would be nice to have such an equivalent 
 functionality like this in Spark.
 It would also complement Spark's broadcast variable, which may not be 
 suitable in certain scenarios.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4290) Provide an equivalent functionality of distributed cache as MR does

2014-11-06 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201496#comment-14201496
 ] 

Xuefu Zhang commented on SPARK-4290:


Hi [~rxin], by out of box, do you mean 
org.apache.hadoop.filecache.DistributedCache [1]? This is a MapReduce client 
class, which is used when you submit a MR job. It basically tell MR framework 
that your job needs these files put in distributed cache in order to run. Thus, 
MR framework will copy these files to local file system of the tasks. The task 
can access the local files via syslinks.

I don't know how this can be used out of box. First, Hive on Spark user may not 
have MR client library. Secondly, there isn't MR framework that does the 
copying.

Do you have an example on how I might achieve this?

[1] 
http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/filecache/DistributedCache.html

 Provide an equivalent functionality of distributed cache as MR does
 ---

 Key: SPARK-4290
 URL: https://issues.apache.org/jira/browse/SPARK-4290
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: Xuefu Zhang

 MapReduce allows client to specify files to be put in distributed cache for a 
 job and the framework guarentees that the file will be available in local 
 file system of a node where a task of the job runs and before the tasks 
 actually starts. While this might be achieved with Yarn via hacks, it's not 
 available in other clusters. It would be nice to have such an equivalent 
 functionality like this in Spark.
 It would also complement Spark's broadcast variable, which may not be 
 suitable in certain scenarios.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4290) Provide an equivalent functionality of distributed cache as MR does

2014-11-06 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201561#comment-14201561
 ] 

Xuefu Zhang commented on SPARK-4290:


Hi [~rxin], from the documentation of above java class, I read
{quote}
Its efficiency stems from the fact that the files are only copied once per job 
and the ability to cache archives which are un-archived on the slaves.
{quote}

Two things are suggested:
1. one copy per job. If multiple tasks of a job running on a node, there is  
still only one copy from HDFS to local. 
2. unarchive the archives on the slaves.

In your #2, it will be desirable it can achieve one copy per job (not one copy 
per request). That is, subsequent request for the file from other tasks of the 
same job will directly read from local.

Of course, this is something on the surface. Actual implementaton can be much 
more complicated.

 Provide an equivalent functionality of distributed cache as MR does
 ---

 Key: SPARK-4290
 URL: https://issues.apache.org/jira/browse/SPARK-4290
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: Xuefu Zhang

 MapReduce allows client to specify files to be put in distributed cache for a 
 job and the framework guarentees that the file will be available in local 
 file system of a node where a task of the job runs and before the tasks 
 actually starts. While this might be achieved with Yarn via hacks, it's not 
 available in other clusters. It would be nice to have such an equivalent 
 functionality like this in Spark.
 It would also complement Spark's broadcast variable, which may not be 
 suitable in certain scenarios.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4290) Provide an equivalent functionality of distributed cache as MR does

2014-11-06 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201621#comment-14201621
 ] 

Xuefu Zhang commented on SPARK-4290:


Yes, SparkContext#addFile() seems to be what we need. If the files can be more 
efficiently broadcast to every executor, that's even better than distributed 
cache. In the meantime, we can set a large replication factor for the files to 
mitigate the problem.

 Provide an equivalent functionality of distributed cache as MR does
 ---

 Key: SPARK-4290
 URL: https://issues.apache.org/jira/browse/SPARK-4290
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: Xuefu Zhang

 MapReduce allows client to specify files to be put in distributed cache for a 
 job and the framework guarentees that the file will be available in local 
 file system of a node where a task of the job runs and before the tasks 
 actually starts. While this might be achieved with Yarn via hacks, it's not 
 available in other clusters. It would be nice to have such an equivalent 
 functionality like this in Spark.
 It would also complement Spark's broadcast variable, which may not be 
 suitable in certain scenarios.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4290) Provide an equivalent functionality of distributed cache as MR does

2014-11-06 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201621#comment-14201621
 ] 

Xuefu Zhang edited comment on SPARK-4290 at 11/7/14 5:37 AM:
-

Yes, SparkContext#addFile() seems to be what we need. If the files can be more 
efficiently broadcast to every executor, that's even better than distributed 
cache. In the meantime, we can set a large replication factor for the files to 
mitigate the problem.

To clarify, [~sandyr], [~rxin], do files added via SparkContext.addFile() get 
automatically downloaded to executor, or SparkFiles.get() has to be called in 
order to make that happen?


was (Author: xuefuz):
Yes, SparkContext#addFile() seems to be what we need. If the files can be more 
efficiently broadcast to every executor, that's even better than distributed 
cache. In the meantime, we can set a large replication factor for the files to 
mitigate the problem.

 Provide an equivalent functionality of distributed cache as MR does
 ---

 Key: SPARK-4290
 URL: https://issues.apache.org/jira/browse/SPARK-4290
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: Xuefu Zhang

 MapReduce allows client to specify files to be put in distributed cache for a 
 job and the framework guarentees that the file will be available in local 
 file system of a node where a task of the job runs and before the tasks 
 actually starts. While this might be achieved with Yarn via hacks, it's not 
 available in other clusters. It would be nice to have such an equivalent 
 functionality like this in Spark.
 It would also complement Spark's broadcast variable, which may not be 
 suitable in certain scenarios.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-3691) Provide a mini cluster for testing system built on Spark

2014-09-25 Thread Xuefu Zhang (JIRA)
Xuefu Zhang created SPARK-3691:
--

 Summary: Provide a mini cluster for testing system built on Spark
 Key: SPARK-3691
 URL: https://issues.apache.org/jira/browse/SPARK-3691
 Project: Spark
  Issue Type: Test
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Xuefu Zhang


Most Hadoop components such MR, DFS, Tez, and Yarn provide a mini cluster that 
can be used to test the external systems that rely on those frameworks, such as 
Pig and Hive. While Spark's local mode can be used to do such testing and is 
friendly for debugging, it's too far from a real Spark cluster and a lot of 
problems cannot be discovered. Thus, an equivalent of Hadoop MR mini cluster in 
Spark would be very helpful in testing system such as Hive/Pig on Spark.

Spark's local-cluster is considered for this purpose but it doesn't fit well 
because it requires a Spark installation on the box where the tests run. Also, 
local-cluster isn't exposed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3691) Provide a mini cluster for testing system built on Spark

2014-09-25 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148071#comment-14148071
 ] 

Xuefu Zhang commented on SPARK-3691:


cc [~sandyr]

 Provide a mini cluster for testing system built on Spark
 

 Key: SPARK-3691
 URL: https://issues.apache.org/jira/browse/SPARK-3691
 Project: Spark
  Issue Type: Test
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Xuefu Zhang

 Most Hadoop components such MR, DFS, Tez, and Yarn provide a mini cluster 
 that can be used to test the external systems that rely on those frameworks, 
 such as Pig and Hive. While Spark's local mode can be used to do such testing 
 and is friendly for debugging, it's too far from a real Spark cluster and a 
 lot of problems cannot be discovered. Thus, an equivalent of Hadoop MR mini 
 cluster in Spark would be very helpful in testing system such as Hive/Pig on 
 Spark.
 Spark's local-cluster is considered for this purpose but it doesn't fit well 
 because it requires a Spark installation on the box where the tests run. 
 Also, local-cluster isn't exposed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-3691) Provide a mini cluster for testing system built on Spark

2014-09-25 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148071#comment-14148071
 ] 

Xuefu Zhang edited comment on SPARK-3691 at 9/25/14 6:21 PM:
-

cc [~sandyr], [~rxin]


was (Author: xuefuz):
cc [~sandyr]

 Provide a mini cluster for testing system built on Spark
 

 Key: SPARK-3691
 URL: https://issues.apache.org/jira/browse/SPARK-3691
 Project: Spark
  Issue Type: Test
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Xuefu Zhang

 Most Hadoop components such MR, DFS, Tez, and Yarn provide a mini cluster 
 that can be used to test the external systems that rely on those frameworks, 
 such as Pig and Hive. While Spark's local mode can be used to do such testing 
 and is friendly for debugging, it's too far from a real Spark cluster and a 
 lot of problems cannot be discovered. Thus, an equivalent of Hadoop MR mini 
 cluster in Spark would be very helpful in testing system such as Hive/Pig on 
 Spark.
 Spark's local-cluster is considered for this purpose but it doesn't fit well 
 because it requires a Spark installation on the box where the tests run. 
 Also, local-cluster isn't exposed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-3693) Cached Hadoop RDD always return rows with the same value

2014-09-25 Thread Xuefu Zhang (JIRA)
Xuefu Zhang created SPARK-3693:
--

 Summary: Cached Hadoop RDD always return rows with the same value
 Key: SPARK-3693
 URL: https://issues.apache.org/jira/browse/SPARK-3693
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.2.0
Reporter: Xuefu Zhang


While trying RDD caching, it's found that caching a Hadoop RDD causes data 
correctness issues. The following code snippet demonstrates the usage:

{code}
public final class Test {
private static final Pattern SPACE = Pattern.compile( );

public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName(Test);
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
... 
JavaPairRDDBytesWritable, BytesWritable input = 
ctx.hadoopRDD(jobConf, CombineHiveInputClass.class, 
WritableComparable.class, Writable.class);
input = input.cache();
input.foreach(new VoidFunctionTuple2BytesWritable, BytesWritable() {
@Override
public void call(Tuple2BytesWritable, BytesWritable row) throws 
Exception {
if (row._1() != null) {
System.out.println(Key:  + row._1());
}
if (row._2() != null) {
System.out.println(Value:  + row._2());
}
}
});
ctx.stop();
}
}
{code}
In this case, row._2() always gives the same value. If we disable caching by 
removing input.cache(), the program gives the expected rows.

Further analysis shows that MemoryStore (see 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L236)
 is storing the references to (key, value) pairs returned by 
HadoopRDD.getNext() (See 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L220),
 but this method always returns the same (key, value) object references, except 
each getNext() call updates values inside these objects. When there are no more 
records (key, value) objects are filled with empty strings (no values) in 
CombineFileRecordReader. As all pairs in MemoryStore.vector refer to the same 
key, value object pairs, all values become NULL.

Probably MemoryStore should instead store a copy of key, value pair rather 
than keeping a reference to it.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-3693) Cached Hadoop RDD always return rows with the same value

2014-09-25 Thread Xuefu Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuefu Zhang updated SPARK-3693:
---
Description: 
While trying RDD caching, it's found that caching a Hadoop RDD causes data 
correctness issues. The following code snippet demonstrates the usage:

{code}
public final class Test {
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName(Test);
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
... 
JavaPairRDDBytesWritable, BytesWritable input = 
ctx.hadoopRDD(jobConf, CombineHiveInputClass.class, 
WritableComparable.class, Writable.class);
input = input.cache();
input.foreach(new VoidFunctionTuple2BytesWritable, BytesWritable() {
@Override
public void call(Tuple2BytesWritable, BytesWritable row) throws 
Exception {
if (row._1() != null) {
System.out.println(Key:  + row._1());
}
if (row._2() != null) {
System.out.println(Value:  + row._2());
}
}
});
ctx.stop();
}
}
{code}
In this case, row._2() always gives the same value. If we disable caching by 
removing input.cache(), the program gives the expected rows.

Further analysis shows that MemoryStore (see 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L236)
 is storing the references to (key, value) pairs returned by 
HadoopRDD.getNext() (See 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L220),
 but this method always returns the same (key, value) object references, except 
each getNext() call updates values inside these objects. When there are no more 
records (key, value) objects are filled with empty strings (no values) in 
CombineFileRecordReader. As all pairs in MemoryStore.vector refer to the same 
key, value object pairs, all values become NULL.

Probably MemoryStore should instead store a copy of key, value pair rather 
than keeping a reference to it.


  was:
While trying RDD caching, it's found that caching a Hadoop RDD causes data 
correctness issues. The following code snippet demonstrates the usage:

{code}
public final class Test {
private static final Pattern SPACE = Pattern.compile( );

public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName(Test);
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
... 
JavaPairRDDBytesWritable, BytesWritable input = 
ctx.hadoopRDD(jobConf, CombineHiveInputClass.class, 
WritableComparable.class, Writable.class);
input = input.cache();
input.foreach(new VoidFunctionTuple2BytesWritable, BytesWritable() {
@Override
public void call(Tuple2BytesWritable, BytesWritable row) throws 
Exception {
if (row._1() != null) {
System.out.println(Key:  + row._1());
}
if (row._2() != null) {
System.out.println(Value:  + row._2());
}
}
});
ctx.stop();
}
}
{code}
In this case, row._2() always gives the same value. If we disable caching by 
removing input.cache(), the program gives the expected rows.

Further analysis shows that MemoryStore (see 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L236)
 is storing the references to (key, value) pairs returned by 
HadoopRDD.getNext() (See 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L220),
 but this method always returns the same (key, value) object references, except 
each getNext() call updates values inside these objects. When there are no more 
records (key, value) objects are filled with empty strings (no values) in 
CombineFileRecordReader. As all pairs in MemoryStore.vector refer to the same 
key, value object pairs, all values become NULL.

Probably MemoryStore should instead store a copy of key, value pair rather 
than keeping a reference to it.



 Cached Hadoop RDD always return rows with the same value
 

 Key: SPARK-3693
 URL: https://issues.apache.org/jira/browse/SPARK-3693
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.2.0
Reporter: Xuefu Zhang

 While trying RDD caching, it's found that caching a Hadoop RDD causes data 
 correctness issues. The following code snippet demonstrates the usage:
 {code}
 public final class Test {
 public static void main(String[] args) throws Exception {
 SparkConf 

[jira] [Commented] (SPARK-3693) Cached Hadoop RDD always return rows with the same value

2014-09-25 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148295#comment-14148295
 ] 

Xuefu Zhang commented on SPARK-3693:


cc [~rxin], [~sandyr]

 Cached Hadoop RDD always return rows with the same value
 

 Key: SPARK-3693
 URL: https://issues.apache.org/jira/browse/SPARK-3693
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.2.0
Reporter: Xuefu Zhang

 While trying RDD caching, it's found that caching a Hadoop RDD causes data 
 correctness issues. The following code snippet demonstrates the usage:
 {code}
 public final class Test {
 public static void main(String[] args) throws Exception {
 SparkConf sparkConf = new SparkConf().setAppName(Test);
 JavaSparkContext ctx = new JavaSparkContext(sparkConf);
 ... 
 JavaPairRDDBytesWritable, BytesWritable input = 
 ctx.hadoopRDD(jobConf, CombineHiveInputClass.class, 
 WritableComparable.class, Writable.class);
 input = input.cache();
 input.foreach(new VoidFunctionTuple2BytesWritable, 
 BytesWritable() {
 @Override
 public void call(Tuple2BytesWritable, BytesWritable row) throws 
 Exception {
 if (row._1() != null) {
 System.out.println(Key:  + row._1());
 }
 if (row._2() != null) {
 System.out.println(Value:  + row._2());
 }
 }
 });
 ctx.stop();
 }
 }
 {code}
 In this case, row._2() always gives the same value. If we disable caching by 
 removing input.cache(), the program gives the expected rows.
 Further analysis shows that MemoryStore (see 
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L236)
  is storing the references to (key, value) pairs returned by 
 HadoopRDD.getNext() (See 
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L220),
  but this method always returns the same (key, value) object references, 
 except each getNext() call updates values inside these objects. When there 
 are no more records (key, value) objects are filled with empty strings (no 
 values) in CombineFileRecordReader. As all pairs in MemoryStore.vector refer 
 to the same key, value object pairs, all values become NULL.
 Probably MemoryStore should instead store a copy of key, value pair rather 
 than keeping a reference to it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3693) Cached Hadoop RDD always return rows with the same value

2014-09-25 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148318#comment-14148318
 ] 

Xuefu Zhang commented on SPARK-3693:


Thanks, guys. We are fine with the workaround.

 Cached Hadoop RDD always return rows with the same value
 

 Key: SPARK-3693
 URL: https://issues.apache.org/jira/browse/SPARK-3693
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.2.0
Reporter: Xuefu Zhang

 While trying RDD caching, it's found that caching a Hadoop RDD causes data 
 correctness issues. The following code snippet demonstrates the usage:
 {code}
 public final class Test {
 public static void main(String[] args) throws Exception {
 SparkConf sparkConf = new SparkConf().setAppName(Test);
 JavaSparkContext ctx = new JavaSparkContext(sparkConf);
 ... 
 JavaPairRDDBytesWritable, BytesWritable input = 
 ctx.hadoopRDD(jobConf, CombineHiveInputClass.class, 
 WritableComparable.class, Writable.class);
 input = input.cache();
 input.foreach(new VoidFunctionTuple2BytesWritable, 
 BytesWritable() {
 @Override
 public void call(Tuple2BytesWritable, BytesWritable row) throws 
 Exception {
 if (row._1() != null) {
 System.out.println(Key:  + row._1());
 }
 if (row._2() != null) {
 System.out.println(Value:  + row._2());
 }
 }
 });
 ctx.stop();
 }
 }
 {code}
 In this case, row._2() always gives the same value. If we disable caching by 
 removing input.cache(), the program gives the expected rows.
 Further analysis shows that MemoryStore (see 
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L236)
  is storing the references to (key, value) pairs returned by 
 HadoopRDD.getNext() (See 
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L220),
  but this method always returns the same (key, value) object references, 
 except each getNext() call updates values inside these objects. When there 
 are no more records (key, value) objects are filled with empty strings (no 
 values) in CombineFileRecordReader. As all pairs in MemoryStore.vector refer 
 to the same key, value object pairs, all values become NULL.
 Probably MemoryStore should instead store a copy of key, value pair rather 
 than keeping a reference to it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3621) Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access

2014-09-22 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143231#comment-14143231
 ] 

Xuefu Zhang commented on SPARK-3621:


In my limited understanding, to broadcast a variable made of an RDD, you have 
to call RDD.collect() at the driver, which means data will be transferred to 
the driver. While broadcasting the variable might be very efficient, I'd like 
to avoid shipping data to the driver also.

 Provide a way to broadcast an RDD (instead of just a variable made of the 
 RDD) so that a job can access
 ---

 Key: SPARK-3621
 URL: https://issues.apache.org/jira/browse/SPARK-3621
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0, 1.1.0
Reporter: Xuefu Zhang

 In some cases, such as Hive's way of doing map-side join, it would be 
 benefcial to allow client program to broadcast RDDs rather than just 
 variables made of these RDDs. Broadcasting a variable made of RDDs requires 
 all RDD data be collected to the driver and that the variable be shipped to 
 the cluster after being made. It would be more performing if driver just 
 broadcasts the RDDs and uses the corresponding data in jobs (such building 
 hashmaps at executors).
 Tez has a broadcast edge which can ship data from previous stage to the next 
 stage, which doesn't require driver side processing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3622) Provide a custom transformation that can output multiple RDDs

2014-09-22 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143921#comment-14143921
 ] 

Xuefu Zhang commented on SPARK-3622:


They are related but not exactly the same. SPARK-2688 is about branching off 
RDD tree with no custom transformation invovled. This JIRA is about returning 
multiple RDDs from a single transformation (branching happening within a 
transformation).

 Provide a custom transformation that can output multiple RDDs
 -

 Key: SPARK-3622
 URL: https://issues.apache.org/jira/browse/SPARK-3622
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Xuefu Zhang

 All existing transformations return just one RDD at most, even for those 
 which takes user-supplied functions such as mapPartitions() . However, 
 sometimes a user provided function may need to output multiple RDDs. For 
 instance, a filter function that divides the input RDD into serveral RDDs. 
 While it's possible to get multiple RDDs by transforming the same RDD 
 multiple times, it may be more efficient to do this concurrently in one shot. 
 Especially user's existing function is already generating different data sets.
 This the case in Hive on Spark, where Hive's map function and reduce function 
 can output different data sets to be consumed by subsequent stages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3621) Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access

2014-09-21 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14142466#comment-14142466
 ] 

Xuefu Zhang commented on SPARK-3621:


I understand RDD is a concept existing only in the driver. However, accessing 
the data in Spark job doesn't have to be in the form of RDD. An iterator over 
the underlying data is sufficient, as long as the data is already shipped to 
the node when the job starts to run. One way to identify the shipped RDD and 
the iterator afterwards could be a UUID.

Hive on Spark isn't using Spark's transformations to do map-join, or join in 
general. Hive's own implementation is to build hash maps for the small tables 
when the join starts, and then do key lookups while streaming thru the big 
table. For this, small table data (which can be a result RDD of another Spark 
job) needs to be shipped to all nodes that do the join.

 Provide a way to broadcast an RDD (instead of just a variable made of the 
 RDD) so that a job can access
 ---

 Key: SPARK-3621
 URL: https://issues.apache.org/jira/browse/SPARK-3621
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0, 1.1.0
Reporter: Xuefu Zhang

 In some cases, such as Hive's way of doing map-side join, it would be 
 benefcial to allow client program to broadcast RDDs rather than just 
 variables made of these RDDs. Broadcasting a variable made of RDDs requires 
 all RDD data be collected to the driver and that the variable be shipped to 
 the cluster after being made. It would be more performing if driver just 
 broadcasts the RDDs and uses the corresponding data in jobs (such building 
 hashmaps at executors).
 Tez has a broadcast edge which can ship data from previous stage to the next 
 stage, which doesn't require driver side processing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3622) Provide a custom transformation that can output multiple RDDs

2014-09-21 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14142881#comment-14142881
 ] 

Xuefu Zhang commented on SPARK-3622:


Thanks for your comments, [~pwendell]. I understand caching A would be helpful 
if I need to transform it to get B and C separately. My proposal is to get B 
and C just by one pass of A, so A doens't even need to be cached.

Here is an example how it may be used in Hive.
{code}
JavaPairRDD table = sparkContext.hadoopRDD(..);
Mapname, JavaPairRDD mappedRDDs = table.mapPartitions(mapFunction);
JavaPairRDD rddA = mapperRDDs.get(A);
JavaPairRDD rddB = mapperRDDs.get(A);
JavaPairRDD sortedRddA = rddA.sortByKey();
javaPairRDD groupedRddB = rddB.groupByKey();
// further processing sortedRddA and groupedRddB.
...
{code}
In this case, mapFunction can return named iterators for A and B. B is 
automatically computed whenever A is computed, and vice versa. Since both are 
computed if any of them computed, subsequent reference to either one should not 
recompute any of them.

The benefits of it: 1) no need to cache A; 2) only one pass of the input.

I'm not sure if this is possible feasible in Spark, but Hive's map function is 
exactly doing this. It's operator tree can branch off anywhere, resulting 
multiple output datasets from a single input dataset.

Please let me know if there are more questions.


 Provide a custom transformation that can output multiple RDDs
 -

 Key: SPARK-3622
 URL: https://issues.apache.org/jira/browse/SPARK-3622
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Xuefu Zhang

 All existing transformations return just one RDD at most, even for those 
 which takes user-supplied functions such as mapPartitions() . However, 
 sometimes a user provided function may need to output multiple RDDs. For 
 instance, a filter function that divides the input RDD into serveral RDDs. 
 While it's possible to get multiple RDDs by transforming the same RDD 
 multiple times, it may be more efficient to do this concurrently in one shot. 
 Especially user's existing function is already generating different data sets.
 This the case in Hive on Spark, where Hive's map function and reduce function 
 can output different data sets to be consumed by subsequent stages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-3622) Provide a custom transformation that can output multiple RDDs

2014-09-21 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14142881#comment-14142881
 ] 

Xuefu Zhang edited comment on SPARK-3622 at 9/22/14 4:39 AM:
-

Thanks for your comments, [~pwendell]. I understand caching A would be helpful 
if I need to transform it to get B and C separately. My proposal is to get B 
and C just by one pass of A, so A doens't even need to be cached.

Here is an example how it may be used in Hive.
{code}
JavaPairRDD table = sparkContext.hadoopRDD(..);
Mapname, JavaPairRDD mappedRDDs = table.mapPartitions(mapFunction);
JavaPairRDD rddA = mapperRDDs.get(A);
JavaPairRDD rddB = mapperRDDs.get(B);
JavaPairRDD sortedRddA = rddA.sortByKey();
javaPairRDD groupedRddB = rddB.groupByKey();
// further processing sortedRddA and groupedRddB.
...
{code}
In this case, mapFunction can return named iterators for A and B. B is 
automatically computed whenever A is computed, and vice versa. Since both are 
computed if any of them computed, subsequent reference to either one should not 
recompute any of them.

The benefits of it: 1) no need to cache A; 2) only one pass of the input.

I'm not sure if this is possible feasible in Spark, but Hive's map function is 
exactly doing this. It's operator tree can branch off anywhere, resulting 
multiple output datasets from a single input dataset.

Please let me know if there are more questions.



was (Author: xuefuz):
Thanks for your comments, [~pwendell]. I understand caching A would be helpful 
if I need to transform it to get B and C separately. My proposal is to get B 
and C just by one pass of A, so A doens't even need to be cached.

Here is an example how it may be used in Hive.
{code}
JavaPairRDD table = sparkContext.hadoopRDD(..);
Mapname, JavaPairRDD mappedRDDs = table.mapPartitions(mapFunction);
JavaPairRDD rddA = mapperRDDs.get(A);
JavaPairRDD rddB = mapperRDDs.get(A);
JavaPairRDD sortedRddA = rddA.sortByKey();
javaPairRDD groupedRddB = rddB.groupByKey();
// further processing sortedRddA and groupedRddB.
...
{code}
In this case, mapFunction can return named iterators for A and B. B is 
automatically computed whenever A is computed, and vice versa. Since both are 
computed if any of them computed, subsequent reference to either one should not 
recompute any of them.

The benefits of it: 1) no need to cache A; 2) only one pass of the input.

I'm not sure if this is possible feasible in Spark, but Hive's map function is 
exactly doing this. It's operator tree can branch off anywhere, resulting 
multiple output datasets from a single input dataset.

Please let me know if there are more questions.


 Provide a custom transformation that can output multiple RDDs
 -

 Key: SPARK-3622
 URL: https://issues.apache.org/jira/browse/SPARK-3622
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Xuefu Zhang

 All existing transformations return just one RDD at most, even for those 
 which takes user-supplied functions such as mapPartitions() . However, 
 sometimes a user provided function may need to output multiple RDDs. For 
 instance, a filter function that divides the input RDD into serveral RDDs. 
 While it's possible to get multiple RDDs by transforming the same RDD 
 multiple times, it may be more efficient to do this concurrently in one shot. 
 Especially user's existing function is already generating different data sets.
 This the case in Hive on Spark, where Hive's map function and reduce function 
 can output different data sets to be consumed by subsequent stages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-3621) Provide a way to broadcast an RDD (instead of just a variable made of the RDD) so that a job can access

2014-09-20 Thread Xuefu Zhang (JIRA)
Xuefu Zhang created SPARK-3621:
--

 Summary: Provide a way to broadcast an RDD (instead of just a 
variable made of the RDD) so that a job can access
 Key: SPARK-3621
 URL: https://issues.apache.org/jira/browse/SPARK-3621
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.1.0, 1.0.0
Reporter: Xuefu Zhang


In some cases, such as Hive's way of doing map-side join, it would be benefcial 
to allow client program to broadcast RDDs rather than just variables made of 
these RDDs. Broadcasting a variable made of RDDs requires all RDD data be 
collected to the driver and that the variable be shipped to the cluster after 
being made. It would be more performing if driver just broadcasts the RDDs and 
uses the corresponding data in jobs (such building hashmaps at executors).

Tez has a broadcast edge which can ship data from previous stage to the next 
stage, which doesn't require driver side processing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-3622) Provide a custom transformation that can output multiple RDDs

2014-09-20 Thread Xuefu Zhang (JIRA)
Xuefu Zhang created SPARK-3622:
--

 Summary: Provide a custom transformation that can output multiple 
RDDs
 Key: SPARK-3622
 URL: https://issues.apache.org/jira/browse/SPARK-3622
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Xuefu Zhang


All existing transformations return just one RDD at most, even for those which 
takes user-supplied functions such as mapPartitions() . However, sometimes a 
user provided function may need to output multiple RDDs. For instance, a filter 
function that divides the input RDD into serveral RDDs. While it's possible to 
get multiple RDDs by transforming the same RDD multiple times, it may be more 
efficient to do this concurrently in one shot. Especially user's existing 
function is already generating different data sets.

This the case in Hive on Spark, where Hive's map function and reduce function 
can output different data sets to be consumed by subsequent stages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2895) Support mapPartitionsWithContext in Spark Java API

2014-09-02 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14118759#comment-14118759
 ] 

Xuefu Zhang commented on SPARK-2895:


Hi [~rxin], could you review [~chengxiang li]'s patch when you get a chance? 
This is needed for our hive-on-spark milestone #1. Thanks.

 Support mapPartitionsWithContext in Spark Java API
 --

 Key: SPARK-2895
 URL: https://issues.apache.org/jira/browse/SPARK-2895
 Project: Spark
  Issue Type: New Feature
  Components: Java API
Reporter: Chengxiang Li
Assignee: Chengxiang Li
  Labels: hive

 This is a requirement from Hive on Spark, mapPartitionsWithContext only 
 exists in Spark Scala API, we expect to access from Spark Java API. 
 For HIVE-7627, HIVE-7843, Hive operators which are invoked in mapPartitions 
 closure need to get taskId.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2741) Publish version of spark assembly which does not contain Hive

2014-07-29 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14078813#comment-14078813
 ] 

Xuefu Zhang commented on SPARK-2741:


cc: [~rxin], [~sandyr]

 Publish version of spark assembly which does not contain Hive
 -

 Key: SPARK-2741
 URL: https://issues.apache.org/jira/browse/SPARK-2741
 Project: Spark
  Issue Type: Task
Reporter: Brock Noland

 The current spark assembly contains Hive. This conflicts with Hive + Spark 
 which is attempting to use it's own version of Hive.
 We'll need to publish a version of the assembly which does not contain the 
 Hive jars.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2741) Publish version of spark assembly which does not contain Hive

2014-07-29 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14078835#comment-14078835
 ] 

Xuefu Zhang commented on SPARK-2741:


I did see a profile about Hive. However, it seems that spark-assembly.jar 
coming from Spark install was built with this profile enabled. The question is, 
for user of hive on spark, where can the user to get an spark-assembly without 
Hive classes? Is it our vision that the user has to build the jar 
himself/herself?

 Publish version of spark assembly which does not contain Hive
 -

 Key: SPARK-2741
 URL: https://issues.apache.org/jira/browse/SPARK-2741
 Project: Spark
  Issue Type: Task
Reporter: Brock Noland

 The current spark assembly contains Hive. This conflicts with Hive + Spark 
 which is attempting to use it's own version of Hive.
 We'll need to publish a version of the assembly which does not contain the 
 Hive jars.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (SPARK-2688) Need a way to run multiple data pipeline concurrently

2014-07-25 Thread Xuefu Zhang (JIRA)
Xuefu Zhang created SPARK-2688:
--

 Summary: Need a way to run multiple data pipeline concurrently
 Key: SPARK-2688
 URL: https://issues.apache.org/jira/browse/SPARK-2688
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.1
Reporter: Xuefu Zhang


Suppose we want to do the following data processing: 
{code}
rdd1 - rdd2 - rdd3
  | - rdd4
  | - rdd5
  \ - rdd6
{code}
where - represents a transformation. rdd3 to rrdd6 are all derived from an 
intermediate rdd2. We use foreach(fn) with a dummy function to trigger the 
execution. However, rdd.foreach(fn) only trigger pipeline rdd1 - rdd2 - rdd3. 
To make things worse, when we call rdd4.foreach(), rdd2 will be recomputed. 
This is very inefficient. Ideally, we should be able to trigger the execution 
the whole graph and reuse rdd2, but there doesn't seem to be a way doing so. As 
far as I know, It's possible in Tez.

This is required for Hive to support multi-insert queries. HIVE-7292.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2688) Need a way to run multiple data pipeline concurrently

2014-07-25 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074368#comment-14074368
 ] 

Xuefu Zhang commented on SPARK-2688:


cc: [~rxin] [~sandyr]

 Need a way to run multiple data pipeline concurrently
 -

 Key: SPARK-2688
 URL: https://issues.apache.org/jira/browse/SPARK-2688
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.1
Reporter: Xuefu Zhang

 Suppose we want to do the following data processing: 
 {code}
 rdd1 - rdd2 - rdd3
   | - rdd4
   | - rdd5
   \ - rdd6
 {code}
 where - represents a transformation. rdd3 to rrdd6 are all derived from an 
 intermediate rdd2. We use foreach(fn) with a dummy function to trigger the 
 execution. However, rdd.foreach(fn) only trigger pipeline rdd1 - rdd2 - 
 rdd3. To make things worse, when we call rdd4.foreach(), rdd2 will be 
 recomputed. This is very inefficient. Ideally, we should be able to trigger 
 the execution the whole graph and reuse rdd2, but there doesn't seem to be a 
 way doing so. As far as I know, It's possible in Tez.
 This is required for Hive to support multi-insert queries. HIVE-7292.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (SPARK-2688) Need a way to run multiple data pipeline concurrently

2014-07-25 Thread Xuefu Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuefu Zhang updated SPARK-2688:
---

Description: 
Suppose we want to do the following data processing: 
{code}
rdd1 - rdd2 - rdd3
 | - rdd4
 | - rdd5
 \ - rdd6
{code}
where - represents a transformation. rdd3 to rrdd6 are all derived from an 
intermediate rdd2. We use foreach(fn) with a dummy function to trigger the 
execution. However, rdd.foreach(fn) only trigger pipeline rdd1 - rdd2 - rdd3. 
To make things worse, when we call rdd4.foreach(), rdd2 will be recomputed. 
This is very inefficient. Ideally, we should be able to trigger the execution 
the whole graph and reuse rdd2, but there doesn't seem to be a way doing so. As 
far as I know, It's possible in Tez.

This is required for Hive to support multi-insert queries. HIVE-7292.

  was:
Suppose we want to do the following data processing: 
{code}
rdd1 - rdd2 - rdd3
  | - rdd4
  | - rdd5
  \ - rdd6
{code}
where - represents a transformation. rdd3 to rrdd6 are all derived from an 
intermediate rdd2. We use foreach(fn) with a dummy function to trigger the 
execution. However, rdd.foreach(fn) only trigger pipeline rdd1 - rdd2 - rdd3. 
To make things worse, when we call rdd4.foreach(), rdd2 will be recomputed. 
This is very inefficient. Ideally, we should be able to trigger the execution 
the whole graph and reuse rdd2, but there doesn't seem to be a way doing so. As 
far as I know, It's possible in Tez.

This is required for Hive to support multi-insert queries. HIVE-7292.


 Need a way to run multiple data pipeline concurrently
 -

 Key: SPARK-2688
 URL: https://issues.apache.org/jira/browse/SPARK-2688
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.1
Reporter: Xuefu Zhang

 Suppose we want to do the following data processing: 
 {code}
 rdd1 - rdd2 - rdd3
  | - rdd4
  | - rdd5
  \ - rdd6
 {code}
 where - represents a transformation. rdd3 to rrdd6 are all derived from an 
 intermediate rdd2. We use foreach(fn) with a dummy function to trigger the 
 execution. However, rdd.foreach(fn) only trigger pipeline rdd1 - rdd2 - 
 rdd3. To make things worse, when we call rdd4.foreach(), rdd2 will be 
 recomputed. This is very inefficient. Ideally, we should be able to trigger 
 the execution the whole graph and reuse rdd2, but there doesn't seem to be a 
 way doing so. As far as I know, It's possible in Tez.
 This is required for Hive to support multi-insert queries. HIVE-7292.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (SPARK-2688) Need a way to run multiple data pipeline concurrently

2014-07-25 Thread Xuefu Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuefu Zhang updated SPARK-2688:
---

Description: 
Suppose we want to do the following data processing: 
{code}
rdd1 - rdd2 - rdd3
   | - rdd4
   | - rdd5
   \ - rdd6
{code}
where - represents a transformation. rdd3 to rrdd6 are all derived from an 
intermediate rdd2. We use foreach(fn) with a dummy function to trigger the 
execution. However, rdd.foreach(fn) only trigger pipeline rdd1 - rdd2 - rdd3. 
To make things worse, when we call rdd4.foreach(), rdd2 will be recomputed. 
This is very inefficient. Ideally, we should be able to trigger the execution 
the whole graph and reuse rdd2, but there doesn't seem to be a way doing so. As 
far as I know, It's possible in Tez.

This is required for Hive to support multi-insert queries. HIVE-7292.

  was:
Suppose we want to do the following data processing: 
{code}
rdd1 - rdd2 - rdd3
 | - rdd4
 | - rdd5
 \ - rdd6
{code}
where - represents a transformation. rdd3 to rrdd6 are all derived from an 
intermediate rdd2. We use foreach(fn) with a dummy function to trigger the 
execution. However, rdd.foreach(fn) only trigger pipeline rdd1 - rdd2 - rdd3. 
To make things worse, when we call rdd4.foreach(), rdd2 will be recomputed. 
This is very inefficient. Ideally, we should be able to trigger the execution 
the whole graph and reuse rdd2, but there doesn't seem to be a way doing so. As 
far as I know, It's possible in Tez.

This is required for Hive to support multi-insert queries. HIVE-7292.


 Need a way to run multiple data pipeline concurrently
 -

 Key: SPARK-2688
 URL: https://issues.apache.org/jira/browse/SPARK-2688
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.1
Reporter: Xuefu Zhang

 Suppose we want to do the following data processing: 
 {code}
 rdd1 - rdd2 - rdd3
| - rdd4
| - rdd5
\ - rdd6
 {code}
 where - represents a transformation. rdd3 to rrdd6 are all derived from an 
 intermediate rdd2. We use foreach(fn) with a dummy function to trigger the 
 execution. However, rdd.foreach(fn) only trigger pipeline rdd1 - rdd2 - 
 rdd3. To make things worse, when we call rdd4.foreach(), rdd2 will be 
 recomputed. This is very inefficient. Ideally, we should be able to trigger 
 the execution the whole graph and reuse rdd2, but there doesn't seem to be a 
 way doing so. As far as I know, It's possible in Tez.
 This is required for Hive to support multi-insert queries. HIVE-7292.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2420) Change Spark build to minimize library conflicts

2014-07-23 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14072778#comment-14072778
 ] 

Xuefu Zhang commented on SPARK-2420:


Is shading guava in Spark build a reasonable compromise? We tested it, and it 
worked. All we need is this in Spark's assemble pom.xml:

{code}
  relocations
relocation
  patterncom.google.common/pattern
  shadedPatternspark.com.google.common/shadedPattern
   /relocation
/relocations
{code}

We really like to get this resolved and move on.

 Change Spark build to minimize library conflicts
 

 Key: SPARK-2420
 URL: https://issues.apache.org/jira/browse/SPARK-2420
 Project: Spark
  Issue Type: Wish
  Components: Build
Affects Versions: 1.0.0
Reporter: Xuefu Zhang
 Attachments: spark_1.0.0.patch


 During the prototyping of HIVE-7292, many library conflicts showed up because 
 Spark build contains versions of libraries that's vastly different from 
 current major Hadoop version. It would be nice if we can choose versions 
 that's in line with Hadoop or shading them in the assembly. Here are the wish 
 list:
 1. Upgrade protobuf version to 2.5.0 from current 2.4.1
 2. Shading Spark's jetty and servlet dependency in the assembly.
 3. guava version difference. Spark is using a higher version. I'm not sure 
 what's the best solution for this.
 The list may grow as HIVE-7292 proceeds.
 For information only, the attached is a patch that we applied on Spark in 
 order to make Spark work with Hive. It gives an idea of the scope of changes.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2420) Change Spark build to minimize library conflicts

2014-07-18 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14066600#comment-14066600
 ] 

Xuefu Zhang commented on SPARK-2420:


{quote}
2. For jetty, it was a problem with Hive on Spark POC, possibly because we 
shipped all libraries from Hive process's classpath to the spark cluster. We 
have a task (HIVE-7371) to identify a minimum set of jars to be shipped. With 
that, the story might change. We will confirm if Jetty is a problem once we 
have a better idea on HIVE-7371. 
{quote}

A better good news. With the latest work in HIVE-7292, we found that 
servlet-api/jetty didn't seem to be a problem any more. Thus, the only conflict 
remaining is guava, for which HIVE-7387 has all the details.

 Change Spark build to minimize library conflicts
 

 Key: SPARK-2420
 URL: https://issues.apache.org/jira/browse/SPARK-2420
 Project: Spark
  Issue Type: Wish
  Components: Build
Affects Versions: 1.0.0
Reporter: Xuefu Zhang
 Attachments: spark_1.0.0.patch


 During the prototyping of HIVE-7292, many library conflicts showed up because 
 Spark build contains versions of libraries that's vastly different from 
 current major Hadoop version. It would be nice if we can choose versions 
 that's in line with Hadoop or shading them in the assembly. Here are the wish 
 list:
 1. Upgrade protobuf version to 2.5.0 from current 2.4.1
 2. Shading Spark's jetty and servlet dependency in the assembly.
 3. guava version difference. Spark is using a higher version. I'm not sure 
 what's the best solution for this.
 The list may grow as HIVE-7292 proceeds.
 For information only, the attached is a patch that we applied on Spark in 
 order to make Spark work with Hive. It gives an idea of the scope of changes.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2420) Change Spark build to minimize library conflicts

2014-07-16 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14063998#comment-14063998
 ] 

Xuefu Zhang commented on SPARK-2420:


Thanks for your comments, [~srowen]. I mostly agree with your assessment.

1. Guava is indeed a problem and the problem is confirmed. The solution also 
seems simple. It it can be set to 11 in spark, it solves the problem we have.

2. For jetty, it was a problem with Hive on Spark POC, possibly because we 
shipped all libraries from Hive process's classpath to the spark cluster. We 
have a task (HIVE-7371) to identify a minimum set of jars to be shipped. With 
that, the story might change. We will confirm if Jetty is a problem once we 
have a better idea on HIVE-7371. Also, we will check if the problem, if exists, 
can be fixed on Hive side first. If not, we'd like to get help from Spark and 
also provide a reason why.



 Change Spark build to minimize library conflicts
 

 Key: SPARK-2420
 URL: https://issues.apache.org/jira/browse/SPARK-2420
 Project: Spark
  Issue Type: Wish
  Components: Build
Affects Versions: 1.0.0
Reporter: Xuefu Zhang
 Attachments: spark_1.0.0.patch


 During the prototyping of HIVE-7292, many library conflicts showed up because 
 Spark build contains versions of libraries that's vastly different from 
 current major Hadoop version. It would be nice if we can choose versions 
 that's in line with Hadoop or shading them in the assembly. Here are the wish 
 list:
 1. Upgrade protobuf version to 2.5.0 from current 2.4.1
 2. Shading Spark's jetty and servlet dependency in the assembly.
 3. guava version difference. Spark is using a higher version. I'm not sure 
 what's the best solution for this.
 The list may grow as HIVE-7292 proceeds.
 For information only, the attached is a patch that we applied on Spark in 
 order to make Spark work with Hive. It gives an idea of the scope of changes.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2420) Change Spark build to minimize library conflicts

2014-07-16 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14064168#comment-14064168
 ] 

Xuefu Zhang commented on SPARK-2420:


As to guava conflict, HIVE-7387 has more details and analysis.

 Change Spark build to minimize library conflicts
 

 Key: SPARK-2420
 URL: https://issues.apache.org/jira/browse/SPARK-2420
 Project: Spark
  Issue Type: Wish
  Components: Build
Affects Versions: 1.0.0
Reporter: Xuefu Zhang
 Attachments: spark_1.0.0.patch


 During the prototyping of HIVE-7292, many library conflicts showed up because 
 Spark build contains versions of libraries that's vastly different from 
 current major Hadoop version. It would be nice if we can choose versions 
 that's in line with Hadoop or shading them in the assembly. Here are the wish 
 list:
 1. Upgrade protobuf version to 2.5.0 from current 2.4.1
 2. Shading Spark's jetty and servlet dependency in the assembly.
 3. guava version difference. Spark is using a higher version. I'm not sure 
 what's the best solution for this.
 The list may grow as HIVE-7292 proceeds.
 For information only, the attached is a patch that we applied on Spark in 
 order to make Spark work with Hive. It gives an idea of the scope of changes.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2420) Change Spark build to minimize library conflicts

2014-07-15 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14063049#comment-14063049
 ] 

Xuefu Zhang commented on SPARK-2420:


[~rxin] As pointed above, Hive and its dependent hadoop components are all on 
guava 11, unless Spark shades or downgrade to a common version, I don't see an 
easy workaround. Same story for servlet-api/jetty.

 Change Spark build to minimize library conflicts
 

 Key: SPARK-2420
 URL: https://issues.apache.org/jira/browse/SPARK-2420
 Project: Spark
  Issue Type: Wish
  Components: Build
Affects Versions: 1.0.0
Reporter: Xuefu Zhang
 Attachments: spark_1.0.0.patch


 During the prototyping of HIVE-7292, many library conflicts showed up because 
 Spark build contains versions of libraries that's vastly different from 
 current major Hadoop version. It would be nice if we can choose versions 
 that's in line with Hadoop or shading them in the assembly. Here are the wish 
 list:
 1. Upgrade protobuf version to 2.5.0 from current 2.4.1
 2. Shading Spark's jetty and servlet dependency in the assembly.
 3. guava version difference. Spark is using a higher version. I'm not sure 
 what's the best solution for this.
 The list may grow as HIVE-7292 proceeds.
 For information only, the attached is a patch that we applied on Spark in 
 order to make Spark work with Hive. It gives an idea of the scope of changes.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2420) Change Spark build to minimize library conflicts

2014-07-11 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14058757#comment-14058757
 ] 

Xuefu Zhang commented on SPARK-2420:


[~rxin] Is it possible to downgrade or shade guava to 11 in Spark? it doesn't 
seem feasible to upgrade guava across all hadoop components. This problem 
currently blocks HIVE-7292, Spark and Hive integration.

 Change Spark build to minimize library conflicts
 

 Key: SPARK-2420
 URL: https://issues.apache.org/jira/browse/SPARK-2420
 Project: Spark
  Issue Type: Wish
  Components: Build
Affects Versions: 1.0.0
Reporter: Xuefu Zhang
 Attachments: spark_1.0.0.patch


 During the prototyping of HIVE-7292, many library conflicts showed up because 
 Spark build contains versions of libraries that's vastly different from 
 current major Hadoop version. It would be nice if we can choose versions 
 that's in line with Hadoop or shading them in the assembly. Here are the wish 
 list:
 1. Upgrade protobuf version to 2.5.0 from current 2.4.1
 2. Shading Spark's jetty and servlet dependency in the assembly.
 3. guava version difference. Spark is using a higher version. I'm not sure 
 what's the best solution for this.
 The list may grow as HIVE-7292 proceeds.
 For information only, the attached is a patch that we applied on Spark in 
 order to make Spark work with Hive. It gives an idea of the scope of changes.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (SPARK-2420) Change Spark build to minimize library conflicts

2014-07-09 Thread Xuefu Zhang (JIRA)
Xuefu Zhang created SPARK-2420:
--

 Summary: Change Spark build to minimize library conflicts
 Key: SPARK-2420
 URL: https://issues.apache.org/jira/browse/SPARK-2420
 Project: Spark
  Issue Type: Wish
  Components: Build
Affects Versions: 1.0.0
Reporter: Xuefu Zhang


During the prototyping of HIVE-7292, many library conflicts showed up because 
Spark build contains versions of libraries that's vastly different from current 
major Hadoop version. It would be nice if we can choose versions that's in line 
with Hadoop or shading them in the assembly. Here are the wish list:

1. Upgrade protobuf version to 2.5.0 from current 2.4.1
2. Shading Spark's jetty and servlet dependency in the assembly.
3. guava version difference. Spark is using a higher version. I'm not sure 
what's the best solution for this.

The list may grow as HIVE-7292 proceeds.

For information only, the attached is a patch that we applied on Spark in order 
to make Spark work with Hive. It gives an idea of the scope of changes.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2421) Spark should treat writable as serializable for keys

2014-07-09 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14056677#comment-14056677
 ] 

Xuefu Zhang commented on SPARK-2421:


CC [~rxin], [~hshreedharan]

 Spark should treat writable as serializable for keys
 

 Key: SPARK-2421
 URL: https://issues.apache.org/jira/browse/SPARK-2421
 Project: Spark
  Issue Type: Improvement
  Components: Input/Output, Java API
Affects Versions: 1.0.0
Reporter: Xuefu Zhang

 It seems that Spark requires the key be serializable (class implement 
 Serializable interface). In Hadoop world, Writable interface is used for the 
 same purpose. A lot of existing classes, while writable, are not considered 
 by Spark as Serializable. It would be nice if Spark can treate Writable as 
 serializable and automatically serialize and de-serialize these classes using 
 writable interface.
 This is identified in HIVE-7279, but its benefits are seen global.



--
This message was sent by Atlassian JIRA
(v6.2#6252)