[jira] [Commented] (SPARK-9850) Adaptive execution in Spark

2018-07-29 Thread Carson Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16561326#comment-16561326
 ] 

Carson Wang commented on SPARK-9850:


We have a new proposal and implementation for Spark SQL adaptive execution 
discussed in SPARK-23128.  Optimizing join strategy at run time and handling 
skewed join are also supported. The full code is also available at 
[https://github.com/Intel-bigdata/spark-adaptive|https://github.com/Intel-bigdata/spark-adaptive/].
 

> Adaptive execution in Spark
> ---
>
> Key: SPARK-9850
> URL: https://issues.apache.org/jira/browse/SPARK-9850
> Project: Spark
>  Issue Type: Epic
>  Components: Spark Core, SQL
>Reporter: Matei Zaharia
>Assignee: Yin Huai
>Priority: Major
> Attachments: AdaptiveExecutionInSpark.pdf
>
>
> Query planning is one of the main factors in high performance, but the 
> current Spark engine requires the execution DAG for a job to be set in 
> advance. Even with cost­-based optimization, it is hard to know the behavior 
> of data and user-defined functions well enough to always get great execution 
> plans. This JIRA proposes to add adaptive query execution, so that the engine 
> can change the plan for each query as it sees what data earlier stages 
> produced.
> We propose adding this to Spark SQL / DataFrames first, using a new API in 
> the Spark engine that lets libraries run DAGs adaptively. In future JIRAs, 
> the functionality could be extended to other libraries or the RDD API, but 
> that is more difficult than adding it in SQL.
> I've attached a design doc by Yin Huai and myself explaining how it would 
> work in more detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9850) Adaptive execution in Spark

2018-07-17 Thread Michail Giannakopoulos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546840#comment-16546840
 ] 

Michail Giannakopoulos commented on SPARK-9850:
---

Hello [~yhuai]! Are people currently working on this Epic? In other words, is 
this work in progress, or have you determined that it should be stalled?
I am asking because recently I logged an issue related with adaptive execution 
(SPARK-24826). It would be nice to know if you are working on this actively 
since it reduces a lot the number of partitions during shuffles when executing 
sql queries (one of the main bottlenecks for spark). Thanks a lot!

> Adaptive execution in Spark
> ---
>
> Key: SPARK-9850
> URL: https://issues.apache.org/jira/browse/SPARK-9850
> Project: Spark
>  Issue Type: Epic
>  Components: Spark Core, SQL
>Reporter: Matei Zaharia
>Assignee: Yin Huai
>Priority: Major
> Attachments: AdaptiveExecutionInSpark.pdf
>
>
> Query planning is one of the main factors in high performance, but the 
> current Spark engine requires the execution DAG for a job to be set in 
> advance. Even with cost­-based optimization, it is hard to know the behavior 
> of data and user-defined functions well enough to always get great execution 
> plans. This JIRA proposes to add adaptive query execution, so that the engine 
> can change the plan for each query as it sees what data earlier stages 
> produced.
> We propose adding this to Spark SQL / DataFrames first, using a new API in 
> the Spark engine that lets libraries run DAGs adaptively. In future JIRAs, 
> the functionality could be extended to other libraries or the RDD API, but 
> that is more difficult than adding it in SQL.
> I've attached a design doc by Yin Huai and myself explaining how it would 
> work in more detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9850) Adaptive execution in Spark

2016-11-15 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667665#comment-15667665
 ] 

Imran Rashid commented on SPARK-9850:
-

[~assaf.mendelson] reducers already have to wait for the last mapper to finish. 
 Spark has always behaved this way.  (I think you might find discussions 
referring to this as the "stage barrier").  I don't see that changing anytime 
soon -- while its not ideal, doing away with that would a lot of complexity.

