[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-09-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r218665902
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = LocalPartitioning(childRDD)
+val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().recordsByPartitionId.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// During global limit, try to evenly distribute limited rows across 
data
+// partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.
+// Besides, if child output has certain ordering, we can't evenly pick 
up rows from
+// each parititon.
+val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && 
child.outputOrdering == Nil
+
+val shuffled = new ShuffledRowRDD(shuffleDependency)
+
+val sumOfOutput = numberOfOutput.sum
+if (sumOfOutput <= limit) {
+  shuffled
+} else if (!flatGlobalLimit) {
+  var numRowTaken = 0
+  val takeAmounts = numberOfOutput.map { num =>
+if (numRowTaken + num < limit) {
+  numRowTaken += num.toInt
+  num.toInt
+} else {
+  val toTake = limit - numRowTaken
+  numRowTaken += toTake
+  toTake
+}
+  }
+  val broadMap = sparkContext.broadcast(takeAmounts)
+  shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
+iter.take(broadMap.value(index).toInt)
+  }
+} else {
+  // We try to evenly require the asked limit number of rows across 
all child rdd's partitions.
+  var rowsNeedToTake: Long = limit
+  val takeAmountByPartition: Array[Long] = 
Array.fill[Long](numberOfOutput.length)(0L)
+  val remainingRowsByPartition: Array[Long] = Array(numberOfOutput: _*)
+
+  while (rowsNeedToTake > 0) {
+val nonEmptyParts = remainingRowsByPartition.count(_ > 0)
+// If the rows needed to take are less the number of non-empty 
partitions, take one row from
+// each non-empty partitions until we reach `limit` rows.
+// Otherwise, evenly divide the needed rows to each non-empty 
partitions.
+val takePerPart = math.max(1, rowsNeedToTake / nonEmptyParts)
+remainingRowsByPartition.zipWithIndex.foreach { case (num, index) 
=>
+  // In case `rowsNeedToTake` < `nonEmptyParts`, we may run out of 
`rowsNeedToTake` during
+  // the traversal, so we need to add this check.
+  if (rowsNeedToTake > 0 && num > 0) {
+if (num >= takePerPart) {
+  rowsNeedToTake -= takePerPart
+  takeAmountByPartition(index) += takePerPart
+  remainingRowsByPartition(index) -= takePerPart
+} else {
+  rowsNeedToTake -= num
+  takeAmountByPartition(index) += num
+  remainingRowsByPartition(index) -= num
+}
+  }
+}
+  }
+  val broadMap 

[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-09-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r218652707
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = LocalPartitioning(childRDD)
+val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().recordsByPartitionId.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// During global limit, try to evenly distribute limited rows across 
data
+// partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.
+// Besides, if child output has certain ordering, we can't evenly pick 
up rows from
+// each parititon.
+val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && 
child.outputOrdering == Nil
+
+val shuffled = new ShuffledRowRDD(shuffleDependency)
+
+val sumOfOutput = numberOfOutput.sum
+if (sumOfOutput <= limit) {
+  shuffled
+} else if (!flatGlobalLimit) {
+  var numRowTaken = 0
+  val takeAmounts = numberOfOutput.map { num =>
+if (numRowTaken + num < limit) {
+  numRowTaken += num.toInt
+  num.toInt
+} else {
+  val toTake = limit - numRowTaken
+  numRowTaken += toTake
+  toTake
+}
+  }
+  val broadMap = sparkContext.broadcast(takeAmounts)
+  shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
+iter.take(broadMap.value(index).toInt)
+  }
+} else {
+  // We try to evenly require the asked limit number of rows across 
all child rdd's partitions.
+  var rowsNeedToTake: Long = limit
+  val takeAmountByPartition: Array[Long] = 
Array.fill[Long](numberOfOutput.length)(0L)
+  val remainingRowsByPartition: Array[Long] = Array(numberOfOutput: _*)
+
+  while (rowsNeedToTake > 0) {
+val nonEmptyParts = remainingRowsByPartition.count(_ > 0)
+// If the rows needed to take are less the number of non-empty 
partitions, take one row from
+// each non-empty partitions until we reach `limit` rows.
+// Otherwise, evenly divide the needed rows to each non-empty 
partitions.
+val takePerPart = math.max(1, rowsNeedToTake / nonEmptyParts)
+remainingRowsByPartition.zipWithIndex.foreach { case (num, index) 
=>
+  // In case `rowsNeedToTake` < `nonEmptyParts`, we may run out of 
`rowsNeedToTake` during
+  // the traversal, so we need to add this check.
+  if (rowsNeedToTake > 0 && num > 0) {
+if (num >= takePerPart) {
+  rowsNeedToTake -= takePerPart
+  takeAmountByPartition(index) += takePerPart
+  remainingRowsByPartition(index) -= takePerPart
+} else {
+  rowsNeedToTake -= num
+  takeAmountByPartition(index) += num
+  remainingRowsByPartition(index) -= num
+}
+  }
+}
+  }
+  val 

[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-09-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r218651545
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = LocalPartitioning(childRDD)
+val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().recordsByPartitionId.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// During global limit, try to evenly distribute limited rows across 
data
+// partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.
+// Besides, if child output has certain ordering, we can't evenly pick 
up rows from
+// each parititon.
+val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && 
child.outputOrdering == Nil
+
+val shuffled = new ShuffledRowRDD(shuffleDependency)
+
+val sumOfOutput = numberOfOutput.sum
+if (sumOfOutput <= limit) {
+  shuffled
+} else if (!flatGlobalLimit) {
+  var numRowTaken = 0
+  val takeAmounts = numberOfOutput.map { num =>
+if (numRowTaken + num < limit) {
+  numRowTaken += num.toInt
+  num.toInt
+} else {
+  val toTake = limit - numRowTaken
+  numRowTaken += toTake
+  toTake
+}
+  }
+  val broadMap = sparkContext.broadcast(takeAmounts)
+  shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
+iter.take(broadMap.value(index).toInt)
+  }
+} else {
+  // We try to evenly require the asked limit number of rows across 
all child rdd's partitions.
+  var rowsNeedToTake: Long = limit
+  val takeAmountByPartition: Array[Long] = 
Array.fill[Long](numberOfOutput.length)(0L)
+  val remainingRowsByPartition: Array[Long] = Array(numberOfOutput: _*)
+
+  while (rowsNeedToTake > 0) {
+val nonEmptyParts = remainingRowsByPartition.count(_ > 0)
+// If the rows needed to take are less the number of non-empty 
partitions, take one row from
+// each non-empty partitions until we reach `limit` rows.
+// Otherwise, evenly divide the needed rows to each non-empty 
partitions.
+val takePerPart = math.max(1, rowsNeedToTake / nonEmptyParts)
+remainingRowsByPartition.zipWithIndex.foreach { case (num, index) 
=>
+  // In case `rowsNeedToTake` < `nonEmptyParts`, we may run out of 
`rowsNeedToTake` during
+  // the traversal, so we need to add this check.
+  if (rowsNeedToTake > 0 && num > 0) {
+if (num >= takePerPart) {
+  rowsNeedToTake -= takePerPart
+  takeAmountByPartition(index) += takePerPart
+  remainingRowsByPartition(index) -= takePerPart
+} else {
+  rowsNeedToTake -= num
+  takeAmountByPartition(index) += num
+  remainingRowsByPartition(index) -= num
+}
+  }
+}
+  }
+  val 

[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-09-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r218640368
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = LocalPartitioning(childRDD)
+val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().recordsByPartitionId.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// During global limit, try to evenly distribute limited rows across 
data
+// partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.
+// Besides, if child output has certain ordering, we can't evenly pick 
up rows from
+// each parititon.
+val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && 
child.outputOrdering == Nil
+
+val shuffled = new ShuffledRowRDD(shuffleDependency)
+
+val sumOfOutput = numberOfOutput.sum
+if (sumOfOutput <= limit) {
+  shuffled
+} else if (!flatGlobalLimit) {
+  var numRowTaken = 0
+  val takeAmounts = numberOfOutput.map { num =>
+if (numRowTaken + num < limit) {
+  numRowTaken += num.toInt
+  num.toInt
+} else {
+  val toTake = limit - numRowTaken
+  numRowTaken += toTake
+  toTake
+}
+  }
+  val broadMap = sparkContext.broadcast(takeAmounts)
+  shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
+iter.take(broadMap.value(index).toInt)
+  }
+} else {
+  // We try to evenly require the asked limit number of rows across 
all child rdd's partitions.
+  var rowsNeedToTake: Long = limit
+  val takeAmountByPartition: Array[Long] = 
Array.fill[Long](numberOfOutput.length)(0L)
+  val remainingRowsByPartition: Array[Long] = Array(numberOfOutput: _*)
+
+  while (rowsNeedToTake > 0) {
+val nonEmptyParts = remainingRowsByPartition.count(_ > 0)
+// If the rows needed to take are less the number of non-empty 
partitions, take one row from
+// each non-empty partitions until we reach `limit` rows.
+// Otherwise, evenly divide the needed rows to each non-empty 
partitions.
+val takePerPart = math.max(1, rowsNeedToTake / nonEmptyParts)
+remainingRowsByPartition.zipWithIndex.foreach { case (num, index) 
=>
+  // In case `rowsNeedToTake` < `nonEmptyParts`, we may run out of 
`rowsNeedToTake` during
+  // the traversal, so we need to add this check.
+  if (rowsNeedToTake > 0 && num > 0) {
+if (num >= takePerPart) {
+  rowsNeedToTake -= takePerPart
+  takeAmountByPartition(index) += takePerPart
+  remainingRowsByPartition(index) -= takePerPart
+} else {
+  rowsNeedToTake -= num
+  takeAmountByPartition(index) += num
+  remainingRowsByPartition(index) -= num
+}
+  }
+}
+  }
+  val broadMap 

[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-09-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r218639550
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = LocalPartitioning(childRDD)
+val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().recordsByPartitionId.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// During global limit, try to evenly distribute limited rows across 
data
+// partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.
+// Besides, if child output has certain ordering, we can't evenly pick 
up rows from
+// each parititon.
+val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && 
child.outputOrdering == Nil
+
+val shuffled = new ShuffledRowRDD(shuffleDependency)
+
+val sumOfOutput = numberOfOutput.sum
+if (sumOfOutput <= limit) {
+  shuffled
+} else if (!flatGlobalLimit) {
+  var numRowTaken = 0
+  val takeAmounts = numberOfOutput.map { num =>
+if (numRowTaken + num < limit) {
+  numRowTaken += num.toInt
+  num.toInt
+} else {
+  val toTake = limit - numRowTaken
+  numRowTaken += toTake
+  toTake
+}
+  }
+  val broadMap = sparkContext.broadcast(takeAmounts)
+  shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
+iter.take(broadMap.value(index).toInt)
+  }
+} else {
+  // We try to evenly require the asked limit number of rows across 
all child rdd's partitions.
+  var rowsNeedToTake: Long = limit
+  val takeAmountByPartition: Array[Long] = 
Array.fill[Long](numberOfOutput.length)(0L)
+  val remainingRowsByPartition: Array[Long] = Array(numberOfOutput: _*)
+
+  while (rowsNeedToTake > 0) {
+val nonEmptyParts = remainingRowsByPartition.count(_ > 0)
+// If the rows needed to take are less the number of non-empty 
partitions, take one row from
+// each non-empty partitions until we reach `limit` rows.
+// Otherwise, evenly divide the needed rows to each non-empty 
partitions.
+val takePerPart = math.max(1, rowsNeedToTake / nonEmptyParts)
+remainingRowsByPartition.zipWithIndex.foreach { case (num, index) 
=>
+  // In case `rowsNeedToTake` < `nonEmptyParts`, we may run out of 
`rowsNeedToTake` during
+  // the traversal, so we need to add this check.
+  if (rowsNeedToTake > 0 && num > 0) {
+if (num >= takePerPart) {
+  rowsNeedToTake -= takePerPart
+  takeAmountByPartition(index) += takePerPart
+  remainingRowsByPartition(index) -= takePerPart
+} else {
+  rowsNeedToTake -= num
+  takeAmountByPartition(index) += num
+  remainingRowsByPartition(index) -= num
+}
+  }
+}
+  }
+  val 

[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-09-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r218639483
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = LocalPartitioning(childRDD)
+val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().recordsByPartitionId.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// During global limit, try to evenly distribute limited rows across 
data
+// partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.
+// Besides, if child output has certain ordering, we can't evenly pick 
up rows from
+// each parititon.
+val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && 
child.outputOrdering == Nil
+
+val shuffled = new ShuffledRowRDD(shuffleDependency)
+
+val sumOfOutput = numberOfOutput.sum
+if (sumOfOutput <= limit) {
+  shuffled
+} else if (!flatGlobalLimit) {
+  var numRowTaken = 0
+  val takeAmounts = numberOfOutput.map { num =>
+if (numRowTaken + num < limit) {
+  numRowTaken += num.toInt
+  num.toInt
+} else {
+  val toTake = limit - numRowTaken
+  numRowTaken += toTake
+  toTake
+}
+  }
+  val broadMap = sparkContext.broadcast(takeAmounts)
+  shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
+iter.take(broadMap.value(index).toInt)
+  }
+} else {
+  // We try to evenly require the asked limit number of rows across 
all child rdd's partitions.
+  var rowsNeedToTake: Long = limit
+  val takeAmountByPartition: Array[Long] = 
Array.fill[Long](numberOfOutput.length)(0L)
+  val remainingRowsByPartition: Array[Long] = Array(numberOfOutput: _*)
+
+  while (rowsNeedToTake > 0) {
+val nonEmptyParts = remainingRowsByPartition.count(_ > 0)
+// If the rows needed to take are less the number of non-empty 
partitions, take one row from
+// each non-empty partitions until we reach `limit` rows.
+// Otherwise, evenly divide the needed rows to each non-empty 
partitions.
+val takePerPart = math.max(1, rowsNeedToTake / nonEmptyParts)
+remainingRowsByPartition.zipWithIndex.foreach { case (num, index) 
=>
+  // In case `rowsNeedToTake` < `nonEmptyParts`, we may run out of 
`rowsNeedToTake` during
+  // the traversal, so we need to add this check.
+  if (rowsNeedToTake > 0 && num > 0) {
+if (num >= takePerPart) {
+  rowsNeedToTake -= takePerPart
+  takeAmountByPartition(index) += takePerPart
+  remainingRowsByPartition(index) -= takePerPart
+} else {
+  rowsNeedToTake -= num
+  takeAmountByPartition(index) += num
+  remainingRowsByPartition(index) -= num
+}
+  }
+}
+  }
+  val 

[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-09-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r218631745
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = LocalPartitioning(childRDD)
+val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().recordsByPartitionId.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// During global limit, try to evenly distribute limited rows across 
data
+// partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.
+// Besides, if child output has certain ordering, we can't evenly pick 
up rows from
+// each parititon.
+val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && 
child.outputOrdering == Nil
+
+val shuffled = new ShuffledRowRDD(shuffleDependency)
+
+val sumOfOutput = numberOfOutput.sum
+if (sumOfOutput <= limit) {
+  shuffled
+} else if (!flatGlobalLimit) {
+  var numRowTaken = 0
+  val takeAmounts = numberOfOutput.map { num =>
+if (numRowTaken + num < limit) {
+  numRowTaken += num.toInt
+  num.toInt
+} else {
+  val toTake = limit - numRowTaken
+  numRowTaken += toTake
+  toTake
+}
+  }
+  val broadMap = sparkContext.broadcast(takeAmounts)
+  shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
+iter.take(broadMap.value(index).toInt)
+  }
+} else {
+  // We try to evenly require the asked limit number of rows across 
all child rdd's partitions.
+  var rowsNeedToTake: Long = limit
+  val takeAmountByPartition: Array[Long] = 
Array.fill[Long](numberOfOutput.length)(0L)
+  val remainingRowsByPartition: Array[Long] = Array(numberOfOutput: _*)
+
+  while (rowsNeedToTake > 0) {
+val nonEmptyParts = remainingRowsByPartition.count(_ > 0)
+// If the rows needed to take are less the number of non-empty 
partitions, take one row from
+// each non-empty partitions until we reach `limit` rows.
+// Otherwise, evenly divide the needed rows to each non-empty 
partitions.
+val takePerPart = math.max(1, rowsNeedToTake / nonEmptyParts)
+remainingRowsByPartition.zipWithIndex.foreach { case (num, index) 
=>
+  // In case `rowsNeedToTake` < `nonEmptyParts`, we may run out of 
`rowsNeedToTake` during
+  // the traversal, so we need to add this check.
+  if (rowsNeedToTake > 0 && num > 0) {
+if (num >= takePerPart) {
+  rowsNeedToTake -= takePerPart
+  takeAmountByPartition(index) += takePerPart
+  remainingRowsByPartition(index) -= takePerPart
+} else {
+  rowsNeedToTake -= num
+  takeAmountByPartition(index) += num
+  remainingRowsByPartition(index) -= num
+}
+  }
+}
+  }
+  val broadMap 

[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-09-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r218631682
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = LocalPartitioning(childRDD)
+val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().recordsByPartitionId.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// During global limit, try to evenly distribute limited rows across 
data
+// partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.
+// Besides, if child output has certain ordering, we can't evenly pick 
up rows from
+// each parititon.
+val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && 
child.outputOrdering == Nil
+
+val shuffled = new ShuffledRowRDD(shuffleDependency)
+
+val sumOfOutput = numberOfOutput.sum
+if (sumOfOutput <= limit) {
+  shuffled
+} else if (!flatGlobalLimit) {
+  var numRowTaken = 0
+  val takeAmounts = numberOfOutput.map { num =>
+if (numRowTaken + num < limit) {
+  numRowTaken += num.toInt
+  num.toInt
+} else {
+  val toTake = limit - numRowTaken
+  numRowTaken += toTake
+  toTake
+}
+  }
+  val broadMap = sparkContext.broadcast(takeAmounts)
+  shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
+iter.take(broadMap.value(index).toInt)
+  }
+} else {
+  // We try to evenly require the asked limit number of rows across 
all child rdd's partitions.
+  var rowsNeedToTake: Long = limit
+  val takeAmountByPartition: Array[Long] = 
Array.fill[Long](numberOfOutput.length)(0L)
+  val remainingRowsByPartition: Array[Long] = Array(numberOfOutput: _*)
+
+  while (rowsNeedToTake > 0) {
+val nonEmptyParts = remainingRowsByPartition.count(_ > 0)
+// If the rows needed to take are less the number of non-empty 
partitions, take one row from
+// each non-empty partitions until we reach `limit` rows.
+// Otherwise, evenly divide the needed rows to each non-empty 
partitions.
+val takePerPart = math.max(1, rowsNeedToTake / nonEmptyParts)
+remainingRowsByPartition.zipWithIndex.foreach { case (num, index) 
=>
+  // In case `rowsNeedToTake` < `nonEmptyParts`, we may run out of 
`rowsNeedToTake` during
+  // the traversal, so we need to add this check.
+  if (rowsNeedToTake > 0 && num > 0) {
+if (num >= takePerPart) {
+  rowsNeedToTake -= takePerPart
+  takeAmountByPartition(index) += takePerPart
+  remainingRowsByPartition(index) -= takePerPart
+} else {
+  rowsNeedToTake -= num
+  takeAmountByPartition(index) += num
+  remainingRowsByPartition(index) -= num
+}
+  }
+}
+  }
+  val broadMap 

[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-09-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r218630513
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala 
---
@@ -22,21 +22,29 @@ import scala.collection.JavaConverters._
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.sql.hive.test.{TestHive, TestHiveQueryExecution}
+import org.apache.spark.sql.internal.SQLConf
 
 /**
  * A set of test cases that validate partition and column pruning.
  */
 class PruningSuite extends HiveComparisonTest with BeforeAndAfter {
 
+  private val originalLimitFlatGlobalLimit = 
TestHive.conf.limitFlatGlobalLimit
+
   override def beforeAll(): Unit = {
 super.beforeAll()
 TestHive.setCacheTables(false)
+TestHive.setConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT, false)
--- End diff --

why do we set this flag here? we need to document it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-09-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r218630488
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala ---
@@ -557,11 +557,13 @@ class DataFrameAggregateSuite extends QueryTest with 
SharedSQLContext {
   }
 
   test("SPARK-18004 limit + aggregates") {
-val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", 
"value")
-val limit2Df = df.limit(2)
-checkAnswer(
-  limit2Df.groupBy("id").count().select($"id"),
-  limit2Df.select($"id"))
+withSQLConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT.key -> "true") {
--- End diff --

why do we set this flag here? we need to document it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-09-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r218630324
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -204,6 +204,13 @@ object SQLConf {
 .intConf
 .createWithDefault(4)
 
+  val LIMIT_FLAT_GLOBAL_LIMIT = 
buildConf("spark.sql.limit.flatGlobalLimit")
+.internal()
+.doc("During global limit, try to evenly distribute limited rows 
across data " +
+  "partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.")
+.booleanConf
+.createWithDefault(true)
--- End diff --

so i read this config doc five times, and i still couldn't figure out what 
it does, until i went ahead and read the implementation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-09-18 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r218614872
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala ---
@@ -44,18 +45,23 @@ private[spark] sealed trait MapStatus {
* necessary for correctness, since block fetchers are allowed to skip 
zero-size blocks.
*/
   def getSizeForBlock(reduceId: Int): Long
+
+  /**
+   * The number of outputs for the map task.
+   */
+  def numberOfOutput: Long
--- End diff --

what does this mean? output blocks? output files?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-08-26 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r212844439
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = LocalPartitioning(childRDD)
+val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().recordsByPartitionId.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// During global limit, try to evenly distribute limited rows across 
data
+// partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.
+// Besides, if child output has certain ordering, we can't evenly pick 
up rows from
+// each parititon.
+val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && 
child.outputOrdering == Nil
--- End diff --

Ok. I got your point. As the SQL standard doesn't mandates that. I think we 
can safely remove the `child.outputPartitioning` check.

Let me open a follow up PR for it.





---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-08-26 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r212830045
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = LocalPartitioning(childRDD)
+val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().recordsByPartitionId.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// During global limit, try to evenly distribute limited rows across 
data
+// partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.
+// Besides, if child output has certain ordering, we can't evenly pick 
up rows from
+// each parititon.
+val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && 
child.outputOrdering == Nil
--- End diff --

`select * from table order by a limit 10` gets planned differently right? 
It should use `TakeOrderedAndProjectExec`.

There is nothing in the SQL standard that mandates that a nested order by 
followed by a limit has to respect that ordering clause. In fact, AFAIR, the 
standard does not even support nested limits (they make stuff 
non-deterministic).

If we end up supporting this, then I'd rather have an explicit flag in 
`GlobalLimitExec` (`orderedLimit` or something like that) and set that during 
planning by matching on `Limit(limit, Sort(order, true, child))`. I want the 
explicit flag because then we can figure out what limit is doing by looking at 
the physical plan. I want to explicitly check for an underlying sort to match 
the current `TakeOrderedAndProjectExec` semantics and to avoid weird behavior 
because something way down the plan has set some arbitrary ordering.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-08-25 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r212811618
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = LocalPartitioning(childRDD)
+val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().recordsByPartitionId.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// During global limit, try to evenly distribute limited rows across 
data
+// partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.
+// Besides, if child output has certain ordering, we can't evenly pick 
up rows from
+// each parititon.
+val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && 
child.outputOrdering == Nil
--- End diff --

For a query like `select * from table order by a limit 10`, I think the 
expected semantics is going to return top 10 elements, not any 10 elements. In 
order to not change this behavior, I add this check.

> Moreover checking child.outputOrdering only checks the order of the 
partition and not the order of the frame as a whole. You should also add the 
child.outputPartitioning.

I think you are correct. We need to check `child.outputPartitioning`. I 
think we need to check there is a `RangePartitioning`. The check should be the 
child is a range partitioning and has some output ordering. WDYT?

> I am not entirely sure that we should guarantee that you should get the 
lowest elements of a dataset if you perform a limit in the middle of a query (a 
top level sort-limit does have this guarantee). I also don't think the SQL 
standard supports/mandates this.
> I would be slightly in favor of removing the child.outputOrdering check.

I am not sure for a limit in the middle of a query, if we don't need to 
consider this. When such query has sort, don't we need to return top limit 
elements?

cc @cloud-fan too.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-08-25 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r212805707
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = LocalPartitioning(childRDD)
+val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().recordsByPartitionId.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// During global limit, try to evenly distribute limited rows across 
data
+// partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.
+// Besides, if child output has certain ordering, we can't evenly pick 
up rows from
+// each parititon.
+val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && 
child.outputOrdering == Nil
--- End diff --

If we remove it, we may need to feature flag it first since people may rely 
on the old behavior. Anyway all of this is up for debate.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-08-25 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r212805327
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = LocalPartitioning(childRDD)
+val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().recordsByPartitionId.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// During global limit, try to evenly distribute limited rows across 
data
+// partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.
+// Besides, if child output has certain ordering, we can't evenly pick 
up rows from
+// each parititon.
+val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && 
child.outputOrdering == Nil
--- End diff --

@viirya dumb question, what is `child.outputOrdering` doing here? I am not 
entirely sure that we should guarantee that you should get the lowest elements 
of a dataset if you perform a limit in the middle of a query (a top level 
sort-limit does have this guarantee). I also don't think the SQL standard 
supports/mandates this.

Moreover checking `child.outputOrdering` only checks the order of the 
partition and not the order of the frame as a whole. You should also add the 
`child.outputPartitioning`.

I would be slightly in favor of removing the `child.outputOrdering` check.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-08-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r212797691
  
--- Diff: 
sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql 
---
@@ -1,6 +1,9 @@
 -- A test suite for IN LIMIT in parent side, subquery, and both predicate 
subquery
 -- It includes correlated cases.
 
+-- Disable global limit optimization
--- End diff --

ah i see, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-08-25 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r212792753
  
--- Diff: 
sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql 
---
@@ -1,6 +1,9 @@
 -- A test suite for IN LIMIT in parent side, subquery, and both predicate 
subquery
 -- It includes correlated cases.
 
+-- Disable global limit optimization
--- End diff --

The golden result strictly requires returned values on limit query. This 
disables the optimization to get the required results.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-08-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r212792225
  
--- Diff: 
sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql 
---
@@ -1,6 +1,9 @@
 -- A test suite for IN LIMIT in parent side, subquery, and both predicate 
subquery
 -- It includes correlated cases.
 
+-- Disable global limit optimization
--- End diff --

do we have a problem here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-08-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16677


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-07-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r204580788
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
 ---
@@ -22,6 +22,7 @@ import scala.util.Random
 import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.internal.SQLConf
--- End diff --

Oops, forgot to revert it. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-07-23 Thread sujith71955
Github user sujith71955 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r204362254
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -231,6 +231,12 @@ object ShuffleExchangeExec {
   override def numPartitions: Int = 1
   override def getPartition(key: Any): Int = 0
 }
+  case l: LocalPartitioning =>
+new Partitioner {
+  override def numPartitions: Int = l.numPartitions
+  override def getPartition(key: Any): Int = key.asInstanceOf[Int]
+}
+
--- End diff --

nit :extra space


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-07-23 Thread sujith71955
Github user sujith71955 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r204361301
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
 ---
@@ -22,6 +22,7 @@ import scala.util.Random
 import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.internal.SQLConf
--- End diff --

why this import req?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-07-21 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r204221009
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -204,6 +204,13 @@ object SQLConf {
 .intConf
 .createWithDefault(4)
 
+  val LIMIT_FLAT_GLOBAL_LIMIT = 
buildConf("spark.sql.limit.flatGlobalLimit")
+.internal()
+.doc("During global limit, try to evenly distribute limited rows 
across data " +
+  "partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.")
+.booleanConf
+.createWithDefault(true)
--- End diff --

@hvanhovell Should we set it false by default?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-06-28 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r198746910
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 ---
@@ -145,10 +145,12 @@ public void write(Iterator> records) 
throws IOException {
 // included in the shuffle write time.
 writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
 
+long numOfRecords = 0;
 while (records.hasNext()) {
   final Product2 record = records.next();
   final K key = record._1();
   partitionWriters[partitioner.getPartition(key)].write(key, 
record._2());
+  numOfRecords += 1;
--- End diff --

To verify it, I ran a test locally. Looks like 
`writeMetrics_recordsWritten` is well updated in `SortShuffleWriter` path too. 
So I will replace `numOfRecords` with `writeMetrics_recordsWritten`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-06-26 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r198360170
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 ---
@@ -145,10 +145,12 @@ public void write(Iterator> records) 
throws IOException {
 // included in the shuffle write time.
 writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
 
+long numOfRecords = 0;
 while (records.hasNext()) {
   final Product2 record = records.next();
   final K key = record._1();
   partitionWriters[partitioner.getPartition(key)].write(key, 
record._2());
+  numOfRecords += 1;
--- End diff --

We are introducing numOfRecords because (as you say) some code paths are 
not updating the metric.
Instead of working around the bug and cluttering code, it is better to fix 
it cleanly (unless there is some design issue or more complicated issue).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-06-26 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r198328639
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -193,6 +193,16 @@ case object SinglePartition extends Partitioning {
   }
 }
 
+/**
+ * Represents a partitioning where rows are only serialized/deserialized 
locally. The number
+ * of partitions are not changed and also the distribution of rows. This 
is mainly used to
+ * obtain some statistics of map tasks such as number of outputs.
+ */
+case class LocalPartitioning(orgPartition: Partitioning, numPartitions: 
Int) extends Partitioning {
--- End diff --

As you see that this causes test failure. It is because 
`child.outputPartitioning` can be an `UnknownPartitioning(0)`.  To avoid using 
a separate field, I make childRDD as `LocalRelation`'s field instead of 
`orgPartition`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-06-26 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r198115993
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -193,6 +193,16 @@ case object SinglePartition extends Partitioning {
   }
 }
 
+/**
+ * Represents a partitioning where rows are only serialized/deserialized 
locally. The number
+ * of partitions are not changed and also the distribution of rows. This 
is mainly used to
+ * obtain some statistics of map tasks such as number of outputs.
+ */
+case class LocalPartitioning(orgPartition: Partitioning, numPartitions: 
Int) extends Partitioning {
--- End diff --

Ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-06-26 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r198106419
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -193,6 +193,16 @@ case object SinglePartition extends Partitioning {
   }
 }
 
+/**
+ * Represents a partitioning where rows are only serialized/deserialized 
locally. The number
+ * of partitions are not changed and also the distribution of rows. This 
is mainly used to
+ * obtain some statistics of map tasks such as number of outputs.
+ */
+case class LocalPartitioning(orgPartition: Partitioning, numPartitions: 
Int) extends Partitioning {
--- End diff --

One more thing, can you make LocalRelation use `orgPartition.numPartitions` 
instead of adding the it as a separate field?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-06-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r197613554
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 ---
@@ -145,10 +145,12 @@ public void write(Iterator> records) 
throws IOException {
 // included in the shuffle write time.
 writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
 
+long numOfRecords = 0;
 while (records.hasNext()) {
   final Product2 record = records.next();
   final K key = record._1();
   partitionWriters[partitioner.getPartition(key)].write(key, 
record._2());
+  numOfRecords += 1;
--- End diff --

Hmm, I think it is fine. However, maybe I miss it, but I can't find 
`SortShuffleWriter` has updated `writeMetrics_recordsWritten`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-06-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r197613004
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -204,6 +204,13 @@ object SQLConf {
 .intConf
 .createWithDefault(4)
 
+  val LIMIT_FLAT_GLOBAL_LIMIT = 
buildConf("spark.sql.limit.flatGlobalLimit")
+.internal()
+.doc("During global limit, try to evenly distribute limited rows 
across data " +
+  "partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.")
+.booleanConf
+.createWithDefault(true)
--- End diff --

I set this as true. One reason is to see if it can pass existing tests. If 
we don't feel confident or worry about behavior change, we can set this to 
false before merging.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-06-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r197607227
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -93,25 +96,94 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = LocalPartitioning(child.outputPartitioning,
+  childRDD.getNumPartitions)
+val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().recordsByPartitionId.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// During global limit, try to evenly distribute limited rows across 
data
+// partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.
+// Besides, if child output has certain ordering, we can't evenly pick 
up rows from
+// each parititon.
+val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && 
child.outputOrdering == Nil
+
+val shuffled = new ShuffledRowRDD(shuffleDependency)
+
+val sumOfOutput = numberOfOutput.sum
+if (sumOfOutput <= limit) {
+  shuffled
+} else if (!flatGlobalLimit) {
+  var numRowTaken = 0
+  val takeAmounts = numberOfOutput.map { num =>
--- End diff --

map -> foreach


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-06-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r197606877
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 ---
@@ -145,10 +145,12 @@ public void write(Iterator> records) 
throws IOException {
 // included in the shuffle write time.
 writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
 
+long numOfRecords = 0;
 while (records.hasNext()) {
   final Product2 record = records.next();
   final K key = record._1();
   partitionWriters[partitioner.getPartition(key)].write(key, 
record._2());
+  numOfRecords += 1;
--- End diff --

Here and elsewhere, simply use `writeMetrics._recordsWritten` instead of 
adding `numOfRecords` ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-06-22 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r197596253
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -247,6 +253,10 @@ object ShuffleExchangeExec {
 val projection = UnsafeProjection.create(h.partitionIdExpression 
:: Nil, outputAttributes)
 row => projection(row).getInt(0)
   case RangePartitioning(_, _) | SinglePartition => identity
+  case LocalPartitioning(_, _) =>
+(row: InternalRow) => {
+  TaskContext.get().partitionId()
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-06-22 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r197462974
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -247,6 +253,10 @@ object ShuffleExchangeExec {
 val projection = UnsafeProjection.create(h.partitionIdExpression 
:: Nil, outputAttributes)
 row => projection(row).getInt(0)
   case RangePartitioning(_, _) | SinglePartition => identity
+  case LocalPartitioning(_, _) =>
+(row: InternalRow) => {
+  TaskContext.get().partitionId()
--- End diff --

I think so. Will update in next commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-06-22 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r197430814
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -93,25 +98,95 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = LocalPartitioning(child.outputPartitioning,
+  childRDD.getNumPartitions)
+val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().recordsByPartitionId.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// During global limit, try to evenly distribute limited rows across 
data
+// partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.
+// Besides, if child output has certain ordering, we can't evenly pick 
up rows from
+// each parititon.
+val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && 
child.outputOrdering == Nil
+
+val shuffled = new ShuffledRowRDD(shuffleDependency)
+
+val sumOfOutput = numberOfOutput.sum
+if (sumOfOutput <= limit) {
+  shuffled
+} else if (!flatGlobalLimit) {
+  var numRowTaken = 0
+  val takeAmounts = 
mutable.ArrayBuffer.fill[Long](numberOfOutput.length)(0L)
--- End diff --

Oh, sorry, my bad...I will change it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-06-22 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r197430376
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -193,6 +193,16 @@ case object SinglePartition extends Partitioning {
   }
 }
 
+/**
+ * Represents a partitioning where rows are only serialized/deserialized 
locally. The number
+ * of partitions are not changed and also the distribution of rows. This 
is mainly used to
+ * obtain some statistics of map tasks such as number of outputs.
+ */
+case class LocalPartitioning(orgPartition: Partitioning, numPartitions: 
Int) extends Partitioning {
--- End diff --

Ah. I see. Thanks for the clarifying. I agree that we might need to have 
specialized shuffle writing path at some point. Currently I think when we hit 
the sort based shuffle, this should not be worse performance than previous 
global limit operation. If you agree, I'd like to put it to some follow-ups.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-06-22 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r197410930
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -93,25 +98,95 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = LocalPartitioning(child.outputPartitioning,
+  childRDD.getNumPartitions)
+val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().recordsByPartitionId.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// During global limit, try to evenly distribute limited rows across 
data
+// partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.
+// Besides, if child output has certain ordering, we can't evenly pick 
up rows from
+// each parititon.
+val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && 
child.outputOrdering == Nil
+
+val shuffled = new ShuffledRowRDD(shuffleDependency)
+
+val sumOfOutput = numberOfOutput.sum
+if (sumOfOutput <= limit) {
+  shuffled
+} else if (!flatGlobalLimit) {
+  var numRowTaken = 0
+  val takeAmounts = 
mutable.ArrayBuffer.fill[Long](numberOfOutput.length)(0L)
--- End diff --

I might be dumb, but why do you need an `ArrayBuffer` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-06-22 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r197410511
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -193,6 +193,16 @@ case object SinglePartition extends Partitioning {
   }
 }
 
+/**
+ * Represents a partitioning where rows are only serialized/deserialized 
locally. The number
+ * of partitions are not changed and also the distribution of rows. This 
is mainly used to
+ * obtain some statistics of map tasks such as number of outputs.
+ */
+case class LocalPartitioning(orgPartition: Partitioning, numPartitions: 
Int) extends Partitioning {
--- End diff --

It will hit the sort based shuffle path as soon as the `numPartitions` > 
200 right? The problem is not that it will end up in the same shuffle file (it 
will), the (small) problem is that the sort based shuffle buffers rows and 
tries to sort them before writing them out. It is just a lot of unneeded 
complexity.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-06-22 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r197388872
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -193,6 +193,16 @@ case object SinglePartition extends Partitioning {
   }
 }
 
+/**
+ * Represents a partitioning where rows are only serialized/deserialized 
locally. The number
+ * of partitions are not changed and also the distribution of rows. This 
is mainly used to
+ * obtain some statistics of map tasks such as number of outputs.
+ */
+case class LocalPartitioning(orgPartition: Partitioning, numPartitions: 
Int) extends Partitioning {
--- End diff --

Not sure if I understand correctly. We explicitly specify this 
`LocalPartitioning` when doing global limit and submit a map stage using this 
partitioner. Why we possibly hit a sort based shuffle?

> You basically only need to write to a single file and your done.
I think this is what we want. I specify the same partition numbers for 
`LocalPartitioning`  as its child RDD and the rows in a partition all have the 
same partition id when using `LocalPartitioning`. Doesn't it make it to write 
to a single file?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-06-22 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r197366478
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -93,25 +98,101 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
+
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = LocalPartitioning(child.outputPartitioning,
+  childRDD.getNumPartitions)
+val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().recordsByPartitionId.toSeq
+} else {
+  Nil
+}
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+// Try to keep child plan's original data parallelism or not. It is 
enabled by default.
+// If child output has certain ordering, we can't evenly pick up rows 
from each parititon.
+val respectChildParallelism = 
sqlContext.conf.enableParallelGlobalLimit &&
+  child.outputOrdering != Nil
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+val shuffled = new ShuffledRowRDD(shuffleDependency)
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+val sumOfOutput = numberOfOutput.sum
+if (sumOfOutput <= limit) {
+  shuffled
+} else if (!respectChildParallelism) {
+  // This is mainly for tests.
+  // Some tests like hive compatibility tests assume that the rows are 
returned by a specified
+  // order that the partitions are scaned sequentially until we reach 
the required number of
+  // rows. However, logically a Limit operator should not care the row 
scan order.
+  // Thus we take the rows of each partition until we reach the 
required limit number.
+  var numTakenRow = 0
+  val takeAmounts = new mutable.HashMap[Int, Int]()
--- End diff --

Yes. Going to updated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-06-21 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r197284779
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -93,25 +98,101 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
+
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = LocalPartitioning(child.outputPartitioning,
+  childRDD.getNumPartitions)
+val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().recordsByPartitionId.toSeq
+} else {
+  Nil
+}
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+// Try to keep child plan's original data parallelism or not. It is 
enabled by default.
+// If child output has certain ordering, we can't evenly pick up rows 
from each parititon.
+val respectChildParallelism = 
sqlContext.conf.enableParallelGlobalLimit &&
+  child.outputOrdering != Nil
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+val shuffled = new ShuffledRowRDD(shuffleDependency)
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+val sumOfOutput = numberOfOutput.sum
+if (sumOfOutput <= limit) {
+  shuffled
+} else if (!respectChildParallelism) {
+  // This is mainly for tests.
+  // Some tests like hive compatibility tests assume that the rows are 
returned by a specified
+  // order that the partitions are scaned sequentially until we reach 
the required number of
+  // rows. However, logically a Limit operator should not care the row 
scan order.
+  // Thus we take the rows of each partition until we reach the 
required limit number.
+  var numTakenRow = 0
+  val takeAmounts = new mutable.HashMap[Int, Int]()
--- End diff --

Why a hash map? An array would also work right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-06-21 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r197117604
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -193,6 +193,16 @@ case object SinglePartition extends Partitioning {
   }
 }
 
+/**
+ * Represents a partitioning where rows are only serialized/deserialized 
locally. The number
+ * of partitions are not changed and also the distribution of rows. This 
is mainly used to
+ * obtain some statistics of map tasks such as number of outputs.
+ */
+case class LocalPartitioning(orgPartition: Partitioning, numPartitions: 
Int) extends Partitioning {
--- End diff --

This might be expensive as soon as we hit the sort based shuffle. Perhaps 
we should carve out some specialized shuffle writing path for this at some 
point. You basically only need to write to a single file and your done. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-06-21 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r197116936
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -247,6 +253,10 @@ object ShuffleExchangeExec {
 val projection = UnsafeProjection.create(h.partitionIdExpression 
:: Nil, outputAttributes)
 row => projection(row).getInt(0)
   case RangePartitioning(_, _) | SinglePartition => identity
+  case LocalPartitioning(_, _) =>
+(row: InternalRow) => {
+  TaskContext.get().partitionId()
--- End diff --

Can we try to do this once per partition instead of for each row?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org