Re: Spark 3.2 - ReusedExchange not present in join execution plan

2022-01-06 Thread Abdeali Kothari
Thanks a lot for the reply Albert.

On looking at it and reading about it further - I do see that
"AdaptiveSparkPlan isFinalPlan=false" is mentioned.

Could you point me to how I can see the final plan ? I couldn't find that
in any of the resources I was referring to

On Fri, 7 Jan 2022, 07:25 Albert,  wrote:

> I happen to encounter something similar.
>
> it's probably because you are just `explain` it. when you actually `run`
> it. you will get the final spark plan in which case the exchange will be
> reused.
> right, this is different compared with 3.1 probably because the upgraded
> aqe.
>
> not sure whether this is expected though.
>
> On Thu, Jan 6, 2022 at 12:11 AM Abdeali Kothari 
> wrote:
>
>> Just thought I'd do a quick bump and add the dev mailing list - in case
>> there is some insight there
>> Feels like this should be categorized as a bug for spark 3.2.0
>>
>> On Wed, Dec 29, 2021 at 5:25 PM Abdeali Kothari 
>> wrote:
>>
>>> Hi,
>>> I am using pyspark for some projects. And one of the things we are doing
>>> is trying to find the tables/columns being used by Spark using the
>>> execution plan.
>>>
>>> When we upgrade to spark 3.2 - the spark plan seems to be different from
>>> previous versions - mainly when we are doing joins.
>>> Below is a reproducible example (you could run the same in versions 2.3
>>> to 3.1 to see the difference)
>>>
>>> My original data frames have the columns: id#0 and id#4
>>> But after doing the joins we are seeing new columns id#34 and id#19
>>> which are not created from the original dataframes I was working with.
>>> In previous versions of spark, this used to use a ReusedExchange step
>>> (shown below)
>>>
>>> I was trying to understand if this is expected in spark 3.2 where the
>>> execution plan seems to be creating a new data source which does not
>>> originate from df1 and df2 which I provided.
>>> NOTE: The same happens even if I read from parquet files
>>>
>>> In spark 3.2:
>>> In [1]: import pyspark
>>>...: spark = pyspark.sql.SparkSession.builder.getOrCreate()
>>>
>>> In [2]: df1 = spark.createDataFrame([[1, 10], [2, 20]], ['id', 'col1'])
>>>...: df2 = spark.createDataFrame([[1, 11], [2, 22], [2, 222]], ['id',
>>> 'col2'])
>>>...: df1.explain()
>>>...: df2.explain()
>>> == Physical Plan ==
>>> *(1) Scan ExistingRDD[id#0L,col1#1L]
>>>
>>> == Physical Plan ==
>>> *(1) Scan ExistingRDD[id#4L,col2#5L]
>>>
>>> In [3]: df3 = df1.join(df2, df1['id'] == df2['id']).drop(df2['id'])
>>>...: df4 = df2.join(df3, df1['id'] == df2['id'])
>>>...: df4.explain()
>>> == Physical Plan ==
>>> AdaptiveSparkPlan isFinalPlan=false
>>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>>:- Sort [id#4L ASC NULLS FIRST], false, 0
>>>:  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>>> [id=#53]
>>>: +- Filter isnotnull(id#4L)
>>>:+- Scan ExistingRDD[id#4L,col2#5L]
>>>+- Project [id#0L, col1#1L, col2#20L]
>>>   +- SortMergeJoin [id#0L], [id#19L], Inner
>>>  :- Sort [id#0L ASC NULLS FIRST], false, 0
>>>  :  +- Exchange hashpartitioning(id#0L, 200),
>>> ENSURE_REQUIREMENTS, [id=#45]
>>>  : +- Filter isnotnull(id#0L)
>>>  :+- Scan ExistingRDD[id#0L,col1#1L]
>>>
>>>
>>>
>>> * +- Sort [id#19L ASC NULLS FIRST], false, 0+-
>>> Exchange hashpartitioning(id#19L, 200), ENSURE_REQUIREMENTS, [id=#46]
>>>  +- Filter isnotnull(id#19L)  +- Scan
>>> ExistingRDD[id#19L,col2#20L]*
>>>
>>> In [4]: df1.createOrReplaceTempView('df1')
>>>...: df2.createOrReplaceTempView('df2')
>>>...: df3 = spark.sql("""
>>>...: SELECT df1.id, df1.col1, df2.col2
>>>...: FROM df1 JOIN df2 ON df1.id = df2.id
>>>...: """)
>>>...: df3.createOrReplaceTempView('df3')
>>>...: df4 = spark.sql("""
>>>...: SELECT df2.*, df3.*
>>>...: FROM df2 JOIN df3 ON df2.id = df3.id
>>>...: """)
>>>...: df4.explain()
>>> == Physical Plan ==
>>> AdaptiveSparkPlan isFinalPlan=false
>>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>>:- Sort [id#4L ASC NULLS FIRST], false, 0
>>>:  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>>> [id=#110]
>>>: +- Filter isnotnull(id#4L)
>>>:+- Scan ExistingRDD[id#4L,col2#5L]
>>>+- Project [id#0L, col1#1L, col2#35L]
>>>   +- SortMergeJoin [id#0L], [id#34L], Inner
>>>  :- Sort [id#0L ASC NULLS FIRST], false, 0
>>>  :  +- Exchange hashpartitioning(id#0L, 200),
>>> ENSURE_REQUIREMENTS, [id=#102]
>>>  : +- Filter isnotnull(id#0L)
>>>  :+- Scan ExistingRDD[id#0L,col1#1L]
>>>
>>>
>>>
>>> * +- Sort [id#34L ASC NULLS FIRST], false, 0+-
>>> Exchange hashpartitioning(id#34L, 200), ENSURE_REQUIREMENTS, [id=#103]
>>>  +- Filter isnotnull(id#34L)  +- Scan
>>> ExistingRDD[id#34L,col2#35L]*
>>>
>>>
>>> Doing this in spark 3.1.1 - the plan is:
>>>
>>> *(8) SortMergeJoin [id#4L], 

Re: Spark 3.2 - ReusedExchange not present in join execution plan

2022-01-06 Thread Albert
I happen to encounter something similar.

it's probably because you are just `explain` it. when you actually `run`
it. you will get the final spark plan in which case the exchange will be
reused.
right, this is different compared with 3.1 probably because the upgraded
aqe.

not sure whether this is expected though.

On Thu, Jan 6, 2022 at 12:11 AM Abdeali Kothari 
wrote:

> Just thought I'd do a quick bump and add the dev mailing list - in case
> there is some insight there
> Feels like this should be categorized as a bug for spark 3.2.0
>
> On Wed, Dec 29, 2021 at 5:25 PM Abdeali Kothari 
> wrote:
>
>> Hi,
>> I am using pyspark for some projects. And one of the things we are doing
>> is trying to find the tables/columns being used by Spark using the
>> execution plan.
>>
>> When we upgrade to spark 3.2 - the spark plan seems to be different from
>> previous versions - mainly when we are doing joins.
>> Below is a reproducible example (you could run the same in versions 2.3
>> to 3.1 to see the difference)
>>
>> My original data frames have the columns: id#0 and id#4
>> But after doing the joins we are seeing new columns id#34 and id#19 which
>> are not created from the original dataframes I was working with.
>> In previous versions of spark, this used to use a ReusedExchange step
>> (shown below)
>>
>> I was trying to understand if this is expected in spark 3.2 where the
>> execution plan seems to be creating a new data source which does not
>> originate from df1 and df2 which I provided.
>> NOTE: The same happens even if I read from parquet files
>>
>> In spark 3.2:
>> In [1]: import pyspark
>>...: spark = pyspark.sql.SparkSession.builder.getOrCreate()
>>
>> In [2]: df1 = spark.createDataFrame([[1, 10], [2, 20]], ['id', 'col1'])
>>...: df2 = spark.createDataFrame([[1, 11], [2, 22], [2, 222]], ['id',
>> 'col2'])
>>...: df1.explain()
>>...: df2.explain()
>> == Physical Plan ==
>> *(1) Scan ExistingRDD[id#0L,col1#1L]
>>
>> == Physical Plan ==
>> *(1) Scan ExistingRDD[id#4L,col2#5L]
>>
>> In [3]: df3 = df1.join(df2, df1['id'] == df2['id']).drop(df2['id'])
>>...: df4 = df2.join(df3, df1['id'] == df2['id'])
>>...: df4.explain()
>> == Physical Plan ==
>> AdaptiveSparkPlan isFinalPlan=false
>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>:- Sort [id#4L ASC NULLS FIRST], false, 0
>>:  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>> [id=#53]
>>: +- Filter isnotnull(id#4L)
>>:+- Scan ExistingRDD[id#4L,col2#5L]
>>+- Project [id#0L, col1#1L, col2#20L]
>>   +- SortMergeJoin [id#0L], [id#19L], Inner
>>  :- Sort [id#0L ASC NULLS FIRST], false, 0
>>  :  +- Exchange hashpartitioning(id#0L, 200),
>> ENSURE_REQUIREMENTS, [id=#45]
>>  : +- Filter isnotnull(id#0L)
>>  :+- Scan ExistingRDD[id#0L,col1#1L]
>>
>>
>>
>> * +- Sort [id#19L ASC NULLS FIRST], false, 0+-
>> Exchange hashpartitioning(id#19L, 200), ENSURE_REQUIREMENTS, [id=#46]
>>  +- Filter isnotnull(id#19L)  +- Scan
>> ExistingRDD[id#19L,col2#20L]*
>>
>> In [4]: df1.createOrReplaceTempView('df1')
>>...: df2.createOrReplaceTempView('df2')
>>...: df3 = spark.sql("""
>>...: SELECT df1.id, df1.col1, df2.col2
>>...: FROM df1 JOIN df2 ON df1.id = df2.id
>>...: """)
>>...: df3.createOrReplaceTempView('df3')
>>...: df4 = spark.sql("""
>>...: SELECT df2.*, df3.*
>>...: FROM df2 JOIN df3 ON df2.id = df3.id
>>...: """)
>>...: df4.explain()
>> == Physical Plan ==
>> AdaptiveSparkPlan isFinalPlan=false
>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>:- Sort [id#4L ASC NULLS FIRST], false, 0
>>:  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>> [id=#110]
>>: +- Filter isnotnull(id#4L)
>>:+- Scan ExistingRDD[id#4L,col2#5L]
>>+- Project [id#0L, col1#1L, col2#35L]
>>   +- SortMergeJoin [id#0L], [id#34L], Inner
>>  :- Sort [id#0L ASC NULLS FIRST], false, 0
>>  :  +- Exchange hashpartitioning(id#0L, 200),
>> ENSURE_REQUIREMENTS, [id=#102]
>>  : +- Filter isnotnull(id#0L)
>>  :+- Scan ExistingRDD[id#0L,col1#1L]
>>
>>
>>
>> * +- Sort [id#34L ASC NULLS FIRST], false, 0+-
>> Exchange hashpartitioning(id#34L, 200), ENSURE_REQUIREMENTS, [id=#103]
>>  +- Filter isnotnull(id#34L)  +- Scan
>> ExistingRDD[id#34L,col2#35L]*
>>
>>
>> Doing this in spark 3.1.1 - the plan is:
>>
>> *(8) SortMergeJoin [id#4L], [id#0L], Inner
>> :- *(2) Sort [id#4L ASC NULLS FIRST], false, 0
>> :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS, [id=#56]
>> : +- *(1) Filter isnotnull(id#4L)
>> :+- *(1) Scan ExistingRDD[id#4L,col2#5L]
>> +- *(7) Project [id#0L, col1#1L, col2#20L]
>>+- *(7) SortMergeJoin [id#0L], [id#19L], Inner
>>   :- *(4) Sort [id#0L ASC NULLS FIRST], false, 0
>>   :  +- Exchange hashpartitioning(id#0L, 

Re: [VOTE][SPIP] Support Customized Kubernetes Schedulers Proposal

2022-01-06 Thread John Zhuge
+1 (non-binding)

On Thu, Jan 6, 2022 at 8:39 AM Chenya Zhang 
wrote:

> +1 (non-binding)
>
> On Thu, Jan 6, 2022 at 1:30 AM Mich Talebzadeh 
> wrote:
>
>> +1 (non-binding)
>>
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 6 Jan 2022 at 07:03, bo yang  wrote:
>>
>>> +1 (non-binding)
>>>
>>> On Wed, Jan 5, 2022 at 11:01 PM Holden Karau 
>>> wrote:
>>>
 +1 (binding)

 On Wed, Jan 5, 2022 at 5:31 PM William Wang 
 wrote:

> +1 (non-binding)
>
> Yikun Jiang  于2022年1月6日周四 09:07写道:
>
>> Hi all,
>>
>> I’d like to start a vote for SPIP: "Support Customized Kubernetes
>> Schedulers Proposal"
>>
>> The SPIP is to support customized Kubernetes schedulers in Spark on
>> Kubernetes.
>>
>> Please also refer to:
>>
>> - Previous discussion in dev mailing list: [DISCUSSION] SPIP:
>> Support Volcano/Alternative Schedulers Proposal
>> 
>> - Design doc: [SPIP] Spark-36057 Support Customized Kubernetes
>> Schedulers Proposal
>> 
>> - JIRA: SPARK-36057
>> 
>>
>> Please vote on the SPIP:
>>
>> [ ] +1: Accept the proposal as an official SPIP
>> [ ] +0
>> [ ] -1: I don’t think this is a good idea because …
>>
>> Regards,
>> Yikun
>>
>>
>>
>
> --
 Twitter: https://twitter.com/holdenkarau
 Books (Learning Spark, High Performance Spark, etc.):
 https://amzn.to/2MaRAG9  
 YouTube Live Streams: https://www.youtube.com/user/holdenkarau


 --
John Zhuge


Re: [VOTE][SPIP] Support Customized Kubernetes Schedulers Proposal

2022-01-06 Thread Chenya Zhang
+1 (non-binding)

On Thu, Jan 6, 2022 at 1:30 AM Mich Talebzadeh 
wrote:

> +1 (non-binding)
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 6 Jan 2022 at 07:03, bo yang  wrote:
>
>> +1 (non-binding)
>>
>> On Wed, Jan 5, 2022 at 11:01 PM Holden Karau 
>> wrote:
>>
>>> +1 (binding)
>>>
>>> On Wed, Jan 5, 2022 at 5:31 PM William Wang 
>>> wrote:
>>>
 +1 (non-binding)

 Yikun Jiang  于2022年1月6日周四 09:07写道:

> Hi all,
>
> I’d like to start a vote for SPIP: "Support Customized Kubernetes
> Schedulers Proposal"
>
> The SPIP is to support customized Kubernetes schedulers in Spark on
> Kubernetes.
>
> Please also refer to:
>
> - Previous discussion in dev mailing list: [DISCUSSION] SPIP: Support
> Volcano/Alternative Schedulers Proposal
> 
> - Design doc: [SPIP] Spark-36057 Support Customized Kubernetes
> Schedulers Proposal
> 
> - JIRA: SPARK-36057
> 
>
> Please vote on the SPIP:
>
> [ ] +1: Accept the proposal as an official SPIP
> [ ] +0
> [ ] -1: I don’t think this is a good idea because …
>
> Regards,
> Yikun
>
>
>

 --
>>> Twitter: https://twitter.com/holdenkarau
>>> Books (Learning Spark, High Performance Spark, etc.):
>>> https://amzn.to/2MaRAG9  
>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>
>>>
>>>


Re: [DISCUSSION] SPIP: Support Volcano/Alternative Schedulers Proposal

2022-01-06 Thread Mich Talebzadeh
Further to this, I think we ought to broaden *Background and Motivation
section *of the proposed doc

to give more impetus to reasons behind this project.

I quote

" Kubernetes is highly extendable, it natively supports running other
schedulers either as a secondary scheduler, or even replace the default
scheduler [4]
.
There are some open source K8s schedulers available today, such as CNCF
Volcano, Apache YuniKorn. Adding the ability to support these schedulers
natively will help to run Spark on K8s more easily and efficiently"


Changed to the following


"... Kubernetes is highly extendable, it natively supports running other
schedulers either as a secondary scheduler, or even replace the default
scheduler [4]
.
There are some open source K8s schedulers available today, such as CNCF
Volcano and Apache YuniKorn. The current limitation seems to be the
scalability of Spark on Kubernetes to go beyond one executor per node of
Kubernetes. This impacts performance as well. By adopting Volcano plus
Ynikorn, we envisage:



   - Scheduler throughput: 1.5k pod/s (default scheduler: 100 Pod/s)
   - Spark application performance gain by 30% plus minimal resource
   reservation feature in case of insufficient resource.(tested with TPC-DS)
   - Full lifecycle management for jobs
   - Scheduling policies for high-performance workloads such as fair-share,
   topology, SLA, reservation, pre-emption, backfill and others
   - Support for heterogeneous hardware
   - Performance optimization for high-performance workloads

"
If we agree on the above points mentioned by @William Wang, I propose that
they should be incorporated into the doc

HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 5 Jan 2022 at 19:12, Mich Talebzadeh 
wrote:

> Hi Bo,
>
> Thanks for the info. Let me elaborate:
>
> In theory you can set the number of executors to multiple values of Nodes.
> For example if you have a three node k8s cluster (in my case Google GKE),
> you can set the number of executors to 6 and end up with six executors
> queuing to start but ultimately you finish with two running executors plus
> the driver in a 3 node cluster as shown below
>
> hduser@ctpvm: /home/hduser> k get pods -n spark
>
> NAME READY   STATUSRESTARTS
>  AGE
>
> *randomdatabigquery-d42d067e2b91c88a-exec-1   1/1 Running   0
> 33s*
>
> *randomdatabigquery-d42d067e2b91c88a-exec-2   1/1 Running   0
> 33s*
>
> randomdatabigquery-d42d067e2b91c88a-exec-3   0/1 Pending   0
> 33s
>
> randomdatabigquery-d42d067e2b91c88a-exec-4   0/1 Pending   0
> 33s
>
> randomdatabigquery-d42d067e2b91c88a-exec-5   0/1 Pending   0
> 33s
>
> randomdatabigquery-d42d067e2b91c88a-exec-6   0/1 Pending   0
> 33s
>
> *sparkbq-0beda77e2b919e01-driver  1/1 Running   0
> 45s*
>
> hduser@ctpvm: /home/hduser> k get pods -n spark
>
> NAME READY   STATUSRESTARTS
>  AGE
>
> randomdatabigquery-d42d067e2b91c88a-exec-1   1/1 Running   0
> 38s
>
> randomdatabigquery-d42d067e2b91c88a-exec-2   1/1 Running   0
> 38s
>
> sparkbq-0beda77e2b919e01-driver  1/1 Running   0
> 50s
>
> hduser@ctpvm: /home/hduser> k get pods -n spark
>
> *NAME READY   STATUSRESTARTS
>  AGE*
>
> *randomdatabigquery-d42d067e2b91c88a-exec-1   1/1 Running   0
> 40s*
>
> *randomdatabigquery-d42d067e2b91c88a-exec-2   1/1 Running   0
> 40s*
>
> *sparkbq-0beda77e2b919e01-driver  1/1 Running   0
> 52s*
>
> So you end up with the three added executors dropping out. Hence the
> conclusion seems to be you want to fit exactly one Spark executor pod per
> Kubernetes node with the current model.
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 5 Jan 2022 at 17:01, bo yang  wrote:
>
>> Hi Mich,
>>
>> Curious what do you mean “The constraint seems to be that you can 

Re: [VOTE][SPIP] Support Customized Kubernetes Schedulers Proposal

2022-01-06 Thread Mich Talebzadeh
+1 (non-binding)



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 6 Jan 2022 at 07:03, bo yang  wrote:

> +1 (non-binding)
>
> On Wed, Jan 5, 2022 at 11:01 PM Holden Karau  wrote:
>
>> +1 (binding)
>>
>> On Wed, Jan 5, 2022 at 5:31 PM William Wang 
>> wrote:
>>
>>> +1 (non-binding)
>>>
>>> Yikun Jiang  于2022年1月6日周四 09:07写道:
>>>
 Hi all,

 I’d like to start a vote for SPIP: "Support Customized Kubernetes
 Schedulers Proposal"

 The SPIP is to support customized Kubernetes schedulers in Spark on
 Kubernetes.

 Please also refer to:

 - Previous discussion in dev mailing list: [DISCUSSION] SPIP: Support
 Volcano/Alternative Schedulers Proposal
 
 - Design doc: [SPIP] Spark-36057 Support Customized Kubernetes
 Schedulers Proposal
 
 - JIRA: SPARK-36057 

 Please vote on the SPIP:

 [ ] +1: Accept the proposal as an official SPIP
 [ ] +0
 [ ] -1: I don’t think this is a good idea because …

 Regards,
 Yikun



>>>
>>> --
>> Twitter: https://twitter.com/holdenkarau
>> Books (Learning Spark, High Performance Spark, etc.):
>> https://amzn.to/2MaRAG9  
>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>
>>
>>


Re: About contribution

2022-01-06 Thread Dennis Jung
Oh, yes. Also check on that.
I just want to know if there's a bit more detail about contribution,
because not just for contribution, but also want to know more deeply of
spark project.

- To review the base code, what is a good point to start?
- Or recommending a blog post or document will be great.
- Roadmap of project.

Thanks.



2022년 1월 6일 (목) 오전 12:24, Sean Owen 님이 작성:

> (There is no project chat)
> See https://spark.apache.org/contributing.html
>
> On Tue, Jan 4, 2022 at 11:42 PM Dennis Jung  wrote:
>
>> Hello, I hope this is not a silly question.
>> (I couldn't find any chat room on spark project, so asking on mail)
>>
>> It has been about a year since using spark in work, and try to make a
>> contribution to this project.
>>
>> I'm currently looking at documents in more detail, and checking the issue
>> in JIRA now. Is there some suggestion of reviewing the code?
>>
>> - Which code part will be good to start?
>> - What will be more helpful for the project?
>>
>> Thanks.
>>
>