Try increase the shuffle memory fraction (by default it is only 16%).
Again, if you run Spark 1.5, this will probably run a lot faster,
especially if you increase the shuffle memory fraction ...

On Tue, Sep 1, 2015 at 8:13 AM, Thomas Dudziak <tom...@gmail.com> wrote:

> While it works with sort-merge-join, it takes about 12h to finish (with
> 10000 shuffle partitions). My hunch is that the reason for that is this:
>
> INFO ExternalSorter: Thread 3733 spilling in-memory map of 174.9 MB to
> disk (62 times so far)
>
> (and lots more where this comes from).
>
> On Sat, Aug 29, 2015 at 7:17 PM, Reynold Xin <r...@databricks.com> wrote:
>
>> Can you try 1.5? This should work much, much better in 1.5 out of the box.
>>
>> For 1.4, I think you'd want to turn on sort-merge-join, which is off by
>> default. However, the sort-merge join in 1.4 can still trigger a lot of
>> garbage, making it slower. SMJ performance is probably 5x - 1000x better in
>> 1.5 for your case.
>>
>>
>> On Thu, Aug 27, 2015 at 6:03 PM, Thomas Dudziak <tom...@gmail.com> wrote:
>>
>>> I'm getting errors like "Removing executor with no recent heartbeats" &
>>> "Missing an output location for shuffle" errors for a large SparkSql join
>>> (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
>>> configure the job to avoid them.
>>>
>>> The initial stage completes fine with some 30k tasks on a cluster with
>>> 70 machines/10TB memory, generating about 6.5TB of shuffle writes, but then
>>> the shuffle stage first waits 30min in the scheduling phase according to
>>> the UI, and then dies with the mentioned errors.
>>>
>>> I can see in the GC logs that the executors reach their memory limits
>>> (32g per executor, 2 workers per machine) and can't allocate any more stuff
>>> in the heap. Fwiw, the top 10 in the memory use histogram are:
>>>
>>> num     #instances         #bytes  class name
>>> ----------------------------------------------
>>>    1:     249139595    11958700560
>>>  scala.collection.immutable.HashMap$HashMap1
>>>    2:     251085327     8034730464  scala.Tuple2
>>>    3:     243694737     5848673688  java.lang.Float
>>>    4:     231198778     5548770672  java.lang.Integer
>>>    5:      72191585     4298521576  [Lscala.collection.immutable.HashMap;
>>>    6:      72191582     2310130624
>>>  scala.collection.immutable.HashMap$HashTrieMap
>>>    7:      74114058     1778737392  java.lang.Long
>>>    8:       6059103      779203840  [Ljava.lang.Object;
>>>    9:       5461096      174755072  scala.collection.mutable.ArrayBuffer
>>>   10:         34749       70122104  [B
>>>
>>> Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):
>>>
>>> spark.core.connection.ack.wait.timeout 600
>>> spark.executor.heartbeatInterval       60s
>>> spark.executor.memory                  32g
>>> spark.mesos.coarse                     false
>>> spark.network.timeout                  600s
>>> spark.shuffle.blockTransferService     netty
>>> spark.shuffle.consolidateFiles         true
>>> spark.shuffle.file.buffer              1m
>>> spark.shuffle.io.maxRetries            6
>>> spark.shuffle.manager                  sort
>>>
>>> The join is currently configured with spark.sql.shuffle.partitions=1000
>>> but that doesn't seem to help. Would increasing the partitions help ? Is
>>> there a formula to determine an approximate partitions number value for a
>>> join ?
>>> Any help with this job would be appreciated !
>>>
>>> cheers,
>>> Tom
>>>
>>
>>
>

Reply via email to