albertusk95 commented on a change in pull request #15297: [SPARK-9862]Handling 
data skew
URL: https://github.com/apache/spark/pull/15297#discussion_r305259187
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
 ##########
 @@ -184,10 +189,146 @@ class ExchangeCoordinator(
 
       i += 1
     }
-
     partitionStartIndices.toArray
   }
 
+
+
+ /**
+  * the skew algorithm , given last stage map output statitsics,  
partitionStartIndices
+  * provided by estimatePartitionStartIndices function. pre-shuffle stages 
partition num.
+  * And return a Array of 2-item tuples , the return value is use to create 
SkewShuffleRowRDD.
+  *
+  * we find data skew partition by mapOutputStatistics, and reuse 
partitionStartIndices which
+  * provided by estimatePartitionStartIndices function, to generate new 
partition start indices
+  * call "skewPartitionStartIndices". skewPartitionStartIndices is the 
function return value use
+  * for generate SkewShuffleRowRDD
+  * For example, we have two stages with the following pre-shuffle partition 
size statistics
+  * stage 1: [100 MB, 10 MB, 20000 MB, 10MB, 30 MB] and stage 1 partition num 
is 3
+  * stage 2: [10 MB,  10 MB, 70 MB,  5 MB, 5 MB]
+  * assuming the target input size is 128 MB
+  * obviously partition 3 is data skew partition is [2].
+  * the partitionStartIndices is [0,3]
+  * in this case , we find partition3 is data skew,as SPARK-9862 said ,we 
don't put this
+  * partition in a reduce task.but broadcast other stage partition3 to this 
stage partition.
+  * so the skewPartitionStartIndices  like this:
+  * ( 5/*partition num*/
+  * ((-1/*mean no skew*/,0/* index like partitionStartIndices */ ),1 /*only 
generate 1 partition*/)
+  * (1/*mean this side data skew*/,2/*index*/,3/*generate 3 partition*/),
+  * (-1,3,1))// this for generate  SkewShuffleRowRDD of stage 1.
+  * ( 5/*partition num*/)
+  * ((-1/*mean no skew*/),0/*index like partitionStartIndices */,1 /*only 
generate 1 partition*/),
+  * (2/*mean  other side data skew*/,2/*index*/,3/*generate 3 partition*/),
+  * (-1,3,1)// this for generate SkewShuffleRowRDD of stage 2.
+  *
+  * @param mapOutputStatistics pre-shuffle stages.
+  * @param prePartitionNum  partition num of pre-shuffle stages
+  * @param partitionStartIndices provided by estimatePartitionStartIndices 
function
+  * @return return Array of 2-item tuples, the first item in the tuple is mean 
how many
+  * partition should generate by SkewShuffleRowRDD, if the value is -1, then 
use ShuffledRowRDD
+  * second item is a array of (isSkew, partition index, gen partition num)
+  * isSkew is -1 mean's no skew. 1 my side is skew. SkewShuffleRowRDD should 
generate many
+  * partition by gen partition num,a partition only read a pre-state partition 
one block
+  * isSkew is 2 mean's other side data skew , so SkewShuffleRowRDD should 
generate many some
+  * partition .
+  */
+ def skewPartitionIdx(
+    mapOutputStatistics: Array[MapOutputStatistics],
+    prePartitionNum: Array[Int],
+    partitionStartIndices: Option[Array[Int]] = None):
+ Array[(Int, Array[(Int, Long, Int, Int)])] = {
+
+   if (mapOutputStatistics.length != 2 || !isJoin || joinType == FullOuter) {
+     return (0 until numExchanges).map(_ =>
+       (0, Array[(Int, Long, Int, Int)]((-1, 0, 0, 0)))).toArray
+   }
+   // find which partition is skew
+   var skewPartition = mapOutputStatistics.map(ms =>
+     ms.bytesByPartitionId.zipWithIndex
+       .filter(x => x._1 > skewThreshold)
+   )
+   // if 2 stage some partition output size both over than skewSize
+   // then choose a size big one as the skew side.
+   skewPartition = skewPartition.zipWithIndex.map(sti => {
+     val index = if (sti._2 == 0) 1 else 0
+     sti._1.filterNot(
+       p => skewPartition(index).exists(p1 => p1._2 == p._2 && p._1 < p1._1))
+   })
+
+   if (joinType == LeftOuter && skewPartition(1).length > 0 ||
+   joinType == RightOuter && skewPartition(0).length > 0 ) {
+     return (0 until numExchanges).map(_ =>
+       (0, Array[(Int, Long, Int, Int)]((-1, 0, 0, 0)))).toArray
+   }
+   // skewSize must great than TargetPostShuffleInputSize
 
 Review comment:
   `// skewSize must greater than TargetPostShuffleInputSize`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to