> Adaptive execution in Spark
> ---
>
> Key: SPARK-9850
> URL: https://issues.apache.org/jira/browse/SPARK-9850
> Project: Spark
>  Issue Type: Epic
>  Components: Spark Core, SQL
>Reporter: Matei Zaharia
>Assignee: Yin Huai
> Attachments: AdaptiveExecutionInSpark.pdf
>
>
> Query planning is one of the main factors in high performance, but the 
> current Spark engine requires the execution DAG for a job to be set in 
> advance. Even with cost­-based optimization, it is hard to know the behavior 
> of data and user-defined functions well enough to always get great execution 
> plans. This JIRA proposes to add adaptive query execution, so that the engine 
> can change the plan for each query as it sees what data earlier stages 
> produced.
> We propose adding this to Spark SQL / DataFrames first, using a new API in 
> the Spark engine that lets libraries run DAGs adaptively. In future JIRAs, 
> the functionality could be extended to other libraries or the RDD API, but 
> that is more difficult than adding it in SQL.
> I've attached a design doc by Yin Huai and myself explaining how it would 
> work in more detail.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9850) Adaptive execution in Spark

2016-11-15 Thread Assaf Mendelson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666806#comment-15666806
 ] 

Assaf Mendelson commented on SPARK-9850:


I like the overall idea.
What I am trying to figure out is the portion where first the map portion is 
performed and then the reducer.
if DAGScheduler.submitMapStage() waits for all the map processing to finish and 
only then reducer start, this can really slow things down as it will begin only 
when the last map finishes.

Wouldn't it be better to start the reducers once the first few mappers finished 
(or at least when there are idle executors)? Assuming the first few mappers are 
a representative of the entire maps then this shouldn't affect the assessment 
of statistics too much.


> Adaptive execution in Spark
> ---
>
> Key: SPARK-9850
> URL: https://issues.apache.org/jira/browse/SPARK-9850
> Project: Spark
>  Issue Type: Epic
>  Components: Spark Core, SQL
>Reporter: Matei Zaharia
>Assignee: Yin Huai
> Attachments: AdaptiveExecutionInSpark.pdf
>
>
> Query planning is one of the main factors in high performance, but the 
> current Spark engine requires the execution DAG for a job to be set in 
> advance. Even with cost­-based optimization, it is hard to know the behavior 
> of data and user-defined functions well enough to always get great execution 
> plans. This JIRA proposes to add adaptive query execution, so that the engine 
> can change the plan for each query as it sees what data earlier stages 
> produced.
> We propose adding this to Spark SQL / DataFrames first, using a new API in 
> the Spark engine that lets libraries run DAGs adaptively. In future JIRAs, 
> the functionality could be extended to other libraries or the RDD API, but 
> that is more difficult than adding it in SQL.
> I've attached a design doc by Yin Huai and myself explaining how it would 
> work in more detail.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9850) Adaptive execution in Spark

2016-04-12 Thread Justin Uang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15237056#comment-15237056
 ] 

Justin Uang commented on SPARK-9850:


I like this idea a lot. One thing we encounter in our use cases is that we end 
up accidentally joining on a field that is 50% nulls, or a string that 
represents null like "N/A". It then becomes quite cumbersome to have to 
constantly have to have a Spark expert dig in and find why there is 1 task that 
will never finish. Would it be possible to add a threshold such that if a join 
key ever gets too big, it will just fail the job with an error message?

> Adaptive execution in Spark
> ---
>
> Key: SPARK-9850
> URL: https://issues.apache.org/jira/browse/SPARK-9850
> Project: Spark
>  Issue Type: Epic
>  Components: Spark Core, SQL
>Reporter: Matei Zaharia
>Assignee: Yin Huai
> Attachments: AdaptiveExecutionInSpark.pdf
>
>
> Query planning is one of the main factors in high performance, but the 
> current Spark engine requires the execution DAG for a job to be set in 
> advance. Even with cost­-based optimization, it is hard to know the behavior 
> of data and user-defined functions well enough to always get great execution 
> plans. This JIRA proposes to add adaptive query execution, so that the engine 
> can change the plan for each query as it sees what data earlier stages 
> produced.
> We propose adding this to Spark SQL / DataFrames first, using a new API in 
> the Spark engine that lets libraries run DAGs adaptively. In future JIRAs, 
> the functionality could be extended to other libraries or the RDD API, but 
> that is more difficult than adding it in SQL.
> I've attached a design doc by Yin Huai and myself explaining how it would 
> work in more detail.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9850) Adaptive execution in Spark

