Hi Mich,

your notes are really great, it really brought back the old days again :)
thanks.

Just to note a few points that I found useful related to this question:
1. cores and threads - page 5
2. executor cores and number settings - page 6..


I think that the following example may be of use, note that I have one
driver and that has 8 cores as I am running PYSPARK 3.1.2 in local mode,
but this will give a way to find out a bit more possibly:

========================================================================
>>> from pyspark.sql.types import *
>>> #create the filter dataframe, there are easier ways to do the below
>>> spark.createDataFrame(list(map(lambda filter: pyspark.sql.Row(filter),
[0, 1, 2, 4, 7, 9])), StructType([StructField("filter_value",
IntegerType())])).createOrReplaceTempView("filters")
>>> #create the main table
>>> spark.range(10000000000).createOrReplaceTempView("test_base")
>>> spark.sql("SELECT id, FLOOR(RAND() * 10) rand FROM
test_base").createOrReplaceTempView("test")
>>> #see the partitions in the filters and the main table
>>> spark.sql("SELECT * FROM filters").rdd.getNumPartitions()
8
>>> spark.sql("SELECT * FROM test").rdd.getNumPartitions()
8
>>> #see the number of partitions in the filtered join output, I am
assuming implicit casting here
>>> spark.sql("SELECT * FROM test WHERE rand IN (SELECT filter_value FROM
filters)").rdd.getNumPartitions()
200
>>> spark.sql("SET spark.sql.shuffle.partitions=10")
DataFrame[key: string, value: string]
>>> spark.sql("SELECT * FROM test WHERE rand IN (SELECT filter_value FROM
filters)").rdd.getNumPartitions()
10
========================================================================

Please do refer to the following page for adaptive sql execution in SPARK
3, it will be of massive help particularly in case you are handling skewed
joins, https://spark.apache.org/docs/latest/sql-performance-tuning.html


Thanks and Regards,
Gourav Sengupta

On Sun, Jan 2, 2022 at 11:24 AM Bitfox <bit...@bitfox.top> wrote:

> Thanks Mich. That looks good.
>
> On Sun, Jan 2, 2022 at 7:10 PM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> LOL.
>>
>> You asking these questions takes me back to summer 2016 when I started
>> writing notes on spark. Obviously earlier versions but the notion of RDD,
>> Local, standalone, YARN etc. are still valid. Those days there were no k8s
>> and the public cloud was not widely adopted.  I browsed it and it was
>> refreshing for me. Anyway you may find some points addressing your
>> questions that you tend to ask.
>>
>> HTH
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sun, 2 Jan 2022 at 00:20, Bitfox <bit...@bitfox.top> wrote:
>>
>>> One more question, for this big filter, given my server has 4 Cores,
>>> will spark (standalone mode) split the RDD to 4 partitions automatically?
>>>
>>> Thanks
>>>
>>> On Sun, Jan 2, 2022 at 6:30 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Create a list of values that you don't want anf filter oon those
>>>>
>>>> >>> DF = spark.range(10)
>>>> >>> DF
>>>> DataFrame[id: bigint]
>>>> >>>
>>>> >>> array = [1, 2, 3, 8]  # don't want these
>>>> >>> DF.filter(DF.id.isin(array) == False).show()
>>>> +---+
>>>> | id|
>>>> +---+
>>>> |  0|
>>>> |  4|
>>>> |  5|
>>>> |  6|
>>>> |  7|
>>>> |  9|
>>>> +---+
>>>>
>>>>  or use binary NOT operator:
>>>>
>>>>
>>>> >>> DF.filter(*~*DF.id.isin(array)).show()
>>>>
>>>> +---+
>>>>
>>>> | id|
>>>>
>>>> +---+
>>>>
>>>> |  0|
>>>>
>>>> |  4|
>>>>
>>>> |  5|
>>>>
>>>> |  6|
>>>>
>>>> |  7|
>>>>
>>>> |  9|
>>>>
>>>> +---+
>>>>
>>>>
>>>> HTH
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, 1 Jan 2022 at 20:59, Bitfox <bit...@bitfox.top> wrote:
>>>>
>>>>> Using the dataframe API I need to implement a batch filter:
>>>>>
>>>>> DF. select(..).where(col(..) != ‘a’ and col(..) != ‘b’ and …)
>>>>>
>>>>> There are a lot of keywords should be filtered for the same column in
>>>>> where statement.
>>>>>
>>>>> How can I make it more smater? UDF or others?
>>>>>
>>>>> Thanks & Happy new Year!
>>>>> Bitfox
>>>>>
>>>>

Reply via email to