albertusk95 commented on a change in pull request #15297: [SPARK-9862]Handling
data skew
URL: https://github.com/apache/spark/pull/15297#discussion_r305259187
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
##########
@@ -184,10 +189,146 @@ class ExchangeCoordinator(
i += 1
}
-
partitionStartIndices.toArray
}
+
+
+ /**
+ * the skew algorithm , given last stage map output statitsics,
partitionStartIndices
+ * provided by estimatePartitionStartIndices function. pre-shuffle stages
partition num.
+ * And return a Array of 2-item tuples , the return value is use to create
SkewShuffleRowRDD.
+ *
+ * we find data skew partition by mapOutputStatistics, and reuse
partitionStartIndices which
+ * provided by estimatePartitionStartIndices function, to generate new
partition start indices
+ * call "skewPartitionStartIndices". skewPartitionStartIndices is the
function return value use
+ * for generate SkewShuffleRowRDD
+ * For example, we have two stages with the following pre-shuffle partition
size statistics
+ * stage 1: [100 MB, 10 MB, 20000 MB, 10MB, 30 MB] and stage 1 partition num
is 3
+ * stage 2: [10 MB, 10 MB, 70 MB, 5 MB, 5 MB]
+ * assuming the target input size is 128 MB
+ * obviously partition 3 is data skew partition is [2].
+ * the partitionStartIndices is [0,3]
+ * in this case , we find partition3 is data skew,as SPARK-9862 said ,we
don't put this
+ * partition in a reduce task.but broadcast other stage partition3 to this
stage partition.
+ * so the skewPartitionStartIndices like this:
+ * ( 5/*partition num*/
+ * ((-1/*mean no skew*/,0/* index like partitionStartIndices */ ),1 /*only
generate 1 partition*/)
+ * (1/*mean this side data skew*/,2/*index*/,3/*generate 3 partition*/),
+ * (-1,3,1))// this for generate SkewShuffleRowRDD of stage 1.
+ * ( 5/*partition num*/)
+ * ((-1/*mean no skew*/),0/*index like partitionStartIndices */,1 /*only
generate 1 partition*/),
+ * (2/*mean other side data skew*/,2/*index*/,3/*generate 3 partition*/),
+ * (-1,3,1)// this for generate SkewShuffleRowRDD of stage 2.
+ *
+ * @param mapOutputStatistics pre-shuffle stages.
+ * @param prePartitionNum partition num of pre-shuffle stages
+ * @param partitionStartIndices provided by estimatePartitionStartIndices
function
+ * @return return Array of 2-item tuples, the first item in the tuple is mean
how many
+ * partition should generate by SkewShuffleRowRDD, if the value is -1, then
use ShuffledRowRDD
+ * second item is a array of (isSkew, partition index, gen partition num)
+ * isSkew is -1 mean's no skew. 1 my side is skew. SkewShuffleRowRDD should
generate many
+ * partition by gen partition num,a partition only read a pre-state partition
one block
+ * isSkew is 2 mean's other side data skew , so SkewShuffleRowRDD should
generate many some
+ * partition .
+ */
+ def skewPartitionIdx(
+ mapOutputStatistics: Array[MapOutputStatistics],
+ prePartitionNum: Array[Int],
+ partitionStartIndices: Option[Array[Int]] = None):
+ Array[(Int, Array[(Int, Long, Int, Int)])] = {
+
+ if (mapOutputStatistics.length != 2 || !isJoin || joinType == FullOuter) {
+ return (0 until numExchanges).map(_ =>
+ (0, Array[(Int, Long, Int, Int)]((-1, 0, 0, 0)))).toArray
+ }
+ // find which partition is skew
+ var skewPartition = mapOutputStatistics.map(ms =>
+ ms.bytesByPartitionId.zipWithIndex
+ .filter(x => x._1 > skewThreshold)
+ )
+ // if 2 stage some partition output size both over than skewSize
+ // then choose a size big one as the skew side.
+ skewPartition = skewPartition.zipWithIndex.map(sti => {
+ val index = if (sti._2 == 0) 1 else 0
+ sti._1.filterNot(
+ p => skewPartition(index).exists(p1 => p1._2 == p._2 && p._1 < p1._1))
+ })
+
+ if (joinType == LeftOuter && skewPartition(1).length > 0 ||
+ joinType == RightOuter && skewPartition(0).length > 0 ) {
+ return (0 until numExchanges).map(_ =>
+ (0, Array[(Int, Long, Int, Int)]((-1, 0, 0, 0)))).toArray
+ }
+ // skewSize must great than TargetPostShuffleInputSize
Review comment:
`// skewSize must greater than TargetPostShuffleInputSize`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]