Hi Jestin,

You already have the skewed column in the join condition correct?

This is basically what you are doing assuming rs is your result set below

val rs = df1.join(df2,df1("id")===df2("id"), "fullouter")

What is the percentage of df1.id = 0?

Can you register both tables as temporary and use UNION ALL to join both
parts (df1.id !=0 UNION ALL df1.id ===0)?


HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 14 August 2016 at 15:14, Jestin Ma <jestinwith.a...@gmail.com> wrote:

> Hi Mich, do you mean using the skewed column as a join condition? I tried
> repartition(skewed column, unique column) but had no success, possibly
> because the join was still hash-partitioning on just the skewed column
> after I called repartition.
>
> On Sun, Aug 14, 2016 at 1:49 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Can you make the join more selective by using the skewed column ID  +
>> another column that has valid unique vales( Repartitioning according to
>> column I know contains unique values)?
>>
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 14 August 2016 at 07:17, Jestin Ma <jestinwith.a...@gmail.com> wrote:
>>
>>> Attached are screenshots mentioned, apologies for that.
>>>
>>> On Sat, Aug 13, 2016 at 11:15 PM, Jestin Ma <jestinwith.a...@gmail.com>
>>> wrote:
>>>
>>>> Hi, I'm currently trying to perform an outer join between two
>>>> DataFrames/Sets, one is ~150GB, one is about ~50 GB on a column, id.
>>>>
>>>> df1.id is skewed in that there are many 0's, the rest being unique IDs.
>>>>
>>>> df2.id is not skewed. If I filter df1.id != 0, then the join works
>>>> well. If I don't, then the join does not complete for a very, very long
>>>> time.
>>>>
>>>> I have diagnosed this problem due to the hashpartitioning on IDs,
>>>> resulting in one partition containing many values due to data skew. One
>>>> executor ends up reading most of the shuffle data, and writing all of the
>>>> shuffle data, as shown below.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Shown above is the task in question assigned to one executor.
>>>>
>>>>
>>>>
>>>> This screenshot comes from one of the executors, showing one single
>>>> thread spilling sort data since the executor cannot hold 90%+ of the ~200
>>>> GB result in memory.
>>>>
>>>> Moreover, looking at the event timeline, I find that the executor on
>>>> that task spends about 20% time reading shuffle data, 70% computation, and
>>>> 10% writing output data.
>>>>
>>>> I have tried the following:
>>>>
>>>>
>>>>    - "Salting" the 0-value keys by monotonically_increasing_id().mod(N)
>>>>    - - This doesn't seem to have an effect since now I have
>>>>    hundreds/thousands of keys with tens of thousands of occurrences.
>>>>    - - Should I increase N? Is there a way to just do random.mod(N)
>>>>    instead of monotonically_increasing_id()?
>>>>    -
>>>>    - Repartitioning according to column I know contains unique values
>>>>    -
>>>>    - - This is overridden by Spark's sort-based shuffle manager which
>>>>    hash repartitions on the skewed column
>>>>    -
>>>>    - - Is it possible to change this? Or will the join column need to
>>>>    be hashed and partitioned on for joins to work
>>>>    -
>>>>    - Broadcasting does not work for my large tables
>>>>    -
>>>>    - Increasing/decreasing spark.sql.shuffle.partitions does not
>>>>    remedy the skewed data problem as 0-product values are still being 
>>>> hashed
>>>>    to the same partition.
>>>>
>>>>
>>>> ----------------------------------
>>>>
>>>> What I am considering currently is doing the join at the RDD level, but
>>>> is there any level of control which can solve my skewed data problem? Other
>>>> than that, see the bolded question.
>>>>
>>>> I would appreciate any suggestions/tips/experience with this. Thank you!
>>>>
>>>>
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>
>>
>
  • [no subject] Jestin Ma
    • Re: Mich Talebzadeh
      • Re: Jestin Ma
        • Re: Mich Talebzadeh
    • Re: Michael Armbrust
      • Re: Jacek Laskowski
        • Re: Michael Armbrust
          • Re: Jestin Ma

Reply via email to