[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2020-01-15 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r367209201
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 ##
 @@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+  stats: MapOutputStatistics,
+  partitionId: Int,
+  medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
+  size > 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val numPartitions = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(numPartitions / 2) > 0) bytes(numPartitions / 2) else 1
+  }
+
+  /**
+   * Get the map size of the specific reduce shuffle Id.
+   */
+  private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): 
Array[Long] = {
+val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+
mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses.map{_.getSizeForBlock(partitionId)}
+  }
+
+  /**
+   * Split the skewed partition based on the map size and the max split number.
+   */
+  private def getMapStartIndices(stage: ShuffleQueryStageExec, partitionId: 
Int): Array[Int] = {
+val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
+val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
+val maxSplits = math.min(conf.getConf(
+  SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS), 
mapPartitionSizes.length)
+val avgPartitionSize = mapPartitionSizes.sum / maxSplits
+val advisoryPartitionSize = math.max(avgPartitionSize,
+  conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD))
+val partitionIndices = mapPartitionSizes.indices
+val partitionStartIndices = ArrayBuffer[Int]()
+var postMapPartitionSize = mapPartitionSizes(0)
+partitionStartIndices += 0
+partitionIndices.drop(1).foreach { nextPartitionIndex =>
+  val nextMapPartitionSize = mapPartitionSizes(nextPartitionIndex)
+  if (postMapPartitionSize + nextMapPartitionSize > advisoryPartitionSize) 
{
+partitionStartIndices += nextPartitionIndex
+postMapPartitionSize = nextMapPartitionSize
+  } else {
+postMapPartitionSize += nextMapPartitionSize
+  }
+}
+
+if (partitionStartIndices.size > maxSplits) {
+  partitionStartIndices.take(maxSplits).toArray
+} else partitionStartIndices.toArray
+  }
+
+  private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics 
= {
+assert(stage.resultOption.isDefined, "ShuffleQueryStageExec should" +
+  " already be ready when executing OptimizeSkewedPartitions 

[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2020-01-14 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r38734
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 ##
 @@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+  stats: MapOutputStatistics,
+  partitionId: Int,
+  medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
+  size > 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val numPartitions = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(numPartitions / 2) > 0) bytes(numPartitions / 2) else 1
+  }
+
+  /**
+   * Get the map size of the specific reduce shuffle Id.
+   */
+  private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): 
Array[Long] = {
+val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+
mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses.map{_.getSizeForBlock(partitionId)}
+  }
+
+  /**
+   * Split the skewed partition based on the map size and the max split number.
+   */
+  private def getMapStartIndices(stage: ShuffleQueryStageExec, partitionId: 
Int): Array[Int] = {
+val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
+val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
+val maxSplits = math.min(conf.getConf(
+  SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS), 
mapPartitionSizes.length)
+val avgPartitionSize = mapPartitionSizes.sum / maxSplits
+val advisoryPartitionSize = math.max(avgPartitionSize,
+  conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD))
+val partitionIndices = mapPartitionSizes.indices
+val partitionStartIndices = ArrayBuffer[Int]()
+var postMapPartitionSize = mapPartitionSizes(0)
+partitionStartIndices += 0
+partitionIndices.drop(1).foreach { nextPartitionIndex =>
+  val nextMapPartitionSize = mapPartitionSizes(nextPartitionIndex)
+  if (postMapPartitionSize + nextMapPartitionSize > advisoryPartitionSize) 
{
+partitionStartIndices += nextPartitionIndex
+postMapPartitionSize = nextMapPartitionSize
+  } else {
+postMapPartitionSize += nextMapPartitionSize
+  }
+}
+
+if (partitionStartIndices.size > maxSplits) {
 
 Review comment:
   Yes, it may cause the last partition be skewed. This approach  can not solve 
the extreme skewed use case. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above 

[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2020-01-14 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r38850
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 ##
 @@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+  stats: MapOutputStatistics,
+  partitionId: Int,
+  medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
+  size > 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val numPartitions = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(numPartitions / 2) > 0) bytes(numPartitions / 2) else 1
+  }
+
+  /**
+   * Get the map size of the specific reduce shuffle Id.
+   */
+  private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): 
Array[Long] = {
+val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+
mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses.map{_.getSizeForBlock(partitionId)}
+  }
+
+  /**
+   * Split the skewed partition based on the map size and the max split number.
+   */
+  private def getMapStartIndices(stage: ShuffleQueryStageExec, partitionId: 
Int): Array[Int] = {
+val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
+val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
+val maxSplits = math.min(conf.getConf(
+  SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS), 
mapPartitionSizes.length)
+val avgPartitionSize = mapPartitionSizes.sum / maxSplits
+val advisoryPartitionSize = math.max(avgPartitionSize,
+  conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD))
+val partitionIndices = mapPartitionSizes.indices
+val partitionStartIndices = ArrayBuffer[Int]()
+var postMapPartitionSize = mapPartitionSizes(0)
+partitionStartIndices += 0
+partitionIndices.drop(1).foreach { nextPartitionIndex =>
 
 Review comment:
   Ok. I will update later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2020-01-14 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r366672270
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 ##
 @@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+  stats: MapOutputStatistics,
+  partitionId: Int,
+  medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
+  size > 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val numPartitions = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(numPartitions / 2) > 0) bytes(numPartitions / 2) else 1
+  }
+
+  /**
+   * Get the map size of the specific reduce shuffle Id.
+   */
+  private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): 
Array[Long] = {
+val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+
mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses.map{_.getSizeForBlock(partitionId)}
+  }
+
+  /**
+   * Split the skewed partition based on the map size and the max split number.
+   */
+  private def getMapStartIndices(stage: ShuffleQueryStageExec, partitionId: 
Int): Array[Int] = {
+val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
+val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
+val maxSplits = math.min(conf.getConf(
+  SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS), 
mapPartitionSizes.length)
+val avgPartitionSize = mapPartitionSizes.sum / maxSplits
+val advisoryPartitionSize = math.max(avgPartitionSize,
+  conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD))
+val partitionIndices = mapPartitionSizes.indices
+val partitionStartIndices = ArrayBuffer[Int]()
+var postMapPartitionSize = mapPartitionSizes(0)
+partitionStartIndices += 0
+partitionIndices.drop(1).foreach { nextPartitionIndex =>
+  val nextMapPartitionSize = mapPartitionSizes(nextPartitionIndex)
+  if (postMapPartitionSize + nextMapPartitionSize > advisoryPartitionSize) 
{
+partitionStartIndices += nextPartitionIndex
+postMapPartitionSize = nextMapPartitionSize
+  } else {
+postMapPartitionSize += nextMapPartitionSize
+  }
+}
+
+if (partitionStartIndices.size > maxSplits) {
+  partitionStartIndices.take(maxSplits).toArray
+} else partitionStartIndices.toArray
+  }
+
+  private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics 
= {
+assert(stage.resultOption.isDefined, "ShuffleQueryStageExec should" +
+  " already be ready when executing OptimizeSkewedPartitions 

[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2020-01-14 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r38235
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 ##
 @@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+  stats: MapOutputStatistics,
+  partitionId: Int,
+  medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
+  size > 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val numPartitions = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(numPartitions / 2) > 0) bytes(numPartitions / 2) else 1
+  }
+
+  /**
+   * Get the map size of the specific reduce shuffle Id.
+   */
+  private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): 
Array[Long] = {
+val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+
mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses.map{_.getSizeForBlock(partitionId)}
+  }
+
+  /**
+   * Split the skewed partition based on the map size and the max split number.
+   */
+  private def getMapStartIndices(stage: ShuffleQueryStageExec, partitionId: 
Int): Array[Int] = {
+val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
+val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
+val maxSplits = math.min(conf.getConf(
+  SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS), 
mapPartitionSizes.length)
 
 Review comment:
   According to the feedback  in @manuzhang environment.  In some use case, the 
split number may be very large (more than 1000). There will be too many smjs 
and it may take a long time to launch job after optimizing skewed join. The ui 
will also be choked by the huge praph. So we add this configuration as a upper 
limit of the split number.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2020-01-14 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r366671831
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 ##
 @@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+  stats: MapOutputStatistics,
+  partitionId: Int,
+  medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
+  size > 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val numPartitions = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(numPartitions / 2) > 0) bytes(numPartitions / 2) else 1
+  }
+
+  /**
+   * Get the map size of the specific reduce shuffle Id.
+   */
+  private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): 
Array[Long] = {
+val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+
mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses.map{_.getSizeForBlock(partitionId)}
+  }
+
+  /**
+   * Split the skewed partition based on the map size and the max split number.
+   */
+  private def getMapStartIndices(stage: ShuffleQueryStageExec, partitionId: 
Int): Array[Int] = {
+val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
+val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
+val maxSplits = math.min(conf.getConf(
+  SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS), 
mapPartitionSizes.length)
+val avgPartitionSize = mapPartitionSizes.sum / maxSplits
+val advisoryPartitionSize = math.max(avgPartitionSize,
+  conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD))
+val partitionIndices = mapPartitionSizes.indices
+val partitionStartIndices = ArrayBuffer[Int]()
+var postMapPartitionSize = mapPartitionSizes(0)
+partitionStartIndices += 0
+partitionIndices.drop(1).foreach { nextPartitionIndex =>
+  val nextMapPartitionSize = mapPartitionSizes(nextPartitionIndex)
+  if (postMapPartitionSize + nextMapPartitionSize > advisoryPartitionSize) 
{
+partitionStartIndices += nextPartitionIndex
+postMapPartitionSize = nextMapPartitionSize
+  } else {
+postMapPartitionSize += nextMapPartitionSize
+  }
+}
+
+if (partitionStartIndices.size > maxSplits) {
+  partitionStartIndices.take(maxSplits).toArray
+} else partitionStartIndices.toArray
+  }
+
+  private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics 
= {
+assert(stage.resultOption.isDefined, "ShuffleQueryStageExec should" +
+  " already be ready when executing OptimizeSkewedPartitions 

[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2020-01-14 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r37818
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 ##
 @@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+  stats: MapOutputStatistics,
+  partitionId: Int,
+  medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
+  size > 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val numPartitions = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(numPartitions / 2) > 0) bytes(numPartitions / 2) else 1
 
 Review comment:
   Good catch. And I will updated later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2020-01-14 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r366705008
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 ##
 @@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+  stats: MapOutputStatistics,
+  partitionId: Int,
+  medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
+  size > 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val numPartitions = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(numPartitions / 2) > 0) bytes(numPartitions / 2) else 1
+  }
+
+  /**
+   * Get the map size of the specific reduce shuffle Id.
+   */
+  private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): 
Array[Long] = {
+val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+
mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses.map{_.getSizeForBlock(partitionId)}
+  }
+
+  /**
+   * Split the skewed partition based on the map size and the max split number.
+   */
+  private def getMapStartIndices(stage: ShuffleQueryStageExec, partitionId: 
Int): Array[Int] = {
+val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
+val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
+val maxSplits = math.min(conf.getConf(
+  SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS), 
mapPartitionSizes.length)
+val avgPartitionSize = mapPartitionSizes.sum / maxSplits
+val advisoryPartitionSize = math.max(avgPartitionSize,
+  conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD))
+val partitionIndices = mapPartitionSizes.indices
+val partitionStartIndices = ArrayBuffer[Int]()
+var postMapPartitionSize = mapPartitionSizes(0)
+partitionStartIndices += 0
+partitionIndices.drop(1).foreach { nextPartitionIndex =>
+  val nextMapPartitionSize = mapPartitionSizes(nextPartitionIndex)
+  if (postMapPartitionSize + nextMapPartitionSize > advisoryPartitionSize) 
{
+partitionStartIndices += nextPartitionIndex
+postMapPartitionSize = nextMapPartitionSize
+  } else {
+postMapPartitionSize += nextMapPartitionSize
+  }
+}
+
+if (partitionStartIndices.size > maxSplits) {
+  partitionStartIndices.take(maxSplits).toArray
+} else partitionStartIndices.toArray
+  }
+
+  private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics 
= {
+assert(stage.resultOption.isDefined, "ShuffleQueryStageExec should" +
+  " already be ready when executing OptimizeSkewedPartitions 

[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2020-01-14 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r39522
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 ##
 @@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+  stats: MapOutputStatistics,
+  partitionId: Int,
+  medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
+  size > 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val numPartitions = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(numPartitions / 2) > 0) bytes(numPartitions / 2) else 1
+  }
+
+  /**
+   * Get the map size of the specific reduce shuffle Id.
+   */
+  private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): 
Array[Long] = {
+val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+
mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses.map{_.getSizeForBlock(partitionId)}
+  }
+
+  /**
+   * Split the skewed partition based on the map size and the max split number.
+   */
+  private def getMapStartIndices(stage: ShuffleQueryStageExec, partitionId: 
Int): Array[Int] = {
+val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
+val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
+val maxSplits = math.min(conf.getConf(
+  SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS), 
mapPartitionSizes.length)
+val avgPartitionSize = mapPartitionSizes.sum / maxSplits
+val advisoryPartitionSize = math.max(avgPartitionSize,
+  conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD))
+val partitionIndices = mapPartitionSizes.indices
+val partitionStartIndices = ArrayBuffer[Int]()
+var postMapPartitionSize = mapPartitionSizes(0)
+partitionStartIndices += 0
+partitionIndices.drop(1).foreach { nextPartitionIndex =>
+  val nextMapPartitionSize = mapPartitionSizes(nextPartitionIndex)
+  if (postMapPartitionSize + nextMapPartitionSize > advisoryPartitionSize) 
{
+partitionStartIndices += nextPartitionIndex
+postMapPartitionSize = nextMapPartitionSize
+  } else {
+postMapPartitionSize += nextMapPartitionSize
+  }
+}
+
+if (partitionStartIndices.size > maxSplits) {
+  partitionStartIndices.take(maxSplits).toArray
+} else partitionStartIndices.toArray
+  }
+
+  private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics 
= {
+assert(stage.resultOption.isDefined, "ShuffleQueryStageExec should" +
+  " already be ready when executing OptimizeSkewedPartitions 

[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2020-01-14 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r366704700
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 ##
 @@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+  stats: MapOutputStatistics,
+  partitionId: Int,
+  medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
+  size > 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val numPartitions = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(numPartitions / 2) > 0) bytes(numPartitions / 2) else 1
+  }
+
+  /**
+   * Get the map size of the specific reduce shuffle Id.
+   */
+  private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): 
Array[Long] = {
+val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+
mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses.map{_.getSizeForBlock(partitionId)}
+  }
+
+  /**
+   * Split the skewed partition based on the map size and the max split number.
+   */
+  private def getMapStartIndices(stage: ShuffleQueryStageExec, partitionId: 
Int): Array[Int] = {
+val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
+val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
+val maxSplits = math.min(conf.getConf(
+  SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS), 
mapPartitionSizes.length)
+val avgPartitionSize = mapPartitionSizes.sum / maxSplits
+val advisoryPartitionSize = math.max(avgPartitionSize,
+  conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD))
+val partitionIndices = mapPartitionSizes.indices
+val partitionStartIndices = ArrayBuffer[Int]()
+var postMapPartitionSize = mapPartitionSizes(0)
+partitionStartIndices += 0
+partitionIndices.drop(1).foreach { nextPartitionIndex =>
+  val nextMapPartitionSize = mapPartitionSizes(nextPartitionIndex)
+  if (postMapPartitionSize + nextMapPartitionSize > advisoryPartitionSize) 
{
+partitionStartIndices += nextPartitionIndex
+postMapPartitionSize = nextMapPartitionSize
+  } else {
+postMapPartitionSize += nextMapPartitionSize
+  }
+}
+
+if (partitionStartIndices.size > maxSplits) {
+  partitionStartIndices.take(maxSplits).toArray
+} else partitionStartIndices.toArray
+  }
+
+  private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics 
= {
+assert(stage.resultOption.isDefined, "ShuffleQueryStageExec should" +
+  " already be ready when executing OptimizeSkewedPartitions 

[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2020-01-14 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r366675586
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 ##
 @@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+  stats: MapOutputStatistics,
+  partitionId: Int,
+  medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
+  size > 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val numPartitions = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(numPartitions / 2) > 0) bytes(numPartitions / 2) else 1
+  }
+
+  /**
+   * Get the map size of the specific reduce shuffle Id.
+   */
+  private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): 
Array[Long] = {
+val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+
mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses.map{_.getSizeForBlock(partitionId)}
+  }
+
+  /**
+   * Split the skewed partition based on the map size and the max split number.
+   */
+  private def getMapStartIndices(stage: ShuffleQueryStageExec, partitionId: 
Int): Array[Int] = {
+val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
+val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
+val maxSplits = math.min(conf.getConf(
+  SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS), 
mapPartitionSizes.length)
+val avgPartitionSize = mapPartitionSizes.sum / maxSplits
+val advisoryPartitionSize = math.max(avgPartitionSize,
+  conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD))
+val partitionIndices = mapPartitionSizes.indices
+val partitionStartIndices = ArrayBuffer[Int]()
+var postMapPartitionSize = mapPartitionSizes(0)
+partitionStartIndices += 0
+partitionIndices.drop(1).foreach { nextPartitionIndex =>
+  val nextMapPartitionSize = mapPartitionSizes(nextPartitionIndex)
+  if (postMapPartitionSize + nextMapPartitionSize > advisoryPartitionSize) 
{
+partitionStartIndices += nextPartitionIndex
+postMapPartitionSize = nextMapPartitionSize
+  } else {
+postMapPartitionSize += nextMapPartitionSize
+  }
+}
+
+if (partitionStartIndices.size > maxSplits) {
+  partitionStartIndices.take(maxSplits).toArray
+} else partitionStartIndices.toArray
+  }
+
+  private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics 
= {
+assert(stage.resultOption.isDefined, "ShuffleQueryStageExec should" +
+  " already be ready when executing OptimizeSkewedPartitions 

[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2020-01-14 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r366674846
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 ##
 @@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+  stats: MapOutputStatistics,
+  partitionId: Int,
+  medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
+  size > 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val numPartitions = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(numPartitions / 2) > 0) bytes(numPartitions / 2) else 1
+  }
+
+  /**
+   * Get the map size of the specific reduce shuffle Id.
+   */
+  private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): 
Array[Long] = {
+val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+
mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses.map{_.getSizeForBlock(partitionId)}
+  }
+
+  /**
+   * Split the skewed partition based on the map size and the max split number.
+   */
+  private def getMapStartIndices(stage: ShuffleQueryStageExec, partitionId: 
Int): Array[Int] = {
+val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
+val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
+val maxSplits = math.min(conf.getConf(
+  SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS), 
mapPartitionSizes.length)
+val avgPartitionSize = mapPartitionSizes.sum / maxSplits
+val advisoryPartitionSize = math.max(avgPartitionSize,
+  conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD))
+val partitionIndices = mapPartitionSizes.indices
+val partitionStartIndices = ArrayBuffer[Int]()
+var postMapPartitionSize = mapPartitionSizes(0)
+partitionStartIndices += 0
+partitionIndices.drop(1).foreach { nextPartitionIndex =>
+  val nextMapPartitionSize = mapPartitionSizes(nextPartitionIndex)
+  if (postMapPartitionSize + nextMapPartitionSize > advisoryPartitionSize) 
{
+partitionStartIndices += nextPartitionIndex
+postMapPartitionSize = nextMapPartitionSize
+  } else {
+postMapPartitionSize += nextMapPartitionSize
+  }
+}
+
+if (partitionStartIndices.size > maxSplits) {
+  partitionStartIndices.take(maxSplits).toArray
+} else partitionStartIndices.toArray
+  }
+
+  private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics 
= {
+assert(stage.resultOption.isDefined, "ShuffleQueryStageExec should" +
+  " already be ready when executing OptimizeSkewedPartitions 

[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2020-01-14 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r366671044
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 ##
 @@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+  stats: MapOutputStatistics,
+  partitionId: Int,
+  medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
+  size > 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val numPartitions = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(numPartitions / 2) > 0) bytes(numPartitions / 2) else 1
+  }
+
+  /**
+   * Get the map size of the specific reduce shuffle Id.
+   */
+  private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): 
Array[Long] = {
+val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+
mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses.map{_.getSizeForBlock(partitionId)}
+  }
+
+  /**
+   * Split the skewed partition based on the map size and the max split number.
+   */
+  private def getMapStartIndices(stage: ShuffleQueryStageExec, partitionId: 
Int): Array[Int] = {
+val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
+val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
+val maxSplits = math.min(conf.getConf(
+  SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS), 
mapPartitionSizes.length)
+val avgPartitionSize = mapPartitionSizes.sum / maxSplits
+val advisoryPartitionSize = math.max(avgPartitionSize,
+  conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD))
+val partitionIndices = mapPartitionSizes.indices
+val partitionStartIndices = ArrayBuffer[Int]()
+var postMapPartitionSize = mapPartitionSizes(0)
+partitionStartIndices += 0
+partitionIndices.drop(1).foreach { nextPartitionIndex =>
+  val nextMapPartitionSize = mapPartitionSizes(nextPartitionIndex)
+  if (postMapPartitionSize + nextMapPartitionSize > advisoryPartitionSize) 
{
+partitionStartIndices += nextPartitionIndex
+postMapPartitionSize = nextMapPartitionSize
+  } else {
+postMapPartitionSize += nextMapPartitionSize
+  }
+}
+
+if (partitionStartIndices.size > maxSplits) {
+  partitionStartIndices.take(maxSplits).toArray
+} else partitionStartIndices.toArray
+  }
+
+  private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics 
= {
+assert(stage.resultOption.isDefined, "ShuffleQueryStageExec should" +
+  " already be ready when executing OptimizeSkewedPartitions 

[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2020-01-14 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r366679204
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -87,6 +87,10 @@ case class AdaptiveSparkPlanExec(
   // optimizations should be stage-independent.
   @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
 ReuseAdaptiveSubquery(conf, context.subqueryCache),
+// Here the 'OptimizeSkewedPartitions' rule should be executed
 
 Review comment:
   Ok. I Will update later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2020-01-12 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r365626441
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala
 ##
 @@ -145,39 +147,36 @@ case class ReduceNumShufflePartitions(conf: SQLConf) 
extends Rule[SparkPlan] {
   distinctNumPreShufflePartitions.length == 1,
   "There should be only one distinct value of the number pre-shuffle 
partitions " +
 "among registered Exchange operator.")
-val numPreShufflePartitions = distinctNumPreShufflePartitions.head
 
 val partitionStartIndices = ArrayBuffer[Int]()
-// The first element of partitionStartIndices is always 0.
-partitionStartIndices += 0
-
-var postShuffleInputSize = 0L
-
-var i = 0
-while (i < numPreShufflePartitions) {
-  // We calculate the total size of ith pre-shuffle partitions from all 
pre-shuffle stages.
-  // Then, we add the total size to postShuffleInputSize.
-  var nextShuffleInputSize = 0L
-  var j = 0
-  while (j < mapOutputStatistics.length) {
-nextShuffleInputSize += mapOutputStatistics(j).bytesByPartitionId(i)
-j += 1
-  }
-
-  // If including the nextShuffleInputSize would exceed the target 
partition size, then start a
-  // new partition.
-  if (i > 0 && postShuffleInputSize + nextShuffleInputSize > 
targetPostShuffleInputSize) {
-partitionStartIndices += i
-// reset postShuffleInputSize.
-postShuffleInputSize = nextShuffleInputSize
-  } else {
-postShuffleInputSize += nextShuffleInputSize
-  }
-
-  i += 1
+val partitionEndIndices = ArrayBuffer[Int]()
+val numPartitions = distinctNumPreShufflePartitions.head
+val includedPartitions = (0 until 
numPartitions).filter(!excludedPartitions.contains(_))
+val firstStartIndex = includedPartitions(0)
+partitionStartIndices += firstStartIndex
+var postShuffleInputSize = 
mapOutputStatistics.map(_.bytesByPartitionId(firstStartIndex)).sum
+var i = firstStartIndex
 
 Review comment:
   Here` i` stands for the` partitionStartIndex` and not the 
`partitionEndIndex`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2020-01-08 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r364160266
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedPartitions.scala
 ##
 @@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+case class OptimizeSkewedPartitions(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
 
 Review comment:
   Her the cross join behavior is same with the inner join.  They all extend 
the `InnerLike `object. So we have already handle it in `handleSkewJoin`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2020-01-08 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r364141533
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedPartitions.scala
 ##
 @@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+case class OptimizeSkewedPartitions(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
 
 Review comment:
   The cross join is already  handled in `handleSkewJoin `. Because we does not 
change the join behavior when we split the skewed partition. So when the 
partition is skewed with cross join and split . It is also will join with the 
other side all partitions. Later I will also add the cross join test case in 
ut. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2020-01-08 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r364141533
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedPartitions.scala
 ##
 @@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+case class OptimizeSkewedPartitions(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
 
 Review comment:
   The cross join is already  handled in `handleSkewJoin `. Because we does not 
change the join behavior when we split the skewed partition. So when the 
partition is skewed with cross join and split . It is also will join with the 
other side all partitions. Later I will also add the cross join test case in 
ut. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2019-12-25 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r361361009
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedPartitions.scala
 ##
 @@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+case class OptimizeSkewedPartitions(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+  stats: MapOutputStatistics,
+  partitionId: Int,
+  medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
+  size > 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val numPartitions = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(numPartitions / 2) > 0) bytes(numPartitions / 2) else 1
+  }
+
+  /**
+   * Split the partition into the number of mappers. Each split read data from 
each mapper.
+   */
+  private def getMapStartIndices(
+  stage: QueryStageExec,
+  partitionId: Int,
+  medianSize: Long): Array[Int] = {
+val dependency = 
ShuffleQueryStageExec.getShuffleStage(stage).plan.shuffleDependency
+val numMappers = dependency.rdd.partitions.length
+// TODO: split the partition based on the size
+(0 until numMappers).toArray
+  }
+
+  private def getStatistics(queryStage: QueryStageExec): MapOutputStatistics = 
{
+val shuffleStage = ShuffleQueryStageExec.getShuffleStage(queryStage)
+val metrics = shuffleStage.plan.mapOutputStatisticsFuture
+assert(metrics.isCompleted,
+  "ShuffleQueryStageExec should already be ready when executing 
OptimizeSkewedPartitions rule")
+ThreadUtils.awaitResult(metrics, Duration.Zero)
+  }
+
+  /**
+   * Base optimization support check: the join type is supported.
+   * Note that for some join types(like left outer), whether a certain 
partition can be optimized
+   * also depends on which side is skewed.
+   */
+  private def supportOptimization(
+  joinType: JoinType,
+  leftStage: QueryStageExec,
+  rightStage: QueryStageExec): Boolean = {
+val joinTypeSupported = supportedJoinTypes.contains(joinType)
+val shuffleStageCheck = 
ShuffleQueryStageExec.isShuffleQueryStageExec(leftStage) &&
+  ShuffleQueryStageExec.isShuffleQueryStageExec(rightStage)
+joinTypeSupported && shuffleStageCheck
+  }
+
+  private def supportSplitOnLeftPartition(joinType: JoinType) = {
+joinType == Inner || joinType == Cross || joinType == LeftSemi ||
+  joinType == LeftAnti || joinType == LeftOuter
+  }
+
+  private def supportSplitOnRightPartition(joinType: JoinType) = {
+joinType == Inner || joinType == Cross || joinType == RightOuter
+  }
+
+  private def getNumMappers(stage: QueryStageExec): Int = {
+

[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2019-12-17 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r358657998
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 ##
 @@ -552,4 +577,155 @@ class AdaptiveQueryExecSuite
   spark.sparkContext.removeSparkListener(listener)
 }
   }
+
+  test("adaptive skew join both in left and right for inner join ") {
+withSQLConf(
+  SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+  SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+  SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR.key -> "1",
+  SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key -> "100",
+  SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "2000") {
+  val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
+"SELECT * FROM skewData1 join skewData2 ON key1 = key2")
+  val smj = findTopLevelSortMergeJoin(plan)
+  assert(smj.size == 1)
+  // left stats: [4403, 0, 1927, 1927, 1927]
 
 Review comment:
   It may be caused by the calculation in `ShuffleExchangeExec`. And also 
update the test data.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2019-12-16 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r358626209
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala
 ##
 @@ -132,7 +133,8 @@ class ReduceNumShufflePartitionsSuite extends 
SparkFunSuite with BeforeAndAfterA
 Array(
   new MapOutputStatistics(0, bytesByPartitionId1),
   new MapOutputStatistics(1, bytesByPartitionId2))
-  
intercept[AssertionError](rule.estimatePartitionStartIndices(mapOutputStatistics))
+ intercept[AssertionError](rule.estimatePartitionStartAndEndIndices(
+   mapOutputStatistics, (0 until bytesByPartitionId1.length).toSet))
 
 Review comment:
   yes, we can delete it now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2019-12-16 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r358626105
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala
 ##
 @@ -59,8 +59,9 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite 
with BeforeAndAfterA
   case (bytesByPartitionId, index) =>
 new MapOutputStatistics(index, bytesByPartitionId)
 }
+val length = mapOutputStatistics.map(_.bytesByPartitionId.length).head
 val estimatedPartitionStartIndices =
-  rule.estimatePartitionStartIndices(mapOutputStatistics)
+  rule.estimatePartitionStartAndEndIndices(mapOutputStatistics).unzip._1
 
 Review comment:
   Do you mean we need check there is excluded partitions? Current the check is 
already no excluded partitions. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2019-12-16 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r358584117
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedPartitions.scala
 ##
 @@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+case class OptimizeSkewedPartitions(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+  stats: MapOutputStatistics,
+  partitionId: Int,
+  medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
+  size > 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val numPartitions = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(numPartitions / 2) > 0) bytes(numPartitions / 2) else 1
+  }
+
+  /**
+   * Get all the map data size for specific reduce partitionId.
+   */
+  def getMapSizeForSpecificPartition(partitionId: Int, shuffleId: Int): 
Array[Long] = {
+val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+mapOutputTracker.shuffleStatuses.get(shuffleId).
+  get.mapStatuses.map{_.getSizeForBlock(partitionId)}
+  }
+
+  /**
+   * Split the partition into the number of mappers. Each split read data from 
each mapper.
+   */
+  private def estimateMapStartIndices(
+  stage: QueryStageExec,
+  partitionId: Int,
+  medianSize: Long): Array[Int] = {
+val dependency = 
ShuffleQueryStageExec.getShuffleStage(stage).plan.shuffleDependency
+val numMappers = dependency.rdd.partitions.length
+// TODO: split the partition based on the size
+(0 until numMappers).toArray
+  }
+
+  private def getStatistics(queryStage: QueryStageExec): MapOutputStatistics = 
{
+val shuffleStage = ShuffleQueryStageExec.getShuffleStage(queryStage)
+val metrics = shuffleStage.plan.mapOutputStatisticsFuture
+assert(metrics.isCompleted,
+  "ShuffleQueryStageExec should already be ready when executing 
OptimizeSkewedPartitions rule")
+ThreadUtils.awaitResult(metrics, Duration.Zero)
+  }
+
+  /**
+   * Base optimization support check: the join type is supported.
+   * Note that for some join types(like left outer), whether a certain 
partition can be optimized
+   * also depends on the filed isSkewAndSupportsSplit.
+   */
+  private def supportOptimization(
+  joinType: JoinType,
+  leftStage: QueryStageExec,
+  rightStage: QueryStageExec): Boolean = {
+val joinTypeSupported = supportedJoinTypes.contains(joinType)
+val shuffleStageCheck = 
ShuffleQueryStageExec.isShuffleQueryStageExec(leftStage) &&
+  ShuffleQueryStageExec.isShuffleQueryStageExec(rightStage)
+joinTypeSupported && shuffleStageCheck
+  }
+
+  private def supportSplitOnLeftPartition(joinType: 

[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2019-12-16 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r358583953
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala
 ##
 @@ -145,39 +156,36 @@ case class ReduceNumShufflePartitions(conf: SQLConf) 
extends Rule[SparkPlan] {
   distinctNumPreShufflePartitions.length == 1,
   "There should be only one distinct value of the number pre-shuffle 
partitions " +
 "among registered Exchange operator.")
-val numPreShufflePartitions = distinctNumPreShufflePartitions.head
 
 val partitionStartIndices = ArrayBuffer[Int]()
-// The first element of partitionStartIndices is always 0.
-partitionStartIndices += 0
-
-var postShuffleInputSize = 0L
-
-var i = 0
-while (i < numPreShufflePartitions) {
-  // We calculate the total size of ith pre-shuffle partitions from all 
pre-shuffle stages.
-  // Then, we add the total size to postShuffleInputSize.
-  var nextShuffleInputSize = 0L
-  var j = 0
-  while (j < mapOutputStatistics.length) {
-nextShuffleInputSize += mapOutputStatistics(j).bytesByPartitionId(i)
-j += 1
-  }
-
-  // If including the nextShuffleInputSize would exceed the target 
partition size, then start a
-  // new partition.
-  if (i > 0 && postShuffleInputSize + nextShuffleInputSize > 
targetPostShuffleInputSize) {
-partitionStartIndices += i
-// reset postShuffleInputSize.
-postShuffleInputSize = nextShuffleInputSize
-  } else {
-postShuffleInputSize += nextShuffleInputSize
-  }
-
-  i += 1
+val partitionEndIndices = ArrayBuffer[Int]()
+val numPartitions = mapOutputStatistics.map(stats => 
stats.bytesByPartitionId.length).head
+val includedPartitions = (0 until 
numPartitions).filter(!excludedPartitions.contains(_))
+val firstStartIndex = includedPartitions(0)
+partitionStartIndices += firstStartIndex
+var postShuffleInputSize = 
mapOutputStatistics.map(_.bytesByPartitionId(firstStartIndex)).sum
+var i = firstStartIndex
+includedPartitions.filter(_ != firstStartIndex).foreach {
+  nextPartitionIndices =>
+var nextShuffleInputSize =
+  
mapOutputStatistics.map(_.bytesByPartitionId(nextPartitionIndices)).sum
+// If nextPartitionIndices is skewed and omitted, or including
+// the nextShuffleInputSize would exceed the target partition size,
+// then start a new partition.
+if (nextPartitionIndices != i + 1 ||
 
 Review comment:
   The `nextPartitionIndices ` is excluded.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2019-12-10 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r356386884
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedPartitions.scala
 ##
 @@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+case class OptimizeSkewedPartitions(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+ stats: MapOutputStatistics,
+ partitionId: Int,
+ medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
+  size > 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val bytesLen = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(bytesLen / 2) > 0) bytes(bytesLen / 2) else 1
+  }
+
+  /**
+   * Get all the map data size for specific reduce partitionId.
+   */
+  def getMapSizeForSpecificPartition(partitionId: Int, shuffleId: Int): 
Array[Long] = {
+val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+mapOutputTracker.shuffleStatuses.get(shuffleId).
+  get.mapStatuses.map{_.getSizeForBlock(partitionId)}
+  }
+
+  /**
+   * Split the partition into the number of mappers. Each split read data from 
each mapper.
+   */
+  private def estimateMapIdStartIndices(
+stage: QueryStageExec,
+partitionId: Int,
+medianSize: Long): Array[Int] = {
+val dependency = getShuffleStage(stage).plan.shuffleDependency
+val numMappers = dependency.rdd.partitions.length
+(0 until numMappers).toArray
+  }
+
+  private def getShuffleStage(queryStage: QueryStageExec): 
ShuffleQueryStageExec = {
+queryStage match {
+  case stage: ShuffleQueryStageExec => stage
+  case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => stage
+}
+  }
+
+  private def getStatistics(queryStage: QueryStageExec): MapOutputStatistics = 
{
+val shuffleStage = getShuffleStage(queryStage)
+val metrics = shuffleStage.mapOutputStatisticsFuture
+assert(metrics.isCompleted,
+  "ShuffleQueryStageExec should already be ready when executing 
OptimizeSkewedPartitions rule")
+ThreadUtils.awaitResult(metrics, Duration.Zero)
+  }
+
+  /**
+   * Base optimization support check: the join type is supported.
+   * Note that for some join types(like left outer), whether a certain 
partition can be optimized
+   * also depends on the filed isSkewAndSupportsSplit.
+   */
+  private def supportOptimization(
+  joinType: JoinType,
+  leftStage: QueryStageExec,
+  rightStage: QueryStageExec): Boolean = {
+val joinTypeSupported = supportedJoinTypes.contains(joinType)
+val shuffleStageCheck = 
ShuffleQueryStageExec.isShuffleQueryStageExec(leftStage) &&
+  

[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2019-12-09 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r355856580
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedPartitions.scala
 ##
 @@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+case class OptimizeSkewedPartitions(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+ stats: MapOutputStatistics,
+ partitionId: Int,
+ medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
+  size > 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val bytesLen = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(bytesLen / 2) > 0) bytes(bytesLen / 2) else 1
+  }
+
+  /*
+  * Get all the map data size for specific reduce partitionId.
+  */
+  def getMapSizeForSpecificPartition(partitionId: Int, shuffleId: Int): 
Array[Long] = {
+val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+mapOutputTracker.shuffleStatuses.get(shuffleId).
+  get.mapStatuses.map{_.getSizeForBlock(partitionId)}
+  }
+
+  /**
+   * Split the partition into the number of mappers. Each split read data from 
each mapper.
+   */
+  private def estimateMapIdStartIndices(
+stage: QueryStageExec,
+partitionId: Int,
+medianSize: Long): Array[Int] = {
+val dependency = getShuffleStage(stage).plan.shuffleDependency
+val numMappers = dependency.rdd.partitions.length
+(0 until numMappers).toArray
+  }
+
+  private def getShuffleStage(queryStage: QueryStageExec): 
ShuffleQueryStageExec = {
+queryStage match {
+  case stage: ShuffleQueryStageExec => stage
+  case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => stage
+}
+  }
+
+  private def getStatistics(queryStage: QueryStageExec): MapOutputStatistics = 
{
+val shuffleStage = queryStage match {
+  case stage: ShuffleQueryStageExec => stage
+  case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => stage
+}
+val metrics = shuffleStage.mapOutputStatisticsFuture
+assert(metrics.isCompleted, "ShuffleQueryStageExec should already be 
ready")
+ThreadUtils.awaitResult(metrics, Duration.Zero)
+  }
+
+  /**
+   * Base optimization support check: the join type is supported and plan 
statistics is available.
+   * Note that for some join types(like left outer), whether a certain 
partition can be optimized
+   * also depends on the filed isSkewAndSupportsSplit.
+   */
+  private def supportOptimization(
+joinType: JoinType,
+leftStage: QueryStageExec,
+rightStage: QueryStageExec): Boolean = {
+val joinTypeSupported = supportedJoinTypes.contains(joinType)
+val shuffleStageCheck = 

[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2019-12-09 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r355856349
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedPartitions.scala
 ##
 @@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+case class OptimizeSkewedPartitions(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+ stats: MapOutputStatistics,
+ partitionId: Int,
+ medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
+  size > 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val bytesLen = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(bytesLen / 2) > 0) bytes(bytesLen / 2) else 1
+  }
+
+  /*
+  * Get all the map data size for specific reduce partitionId.
+  */
+  def getMapSizeForSpecificPartition(partitionId: Int, shuffleId: Int): 
Array[Long] = {
+val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+mapOutputTracker.shuffleStatuses.get(shuffleId).
+  get.mapStatuses.map{_.getSizeForBlock(partitionId)}
+  }
+
+  /**
+   * Split the partition into the number of mappers. Each split read data from 
each mapper.
+   */
+  private def estimateMapIdStartIndices(
+stage: QueryStageExec,
+partitionId: Int,
+medianSize: Long): Array[Int] = {
+val dependency = getShuffleStage(stage).plan.shuffleDependency
+val numMappers = dependency.rdd.partitions.length
+(0 until numMappers).toArray
+  }
+
+  private def getShuffleStage(queryStage: QueryStageExec): 
ShuffleQueryStageExec = {
+queryStage match {
+  case stage: ShuffleQueryStageExec => stage
+  case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => stage
+}
+  }
+
+  private def getStatistics(queryStage: QueryStageExec): MapOutputStatistics = 
{
+val shuffleStage = queryStage match {
+  case stage: ShuffleQueryStageExec => stage
+  case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => stage
+}
+val metrics = shuffleStage.mapOutputStatisticsFuture
+assert(metrics.isCompleted, "ShuffleQueryStageExec should already be 
ready")
+ThreadUtils.awaitResult(metrics, Duration.Zero)
+  }
+
+  /**
+   * Base optimization support check: the join type is supported and plan 
statistics is available.
+   * Note that for some join types(like left outer), whether a certain 
partition can be optimized
+   * also depends on the filed isSkewAndSupportsSplit.
+   */
+  private def supportOptimization(
+joinType: JoinType,
+leftStage: QueryStageExec,
+rightStage: QueryStageExec): Boolean = {
+val joinTypeSupported = supportedJoinTypes.contains(joinType)
+val shuffleStageCheck = 

[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2019-12-09 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r355843804
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedPartitions.scala
 ##
 @@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+case class OptimizeSkewedPartitions(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+ stats: MapOutputStatistics,
+ partitionId: Int,
+ medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
+  size > 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val bytesLen = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(bytesLen / 2) > 0) bytes(bytesLen / 2) else 1
+  }
+
+  /*
+  * Get all the map data size for specific reduce partitionId.
+  */
+  def getMapSizeForSpecificPartition(partitionId: Int, shuffleId: Int): 
Array[Long] = {
+val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+mapOutputTracker.shuffleStatuses.get(shuffleId).
+  get.mapStatuses.map{_.getSizeForBlock(partitionId)}
+  }
+
+  /**
+   * Split the partition into the number of mappers. Each split read data from 
each mapper.
+   */
+  private def estimateMapIdStartIndices(
+stage: QueryStageExec,
+partitionId: Int,
+medianSize: Long): Array[Int] = {
+val dependency = getShuffleStage(stage).plan.shuffleDependency
+val numMappers = dependency.rdd.partitions.length
+(0 until numMappers).toArray
+  }
+
+  private def getShuffleStage(queryStage: QueryStageExec): 
ShuffleQueryStageExec = {
+queryStage match {
+  case stage: ShuffleQueryStageExec => stage
+  case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => stage
+}
+  }
+
+  private def getStatistics(queryStage: QueryStageExec): MapOutputStatistics = 
{
+val shuffleStage = queryStage match {
+  case stage: ShuffleQueryStageExec => stage
+  case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => stage
+}
+val metrics = shuffleStage.mapOutputStatisticsFuture
+assert(metrics.isCompleted, "ShuffleQueryStageExec should already be 
ready")
+ThreadUtils.awaitResult(metrics, Duration.Zero)
+  }
+
+  /**
+   * Base optimization support check: the join type is supported and plan 
statistics is available.
+   * Note that for some join types(like left outer), whether a certain 
partition can be optimized
+   * also depends on the filed isSkewAndSupportsSplit.
+   */
+  private def supportOptimization(
+joinType: JoinType,
+leftStage: QueryStageExec,
+rightStage: QueryStageExec): Boolean = {
+val joinTypeSupported = supportedJoinTypes.contains(joinType)
+val shuffleStageCheck = 

[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2019-12-08 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r355270674
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedPartitions.scala
 ##
 @@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+case class OptimizeSkewedPartitions(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+ stats: MapOutputStatistics,
+ partitionId: Int,
+ medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * conf.adaptiveSkewedFactor &&
+  size > conf.adaptiveSkewedSizeThreshold
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val bytesLen = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(bytesLen / 2) > 0) bytes(bytesLen / 2) else 1
+  }
+
+  /*
+  * Get all the map data size for specific reduce partitionId.
+  */
+  def getMapSizeForSpecificPartition(partitionId: Int, shuffleId: Int): 
Array[Long] = {
+val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+mapOutputTracker.shuffleStatuses.get(shuffleId).
+  get.mapStatuses.map{_.getSizeForBlock(partitionId)}
+  }
+
+  /*
+  * Split the mappers based on the map size of specific skewed reduce 
partitionId.
+  */
+  def splitMappersBasedDataSize(mapPartitionSize: Array[Long], numMappers: 
Int): Array[Int] = {
+val advisoryTargetPostShuffleInputSize = conf.targetPostShuffleInputSize
+val partitionStartIndices = ArrayBuffer[Int]()
+var i = 0
+var postMapPartitionSize: Long = mapPartitionSize(i)
+partitionStartIndices += i
+while (i < numMappers && i + 1 < numMappers) {
+  val nextIndex = if (i + 1 < numMappers) {
 
 Review comment:
   @manuzhang Thanks for your review. Offline discussion with wenchen, we 
decided to remove this method. And split the skewed partition with the number 
of mappers.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2019-12-04 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r354111510
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedPartitions.scala
 ##
 @@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+case class OptimizeSkewedPartitions(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+ stats: MapOutputStatistics,
+ partitionId: Int,
+ medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * conf.adaptiveSkewedFactor &&
+  size > conf.adaptiveSkewedSizeThreshold
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val bytesLen = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(bytesLen / 2) > 0) bytes(bytesLen / 2) else 1
+  }
+
+  /*
+  * Get all the map data size for specific reduce partitionId.
+  */
+  def getMapSizeForSpecificPartition(partitionId: Int, shuffleId: Int): 
Array[Long] = {
+val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+mapOutputTracker.shuffleStatuses.get(shuffleId).
+  get.mapStatuses.map{_.getSizeForBlock(partitionId)}
+  }
+
+  /*
+  * Split the mappers based on the map size of specific skewed reduce 
partitionId.
+  */
+  def splitMappersBasedDataSize(mapPartitionSize: Array[Long], numMappers: 
Int): Array[Int] = {
+val advisoryTargetPostShuffleInputSize = conf.targetPostShuffleInputSize
+val partitionStartIndices = ArrayBuffer[Int]()
+var i = 0
+var postMapPartitionSize: Long = mapPartitionSize(i)
+partitionStartIndices += i
+while (i < numMappers && i + 1 < numMappers) {
+  val nextIndex = if (i + 1 < numMappers) {
+i + 1
+  } else numMappers -1
+
+  if (postMapPartitionSize + mapPartitionSize(nextIndex) > 
advisoryTargetPostShuffleInputSize) {
+postMapPartitionSize = mapPartitionSize(nextIndex)
+partitionStartIndices += nextIndex
+  } else {
+postMapPartitionSize += mapPartitionSize(nextIndex)
+  }
+  i += 1
+}
+partitionStartIndices.toArray
+  }
+
+  /**
+   * We split the partition into several splits. Each split reads the data 
from several map outputs
+   * ranging from startMapId to endMapId(exclusive). This method calculates 
the split number and
+   * the startMapId for all splits.
+   */
+  private def estimateMapIdStartIndices(
+stage: QueryStageExec,
+partitionId: Int,
+medianSize: Long): Array[Int] = {
+val dependency = getShuffleStage(stage).plan.shuffleDependency
+val shuffleId = dependency.shuffleHandle.shuffleId
+val mapSize = getMapSizeForSpecificPartition(partitionId, shuffleId)
+val numMappers = dependency.rdd.partitions.length
+splitMappersBasedDataSize(mapSize, numMappers)
+  }
+
+  private def getShuffleStage(queryStage: QueryStageExec): 
ShuffleQueryStageExec = {
+ 

[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2019-12-04 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r354111284
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedPartitions.scala
 ##
 @@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+case class OptimizeSkewedPartitions(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+ stats: MapOutputStatistics,
+ partitionId: Int,
+ medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * conf.adaptiveSkewedFactor &&
+  size > conf.adaptiveSkewedSizeThreshold
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val bytesLen = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(bytesLen / 2) > 0) bytes(bytesLen / 2) else 1
+  }
+
+  /*
+  * Get all the map data size for specific reduce partitionId.
+  */
+  def getMapSizeForSpecificPartition(partitionId: Int, shuffleId: Int): 
Array[Long] = {
+val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+mapOutputTracker.shuffleStatuses.get(shuffleId).
+  get.mapStatuses.map{_.getSizeForBlock(partitionId)}
+  }
+
+  /*
+  * Split the mappers based on the map size of specific skewed reduce 
partitionId.
+  */
+  def splitMappersBasedDataSize(mapPartitionSize: Array[Long], numMappers: 
Int): Array[Int] = {
+val advisoryTargetPostShuffleInputSize = conf.targetPostShuffleInputSize
+val partitionStartIndices = ArrayBuffer[Int]()
+var i = 0
+var postMapPartitionSize: Long = mapPartitionSize(i)
+partitionStartIndices += i
+while (i < numMappers && i + 1 < numMappers) {
+  val nextIndex = if (i + 1 < numMappers) {
+i + 1
+  } else numMappers -1
+
+  if (postMapPartitionSize + mapPartitionSize(nextIndex) > 
advisoryTargetPostShuffleInputSize) {
+postMapPartitionSize = mapPartitionSize(nextIndex)
+partitionStartIndices += nextIndex
+  } else {
+postMapPartitionSize += mapPartitionSize(nextIndex)
+  }
+  i += 1
+}
+partitionStartIndices.toArray
+  }
+
+  /**
+   * We split the partition into several splits. Each split reads the data 
from several map outputs
+   * ranging from startMapId to endMapId(exclusive). This method calculates 
the split number and
+   * the startMapId for all splits.
+   */
+  private def estimateMapIdStartIndices(
+stage: QueryStageExec,
+partitionId: Int,
+medianSize: Long): Array[Int] = {
+val dependency = getShuffleStage(stage).plan.shuffleDependency
+val shuffleId = dependency.shuffleHandle.shuffleId
+val mapSize = getMapSizeForSpecificPartition(partitionId, shuffleId)
+val numMappers = dependency.rdd.partitions.length
+splitMappersBasedDataSize(mapSize, numMappers)
+  }
+
+  private def getShuffleStage(queryStage: QueryStageExec): 
ShuffleQueryStageExec = {
+ 

[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2019-11-12 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r345549872
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedPartitions.scala
 ##
 @@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+case class OptimizeSkewedPartitions(conf: SQLConf) extends Rule[SparkPlan] {
+
+  private val supportedJoinTypes =
+Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil
+
+  /**
+   * A partition is considered as a skewed partition if its size is larger 
than the median
+   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
+   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   */
+  private def isSkewed(
+ stats: MapOutputStatistics,
+ partitionId: Int,
+ medianSize: Long): Boolean = {
+val size = stats.bytesByPartitionId(partitionId)
+size > medianSize * conf.adaptiveSkewedFactor &&
+  size > conf.adaptiveSkewedSizeThreshold
+  }
+
+  private def medianSize(stats: MapOutputStatistics): Long = {
+val bytesLen = stats.bytesByPartitionId.length
+val bytes = stats.bytesByPartitionId.sorted
+if (bytes(bytesLen / 2) > 0) bytes(bytesLen / 2) else 1
+  }
+
+  /*
+  * Get all the map data size for specific reduce partitionId.
+  */
+  def getMapSizeForSpecificPartition(partitionId: Int, shuffleId: Int): 
Array[Long] = {
+val mapOutputTracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+mapOutputTracker.shuffleStatuses.get(shuffleId).
+  get.mapStatuses.map{_.getSizeForBlock(partitionId)}
+  }
+
+  /*
+  * Split the mappers based on the map size of specific skewed reduce 
partitionId.
+  */
+  def splitMappersBasedDataSize(mapPartitionSize: Array[Long], numMappers: 
Int): Array[Int] = {
+val advisoryTargetPostShuffleInputSize = conf.targetPostShuffleInputSize
+val partitionStartIndices = ArrayBuffer[Int]()
+var i = 0
+var postMapPartitionSize: Long = mapPartitionSize(i)
+partitionStartIndices += i
+while (i < numMappers && i + 1 < numMappers) {
+  val nextIndex = if (i + 1 < numMappers) {
+i + 1
+  } else numMappers -1
+
+  if (postMapPartitionSize + mapPartitionSize(nextIndex) > 
advisoryTargetPostShuffleInputSize) {
+postMapPartitionSize = mapPartitionSize(nextIndex)
+partitionStartIndices += nextIndex
+  } else {
+postMapPartitionSize += mapPartitionSize(nextIndex)
+  }
+  i += 1
+}
+partitionStartIndices.toArray
+  }
+
+  /**
+   * We split the partition into several splits. Each split reads the data 
from several map outputs
+   * ranging from startMapId to endMapId(exclusive). This method calculates 
the split number and
+   * the startMapId for all splits.
+   */
+  private def estimateMapIdStartIndices(
+stage: QueryStageExec,
+partitionId: Int,
+medianSize: Long): Array[Int] = {
+val dependency = getShuffleStage(stage).plan.shuffleDependency
+val shuffleId = dependency.shuffleHandle.shuffleId
+val mapSize = getMapSizeForSpecificPartition(partitionId, shuffleId)
+val numMappers = dependency.rdd.partitions.length
+splitMappersBasedDataSize(mapSize, numMappers)
+  }
+
+  private def getShuffleStage(queryStage: QueryStageExec): 
ShuffleQueryStageExec = {
+ 

[GitHub] [spark] JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] optimize skewed partition based on data size

2019-11-12 Thread GitBox
JkSelf commented on a change in pull request #26434: [SPARK-29544] [SQL] 
optimize skewed partition based on data size
URL: https://github.com/apache/spark/pull/26434#discussion_r345544902
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
 ##
 @@ -116,7 +116,8 @@ class CoalescedPartitioner(val parent: Partitioner, val 
partitionStartIndices: A
 class ShuffledRowRDD(
 var dependency: ShuffleDependency[Int, InternalRow, InternalRow],
 metrics: Map[String, SQLMetric],
-specifiedPartitionStartIndices: Option[Array[Int]] = None)
+specifiedPartitionStartIndices: Option[Array[Int]] = None,
 
 Review comment:
   maybe the separate definition to `specifiedPartitionStartIndices` and 
`specifiedPartitionEndIndices` more clear?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org