Ahh yes, thanks for mentioning data skew, I've run into that before as
well. The best way there is to get statistics on the distribution of your
join key. If there are a few values with drastically larger number of
values, then a reducer task will always be swamped no matter how many
reducer side partitions you use.

If this is the problem, then one solution I have used is to do a skew join
manually. Something like:

SELECT * FROM (SELECT * from table WHERE joinkey <> 'commonval') t1 JOIN t2
ON t1.joinkey == t2.joinkey
UNION ALL
SELECT * FROM (SELECT * from table WHERE joinkey = 'commonval') t1 JOIN t2
ON t1.joinkey == t2.joinkey


On Fri, Aug 28, 2015 at 1:56 PM Thomas Dudziak <tom...@gmail.com> wrote:

> Yeah, I tried with 10k and 30k and these still failed, will try with more
> then. Though that is a little disappointing, it only writes ~7TB of shuffle
> data which shouldn't in theory require more than 1000 reducers on my 10TB
> memory cluster (~7GB of spill per reducer).
> I'm now wondering if my shuffle partitions are uneven and I should use a
> custom partitioner, is there a way to get stats on the partition sizes from
> Spark ?
>
> On Fri, Aug 28, 2015 at 12:46 PM, Jason <ja...@jasonknight.us> wrote:
>
>> I had similar problems to this (reduce side failures for large joins
>> (25bn rows with 9bn)), and found the answer was to further up the
>> spark.sql.shuffle.partitions=1000. In my case, 16k partitions worked for
>> me, but your tables look a little denser, so you may want to go even higher.
>>
>> On Thu, Aug 27, 2015 at 6:04 PM Thomas Dudziak <tom...@gmail.com> wrote:
>>
>>> I'm getting errors like "Removing executor with no recent heartbeats" &
>>> "Missing an output location for shuffle" errors for a large SparkSql join
>>> (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
>>> configure the job to avoid them.
>>>
>>> The initial stage completes fine with some 30k tasks on a cluster with
>>> 70 machines/10TB memory, generating about 6.5TB of shuffle writes, but then
>>> the shuffle stage first waits 30min in the scheduling phase according to
>>> the UI, and then dies with the mentioned errors.
>>>
>>> I can see in the GC logs that the executors reach their memory limits
>>> (32g per executor, 2 workers per machine) and can't allocate any more stuff
>>> in the heap. Fwiw, the top 10 in the memory use histogram are:
>>>
>>> num     #instances         #bytes  class name
>>> ----------------------------------------------
>>>    1:     249139595    11958700560
>>>  scala.collection.immutable.HashMap$HashMap1
>>>    2:     251085327     8034730464  scala.Tuple2
>>>    3:     243694737     5848673688  java.lang.Float
>>>    4:     231198778     5548770672  java.lang.Integer
>>>    5:      72191585     4298521576  [Lscala.collection.immutable.HashMap;
>>>    6:      72191582     2310130624
>>>  scala.collection.immutable.HashMap$HashTrieMap
>>>    7:      74114058     1778737392  java.lang.Long
>>>    8:       6059103      779203840  [Ljava.lang.Object;
>>>    9:       5461096      174755072  scala.collection.mutable.ArrayBuffer
>>>   10:         34749       70122104  [B
>>>
>>> Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):
>>>
>>> spark.core.connection.ack.wait.timeout 600
>>> spark.executor.heartbeatInterval       60s
>>> spark.executor.memory                  32g
>>> spark.mesos.coarse                     false
>>> spark.network.timeout                  600s
>>> spark.shuffle.blockTransferService     netty
>>> spark.shuffle.consolidateFiles         true
>>> spark.shuffle.file.buffer              1m
>>> spark.shuffle.io.maxRetries            6
>>> spark.shuffle.manager                  sort
>>>
>>> The join is currently configured with spark.sql.shuffle.partitions=1000
>>> but that doesn't seem to help. Would increasing the partitions help ? Is
>>> there a formula to determine an approximate partitions number value for a
>>> join ?
>>> Any help with this job would be appreciated !
>>>
>>> cheers,
>>> Tom
>>>
>>
>

Reply via email to