2016-01-13 Thread JIRA

[ 
https://issues.apache.org/jira/browse/SPARK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15096985#comment-15096985
 ] 

Maciej Bryński commented on SPARK-9850:
---

[~matei]
Hi,
I'm not sure if my issue is related to this Jira.

In 1.6.0 when using sql limit Spark do following:
- execute limit on every partition
- then take result
Is it possible to finish scanning partitions when we collect enough rows for 
limit ?

> Adaptive execution in Spark
> ---
>
> Key: SPARK-9850
> URL: https://issues.apache.org/jira/browse/SPARK-9850
> Project: Spark
>  Issue Type: Epic
>  Components: Spark Core, SQL
>Reporter: Matei Zaharia
>Assignee: Yin Huai
> Attachments: AdaptiveExecutionInSpark.pdf
>
>
> Query planning is one of the main factors in high performance, but the 
> current Spark engine requires the execution DAG for a job to be set in 
> advance. Even with cost­-based optimization, it is hard to know the behavior 
> of data and user-defined functions well enough to always get great execution 
> plans. This JIRA proposes to add adaptive query execution, so that the engine 
> can change the plan for each query as it sees what data earlier stages 
> produced.
> We propose adding this to Spark SQL / DataFrames first, using a new API in 
> the Spark engine that lets libraries run DAGs adaptively. In future JIRAs, 
> the functionality could be extended to other libraries or the RDD API, but 
> that is more difficult than adding it in SQL.
> I've attached a design doc by Yin Huai and myself explaining how it would 
> work in more detail.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9850) Adaptive execution in Spark

2015-09-24 Thread Matei Zaharia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14907518#comment-14907518
 ] 

Matei Zaharia commented on SPARK-9850:
--

Hey Imran, this could make sense, but note that the problem will only happen if 
you have 2000 map *output* partitions, which would've been 2000 reduce tasks 
normally. Otherwise, you can have as many map *tasks* as needed with fewer 
partitions. In most jobs, I'd expect data to get significantly smaller after 
the maps, so we'd catch that. In particular, for choosing between broadcast and 
shuffle joins this should be fine. We can do something different if we suspect 
that there is going to be tons of map output *and* we think there's nontrivial 
planning to be done once we see it.

> Adaptive execution in Spark
> ---
>
> Key: SPARK-9850
> URL: https://issues.apache.org/jira/browse/SPARK-9850
> Project: Spark
>  Issue Type: Epic
>  Components: Spark Core, SQL
>Reporter: Matei Zaharia
>Assignee: Yin Huai
> Attachments: AdaptiveExecutionInSpark.pdf
>
>
> Query planning is one of the main factors in high performance, but the 
> current Spark engine requires the execution DAG for a job to be set in 
> advance. Even with cost­-based optimization, it is hard to know the behavior 
> of data and user-defined functions well enough to always get great execution 
> plans. This JIRA proposes to add adaptive query execution, so that the engine 
> can change the plan for each query as it sees what data earlier stages 
> produced.
> We propose adding this to Spark SQL / DataFrames first, using a new API in 
> the Spark engine that lets libraries run DAGs adaptively. In future JIRAs, 
> the functionality could be extended to other libraries or the RDD API, but 
> that is more difficult than adding it in SQL.
> I've attached a design doc by Yin Huai and myself explaining how it would 
> work in more detail.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9850) Adaptive execution in Spark

2015-09-18 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14876347#comment-14876347
 ] 

Imran Rashid commented on SPARK-9850:
-

