I had similar problems to this (reduce side failures for large joins (25bn
rows with 9bn)), and found the answer was to further up the
spark.sql.shuffle.partitions=1000. In my case, 16k partitions worked for
me, but your tables look a little denser, so you may want to go even higher.

On Thu, Aug 27, 2015 at 6:04 PM Thomas Dudziak <tom...@gmail.com> wrote:

> I'm getting errors like "Removing executor with no recent heartbeats" &
> "Missing an output location for shuffle" errors for a large SparkSql join
> (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
> configure the job to avoid them.
>
> The initial stage completes fine with some 30k tasks on a cluster with 70
> machines/10TB memory, generating about 6.5TB of shuffle writes, but then
> the shuffle stage first waits 30min in the scheduling phase according to
> the UI, and then dies with the mentioned errors.
>
> I can see in the GC logs that the executors reach their memory limits (32g
> per executor, 2 workers per machine) and can't allocate any more stuff in
> the heap. Fwiw, the top 10 in the memory use histogram are:
>
> num     #instances         #bytes  class name
> ----------------------------------------------
>    1:     249139595    11958700560
>  scala.collection.immutable.HashMap$HashMap1
>    2:     251085327     8034730464  scala.Tuple2
>    3:     243694737     5848673688  java.lang.Float
>    4:     231198778     5548770672  java.lang.Integer
>    5:      72191585     4298521576  [Lscala.collection.immutable.HashMap;
>    6:      72191582     2310130624
>  scala.collection.immutable.HashMap$HashTrieMap
>    7:      74114058     1778737392  java.lang.Long
>    8:       6059103      779203840  [Ljava.lang.Object;
>    9:       5461096      174755072  scala.collection.mutable.ArrayBuffer
>   10:         34749       70122104  [B
>
> Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):
>
> spark.core.connection.ack.wait.timeout 600
> spark.executor.heartbeatInterval       60s
> spark.executor.memory                  32g
> spark.mesos.coarse                     false
> spark.network.timeout                  600s
> spark.shuffle.blockTransferService     netty
> spark.shuffle.consolidateFiles         true
> spark.shuffle.file.buffer              1m
> spark.shuffle.io.maxRetries            6
> spark.shuffle.manager                  sort
>
> The join is currently configured with spark.sql.shuffle.partitions=1000
> but that doesn't seem to help. Would increasing the partitions help ? Is
> there a formula to determine an approximate partitions number value for a
> join ?
> Any help with this job would be appreciated !
>
> cheers,
> Tom
>

Reply via email to