[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r218665902 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = LocalPartitioning(childRDD) +val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().recordsByPartitionId.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// During global limit, try to evenly distribute limited rows across data +// partitions. If disabled, scanning data partitions sequentially until reaching limit number. +// Besides, if child output has certain ordering, we can't evenly pick up rows from +// each parititon. +val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil + +val shuffled = new ShuffledRowRDD(shuffleDependency) + +val sumOfOutput = numberOfOutput.sum +if (sumOfOutput <= limit) { + shuffled +} else if (!flatGlobalLimit) { + var numRowTaken = 0 + val takeAmounts = numberOfOutput.map { num => +if (numRowTaken + num < limit) { + numRowTaken += num.toInt + num.toInt +} else { + val toTake = limit - numRowTaken + numRowTaken += toTake + toTake +} + } + val broadMap = sparkContext.broadcast(takeAmounts) + shuffled.mapPartitionsWithIndexInternal { case (index, iter) => +iter.take(broadMap.value(index).toInt) + } +} else { + // We try to evenly require the asked limit number of rows across all child rdd's partitions. + var rowsNeedToTake: Long = limit + val takeAmountByPartition: Array[Long] = Array.fill[Long](numberOfOutput.length)(0L) + val remainingRowsByPartition: Array[Long] = Array(numberOfOutput: _*) + + while (rowsNeedToTake > 0) { +val nonEmptyParts = remainingRowsByPartition.count(_ > 0) +// If the rows needed to take are less the number of non-empty partitions, take one row from +// each non-empty partitions until we reach `limit` rows. +// Otherwise, evenly divide the needed rows to each non-empty partitions. +val takePerPart = math.max(1, rowsNeedToTake / nonEmptyParts) +remainingRowsByPartition.zipWithIndex.foreach { case (num, index) => + // In case `rowsNeedToTake` < `nonEmptyParts`, we may run out of `rowsNeedToTake` during + // the traversal, so we need to add this check. + if (rowsNeedToTake > 0 && num > 0) { +if (num >= takePerPart) { + rowsNeedToTake -= takePerPart + takeAmountByPartition(index) += takePerPart + remainingRowsByPartition(index) -= takePerPart +} else { + rowsNeedToTake -= num + takeAmountByPartition(index) += num + remainingRowsByPartition(index) -= num +} + } +} + } + val broadMap =
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r218652707 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = LocalPartitioning(childRDD) +val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().recordsByPartitionId.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// During global limit, try to evenly distribute limited rows across data +// partitions. If disabled, scanning data partitions sequentially until reaching limit number. +// Besides, if child output has certain ordering, we can't evenly pick up rows from +// each parititon. +val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil + +val shuffled = new ShuffledRowRDD(shuffleDependency) + +val sumOfOutput = numberOfOutput.sum +if (sumOfOutput <= limit) { + shuffled +} else if (!flatGlobalLimit) { + var numRowTaken = 0 + val takeAmounts = numberOfOutput.map { num => +if (numRowTaken + num < limit) { + numRowTaken += num.toInt + num.toInt +} else { + val toTake = limit - numRowTaken + numRowTaken += toTake + toTake +} + } + val broadMap = sparkContext.broadcast(takeAmounts) + shuffled.mapPartitionsWithIndexInternal { case (index, iter) => +iter.take(broadMap.value(index).toInt) + } +} else { + // We try to evenly require the asked limit number of rows across all child rdd's partitions. + var rowsNeedToTake: Long = limit + val takeAmountByPartition: Array[Long] = Array.fill[Long](numberOfOutput.length)(0L) + val remainingRowsByPartition: Array[Long] = Array(numberOfOutput: _*) + + while (rowsNeedToTake > 0) { +val nonEmptyParts = remainingRowsByPartition.count(_ > 0) +// If the rows needed to take are less the number of non-empty partitions, take one row from +// each non-empty partitions until we reach `limit` rows. +// Otherwise, evenly divide the needed rows to each non-empty partitions. +val takePerPart = math.max(1, rowsNeedToTake / nonEmptyParts) +remainingRowsByPartition.zipWithIndex.foreach { case (num, index) => + // In case `rowsNeedToTake` < `nonEmptyParts`, we may run out of `rowsNeedToTake` during + // the traversal, so we need to add this check. + if (rowsNeedToTake > 0 && num > 0) { +if (num >= takePerPart) { + rowsNeedToTake -= takePerPart + takeAmountByPartition(index) += takePerPart + remainingRowsByPartition(index) -= takePerPart +} else { + rowsNeedToTake -= num + takeAmountByPartition(index) += num + remainingRowsByPartition(index) -= num +} + } +} + } + val broadMap
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r218651545 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = LocalPartitioning(childRDD) +val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().recordsByPartitionId.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// During global limit, try to evenly distribute limited rows across data +// partitions. If disabled, scanning data partitions sequentially until reaching limit number. +// Besides, if child output has certain ordering, we can't evenly pick up rows from +// each parititon. +val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil + +val shuffled = new ShuffledRowRDD(shuffleDependency) + +val sumOfOutput = numberOfOutput.sum +if (sumOfOutput <= limit) { + shuffled +} else if (!flatGlobalLimit) { + var numRowTaken = 0 + val takeAmounts = numberOfOutput.map { num => +if (numRowTaken + num < limit) { + numRowTaken += num.toInt + num.toInt +} else { + val toTake = limit - numRowTaken + numRowTaken += toTake + toTake +} + } + val broadMap = sparkContext.broadcast(takeAmounts) + shuffled.mapPartitionsWithIndexInternal { case (index, iter) => +iter.take(broadMap.value(index).toInt) + } +} else { + // We try to evenly require the asked limit number of rows across all child rdd's partitions. + var rowsNeedToTake: Long = limit + val takeAmountByPartition: Array[Long] = Array.fill[Long](numberOfOutput.length)(0L) + val remainingRowsByPartition: Array[Long] = Array(numberOfOutput: _*) + + while (rowsNeedToTake > 0) { +val nonEmptyParts = remainingRowsByPartition.count(_ > 0) +// If the rows needed to take are less the number of non-empty partitions, take one row from +// each non-empty partitions until we reach `limit` rows. +// Otherwise, evenly divide the needed rows to each non-empty partitions. +val takePerPart = math.max(1, rowsNeedToTake / nonEmptyParts) +remainingRowsByPartition.zipWithIndex.foreach { case (num, index) => + // In case `rowsNeedToTake` < `nonEmptyParts`, we may run out of `rowsNeedToTake` during + // the traversal, so we need to add this check. + if (rowsNeedToTake > 0 && num > 0) { +if (num >= takePerPart) { + rowsNeedToTake -= takePerPart + takeAmountByPartition(index) += takePerPart + remainingRowsByPartition(index) -= takePerPart +} else { + rowsNeedToTake -= num + takeAmountByPartition(index) += num + remainingRowsByPartition(index) -= num +} + } +} + } + val broad
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r218640368 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = LocalPartitioning(childRDD) +val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().recordsByPartitionId.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// During global limit, try to evenly distribute limited rows across data +// partitions. If disabled, scanning data partitions sequentially until reaching limit number. +// Besides, if child output has certain ordering, we can't evenly pick up rows from +// each parititon. +val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil + +val shuffled = new ShuffledRowRDD(shuffleDependency) + +val sumOfOutput = numberOfOutput.sum +if (sumOfOutput <= limit) { + shuffled +} else if (!flatGlobalLimit) { + var numRowTaken = 0 + val takeAmounts = numberOfOutput.map { num => +if (numRowTaken + num < limit) { + numRowTaken += num.toInt + num.toInt +} else { + val toTake = limit - numRowTaken + numRowTaken += toTake + toTake +} + } + val broadMap = sparkContext.broadcast(takeAmounts) + shuffled.mapPartitionsWithIndexInternal { case (index, iter) => +iter.take(broadMap.value(index).toInt) + } +} else { + // We try to evenly require the asked limit number of rows across all child rdd's partitions. + var rowsNeedToTake: Long = limit + val takeAmountByPartition: Array[Long] = Array.fill[Long](numberOfOutput.length)(0L) + val remainingRowsByPartition: Array[Long] = Array(numberOfOutput: _*) + + while (rowsNeedToTake > 0) { +val nonEmptyParts = remainingRowsByPartition.count(_ > 0) +// If the rows needed to take are less the number of non-empty partitions, take one row from +// each non-empty partitions until we reach `limit` rows. +// Otherwise, evenly divide the needed rows to each non-empty partitions. +val takePerPart = math.max(1, rowsNeedToTake / nonEmptyParts) +remainingRowsByPartition.zipWithIndex.foreach { case (num, index) => + // In case `rowsNeedToTake` < `nonEmptyParts`, we may run out of `rowsNeedToTake` during + // the traversal, so we need to add this check. + if (rowsNeedToTake > 0 && num > 0) { +if (num >= takePerPart) { + rowsNeedToTake -= takePerPart + takeAmountByPartition(index) += takePerPart + remainingRowsByPartition(index) -= takePerPart +} else { + rowsNeedToTake -= num + takeAmountByPartition(index) += num + remainingRowsByPartition(index) -= num +} + } +} + } + val broadMap =
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r218639550 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = LocalPartitioning(childRDD) +val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().recordsByPartitionId.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// During global limit, try to evenly distribute limited rows across data +// partitions. If disabled, scanning data partitions sequentially until reaching limit number. +// Besides, if child output has certain ordering, we can't evenly pick up rows from +// each parititon. +val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil + +val shuffled = new ShuffledRowRDD(shuffleDependency) + +val sumOfOutput = numberOfOutput.sum +if (sumOfOutput <= limit) { + shuffled +} else if (!flatGlobalLimit) { + var numRowTaken = 0 + val takeAmounts = numberOfOutput.map { num => +if (numRowTaken + num < limit) { + numRowTaken += num.toInt + num.toInt +} else { + val toTake = limit - numRowTaken + numRowTaken += toTake + toTake +} + } + val broadMap = sparkContext.broadcast(takeAmounts) + shuffled.mapPartitionsWithIndexInternal { case (index, iter) => +iter.take(broadMap.value(index).toInt) + } +} else { + // We try to evenly require the asked limit number of rows across all child rdd's partitions. + var rowsNeedToTake: Long = limit + val takeAmountByPartition: Array[Long] = Array.fill[Long](numberOfOutput.length)(0L) + val remainingRowsByPartition: Array[Long] = Array(numberOfOutput: _*) + + while (rowsNeedToTake > 0) { +val nonEmptyParts = remainingRowsByPartition.count(_ > 0) +// If the rows needed to take are less the number of non-empty partitions, take one row from +// each non-empty partitions until we reach `limit` rows. +// Otherwise, evenly divide the needed rows to each non-empty partitions. +val takePerPart = math.max(1, rowsNeedToTake / nonEmptyParts) +remainingRowsByPartition.zipWithIndex.foreach { case (num, index) => + // In case `rowsNeedToTake` < `nonEmptyParts`, we may run out of `rowsNeedToTake` during + // the traversal, so we need to add this check. + if (rowsNeedToTake > 0 && num > 0) { +if (num >= takePerPart) { + rowsNeedToTake -= takePerPart + takeAmountByPartition(index) += takePerPart + remainingRowsByPartition(index) -= takePerPart +} else { + rowsNeedToTake -= num + takeAmountByPartition(index) += num + remainingRowsByPartition(index) -= num +} + } +} + } + val broadMap
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r218639483 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = LocalPartitioning(childRDD) +val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().recordsByPartitionId.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// During global limit, try to evenly distribute limited rows across data +// partitions. If disabled, scanning data partitions sequentially until reaching limit number. +// Besides, if child output has certain ordering, we can't evenly pick up rows from +// each parititon. +val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil + +val shuffled = new ShuffledRowRDD(shuffleDependency) + +val sumOfOutput = numberOfOutput.sum +if (sumOfOutput <= limit) { + shuffled +} else if (!flatGlobalLimit) { + var numRowTaken = 0 + val takeAmounts = numberOfOutput.map { num => +if (numRowTaken + num < limit) { + numRowTaken += num.toInt + num.toInt +} else { + val toTake = limit - numRowTaken + numRowTaken += toTake + toTake +} + } + val broadMap = sparkContext.broadcast(takeAmounts) + shuffled.mapPartitionsWithIndexInternal { case (index, iter) => +iter.take(broadMap.value(index).toInt) + } +} else { + // We try to evenly require the asked limit number of rows across all child rdd's partitions. + var rowsNeedToTake: Long = limit + val takeAmountByPartition: Array[Long] = Array.fill[Long](numberOfOutput.length)(0L) + val remainingRowsByPartition: Array[Long] = Array(numberOfOutput: _*) + + while (rowsNeedToTake > 0) { +val nonEmptyParts = remainingRowsByPartition.count(_ > 0) +// If the rows needed to take are less the number of non-empty partitions, take one row from +// each non-empty partitions until we reach `limit` rows. +// Otherwise, evenly divide the needed rows to each non-empty partitions. +val takePerPart = math.max(1, rowsNeedToTake / nonEmptyParts) +remainingRowsByPartition.zipWithIndex.foreach { case (num, index) => + // In case `rowsNeedToTake` < `nonEmptyParts`, we may run out of `rowsNeedToTake` during + // the traversal, so we need to add this check. + if (rowsNeedToTake > 0 && num > 0) { +if (num >= takePerPart) { + rowsNeedToTake -= takePerPart + takeAmountByPartition(index) += takePerPart + remainingRowsByPartition(index) -= takePerPart +} else { + rowsNeedToTake -= num + takeAmountByPartition(index) += num + remainingRowsByPartition(index) -= num +} + } +} + } + val broadMap
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r218631745 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = LocalPartitioning(childRDD) +val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().recordsByPartitionId.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// During global limit, try to evenly distribute limited rows across data +// partitions. If disabled, scanning data partitions sequentially until reaching limit number. +// Besides, if child output has certain ordering, we can't evenly pick up rows from +// each parititon. +val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil + +val shuffled = new ShuffledRowRDD(shuffleDependency) + +val sumOfOutput = numberOfOutput.sum +if (sumOfOutput <= limit) { + shuffled +} else if (!flatGlobalLimit) { + var numRowTaken = 0 + val takeAmounts = numberOfOutput.map { num => +if (numRowTaken + num < limit) { + numRowTaken += num.toInt + num.toInt +} else { + val toTake = limit - numRowTaken + numRowTaken += toTake + toTake +} + } + val broadMap = sparkContext.broadcast(takeAmounts) + shuffled.mapPartitionsWithIndexInternal { case (index, iter) => +iter.take(broadMap.value(index).toInt) + } +} else { + // We try to evenly require the asked limit number of rows across all child rdd's partitions. + var rowsNeedToTake: Long = limit + val takeAmountByPartition: Array[Long] = Array.fill[Long](numberOfOutput.length)(0L) + val remainingRowsByPartition: Array[Long] = Array(numberOfOutput: _*) + + while (rowsNeedToTake > 0) { +val nonEmptyParts = remainingRowsByPartition.count(_ > 0) +// If the rows needed to take are less the number of non-empty partitions, take one row from +// each non-empty partitions until we reach `limit` rows. +// Otherwise, evenly divide the needed rows to each non-empty partitions. +val takePerPart = math.max(1, rowsNeedToTake / nonEmptyParts) +remainingRowsByPartition.zipWithIndex.foreach { case (num, index) => + // In case `rowsNeedToTake` < `nonEmptyParts`, we may run out of `rowsNeedToTake` during + // the traversal, so we need to add this check. + if (rowsNeedToTake > 0 && num > 0) { +if (num >= takePerPart) { + rowsNeedToTake -= takePerPart + takeAmountByPartition(index) += takePerPart + remainingRowsByPartition(index) -= takePerPart +} else { + rowsNeedToTake -= num + takeAmountByPartition(index) += num + remainingRowsByPartition(index) -= num +} + } +} + } + val broadMap =
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r218631682 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = LocalPartitioning(childRDD) +val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().recordsByPartitionId.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// During global limit, try to evenly distribute limited rows across data +// partitions. If disabled, scanning data partitions sequentially until reaching limit number. +// Besides, if child output has certain ordering, we can't evenly pick up rows from +// each parititon. +val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil + +val shuffled = new ShuffledRowRDD(shuffleDependency) + +val sumOfOutput = numberOfOutput.sum +if (sumOfOutput <= limit) { + shuffled +} else if (!flatGlobalLimit) { + var numRowTaken = 0 + val takeAmounts = numberOfOutput.map { num => +if (numRowTaken + num < limit) { + numRowTaken += num.toInt + num.toInt +} else { + val toTake = limit - numRowTaken + numRowTaken += toTake + toTake +} + } + val broadMap = sparkContext.broadcast(takeAmounts) + shuffled.mapPartitionsWithIndexInternal { case (index, iter) => +iter.take(broadMap.value(index).toInt) + } +} else { + // We try to evenly require the asked limit number of rows across all child rdd's partitions. + var rowsNeedToTake: Long = limit + val takeAmountByPartition: Array[Long] = Array.fill[Long](numberOfOutput.length)(0L) + val remainingRowsByPartition: Array[Long] = Array(numberOfOutput: _*) + + while (rowsNeedToTake > 0) { +val nonEmptyParts = remainingRowsByPartition.count(_ > 0) +// If the rows needed to take are less the number of non-empty partitions, take one row from +// each non-empty partitions until we reach `limit` rows. +// Otherwise, evenly divide the needed rows to each non-empty partitions. +val takePerPart = math.max(1, rowsNeedToTake / nonEmptyParts) +remainingRowsByPartition.zipWithIndex.foreach { case (num, index) => + // In case `rowsNeedToTake` < `nonEmptyParts`, we may run out of `rowsNeedToTake` during + // the traversal, so we need to add this check. + if (rowsNeedToTake > 0 && num > 0) { +if (num >= takePerPart) { + rowsNeedToTake -= takePerPart + takeAmountByPartition(index) += takePerPart + remainingRowsByPartition(index) -= takePerPart +} else { + rowsNeedToTake -= num + takeAmountByPartition(index) += num + remainingRowsByPartition(index) -= num +} + } +} + } + val broadMap =
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r218630513 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala --- @@ -22,21 +22,29 @@ import scala.collection.JavaConverters._ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.hive.test.{TestHive, TestHiveQueryExecution} +import org.apache.spark.sql.internal.SQLConf /** * A set of test cases that validate partition and column pruning. */ class PruningSuite extends HiveComparisonTest with BeforeAndAfter { + private val originalLimitFlatGlobalLimit = TestHive.conf.limitFlatGlobalLimit + override def beforeAll(): Unit = { super.beforeAll() TestHive.setCacheTables(false) +TestHive.setConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT, false) --- End diff -- why do we set this flag here? we need to document it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r218630488 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala --- @@ -557,11 +557,13 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { } test("SPARK-18004 limit + aggregates") { -val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value") -val limit2Df = df.limit(2) -checkAnswer( - limit2Df.groupBy("id").count().select($"id"), - limit2Df.select($"id")) +withSQLConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT.key -> "true") { --- End diff -- why do we set this flag here? we need to document it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r218630324 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -204,6 +204,13 @@ object SQLConf { .intConf .createWithDefault(4) + val LIMIT_FLAT_GLOBAL_LIMIT = buildConf("spark.sql.limit.flatGlobalLimit") +.internal() +.doc("During global limit, try to evenly distribute limited rows across data " + + "partitions. If disabled, scanning data partitions sequentially until reaching limit number.") +.booleanConf +.createWithDefault(true) --- End diff -- so i read this config doc five times, and i still couldn't figure out what it does, until i went ahead and read the implementation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r218614872 --- Diff: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala --- @@ -44,18 +45,23 @@ private[spark] sealed trait MapStatus { * necessary for correctness, since block fetchers are allowed to skip zero-size blocks. */ def getSizeForBlock(reduceId: Int): Long + + /** + * The number of outputs for the map task. + */ + def numberOfOutput: Long --- End diff -- what does this mean? output blocks? output files? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r212844439 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = LocalPartitioning(childRDD) +val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().recordsByPartitionId.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// During global limit, try to evenly distribute limited rows across data +// partitions. If disabled, scanning data partitions sequentially until reaching limit number. +// Besides, if child output has certain ordering, we can't evenly pick up rows from +// each parititon. +val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil --- End diff -- Ok. I got your point. As the SQL standard doesn't mandates that. I think we can safely remove the `child.outputPartitioning` check. Let me open a follow up PR for it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r212830045 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = LocalPartitioning(childRDD) +val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().recordsByPartitionId.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// During global limit, try to evenly distribute limited rows across data +// partitions. If disabled, scanning data partitions sequentially until reaching limit number. +// Besides, if child output has certain ordering, we can't evenly pick up rows from +// each parititon. +val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil --- End diff -- `select * from table order by a limit 10` gets planned differently right? It should use `TakeOrderedAndProjectExec`. There is nothing in the SQL standard that mandates that a nested order by followed by a limit has to respect that ordering clause. In fact, AFAIR, the standard does not even support nested limits (they make stuff non-deterministic). If we end up supporting this, then I'd rather have an explicit flag in `GlobalLimitExec` (`orderedLimit` or something like that) and set that during planning by matching on `Limit(limit, Sort(order, true, child))`. I want the explicit flag because then we can figure out what limit is doing by looking at the physical plan. I want to explicitly check for an underlying sort to match the current `TakeOrderedAndProjectExec` semantics and to avoid weird behavior because something way down the plan has set some arbitrary ordering. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r212811618 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = LocalPartitioning(childRDD) +val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().recordsByPartitionId.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// During global limit, try to evenly distribute limited rows across data +// partitions. If disabled, scanning data partitions sequentially until reaching limit number. +// Besides, if child output has certain ordering, we can't evenly pick up rows from +// each parititon. +val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil --- End diff -- For a query like `select * from table order by a limit 10`, I think the expected semantics is going to return top 10 elements, not any 10 elements. In order to not change this behavior, I add this check. > Moreover checking child.outputOrdering only checks the order of the partition and not the order of the frame as a whole. You should also add the child.outputPartitioning. I think you are correct. We need to check `child.outputPartitioning`. I think we need to check there is a `RangePartitioning`. The check should be the child is a range partitioning and has some output ordering. WDYT? > I am not entirely sure that we should guarantee that you should get the lowest elements of a dataset if you perform a limit in the middle of a query (a top level sort-limit does have this guarantee). I also don't think the SQL standard supports/mandates this. > I would be slightly in favor of removing the child.outputOrdering check. I am not sure for a limit in the middle of a query, if we don't need to consider this. When such query has sort, don't we need to return top limit elements? cc @cloud-fan too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r212805707 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = LocalPartitioning(childRDD) +val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().recordsByPartitionId.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// During global limit, try to evenly distribute limited rows across data +// partitions. If disabled, scanning data partitions sequentially until reaching limit number. +// Besides, if child output has certain ordering, we can't evenly pick up rows from +// each parititon. +val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil --- End diff -- If we remove it, we may need to feature flag it first since people may rely on the old behavior. Anyway all of this is up for debate. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r212805327 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = LocalPartitioning(childRDD) +val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().recordsByPartitionId.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// During global limit, try to evenly distribute limited rows across data +// partitions. If disabled, scanning data partitions sequentially until reaching limit number. +// Besides, if child output has certain ordering, we can't evenly pick up rows from +// each parititon. +val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil --- End diff -- @viirya dumb question, what is `child.outputOrdering` doing here? I am not entirely sure that we should guarantee that you should get the lowest elements of a dataset if you perform a limit in the middle of a query (a top level sort-limit does have this guarantee). I also don't think the SQL standard supports/mandates this. Moreover checking `child.outputOrdering` only checks the order of the partition and not the order of the frame as a whole. You should also add the `child.outputPartitioning`. I would be slightly in favor of removing the `child.outputOrdering` check. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r212797691 --- Diff: sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql --- @@ -1,6 +1,9 @@ -- A test suite for IN LIMIT in parent side, subquery, and both predicate subquery -- It includes correlated cases. +-- Disable global limit optimization --- End diff -- ah i see, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r212792753 --- Diff: sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql --- @@ -1,6 +1,9 @@ -- A test suite for IN LIMIT in parent side, subquery, and both predicate subquery -- It includes correlated cases. +-- Disable global limit optimization --- End diff -- The golden result strictly requires returned values on limit query. This disables the optimization to get the required results. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r212792225 --- Diff: sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql --- @@ -1,6 +1,9 @@ -- A test suite for IN LIMIT in parent side, subquery, and both predicate subquery -- It includes correlated cases. +-- Disable global limit optimization --- End diff -- do we have a problem here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/16677 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r204580788 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala --- @@ -22,6 +22,7 @@ import scala.util.Random import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.internal.SQLConf --- End diff -- Oops, forgot to revert it. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r204362254 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -231,6 +231,12 @@ object ShuffleExchangeExec { override def numPartitions: Int = 1 override def getPartition(key: Any): Int = 0 } + case l: LocalPartitioning => +new Partitioner { + override def numPartitions: Int = l.numPartitions + override def getPartition(key: Any): Int = key.asInstanceOf[Int] +} + --- End diff -- nit :extra space --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r204361301 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala --- @@ -22,6 +22,7 @@ import scala.util.Random import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.internal.SQLConf --- End diff -- why this import req? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r204221009 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -204,6 +204,13 @@ object SQLConf { .intConf .createWithDefault(4) + val LIMIT_FLAT_GLOBAL_LIMIT = buildConf("spark.sql.limit.flatGlobalLimit") +.internal() +.doc("During global limit, try to evenly distribute limited rows across data " + + "partitions. If disabled, scanning data partitions sequentially until reaching limit number.") +.booleanConf +.createWithDefault(true) --- End diff -- @hvanhovell Should we set it false by default? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r198746910 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java --- @@ -145,10 +145,12 @@ public void write(Iterator> records) throws IOException { // included in the shuffle write time. writeMetrics.incWriteTime(System.nanoTime() - openStartTime); +long numOfRecords = 0; while (records.hasNext()) { final Product2 record = records.next(); final K key = record._1(); partitionWriters[partitioner.getPartition(key)].write(key, record._2()); + numOfRecords += 1; --- End diff -- To verify it, I ran a test locally. Looks like `writeMetrics_recordsWritten` is well updated in `SortShuffleWriter` path too. So I will replace `numOfRecords` with `writeMetrics_recordsWritten`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r198360170 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java --- @@ -145,10 +145,12 @@ public void write(Iterator> records) throws IOException { // included in the shuffle write time. writeMetrics.incWriteTime(System.nanoTime() - openStartTime); +long numOfRecords = 0; while (records.hasNext()) { final Product2 record = records.next(); final K key = record._1(); partitionWriters[partitioner.getPartition(key)].write(key, record._2()); + numOfRecords += 1; --- End diff -- We are introducing numOfRecords because (as you say) some code paths are not updating the metric. Instead of working around the bug and cluttering code, it is better to fix it cleanly (unless there is some design issue or more complicated issue). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r198328639 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -193,6 +193,16 @@ case object SinglePartition extends Partitioning { } } +/** + * Represents a partitioning where rows are only serialized/deserialized locally. The number + * of partitions are not changed and also the distribution of rows. This is mainly used to + * obtain some statistics of map tasks such as number of outputs. + */ +case class LocalPartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning { --- End diff -- As you see that this causes test failure. It is because `child.outputPartitioning` can be an `UnknownPartitioning(0)`. To avoid using a separate field, I make childRDD as `LocalRelation`'s field instead of `orgPartition`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r198115993 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -193,6 +193,16 @@ case object SinglePartition extends Partitioning { } } +/** + * Represents a partitioning where rows are only serialized/deserialized locally. The number + * of partitions are not changed and also the distribution of rows. This is mainly used to + * obtain some statistics of map tasks such as number of outputs. + */ +case class LocalPartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning { --- End diff -- Ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r198106419 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -193,6 +193,16 @@ case object SinglePartition extends Partitioning { } } +/** + * Represents a partitioning where rows are only serialized/deserialized locally. The number + * of partitions are not changed and also the distribution of rows. This is mainly used to + * obtain some statistics of map tasks such as number of outputs. + */ +case class LocalPartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning { --- End diff -- One more thing, can you make LocalRelation use `orgPartition.numPartitions` instead of adding the it as a separate field? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r197613554 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java --- @@ -145,10 +145,12 @@ public void write(Iterator> records) throws IOException { // included in the shuffle write time. writeMetrics.incWriteTime(System.nanoTime() - openStartTime); +long numOfRecords = 0; while (records.hasNext()) { final Product2 record = records.next(); final K key = record._1(); partitionWriters[partitioner.getPartition(key)].write(key, record._2()); + numOfRecords += 1; --- End diff -- Hmm, I think it is fine. However, maybe I miss it, but I can't find `SortShuffleWriter` has updated `writeMetrics_recordsWritten`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r197613004 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -204,6 +204,13 @@ object SQLConf { .intConf .createWithDefault(4) + val LIMIT_FLAT_GLOBAL_LIMIT = buildConf("spark.sql.limit.flatGlobalLimit") +.internal() +.doc("During global limit, try to evenly distribute limited rows across data " + + "partitions. If disabled, scanning data partitions sequentially until reaching limit number.") +.booleanConf +.createWithDefault(true) --- End diff -- I set this as true. One reason is to see if it can pass existing tests. If we don't feel confident or worry about behavior change, we can set this to false before merging. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r197607227 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -93,25 +96,94 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = LocalPartitioning(child.outputPartitioning, + childRDD.getNumPartitions) +val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().recordsByPartitionId.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// During global limit, try to evenly distribute limited rows across data +// partitions. If disabled, scanning data partitions sequentially until reaching limit number. +// Besides, if child output has certain ordering, we can't evenly pick up rows from +// each parititon. +val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil + +val shuffled = new ShuffledRowRDD(shuffleDependency) + +val sumOfOutput = numberOfOutput.sum +if (sumOfOutput <= limit) { + shuffled +} else if (!flatGlobalLimit) { + var numRowTaken = 0 + val takeAmounts = numberOfOutput.map { num => --- End diff -- map -> foreach --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r197606877 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java --- @@ -145,10 +145,12 @@ public void write(Iterator> records) throws IOException { // included in the shuffle write time. writeMetrics.incWriteTime(System.nanoTime() - openStartTime); +long numOfRecords = 0; while (records.hasNext()) { final Product2 record = records.next(); final K key = record._1(); partitionWriters[partitioner.getPartition(key)].write(key, record._2()); + numOfRecords += 1; --- End diff -- Here and elsewhere, simply use `writeMetrics._recordsWritten` instead of adding `numOfRecords` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r197596253 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -247,6 +253,10 @@ object ShuffleExchangeExec { val projection = UnsafeProjection.create(h.partitionIdExpression :: Nil, outputAttributes) row => projection(row).getInt(0) case RangePartitioning(_, _) | SinglePartition => identity + case LocalPartitioning(_, _) => +(row: InternalRow) => { + TaskContext.get().partitionId() --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r197462974 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -247,6 +253,10 @@ object ShuffleExchangeExec { val projection = UnsafeProjection.create(h.partitionIdExpression :: Nil, outputAttributes) row => projection(row).getInt(0) case RangePartitioning(_, _) | SinglePartition => identity + case LocalPartitioning(_, _) => +(row: InternalRow) => { + TaskContext.get().partitionId() --- End diff -- I think so. Will update in next commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r197430814 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -93,25 +98,95 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = LocalPartitioning(child.outputPartitioning, + childRDD.getNumPartitions) +val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().recordsByPartitionId.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// During global limit, try to evenly distribute limited rows across data +// partitions. If disabled, scanning data partitions sequentially until reaching limit number. +// Besides, if child output has certain ordering, we can't evenly pick up rows from +// each parititon. +val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil + +val shuffled = new ShuffledRowRDD(shuffleDependency) + +val sumOfOutput = numberOfOutput.sum +if (sumOfOutput <= limit) { + shuffled +} else if (!flatGlobalLimit) { + var numRowTaken = 0 + val takeAmounts = mutable.ArrayBuffer.fill[Long](numberOfOutput.length)(0L) --- End diff -- Oh, sorry, my bad...I will change it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r197430376 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -193,6 +193,16 @@ case object SinglePartition extends Partitioning { } } +/** + * Represents a partitioning where rows are only serialized/deserialized locally. The number + * of partitions are not changed and also the distribution of rows. This is mainly used to + * obtain some statistics of map tasks such as number of outputs. + */ +case class LocalPartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning { --- End diff -- Ah. I see. Thanks for the clarifying. I agree that we might need to have specialized shuffle writing path at some point. Currently I think when we hit the sort based shuffle, this should not be worse performance than previous global limit operation. If you agree, I'd like to put it to some follow-ups. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r197410930 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -93,25 +98,95 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = LocalPartitioning(child.outputPartitioning, + childRDD.getNumPartitions) +val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().recordsByPartitionId.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// During global limit, try to evenly distribute limited rows across data +// partitions. If disabled, scanning data partitions sequentially until reaching limit number. +// Besides, if child output has certain ordering, we can't evenly pick up rows from +// each parititon. +val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil + +val shuffled = new ShuffledRowRDD(shuffleDependency) + +val sumOfOutput = numberOfOutput.sum +if (sumOfOutput <= limit) { + shuffled +} else if (!flatGlobalLimit) { + var numRowTaken = 0 + val takeAmounts = mutable.ArrayBuffer.fill[Long](numberOfOutput.length)(0L) --- End diff -- I might be dumb, but why do you need an `ArrayBuffer` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r197410511 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -193,6 +193,16 @@ case object SinglePartition extends Partitioning { } } +/** + * Represents a partitioning where rows are only serialized/deserialized locally. The number + * of partitions are not changed and also the distribution of rows. This is mainly used to + * obtain some statistics of map tasks such as number of outputs. + */ +case class LocalPartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning { --- End diff -- It will hit the sort based shuffle path as soon as the `numPartitions` > 200 right? The problem is not that it will end up in the same shuffle file (it will), the (small) problem is that the sort based shuffle buffers rows and tries to sort them before writing them out. It is just a lot of unneeded complexity. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r197388872 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -193,6 +193,16 @@ case object SinglePartition extends Partitioning { } } +/** + * Represents a partitioning where rows are only serialized/deserialized locally. The number + * of partitions are not changed and also the distribution of rows. This is mainly used to + * obtain some statistics of map tasks such as number of outputs. + */ +case class LocalPartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning { --- End diff -- Not sure if I understand correctly. We explicitly specify this `LocalPartitioning` when doing global limit and submit a map stage using this partitioner. Why we possibly hit a sort based shuffle? > You basically only need to write to a single file and your done. I think this is what we want. I specify the same partition numbers for `LocalPartitioning` as its child RDD and the rows in a partition all have the same partition id when using `LocalPartitioning`. Doesn't it make it to write to a single file? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r197366478 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -93,25 +98,101 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) + + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = LocalPartitioning(child.outputPartitioning, + childRDD.getNumPartitions) +val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().recordsByPartitionId.toSeq +} else { + Nil +} - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil +// Try to keep child plan's original data parallelism or not. It is enabled by default. +// If child output has certain ordering, we can't evenly pick up rows from each parititon. +val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit && + child.outputOrdering != Nil - override def outputPartitioning: Partitioning = child.outputPartitioning +val shuffled = new ShuffledRowRDD(shuffleDependency) - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +val sumOfOutput = numberOfOutput.sum +if (sumOfOutput <= limit) { + shuffled +} else if (!respectChildParallelism) { + // This is mainly for tests. + // Some tests like hive compatibility tests assume that the rows are returned by a specified + // order that the partitions are scaned sequentially until we reach the required number of + // rows. However, logically a Limit operator should not care the row scan order. + // Thus we take the rows of each partition until we reach the required limit number. + var numTakenRow = 0 + val takeAmounts = new mutable.HashMap[Int, Int]() --- End diff -- Yes. Going to updated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r197284779 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -93,25 +98,101 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) + + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = LocalPartitioning(child.outputPartitioning, + childRDD.getNumPartitions) +val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Long] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().recordsByPartitionId.toSeq +} else { + Nil +} - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil +// Try to keep child plan's original data parallelism or not. It is enabled by default. +// If child output has certain ordering, we can't evenly pick up rows from each parititon. +val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit && + child.outputOrdering != Nil - override def outputPartitioning: Partitioning = child.outputPartitioning +val shuffled = new ShuffledRowRDD(shuffleDependency) - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +val sumOfOutput = numberOfOutput.sum +if (sumOfOutput <= limit) { + shuffled +} else if (!respectChildParallelism) { + // This is mainly for tests. + // Some tests like hive compatibility tests assume that the rows are returned by a specified + // order that the partitions are scaned sequentially until we reach the required number of + // rows. However, logically a Limit operator should not care the row scan order. + // Thus we take the rows of each partition until we reach the required limit number. + var numTakenRow = 0 + val takeAmounts = new mutable.HashMap[Int, Int]() --- End diff -- Why a hash map? An array would also work right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r197117604 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -193,6 +193,16 @@ case object SinglePartition extends Partitioning { } } +/** + * Represents a partitioning where rows are only serialized/deserialized locally. The number + * of partitions are not changed and also the distribution of rows. This is mainly used to + * obtain some statistics of map tasks such as number of outputs. + */ +case class LocalPartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning { --- End diff -- This might be expensive as soon as we hit the sort based shuffle. Perhaps we should carve out some specialized shuffle writing path for this at some point. You basically only need to write to a single file and your done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r197116936 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -247,6 +253,10 @@ object ShuffleExchangeExec { val projection = UnsafeProjection.create(h.partitionIdExpression :: Nil, outputAttributes) row => projection(row).getInt(0) case RangePartitioning(_, _) | SinglePartition => identity + case LocalPartitioning(_, _) => +(row: InternalRow) => { + TaskContext.get().partitionId() --- End diff -- Can we try to do this once per partition instead of for each row? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org