Ahh yes, thanks for mentioning data skew, I've run into that before as well. The best way there is to get statistics on the distribution of your join key. If there are a few values with drastically larger number of values, then a reducer task will always be swamped no matter how many reducer side partitions you use.
If this is the problem, then one solution I have used is to do a skew join manually. Something like: SELECT * FROM (SELECT * from table WHERE joinkey <> 'commonval') t1 JOIN t2 ON t1.joinkey == t2.joinkey UNION ALL SELECT * FROM (SELECT * from table WHERE joinkey = 'commonval') t1 JOIN t2 ON t1.joinkey == t2.joinkey On Fri, Aug 28, 2015 at 1:56 PM Thomas Dudziak <tom...@gmail.com> wrote: > Yeah, I tried with 10k and 30k and these still failed, will try with more > then. Though that is a little disappointing, it only writes ~7TB of shuffle > data which shouldn't in theory require more than 1000 reducers on my 10TB > memory cluster (~7GB of spill per reducer). > I'm now wondering if my shuffle partitions are uneven and I should use a > custom partitioner, is there a way to get stats on the partition sizes from > Spark ? > > On Fri, Aug 28, 2015 at 12:46 PM, Jason <ja...@jasonknight.us> wrote: > >> I had similar problems to this (reduce side failures for large joins >> (25bn rows with 9bn)), and found the answer was to further up the >> spark.sql.shuffle.partitions=1000. In my case, 16k partitions worked for >> me, but your tables look a little denser, so you may want to go even higher. >> >> On Thu, Aug 27, 2015 at 6:04 PM Thomas Dudziak <tom...@gmail.com> wrote: >> >>> I'm getting errors like "Removing executor with no recent heartbeats" & >>> "Missing an output location for shuffle" errors for a large SparkSql join >>> (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to >>> configure the job to avoid them. >>> >>> The initial stage completes fine with some 30k tasks on a cluster with >>> 70 machines/10TB memory, generating about 6.5TB of shuffle writes, but then >>> the shuffle stage first waits 30min in the scheduling phase according to >>> the UI, and then dies with the mentioned errors. >>> >>> I can see in the GC logs that the executors reach their memory limits >>> (32g per executor, 2 workers per machine) and can't allocate any more stuff >>> in the heap. Fwiw, the top 10 in the memory use histogram are: >>> >>> num #instances #bytes class name >>> ---------------------------------------------- >>> 1: 249139595 11958700560 >>> scala.collection.immutable.HashMap$HashMap1 >>> 2: 251085327 8034730464 scala.Tuple2 >>> 3: 243694737 5848673688 java.lang.Float >>> 4: 231198778 5548770672 java.lang.Integer >>> 5: 72191585 4298521576 [Lscala.collection.immutable.HashMap; >>> 6: 72191582 2310130624 >>> scala.collection.immutable.HashMap$HashTrieMap >>> 7: 74114058 1778737392 java.lang.Long >>> 8: 6059103 779203840 [Ljava.lang.Object; >>> 9: 5461096 174755072 scala.collection.mutable.ArrayBuffer >>> 10: 34749 70122104 [B >>> >>> Relevant settings are (Spark 1.4.1, Java 8 with G1 GC): >>> >>> spark.core.connection.ack.wait.timeout 600 >>> spark.executor.heartbeatInterval 60s >>> spark.executor.memory 32g >>> spark.mesos.coarse false >>> spark.network.timeout 600s >>> spark.shuffle.blockTransferService netty >>> spark.shuffle.consolidateFiles true >>> spark.shuffle.file.buffer 1m >>> spark.shuffle.io.maxRetries 6 >>> spark.shuffle.manager sort >>> >>> The join is currently configured with spark.sql.shuffle.partitions=1000 >>> but that doesn't seem to help. Would increasing the partitions help ? Is >>> there a formula to determine an approximate partitions number value for a >>> join ? >>> Any help with this job would be appreciated ! >>> >>> cheers, >>> Tom >>> >> >