just to continue brainstorming on what to do with large data -- I realize that 
my earlier suggestion about sending uncompressed (or at least not 
{{HighlyCompressed}}) map status back to the driver may not work, since part of 
the point is to avoid OOM on the driver, not to just reduce communication from 
the driver back to the executors.

But here are two other ideas:
1. Create a variant of {{HighlyCompressedMapOutput}} which stores all block 
sizes that are more than some factor above the median, lets say 5x?  This would 
let you deal w/ really extreme skew without increasing the size too much.
2. Since you only need the summed size of the map output per reduce partition, 
you could first perform a tree-reduce of those sizes on the executors before 
sending back to the driver.  avoids trying to guess some arbitrary cutoff 
factor, but also way more complicated.

> Adaptive execution in Spark
> ---
>
> Key: SPARK-9850
> URL: https://issues.apache.org/jira/browse/SPARK-9850
> Project: Spark
>  Issue Type: Epic
>  Components: Spark Core, SQL
>Reporter: Matei Zaharia
>Assignee: Yin Huai
> Attachments: AdaptiveExecutionInSpark.pdf
>
>
> Query planning is one of the main factors in high performance, but the 
> current Spark engine requires the execution DAG for a job to be set in 
> advance. Even with cost­-based optimization, it is hard to know the behavior 
> of data and user-defined functions well enough to always get great execution 
> plans. This JIRA proposes to add adaptive query execution, so that the engine 
> can change the plan for each query as it sees what data earlier stages 
> produced.
> We propose adding this to Spark SQL / DataFrames first, using a new API in 
> the Spark engine that lets libraries run DAGs adaptively. In future JIRAs, 
> the functionality could be extended to other libraries or the RDD API, but 
> that is more difficult than adding it in SQL.
> I've attached a design doc by Yin Huai and myself explaining how it would 
> work in more detail.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9850) Adaptive execution in Spark

2015-08-24 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14709624#comment-14709624
 ] 

Imran Rashid commented on SPARK-9850:
-

I know the 1000 partitions used in the design doc is just for an example, but I 
just wanted to point out that the number probably needs to be much larger.  
With a 2GB limit per partition, that is already 2 TB max.  My rule of thumb has 
been to keep partitions around 100 MB, which is roughly inline with the 64 MB 
you mention in the doc, which brings you down to 100 GB.  And given that you 
want to deal w/ skewed data etc., you probably actually want to leave quite a 
bit of room, which limits you to relatively small datasets.

The key point is that after you go over 2000 partitions, you are going into 
{{HighlyCompressedMapOutput}} territory, which will be relatively useless for 
this.  Perhaps each shuffle map task can always send an uncompressed map status 
back to the driver?  Maybe you could only use the {{HighlyCompressedMapStatus}} 
on the shuffle-read side?? I'm not sure about the performance implications, 
just throwing out an idea.

> Adaptive execution in Spark
> ---
>
> Key: SPARK-9850
> URL: https://issues.apache.org/jira/browse/SPARK-9850
> Project: Spark
>  Issue Type: Epic
>  Components: Spark Core, SQL
>Reporter: Matei Zaharia
>Assignee: Yin Huai
> Attachments: AdaptiveExecutionInSpark.pdf
>
>
> Query planning is one of the main factors in high performance, but the 
> current Spark engine requires the execution DAG for a job to be set in 
> advance. Even with cost­-based optimization, it is hard to know the behavior 
> of data and user-defined functions well enough to always get great execution 
> plans. This JIRA proposes to add adaptive query execution, so that the engine 
> can change the plan for each query as it sees what data earlier stages 
> produced.
> We propose adding this to Spark SQL / DataFrames first, using a new API in 
> the Spark engine that lets libraries run DAGs adaptively. In future JIRAs, 
> the functionality could be extended to other libraries or the RDD API, but 
> that is more difficult than adding it in SQL.
> I've attached a design doc by Yin Huai and myself explaining how it would 
> work in more detail.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org