[GitHub] spark pull request #18055: [Core][WIP] Make the object in TorrentBroadcast a...

2017-05-21 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/18055#discussion_r117666471
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -54,7 +54,7 @@ import org.apache.spark.util.io.{ChunkedByteBuffer, 
ChunkedByteBufferOutputStrea
  * @param obj object to broadcast
  * @param id A unique identifier for the broadcast variable.
  */
-private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
+private[spark] class TorrentBroadcast[T: ClassTag](@transient val obj: T, 
id: Long)
--- End diff --

For the old code, obj is not a field, it is just a ctor argument


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

2017-01-25 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r97783672
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -90,25 +95,101 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
+
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = LocalPartitioning(child.outputPartitioning,
+  childRDD.getNumPartitions)
+val shuffleDependency = ShuffleExchange.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Int] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().numberOfOutput.toSeq
+} else {
+  Nil
+}
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+// Try to keep child plan's original data parallelism or not. It is 
enabled by default.
+val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+val shuffled = new ShuffledRowRDD(shuffleDependency)
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+val sumOfOutput = numberOfOutput.sum
+if (sumOfOutput <= limit) {
+  shuffled
+} else if (!respectChildParallelism) {
+  // This is mainly for tests.
+  // We take the rows of each partition until we reach the required 
limit number.
--- End diff --

Hmm, i think the old single partition is not like this, why we need this 
branch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

2017-01-24 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r97701247
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -90,25 +95,100 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = FakePartitioning(child.outputPartitioning,
+  childRDD.getNumPartitions)
+val shuffleDependency = ShuffleExchange.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Int] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().numberOfOutput.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// Try to keep child plan's original data parallelism or not. It is 
enabled by default.
+val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit
+
+val sumOfOutput = numberOfOutput.sum
+if (sumOfOutput <= limit) {
+  childRDD
+} else if (!respectChildParallelism) {
+  // This is mainly for tests.
+  // We take the rows of each partition until we reach the required 
limit number.
+  var countForRows = 0
+  val takeAmounts = new mutable.HashMap[Int, Int]()
+  numberOfOutput.zipWithIndex.foreach { case (num, index) =>
+if (countForRows + num < limit) {
+  countForRows += num
+  takeAmounts += ((index, num))
+} else {
+  val toTake = limit - countForRows
+  countForRows += toTake
+  takeAmounts += ((index, toTake))
+}
+  }
+  val shuffled = new ShuffledRowRDD(shuffleDependency)
+  shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
+takeAmounts.get(index).map { size =>
+  iter.take(size)
+}.getOrElse(iter)
+  }
+} else {
+  // We try to distribute the required limit number of rows across all 
child rdd's partitions.
+  var numToReduce = (sumOfOutput - limit)
+  val reduceAmounts = new mutable.HashMap[Int, Int]()
+  val nonEmptyParts = numberOfOutput.filter(_ > 0).size
+  val reducePerPart = numToReduce / nonEmptyParts
+  numberOfOutput.zipWithIndex.foreach { case (num, index) =>
+if (num >= reducePerPart) {
+  numToReduce -= reducePerPart
+  reduceAmounts += ((index, reducePerPart))
+} else {
+  numToReduce -= num
+  reduceAmounts += ((index, num))
+}
+  }
+  while (numToReduce > 0) {
+numberOfOutput.zipWithIndex.foreach { case (num, index) =>
+  val toReduce = if (numToReduce / nonEmptyParts > 0) {
+numToReduce / nonEmptyParts
+  } else {
+numToReduce
+  }
+  if (num - reduceAmounts(index) >= toReduce) {
+reduceAmounts(index) = reduceAmounts(index) + toReduce
+numToReduce -= toReduce
+  } else if (num - reduceAmounts(index) > 0) {
+reduceAmounts(index) = reduceAmounts(index) + 1
+numToReduce -= 1
+  }
+}
+  }
+
+  val shuffled = new ShuffledRowRDD(shuffleDependency)
+  shuffled.mapPartitio

[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

2017-01-24 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r97700863
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -90,25 +95,100 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = FakePartitioning(child.outputPartitioning,
+  childRDD.getNumPartitions)
+val shuffleDependency = ShuffleExchange.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Int] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().numberOfOutput.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// Try to keep child plan's original data parallelism or not. It is 
enabled by default.
+val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit
+
+val sumOfOutput = numberOfOutput.sum
+if (sumOfOutput <= limit) {
+  childRDD
+} else if (!respectChildParallelism) {
+  // This is mainly for tests.
+  // We take the rows of each partition until we reach the required 
limit number.
+  var countForRows = 0
+  val takeAmounts = new mutable.HashMap[Int, Int]()
+  numberOfOutput.zipWithIndex.foreach { case (num, index) =>
+if (countForRows + num < limit) {
+  countForRows += num
+  takeAmounts += ((index, num))
+} else {
+  val toTake = limit - countForRows
+  countForRows += toTake
+  takeAmounts += ((index, toTake))
+}
+  }
+  val shuffled = new ShuffledRowRDD(shuffleDependency)
+  shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
+takeAmounts.get(index).map { size =>
+  iter.take(size)
+}.getOrElse(iter)
+  }
+} else {
+  // We try to distribute the required limit number of rows across all 
child rdd's partitions.
+  var numToReduce = (sumOfOutput - limit)
+  val reduceAmounts = new mutable.HashMap[Int, Int]()
--- End diff --

its better to broadcast `reduceAmounts`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

2017-01-24 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r97700723
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -90,25 +95,100 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = FakePartitioning(child.outputPartitioning,
+  childRDD.getNumPartitions)
+val shuffleDependency = ShuffleExchange.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Int] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().numberOfOutput.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// Try to keep child plan's original data parallelism or not. It is 
enabled by default.
+val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit
+
+val sumOfOutput = numberOfOutput.sum
+if (sumOfOutput <= limit) {
+  childRDD
+} else if (!respectChildParallelism) {
+  // This is mainly for tests.
+  // We take the rows of each partition until we reach the required 
limit number.
+  var countForRows = 0
+  val takeAmounts = new mutable.HashMap[Int, Int]()
+  numberOfOutput.zipWithIndex.foreach { case (num, index) =>
+if (countForRows + num < limit) {
+  countForRows += num
+  takeAmounts += ((index, num))
+} else {
+  val toTake = limit - countForRows
+  countForRows += toTake
+  takeAmounts += ((index, toTake))
+}
+  }
+  val shuffled = new ShuffledRowRDD(shuffleDependency)
+  shuffled.mapPartitionsWithIndexInternal { case (index, iter) =>
+takeAmounts.get(index).map { size =>
+  iter.take(size)
+}.getOrElse(iter)
--- End diff --

 getOrElse(empty iter)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

2017-01-24 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r97700670
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -90,25 +95,100 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = FakePartitioning(child.outputPartitioning,
+  childRDD.getNumPartitions)
+val shuffleDependency = ShuffleExchange.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Int] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().numberOfOutput.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// Try to keep child plan's original data parallelism or not. It is 
enabled by default.
+val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit
+
+val sumOfOutput = numberOfOutput.sum
+if (sumOfOutput <= limit) {
+  childRDD
--- End diff --

i think here we should use the shuffle rdd to directly read the data from 
disk.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

2017-01-24 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r97700568
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -230,6 +230,21 @@ case object SinglePartition extends Partitioning {
 }
 
 /**
+ * Represents a partitioning where rows are only serialized/deserialized 
locally. The number
+ * of partitions are not changed and also the distribution of rows. This 
is mainly used to
+ * obtain some statistics of map tasks such as number of outputs.
+ */
+case class FakePartitioning(orgPartition: Partitioning, numPartitions: 
Int) extends Partitioning {
--- End diff --

how about `LocalPartitioning`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...

2017-01-19 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/16633
  
@viirya i suggest fix the 2 in this pr, let's wait some comment on 1.  /cc 
@rxin and @wzhfy who may comment on the first case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...

2017-01-19 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/16633
  
For 1,  my idea is not use the proposal in this PR, 
1. how you determine  `total rows in all partitions are (much) more than 
limit number.` and then go into this code path and how to decide the `much more 
than`,  i can not use cbo estimate stats here because the locallimit plan maybe 
complex and we can not ensure the accuracy of the estimate row number.  
2 as @rxin suggest, this break the rdd chain

So for 1, i think it need some improvement of spark core and scheduler as i 
mentioned above

For 2 it is ok to me, the solution is the same with i described above(still 
shuffle +shuffle to multi partition + modified mapoutput statistics), right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...

2017-01-19 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/16633
  
all partitions after local limit are about/nearly 100,000,000 rows


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...

2017-01-19 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/16633
  
Again, to clean, I am against the performance regression in flowing case
0.  limit num is 100,000,000
1.  the original table rows is very big, much larger than 100,000,000 rows
2.  after local limit stage, the output row num is about/nearly 100,000,000 
rows




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...

2017-01-19 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/16633
  
I think shuffle is ok, but shuffle to one partition leads to the 
performance issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...

2017-01-19 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/16633
  
Assume local limit output 100,000,000 rows,  then in global limit it will 
be take in a single partition, so it is very slow and can not use other free 
cores to improve the parallelism.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...

2017-01-19 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/16633
  
@viirya my team member post the mail list, actually we mean the case i 
listed above,  the main issue is the single partition issue in global limit,  
if in that case you fall back to old global limit it is still unresolved.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...

2017-01-19 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/16633
  
I think the local limit cost is important, we assume recompute partions 
number: m, all the partitions: n
m = 1, n =100 is a positive case, but there also cases that m very close to 
n(even m = n).

Our customers has this scenario, so i am so care about this issue. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...

2017-01-19 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/16633
  
Your proposal avoid the cost of all partitions compute and shuffle for 
local limit but introduce some partitions recompute for local limit stage.

We can not decide which cost is cheaper(in most cases), note computation 
logical for local limit stage maybe very complex and costly. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...

2017-01-19 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/16633
  
I think before compare our proposals , we should first make sure our 
proposal will not bring performance regression. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...

2017-01-19 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/16633
  
Not get you, but let me explain more,
If we use map output statistics to decide each global limit should take how 
many element.
1.  local limit shuffle with the maillist partitioner and return the map 
output statistics
2.  global limit each partition take or drop some rows(just like what you 
do in this pr) based on the statistics

Then,
1. the shuffle cost is almost the same as now
2. global limit without single partition issue when a big limit number


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...

2017-01-18 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/16633
  
need define a new map output statistics  to do this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...

2017-01-18 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/16633
  
Yes, you are right, we can not ensure the uniform distribution for global 
limit.
An idea is not use a special partitioner, after the shuffle we should get 
the mapoutput statistics for row num of each bucket, and decide each global 
limit should take how many element.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...

2017-01-18 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/16633
  
refer to the maillist
>One issue left is how to decide shuffle partition number. 
We can have a config of the maximum number of elements for each GlobalLimit 
task to process, 
then do a factorization to get a number most close to that config. 
E.g. the config is 2000: 
if limit=1,  1 = 2000 * 5, we shuffle to 5 partitions 
if limit=,  =  * 9, we shuffle to 9 partitions 
if limit is a prime number, we just fall back to single partition 

You mean for the prime number case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...

2017-01-18 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/16633
  
To clear, now we have these issues:
1.  local limit compute all partitions, that means it launch many tasks  
but actually maybe very small tasks is enough.
2.  global limit single partition issue, now the global limit will shuffle 
all the data to one partition, so if the limit num is very big, it cause 
performance bottleneck 

It is perfect if we combine the global limit and local limit into one 
stage, and avoid the shuffle, but for now i can not find a very good 
solution(no performance regression) to do this without change spark 
core/scheduler, your solution is trying to do that, but as i suggest, there are 
some cases the performance maybe worse.

@wzhfy 's idea is just resolve the single partition issue, still shuffle, 
still local limit on all the partitions, but it not bring performance down in 
that cases compare with current code path.

> Another issue is, how do you make sure you create a uniform distribution 
of the result of local limit. Each local limit can produce different number of 
rows.

it use a special partitioner to do this, the partitioner like the 
`row_numer`  in sql it give each row a uniform partitionid, so in the reduce 
task, each task handle num of rows very closely.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16633: [SPARK-19274][SQL] Make GlobalLimit without shuff...

2017-01-18 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/16633#discussion_r96784321
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -90,21 +94,74 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the first `limit` elements of the child's partitions.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
-
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-}
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
+  override def output: Seq[Attribute] = child.output
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  protected override def doExecute(): RDD[InternalRow] = {
+// This logic is mainly copyed from `SparkPlan.executeTake`.
+// TODO: combine this with `SparkPlan.executeTake`, if possible.
+val childRDD = child.execute()
+val totalParts = childRDD.partitions.length
+var partsScanned = 0
+var totalNum = 0
+var resultRDD: RDD[InternalRow] = null
+while (totalNum < limit && partsScanned < totalParts) {
+  // The number of partitions to try in this iteration. It is ok for 
this number to be
+  // greater than totalParts because we actually cap it at totalParts 
in runJob.
+  var numPartsToTry = 1L
+  if (partsScanned > 0) {
+// If we didn't find any rows after the previous iteration, 
quadruple and retry.
+// Otherwise, interpolate the number of partitions we need to try, 
but overestimate
+// it by 50%. We also cap the estimation in the end.
+val limitScaleUpFactor = 
Math.max(sqlContext.conf.limitScaleUpFactor, 2)
+if (totalNum == 0) {
+  numPartsToTry = partsScanned * limitScaleUpFactor
+} else {
+  // the left side of max is >=1 whenever partsScanned >= 2
+  numPartsToTry = Math.max((1.5 * limit * partsScanned / 
totalNum).toInt - partsScanned, 1)
+  numPartsToTry = Math.min(numPartsToTry, partsScanned * 
limitScaleUpFactor)
+}
+  }
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  val p = partsScanned.until(math.min(partsScanned + numPartsToTry, 
totalParts).toInt)
+  val sc = sqlContext.sparkContext
+  val res = sc.runJob(childRDD,
+(it: Iterator[InternalRow]) => Array[Int](it.size), p)
+
+  totalNum += res.map(_.head).sum
+  partsScanned += p.size
+
+  if (totalNum >= limit) {
+// If we scan more rows than the limit number, we need to reduce 
that from scanned.
+// We calculate how many rows need to be reduced for each 
partition,
+// until all redunant rows are reduced.
+var numToReduce = (totalNum - limit)
+val reduceAmounts = new HashMap[Int, Int]()
+val partitionsToReduce = p.zip(res.map(_.head)).foreach { case 
(part, size) =>
+  val toReduce = if (size > numToReduce) numToReduce else size
+  reduceAmounts += ((part, toReduce))
+  numToReduce -= toReduce
+}
+resultRDD = childRDD.mapPartitionsWithIndexInternal { case (index, 
iter) =>
+  if (index < partsScanned) {
--- End diff --

can you explain more, i think without the job of 
```
val res = sc.runJob(childRDD,
 (it: Iterator[InternalRow]) => Array[Int](it.size), p)
```
the job chain is not broken.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...

2017-01-18 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/16633
  
@viirya @rxin i support the idea of @wzhfy in the maillist 
http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-td20570.html,
 it solved the single partition issue in the global limit without break the job 
chain. 

For local limit it still compute the all partitions, i think we can 
consider resolve the local limit issue with some changes in core scheduler in 
future,  we may provide a mechanism: do not compute all the tasks in a stage if 
some condition is satisfied for the stage.

what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16633: [SPARK-19274][SQL] Make GlobalLimit without shuff...

2017-01-18 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/16633#discussion_r96782626
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -90,21 +94,74 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the first `limit` elements of the child's partitions.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
-
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-}
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
+  override def output: Seq[Attribute] = child.output
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  protected override def doExecute(): RDD[InternalRow] = {
+// This logic is mainly copyed from `SparkPlan.executeTake`.
+// TODO: combine this with `SparkPlan.executeTake`, if possible.
+val childRDD = child.execute()
+val totalParts = childRDD.partitions.length
+var partsScanned = 0
+var totalNum = 0
+var resultRDD: RDD[InternalRow] = null
+while (totalNum < limit && partsScanned < totalParts) {
+  // The number of partitions to try in this iteration. It is ok for 
this number to be
+  // greater than totalParts because we actually cap it at totalParts 
in runJob.
+  var numPartsToTry = 1L
+  if (partsScanned > 0) {
+// If we didn't find any rows after the previous iteration, 
quadruple and retry.
+// Otherwise, interpolate the number of partitions we need to try, 
but overestimate
+// it by 50%. We also cap the estimation in the end.
+val limitScaleUpFactor = 
Math.max(sqlContext.conf.limitScaleUpFactor, 2)
+if (totalNum == 0) {
+  numPartsToTry = partsScanned * limitScaleUpFactor
+} else {
+  // the left side of max is >=1 whenever partsScanned >= 2
+  numPartsToTry = Math.max((1.5 * limit * partsScanned / 
totalNum).toInt - partsScanned, 1)
+  numPartsToTry = Math.min(numPartsToTry, partsScanned * 
limitScaleUpFactor)
+}
+  }
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  val p = partsScanned.until(math.min(partsScanned + numPartsToTry, 
totalParts).toInt)
+  val sc = sqlContext.sparkContext
+  val res = sc.runJob(childRDD,
+(it: Iterator[InternalRow]) => Array[Int](it.size), p)
+
+  totalNum += res.map(_.head).sum
+  partsScanned += p.size
+
+  if (totalNum >= limit) {
+// If we scan more rows than the limit number, we need to reduce 
that from scanned.
+// We calculate how many rows need to be reduced for each 
partition,
+// until all redunant rows are reduced.
+var numToReduce = (totalNum - limit)
+val reduceAmounts = new HashMap[Int, Int]()
+val partitionsToReduce = p.zip(res.map(_.head)).foreach { case 
(part, size) =>
+  val toReduce = if (size > numToReduce) numToReduce else size
+  reduceAmounts += ((part, toReduce))
+  numToReduce -= toReduce
+}
+resultRDD = childRDD.mapPartitionsWithIndexInternal { case (index, 
iter) =>
+  if (index < partsScanned) {
--- End diff --

It will be consumed by the parent plan.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16633: [SPARK-19274][SQL] Make GlobalLimit without shuff...

2017-01-18 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/16633#discussion_r96782094
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -90,21 +94,74 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the first `limit` elements of the child's partitions.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
-
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-}
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
+  override def output: Seq[Attribute] = child.output
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  protected override def doExecute(): RDD[InternalRow] = {
+// This logic is mainly copyed from `SparkPlan.executeTake`.
+// TODO: combine this with `SparkPlan.executeTake`, if possible.
+val childRDD = child.execute()
+val totalParts = childRDD.partitions.length
+var partsScanned = 0
+var totalNum = 0
+var resultRDD: RDD[InternalRow] = null
+while (totalNum < limit && partsScanned < totalParts) {
+  // The number of partitions to try in this iteration. It is ok for 
this number to be
+  // greater than totalParts because we actually cap it at totalParts 
in runJob.
+  var numPartsToTry = 1L
+  if (partsScanned > 0) {
+// If we didn't find any rows after the previous iteration, 
quadruple and retry.
+// Otherwise, interpolate the number of partitions we need to try, 
but overestimate
+// it by 50%. We also cap the estimation in the end.
+val limitScaleUpFactor = 
Math.max(sqlContext.conf.limitScaleUpFactor, 2)
+if (totalNum == 0) {
+  numPartsToTry = partsScanned * limitScaleUpFactor
+} else {
+  // the left side of max is >=1 whenever partsScanned >= 2
+  numPartsToTry = Math.max((1.5 * limit * partsScanned / 
totalNum).toInt - partsScanned, 1)
+  numPartsToTry = Math.min(numPartsToTry, partsScanned * 
limitScaleUpFactor)
+}
+  }
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  val p = partsScanned.until(math.min(partsScanned + numPartsToTry, 
totalParts).toInt)
+  val sc = sqlContext.sparkContext
+  val res = sc.runJob(childRDD,
+(it: Iterator[InternalRow]) => Array[Int](it.size), p)
+
+  totalNum += res.map(_.head).sum
+  partsScanned += p.size
+
+  if (totalNum >= limit) {
+// If we scan more rows than the limit number, we need to reduce 
that from scanned.
+// We calculate how many rows need to be reduced for each 
partition,
+// until all redunant rows are reduced.
+var numToReduce = (totalNum - limit)
+val reduceAmounts = new HashMap[Int, Int]()
+val partitionsToReduce = p.zip(res.map(_.head)).foreach { case 
(part, size) =>
+  val toReduce = if (size > numToReduce) numToReduce else size
+  reduceAmounts += ((part, toReduce))
+  numToReduce -= toReduce
+}
+resultRDD = childRDD.mapPartitionsWithIndexInternal { case (index, 
iter) =>
+  if (index < partsScanned) {
--- End diff --

In that case there is no rows to drop, so it is a full scan, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16633: [SPARK-19274][SQL] Make GlobalLimit without shuff...

2017-01-18 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/16633#discussion_r96781278
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -90,21 +94,74 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the first `limit` elements of the child's partitions.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
-
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-}
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
+  override def output: Seq[Attribute] = child.output
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  protected override def doExecute(): RDD[InternalRow] = {
+// This logic is mainly copyed from `SparkPlan.executeTake`.
+// TODO: combine this with `SparkPlan.executeTake`, if possible.
+val childRDD = child.execute()
+val totalParts = childRDD.partitions.length
+var partsScanned = 0
+var totalNum = 0
+var resultRDD: RDD[InternalRow] = null
+while (totalNum < limit && partsScanned < totalParts) {
+  // The number of partitions to try in this iteration. It is ok for 
this number to be
+  // greater than totalParts because we actually cap it at totalParts 
in runJob.
+  var numPartsToTry = 1L
+  if (partsScanned > 0) {
+// If we didn't find any rows after the previous iteration, 
quadruple and retry.
+// Otherwise, interpolate the number of partitions we need to try, 
but overestimate
+// it by 50%. We also cap the estimation in the end.
+val limitScaleUpFactor = 
Math.max(sqlContext.conf.limitScaleUpFactor, 2)
+if (totalNum == 0) {
+  numPartsToTry = partsScanned * limitScaleUpFactor
+} else {
+  // the left side of max is >=1 whenever partsScanned >= 2
+  numPartsToTry = Math.max((1.5 * limit * partsScanned / 
totalNum).toInt - partsScanned, 1)
+  numPartsToTry = Math.min(numPartsToTry, partsScanned * 
limitScaleUpFactor)
+}
+  }
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  val p = partsScanned.until(math.min(partsScanned + numPartsToTry, 
totalParts).toInt)
+  val sc = sqlContext.sparkContext
+  val res = sc.runJob(childRDD,
+(it: Iterator[InternalRow]) => Array[Int](it.size), p)
+
+  totalNum += res.map(_.head).sum
+  partsScanned += p.size
+
+  if (totalNum >= limit) {
+// If we scan more rows than the limit number, we need to reduce 
that from scanned.
+// We calculate how many rows need to be reduced for each 
partition,
+// until all redunant rows are reduced.
+var numToReduce = (totalNum - limit)
+val reduceAmounts = new HashMap[Int, Int]()
+val partitionsToReduce = p.zip(res.map(_.head)).foreach { case 
(part, size) =>
+  val toReduce = if (size > numToReduce) numToReduce else size
+  reduceAmounts += ((part, toReduce))
+  numToReduce -= toReduce
+}
+resultRDD = childRDD.mapPartitionsWithIndexInternal { case (index, 
iter) =>
+  if (index < partsScanned) {
--- End diff --

In that case, 
fist scan:  
```
 val res = sc.runJob(childRDD,
 (it: Iterator[InternalRow]) => Array[Int](it.size), p)
```

second scan: 
```
mapPartitionsWithIndexInternal { case (index, iter) =>
if (index < partsScanned) {
   ...
```
correct me if i am wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16633: [SPARK-19274][SQL] Make GlobalLimit without shuff...

2017-01-18 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/16633#discussion_r96780810
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -90,21 +94,74 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the first `limit` elements of the child's partitions.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
-
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-}
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
+  override def output: Seq[Attribute] = child.output
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  protected override def doExecute(): RDD[InternalRow] = {
+// This logic is mainly copyed from `SparkPlan.executeTake`.
+// TODO: combine this with `SparkPlan.executeTake`, if possible.
+val childRDD = child.execute()
+val totalParts = childRDD.partitions.length
+var partsScanned = 0
+var totalNum = 0
+var resultRDD: RDD[InternalRow] = null
+while (totalNum < limit && partsScanned < totalParts) {
+  // The number of partitions to try in this iteration. It is ok for 
this number to be
+  // greater than totalParts because we actually cap it at totalParts 
in runJob.
+  var numPartsToTry = 1L
+  if (partsScanned > 0) {
+// If we didn't find any rows after the previous iteration, 
quadruple and retry.
+// Otherwise, interpolate the number of partitions we need to try, 
but overestimate
+// it by 50%. We also cap the estimation in the end.
+val limitScaleUpFactor = 
Math.max(sqlContext.conf.limitScaleUpFactor, 2)
+if (totalNum == 0) {
+  numPartsToTry = partsScanned * limitScaleUpFactor
+} else {
+  // the left side of max is >=1 whenever partsScanned >= 2
+  numPartsToTry = Math.max((1.5 * limit * partsScanned / 
totalNum).toInt - partsScanned, 1)
+  numPartsToTry = Math.min(numPartsToTry, partsScanned * 
limitScaleUpFactor)
+}
+  }
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  val p = partsScanned.until(math.min(partsScanned + numPartsToTry, 
totalParts).toInt)
+  val sc = sqlContext.sparkContext
+  val res = sc.runJob(childRDD,
+(it: Iterator[InternalRow]) => Array[Int](it.size), p)
--- End diff --

Yes, i know,  but for the selected partitions in each iteration, it will 
trigger the compute for these partitions, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16633: [SPARK-19274][SQL] Make GlobalLimit without shuff...

2017-01-18 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/16633#discussion_r96780571
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -90,21 +94,74 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the first `limit` elements of the child's partitions.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
-
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-}
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
+  override def output: Seq[Attribute] = child.output
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  protected override def doExecute(): RDD[InternalRow] = {
+// This logic is mainly copyed from `SparkPlan.executeTake`.
+// TODO: combine this with `SparkPlan.executeTake`, if possible.
+val childRDD = child.execute()
+val totalParts = childRDD.partitions.length
+var partsScanned = 0
+var totalNum = 0
+var resultRDD: RDD[InternalRow] = null
+while (totalNum < limit && partsScanned < totalParts) {
+  // The number of partitions to try in this iteration. It is ok for 
this number to be
+  // greater than totalParts because we actually cap it at totalParts 
in runJob.
+  var numPartsToTry = 1L
+  if (partsScanned > 0) {
+// If we didn't find any rows after the previous iteration, 
quadruple and retry.
+// Otherwise, interpolate the number of partitions we need to try, 
but overestimate
+// it by 50%. We also cap the estimation in the end.
+val limitScaleUpFactor = 
Math.max(sqlContext.conf.limitScaleUpFactor, 2)
+if (totalNum == 0) {
+  numPartsToTry = partsScanned * limitScaleUpFactor
+} else {
+  // the left side of max is >=1 whenever partsScanned >= 2
+  numPartsToTry = Math.max((1.5 * limit * partsScanned / 
totalNum).toInt - partsScanned, 1)
+  numPartsToTry = Math.min(numPartsToTry, partsScanned * 
limitScaleUpFactor)
+}
+  }
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  val p = partsScanned.until(math.min(partsScanned + numPartsToTry, 
totalParts).toInt)
+  val sc = sqlContext.sparkContext
+  val res = sc.runJob(childRDD,
+(it: Iterator[InternalRow]) => Array[Int](it.size), p)
+
+  totalNum += res.map(_.head).sum
+  partsScanned += p.size
+
+  if (totalNum >= limit) {
+// If we scan more rows than the limit number, we need to reduce 
that from scanned.
+// We calculate how many rows need to be reduced for each 
partition,
+// until all redunant rows are reduced.
+var numToReduce = (totalNum - limit)
+val reduceAmounts = new HashMap[Int, Int]()
+val partitionsToReduce = p.zip(res.map(_.head)).foreach { case 
(part, size) =>
+  val toReduce = if (size > numToReduce) numToReduce else size
+  reduceAmounts += ((part, toReduce))
+  numToReduce -= toReduce
+}
+resultRDD = childRDD.mapPartitionsWithIndexInternal { case (index, 
iter) =>
+  if (index < partsScanned) {
--- End diff --

Actually we have this scenario, i don't think the case of output number 
after a filter is less than the limit num is a corner case. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16633: [SPARK-19274][SQL] Make GlobalLimit without shuff...

2017-01-18 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/16633#discussion_r96779648
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -90,21 +94,74 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the first `limit` elements of the child's partitions.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
-
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-}
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
+  override def output: Seq[Attribute] = child.output
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  protected override def doExecute(): RDD[InternalRow] = {
+// This logic is mainly copyed from `SparkPlan.executeTake`.
+// TODO: combine this with `SparkPlan.executeTake`, if possible.
+val childRDD = child.execute()
+val totalParts = childRDD.partitions.length
+var partsScanned = 0
+var totalNum = 0
+var resultRDD: RDD[InternalRow] = null
+while (totalNum < limit && partsScanned < totalParts) {
+  // The number of partitions to try in this iteration. It is ok for 
this number to be
+  // greater than totalParts because we actually cap it at totalParts 
in runJob.
+  var numPartsToTry = 1L
+  if (partsScanned > 0) {
+// If we didn't find any rows after the previous iteration, 
quadruple and retry.
+// Otherwise, interpolate the number of partitions we need to try, 
but overestimate
+// it by 50%. We also cap the estimation in the end.
+val limitScaleUpFactor = 
Math.max(sqlContext.conf.limitScaleUpFactor, 2)
+if (totalNum == 0) {
+  numPartsToTry = partsScanned * limitScaleUpFactor
+} else {
+  // the left side of max is >=1 whenever partsScanned >= 2
+  numPartsToTry = Math.max((1.5 * limit * partsScanned / 
totalNum).toInt - partsScanned, 1)
+  numPartsToTry = Math.min(numPartsToTry, partsScanned * 
limitScaleUpFactor)
+}
+  }
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  val p = partsScanned.until(math.min(partsScanned + numPartsToTry, 
totalParts).toInt)
+  val sc = sqlContext.sparkContext
+  val res = sc.runJob(childRDD,
+(it: Iterator[InternalRow]) => Array[Int](it.size), p)
--- End diff --

@viirya `it.size` will trigger the scan for the big table, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16633: [SPARK-19274][SQL] Make GlobalLimit without shuff...

2017-01-18 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/16633#discussion_r96773557
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -90,21 +94,74 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the first `limit` elements of the child's partitions.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
-
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-}
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
+  override def output: Seq[Attribute] = child.output
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  protected override def doExecute(): RDD[InternalRow] = {
+// This logic is mainly copyed from `SparkPlan.executeTake`.
+// TODO: combine this with `SparkPlan.executeTake`, if possible.
+val childRDD = child.execute()
+val totalParts = childRDD.partitions.length
+var partsScanned = 0
+var totalNum = 0
+var resultRDD: RDD[InternalRow] = null
+while (totalNum < limit && partsScanned < totalParts) {
+  // The number of partitions to try in this iteration. It is ok for 
this number to be
+  // greater than totalParts because we actually cap it at totalParts 
in runJob.
+  var numPartsToTry = 1L
+  if (partsScanned > 0) {
+// If we didn't find any rows after the previous iteration, 
quadruple and retry.
+// Otherwise, interpolate the number of partitions we need to try, 
but overestimate
+// it by 50%. We also cap the estimation in the end.
+val limitScaleUpFactor = 
Math.max(sqlContext.conf.limitScaleUpFactor, 2)
+if (totalNum == 0) {
+  numPartsToTry = partsScanned * limitScaleUpFactor
+} else {
+  // the left side of max is >=1 whenever partsScanned >= 2
+  numPartsToTry = Math.max((1.5 * limit * partsScanned / 
totalNum).toInt - partsScanned, 1)
+  numPartsToTry = Math.min(numPartsToTry, partsScanned * 
limitScaleUpFactor)
+}
+  }
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  val p = partsScanned.until(math.min(partsScanned + numPartsToTry, 
totalParts).toInt)
+  val sc = sqlContext.sparkContext
+  val res = sc.runJob(childRDD,
+(it: Iterator[InternalRow]) => Array[Int](it.size), p)
+
+  totalNum += res.map(_.head).sum
+  partsScanned += p.size
+
+  if (totalNum >= limit) {
+// If we scan more rows than the limit number, we need to reduce 
that from scanned.
+// We calculate how many rows need to be reduced for each 
partition,
+// until all redunant rows are reduced.
+var numToReduce = (totalNum - limit)
+val reduceAmounts = new HashMap[Int, Int]()
+val partitionsToReduce = p.zip(res.map(_.head)).foreach { case 
(part, size) =>
+  val toReduce = if (size > numToReduce) numToReduce else size
+  reduceAmounts += ((part, toReduce))
+  numToReduce -= toReduce
+}
+resultRDD = childRDD.mapPartitionsWithIndexInternal { case (index, 
iter) =>
+  if (index < partsScanned) {
--- End diff --

An example:  select xxx from table where xxx > 99 limit 1000
if the table is a big table and real num of xxx which > 99 is less than 
100,  you still need compute the all the partitions and you will do the filter, 
scan the big table twice.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16633: [SPARK-19274][SQL] Make GlobalLimit without shuff...

2017-01-18 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/16633#discussion_r96773174
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -90,21 +94,74 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the first `limit` elements of the child's partitions.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
-
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-}
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
+  override def output: Seq[Attribute] = child.output
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  protected override def doExecute(): RDD[InternalRow] = {
+// This logic is mainly copyed from `SparkPlan.executeTake`.
+// TODO: combine this with `SparkPlan.executeTake`, if possible.
+val childRDD = child.execute()
+val totalParts = childRDD.partitions.length
+var partsScanned = 0
+var totalNum = 0
+var resultRDD: RDD[InternalRow] = null
+while (totalNum < limit && partsScanned < totalParts) {
+  // The number of partitions to try in this iteration. It is ok for 
this number to be
+  // greater than totalParts because we actually cap it at totalParts 
in runJob.
+  var numPartsToTry = 1L
+  if (partsScanned > 0) {
+// If we didn't find any rows after the previous iteration, 
quadruple and retry.
+// Otherwise, interpolate the number of partitions we need to try, 
but overestimate
+// it by 50%. We also cap the estimation in the end.
+val limitScaleUpFactor = 
Math.max(sqlContext.conf.limitScaleUpFactor, 2)
+if (totalNum == 0) {
+  numPartsToTry = partsScanned * limitScaleUpFactor
+} else {
+  // the left side of max is >=1 whenever partsScanned >= 2
+  numPartsToTry = Math.max((1.5 * limit * partsScanned / 
totalNum).toInt - partsScanned, 1)
+  numPartsToTry = Math.min(numPartsToTry, partsScanned * 
limitScaleUpFactor)
+}
+  }
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  val p = partsScanned.until(math.min(partsScanned + numPartsToTry, 
totalParts).toInt)
+  val sc = sqlContext.sparkContext
+  val res = sc.runJob(childRDD,
+(it: Iterator[InternalRow]) => Array[Int](it.size), p)
+
+  totalNum += res.map(_.head).sum
+  partsScanned += p.size
+
+  if (totalNum >= limit) {
+// If we scan more rows than the limit number, we need to reduce 
that from scanned.
+// We calculate how many rows need to be reduced for each 
partition,
+// until all redunant rows are reduced.
+var numToReduce = (totalNum - limit)
+val reduceAmounts = new HashMap[Int, Int]()
+val partitionsToReduce = p.zip(res.map(_.head)).foreach { case 
(part, size) =>
+  val toReduce = if (size > numToReduce) numToReduce else size
+  reduceAmounts += ((part, toReduce))
+  numToReduce -= toReduce
+}
+resultRDD = childRDD.mapPartitionsWithIndexInternal { case (index, 
iter) =>
+  if (index < partsScanned) {
--- End diff --

Previously:  compute all partitions + shuffle all partitions of child rdd
Now: compute some partitions of child rdd twice, one to get the 
`partsScanned `  and totalNum, one to get the limit num elements from these 
partitions.

I mean second one is not always better than the first if the recompute cost 
is high.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16633: [SPARK-19274][SQL] Make GlobalLimit without shuff...

2017-01-18 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/16633#discussion_r96673145
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -90,21 +94,74 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the first `limit` elements of the child's partitions.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
-
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-}
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
+  override def output: Seq[Attribute] = child.output
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  protected override def doExecute(): RDD[InternalRow] = {
+// This logic is mainly copyed from `SparkPlan.executeTake`.
+// TODO: combine this with `SparkPlan.executeTake`, if possible.
+val childRDD = child.execute()
+val totalParts = childRDD.partitions.length
+var partsScanned = 0
+var totalNum = 0
+var resultRDD: RDD[InternalRow] = null
+while (totalNum < limit && partsScanned < totalParts) {
+  // The number of partitions to try in this iteration. It is ok for 
this number to be
+  // greater than totalParts because we actually cap it at totalParts 
in runJob.
+  var numPartsToTry = 1L
+  if (partsScanned > 0) {
+// If we didn't find any rows after the previous iteration, 
quadruple and retry.
+// Otherwise, interpolate the number of partitions we need to try, 
but overestimate
+// it by 50%. We also cap the estimation in the end.
+val limitScaleUpFactor = 
Math.max(sqlContext.conf.limitScaleUpFactor, 2)
+if (totalNum == 0) {
+  numPartsToTry = partsScanned * limitScaleUpFactor
+} else {
+  // the left side of max is >=1 whenever partsScanned >= 2
+  numPartsToTry = Math.max((1.5 * limit * partsScanned / 
totalNum).toInt - partsScanned, 1)
+  numPartsToTry = Math.min(numPartsToTry, partsScanned * 
limitScaleUpFactor)
+}
+  }
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  val p = partsScanned.until(math.min(partsScanned + numPartsToTry, 
totalParts).toInt)
+  val sc = sqlContext.sparkContext
+  val res = sc.runJob(childRDD,
+(it: Iterator[InternalRow]) => Array[Int](it.size), p)
+
+  totalNum += res.map(_.head).sum
+  partsScanned += p.size
+
+  if (totalNum >= limit) {
+// If we scan more rows than the limit number, we need to reduce 
that from scanned.
+// We calculate how many rows need to be reduced for each 
partition,
+// until all redunant rows are reduced.
+var numToReduce = (totalNum - limit)
+val reduceAmounts = new HashMap[Int, Int]()
+val partitionsToReduce = p.zip(res.map(_.head)).foreach { case 
(part, size) =>
+  val toReduce = if (size > numToReduce) numToReduce else size
+  reduceAmounts += ((part, toReduce))
+  numToReduce -= toReduce
+}
+resultRDD = childRDD.mapPartitionsWithIndexInternal { case (index, 
iter) =>
+  if (index < partsScanned) {
--- End diff --

Actually for the partitions which `index < partsScanned` of `childRdd` we 
computed twice, its hard to say this must be better than the old version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15240: [SPARK-17556] [CORE] [SQL] Executor side broadcast for b...

2017-01-03 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/15240
  
retest this please



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15240: [SPARK-17556] [CORE] [SQL] Executor side broadcast for b...

2017-01-03 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/15240
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15240: [SPARK-17556] [CORE] [SQL] Executor side broadcast for b...

2017-01-02 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/15240
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15297: [SPARK-9862]Handling data skew

2016-11-08 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/15297
  
@YuhuWang2002 
We should limit the use case for outer join: 
For left outer join, such as A left join B, this implementation now can not 
handle the case of  skew of  table B.  That's because the result of join 
depends on the all data of the same reduce data of B, you can not split it to 
multi-tasks.

Similarly, for right outer join, such as A right join B, this 
implementation now can not handle the case of  skew of  table A. And for full 
outer join, we can not use the optimization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrainedSche...

2016-10-20 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/15481
  
`CoarseGrainedSchedulerBackend.removeExecutor` also use ask,  but it does 
not matter right?  because it just send msg once and log the error if failure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrainedSche...

2016-10-18 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/15481
  
Updated, can you review again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrainedSche...

2016-10-18 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/15481
  
ok, i will revert to the initial commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrainedSche...

2016-10-17 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/15481
  
retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrainedSche...

2016-10-17 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/15481
  
retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrai...

2016-10-14 Thread scwf
GitHub user scwf opened a pull request:

https://github.com/apache/spark/pull/15481

[SPARK-17929] [CORE] Fix deadlock when CoarseGrainedSchedulerBackend reset

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-17929

Now `CoarseGrainedSchedulerBackend` reset will get the lock,
```
  protected def reset(): Unit = synchronized {
numPendingExecutors = 0
executorsPendingToRemove.clear()

// Remove all the lingering executors that should be removed but not 
yet. The reason might be
// because (1) disconnected event is not yet received; (2) executors 
die silently.
executorDataMap.toMap.foreach { case (eid, _) =>
  driverEndpoint.askWithRetry[Boolean](
RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager 
re-registered.")))
}
  }
```
 but on removeExecutor also need the lock 
"CoarseGrainedSchedulerBackend.this.synchronized", this will cause deadlock.

```
   private def removeExecutor(executorId: String, reason: 
ExecutorLossReason): Unit = {
  logDebug(s"Asked to remove executor $executorId with reason $reason")
  executorDataMap.get(executorId) match {
case Some(executorInfo) =>
  // This must be synchronized because variables mutated
  // in this block are read when requesting executors
  val killed = CoarseGrainedSchedulerBackend.this.synchronized {
addressToExecutorId -= executorInfo.executorAddress
executorDataMap -= executorId
executorsPendingLossReason -= executorId
executorsPendingToRemove.remove(executorId).getOrElse(false)
  }
 ...

## How was this patch tested?

manual test.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KirinKing/spark spark-17929

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15481.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15481


commit 3681fae6b5364a5cf55700e1510473d8d9b77cd3
Author: w00228970 <wangf...@huawei.com>
Date:   2016-10-14T09:24:30Z

use send




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15297: [WIP][SPARK-9862]Handling data skew

2016-10-07 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/15297
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15297: [WIP][SPARK-9862]Handling data skew

2016-10-07 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/15297
  
retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15240: [SPARK-17556] [CORE] [SQL] Executor side broadcast for b...

2016-10-07 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/15240
  
/cc @rxin can you help review this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15213: [SPARK-17644] [CORE] Do not add failedStages when abortS...

2016-09-28 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/15213
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15213: [SPARK-17644] [CORE] Do not add failedStages when abortS...

2016-09-28 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/15213
  
@kayousterhout Thanks for your comment, i have updated based on all your 
comment.  
.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15213: [SPARK-17644] [CORE] Do not add failedStages when...

2016-09-28 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/15213#discussion_r80865465
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1256,11 +1257,13 @@ class DAGScheduler(
   if (disallowStageRetryForTest) {
 abortStage(failedStage, "Fetch failure will not retry stage 
due to testing config",
   None)
+abortedStage = true
   } else if 
(failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
 abortStage(failedStage, s"$failedStage (${failedStage.name}) " 
+
   s"has failed the maximum allowable number of " +
   s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
   s"Most recent failure reason: ${failureMessage}", None)
+abortedStage = true
   } else if (failedStages.isEmpty) {
--- End diff --

it make sense to me, updated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15213: [SPARK-17644] [CORE] Do not add failedStages when abortS...

2016-09-26 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/15213
  
@markhamstra in my fix i just want to make the minor changes for the 
dagscheduer, and your fix is also ok to me, i can update this according your 
comment. Thanks:)
/cc @zsxwing may also have comments on this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15240: [SPARK-17556] Executor side broadcast for broadca...

2016-09-26 Thread scwf
GitHub user scwf opened a pull request:

https://github.com/apache/spark/pull/15240

[SPARK-17556] Executor side broadcast for broadcast joins

## What changes were proposed in this pull request?
Design doc : 

https://issues.apache.org/jira/secure/attachment/12830286/executor%20broadcast.pdf

## How was this patch tested?

added unit test and manual test.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KirinKing/spark SPARK-17556

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15240.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15240


commit df723bb138f4c01b507aed7f3d0a80613b2b2f30
Author: w00228970 <wangf...@huawei.com>
Date:   2016-09-26T04:10:11Z

draft

commit cf4adbe8754cf56dceb2c2ad750a24520ef49d4a
Author: w00228970 <wangf...@huawei.com>
Date:   2016-09-26T04:19:19Z

ununsed changes

commit 77057366e69cb659626b4039d35c87fe91a6d668
Author: w00228970 <wangf...@huawei.com>
Date:   2016-09-26T04:59:41Z

comment improvement

commit 81da4e33314015eb044c54e4ef7e834519c15f12
Author: w00228970 <wangf...@huawei.com>
Date:   2016-09-26T06:06:44Z

fix tests

commit 63a5dbf5123b458a52a22e1e03aea152995e55cb
Author: w00228970 <wangf...@huawei.com>
Date:   2016-09-26T06:37:55Z

fix npe

commit c284c2cfbf3c3094db8b95ecbae331f8586128a2
Author: w00228970 <wangf...@huawei.com>
Date:   2016-09-26T07:33:00Z

new endpoint for broadcast

commit cc535ed775efcd228c22c3b1dbbc7e10201f3da6
Author: w00228970 <wangf...@huawei.com>
Date:   2016-09-26T07:43:14Z

unused changes

commit 2caafbf8eea238791335ff4a30c1d797a400b540
Author: w00228970 <wangf...@huawei.com>
Date:   2016-09-26T10:45:36Z

use broadcast in sql

commit 89336e7f0cf535484798536c27e3d98d5ff0a358
Author: w00228970 <wangf...@huawei.com>
Date:   2016-09-26T11:09:43Z

key not found issue




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15213: [SPARK-17644] [CORE] Fix the race condition when DAGSche...

2016-09-23 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/15213
  
> actual problem is not in abortStage but rather in improper additions to 
failedStages

correct, i think a more accurate description for this issue is "do not add 
`failedStages` when abortStage for fetch failure"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15213: [SPARK-17644] [CORE] Fix the race condition when DAGSche...

2016-09-23 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/15213
  
Actually the failedStages only added here in spark.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15213: [SPARK-17644] [CORE] Fix the race condition when DAGSche...

2016-09-23 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/15213
  
Thanks @zsxwing to explain this. 
@markhamstra the issue happens in the case of my PR description. It usually 
depends on muti-thread submitting jobs cases and the order of fetch failure, so 
i said it is a race condition.

If you think it is confusing, how about change the title to " do not add 
failedStages when abort stage"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15213: [SPARK-17644] [CORE] Fix the race condition when ...

2016-09-23 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/15213#discussion_r80274817
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2105,6 +2109,54 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, 
shuffleDepC))
   }
 
+  test("The failed stage never resubmitted due to abort stage in another 
thread") {
+implicit val executorContext = ExecutionContext
+  .fromExecutorService(Executors.newFixedThreadPool(5))
+val duration = 60.seconds
+
+val f1 = Future {
+  try {
+val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 
1)).groupByKey()
+val shuffleHandle =
+  rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, 
_]].shuffleHandle
+rdd1.map { x =>
--- End diff --

ok, thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15213: [SPARK-17644] [CORE] Fix the race condition when ...

2016-09-23 Thread scwf
GitHub user scwf opened a pull request:

https://github.com/apache/spark/pull/15213

[SPARK-17644] [CORE]  Fix the race condition when DAGScheduler handle the 
FetchFailed event

## What changes were proposed in this pull request?
| Time|Thread 1 ,  Job1  | Thread 2 ,  Job2  |
|:-:|:-:|:-:|
| 1 | abort stage due to FetchFailed |  |
| 2 | failedStages += failedStage ||
| 3 |  |  task failed due to  FetchFailed |

then job2 of thread2 never resubmit the failed stage and hang.

we should  not add failedStages when abort stage.

## How was this patch tested?

add unit test




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KirinKing/spark dag-resubmit

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15213.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15213


commit 8e667f532fa4509386ff6a6173b75a8e24cab40a
Author: w00228970 <wangf...@huawei.com>
Date:   2016-09-23T07:17:46Z

test case

commit 2bfa05b172c68d6aa52e66359e83e2e7c6033662
Author: w00228970 <wangf...@huawei.com>
Date:   2016-09-23T07:30:15Z

The failed stage never resubmitted




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14712: [SPARK-17072] [SQL] support table-level statistic...

2016-08-31 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/14712#discussion_r77014850
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -168,6 +169,107 @@ class StatisticsSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils
   TableIdentifier("tempTable"), ignoreIfNotExists = true, purge = 
false)
   }
 
+  private def checkMetastoreRelationStats(
+  tableName: String,
+  expectedTotalSize: Long,
+  expectedRowCount: Option[BigInt]): Unit = {
+val df = sql(s"SELECT * FROM $tableName")
+val relations = df.queryExecution.analyzed.collect { case rel: 
MetastoreRelation =>
+  assert(rel.statistics.sizeInBytes === expectedTotalSize)
+  assert(rel.statistics.rowCount === expectedRowCount)
+  rel
+}
+assert(relations.size === 1)
+  }
+
+  test("test table-level statistics for hive tables created in 
HiveExternalCatalog") {
+val textTable = "textTable"
+withTable(textTable) {
+  sql(s"CREATE TABLE $textTable (key STRING, value STRING) STORED AS 
TEXTFILE")
+  checkMetastoreRelationStats(textTable,
+expectedTotalSize = spark.sessionState.conf.defaultSizeInBytes, 
expectedRowCount = None)
+
+  sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
+  // don't have our statistics, MetastoreRelation uses hive's 
`totalSize`
+  checkMetastoreRelationStats(textTable, expectedTotalSize = 5812, 
expectedRowCount = None)
+
+  // noscan won't count the number of rows
+  sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan")
+  checkMetastoreRelationStats(textTable, expectedTotalSize = 5812, 
expectedRowCount = None)
+
+  // without noscan, we count the number of rows
+  sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS")
+  checkMetastoreRelationStats(textTable, expectedTotalSize = 5812, 
expectedRowCount = Some(500))
+}
+  }
+
+  test("test whether the old stats are removed") {
+val textTable = "textTable"
+withTable(textTable) {
+  sql(s"CREATE TABLE $textTable (key STRING, value STRING) STORED AS 
TEXTFILE")
+  sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
+  sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS")
+  checkMetastoreRelationStats(textTable, expectedTotalSize = 5812, 
expectedRowCount = Some(500))
+
+  sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
+  sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan")
+  // update total size and remove the old and invalid row count
+  checkMetastoreRelationStats(textTable, expectedTotalSize = 11624, 
expectedRowCount = None)
+}
+  }
+
+  private def checkLogicalRelationStats(
+  tableName: String,
+  expectedRowCount: Option[BigInt]): Unit = {
+val df = sql(s"SELECT * FROM $tableName")
+val relations = df.queryExecution.analyzed.collect { case rel: 
LogicalRelation =>
+  // TODO: We don't have an expected value here because parquet size 
is different on Windows
+  // and Linux, we need to find the reason and fix this.
+  assert(rel.statistics.sizeInBytes === rel.relation.sizeInBytes)
+  assert(rel.statistics.rowCount === expectedRowCount)
+  rel
+}
+assert(relations.size === 1)
+  }
+
+  test("test statistics of LogicalRelation inherited from 
MetastoreRelation") {
+val parquetTable = "parquetTable"
+val orcTable = "orcTable"
+withTable(parquetTable, orcTable) {
+  sql(s"CREATE TABLE $parquetTable (key STRING, value STRING) STORED 
AS PARQUET")
+  sql(s"CREATE TABLE $orcTable (key STRING, value STRING) STORED AS 
ORC")
+  sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src")
+  sql(s"INSERT INTO TABLE $orcTable SELECT * FROM src")
+  sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS")
+  sql(s"ANALYZE TABLE $orcTable COMPUTE STATISTICS")
+
+  checkLogicalRelationStats(parquetTable, expectedRowCount = Some(500))
--- End diff --

for parquet table we'd better also set the 
`spark.sql.hive.convertMetastoreParquet` like orc table below, this is to avoid 
failure because of other test case changing this config.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, plea

[GitHub] spark issue #14712: [SPARK-17072] [SQL] support table-level statistics gener...

2016-08-19 Thread scwf
Github user scwf commented on the issue:

https://github.com/apache/spark/pull/14712
  
/cc @cloud-fan @rxin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8968] [SQL] external sort by the partit...

2016-01-20 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/7336#issuecomment-173435459
  
@yhuai  thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8968] [SQL] external sort by the partit...

2016-01-20 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/7336#discussion_r50269169
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala ---
@@ -198,33 +241,99 @@ private[spark] class 
SparkHiveDynamicPartitionWriterContainer(
 conf.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
   }
 
-  override def getLocalFileWriter(row: InternalRow, schema: StructType)
-: FileSinkOperator.RecordWriter = {
-def convertToHiveRawString(col: String, value: Any): String = {
-  val raw = String.valueOf(value)
-  schema(col).dataType match {
-case DateType => DateTimeUtils.dateToString(raw.toInt)
-case _: DecimalType => BigDecimal(raw).toString()
-case _ => raw
-  }
+  // this function is executed on executor side
+  override def writeToFile(context: TaskContext, iterator: 
Iterator[InternalRow]): Unit = {
+val serializer = newSerializer(fileSinkConf.getTableInfo)
+val standardOI = ObjectInspectorUtils
+  .getStandardObjectInspector(
+fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
+ObjectInspectorCopyOption.JAVA)
+  .asInstanceOf[StructObjectInspector]
+
+val fieldOIs = 
standardOI.getAllStructFieldRefs.asScala.map(_.getFieldObjectInspector).toArray
+val dataTypes = inputSchema.map(_.dataType)
+val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => 
wrapperFor(f, dt) }
+val outputData = new Array[Any](fieldOIs.length)
--- End diff --

yes, extracted a common method for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8968] [SQL] external sort by the partit...

2016-01-20 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/7336#issuecomment-173241662
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5213] [SQL] Pluggable SQL Parser Suppor...

2016-01-15 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/5827#issuecomment-172144781
  
@rxin Our parser is a extended version of the `SqlParser`,  the main 
difference is that we add the support for subquery(both correlated and 
uncorrelated ),exists, in and some minor improvement such as grouping, top, 
cube/rollup. It support the tpcds generated ANSI sql syntax without any change.

I noticed that there are some PRs for these features, i will take a look at 
that PRs when i have time and see what i can do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5213] [SQL] Pluggable SQL Parser Suppor...

2016-01-15 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/5827#issuecomment-172151467
  
Actually we were trying to contribute this improvements, unfortunately the 
community do not want them for maintain(or compatibility with hive ql) reason 
in the past:).

I am glad that spark sql use a single parser such that people can make 
contributions and make it more and more powerful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8968] [SQL] external sort by the partit...

2016-01-15 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/7336#issuecomment-172144917
  
Ping @rxin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-5213] [SQL] Pluggable SQL Parser Suppor...

2016-01-14 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/5827#issuecomment-171667688
  
@rxin, yes we used this and we implements a new sqlparser based on this 
interface to support ANSI tpcds sql.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12742] [SQL] org.apache.spark.sql.hive....

2016-01-11 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/10682#discussion_r49404076
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala 
---
@@ -24,6 +24,9 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with 
SQLTestUtils {
   import testImplicits._
 
   protected override def beforeAll(): Unit = {
+sql("DROP TABLE IF EXISTS t0")
+sql("DROP TABLE IF EXISTS t1")
+sql("DROP TABLE IF EXISTS t2")
--- End diff --

@liancheng t1 table must already be created in other test suite, so here i 
just drop them before run this suite.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8968] [SQL] external sort by the partit...

2016-01-10 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/7336#issuecomment-170359121
  
Back to update, @marmbrus @rxin please help review this when you have time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12742] [SQL] org.apache.spark.sql.hive....

2016-01-10 Thread scwf
GitHub user scwf opened a pull request:

https://github.com/apache/spark/pull/10682

[SPARK-12742] [SQL] org.apache.spark.sql.hive.LogicalPlanToSQLSuite failure 
due to Table already exists exception

```
[info] Exception encountered when attempting to run a suite with class 
name: org.apache.spark.sql.hive.LogicalPlanToSQLSuite *** ABORTED *** (325 
milliseconds)
[info]   org.apache.spark.sql.AnalysisException: Table `t1` already exists.;
[info]   at 
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:296)
[info]   at 
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:285)
[info]   at 
org.apache.spark.sql.hive.LogicalPlanToSQLSuite.beforeAll(LogicalPlanToSQLSuite.scala:33)
[info]   at 
org.scalatest.BeforeAndAfterAll$class.beforeAll(BeforeAndAfterAll.scala:187)
[info]   at 
org.apache.spark.sql.hive.LogicalPlanToSQLSuite.beforeAll(LogicalPlanToSQLSuite.scala:23)
[info]   at 
org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:253)
[info]   at 
org.apache.spark.sql.hive.LogicalPlanToSQLSuite.run(LogicalPlanToSQLSuite.scala:23)
[info]   at 
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:462)
[info]   at 
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:671)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:296)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:286)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[info]   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[info]   at java.lang.Thread.run(Thread.java:745)
```

/cc @liancheng 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/scwf/spark fix-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/10682.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #10682


commit 62dd2f4e46c9fff010fd231fdc2e8c5c1a63c86b
Author: wangfei <wangfei_he...@126.com>
Date:   2016-01-10T16:30:58Z

fix table already exists error




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8968] [SQL] external sort by the partit...

2016-01-10 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/7336#issuecomment-170412209
  
@rxin, yes, This PR try to fix the same issue on the Hive support side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6190][core] create LargeByteBuffer for ...

2016-01-08 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/5400#issuecomment-170037893
  
 >>The cached size cannot be greater than 2GB.

@rxin how to understand the `cached size`? the partition size of a cached 
rdd?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6190][core] create LargeByteBuffer for ...

2016-01-06 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/5400#issuecomment-169517123
  
hi @squito, can you explain in which situation users will hit the 2g limit? 
will a job of processing very large data(such as PB level data) reach this 
limit?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12321][SQL] JSON format for TreeNode (u...

2015-12-22 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/10311#issuecomment-166772051
  
Get it thanks @marmbrus :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12321][SQL] JSON format for TreeNode (u...

2015-12-22 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/10311#issuecomment-166611132
  
Hi @cloud-fan can you explain in which cases we can use this feature or the 
motivation for this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12222] [Core] Deserialize RoaringBitmap...

2015-12-10 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/10253#issuecomment-163590407
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11016] Move RoaringBitmap to explicit K...

2015-12-08 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/9748#issuecomment-162860246
  
@davies here are some problems when deserialize for RoaringBitmap. see the 
examples below:
run this piece of code
```
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => 
KryoOutput}
import java.io.DataInput

class KryoInputDataInputBridge(input: KryoInput) extends DataInput {
  override def readLong(): Long = input.readLong()
  override def readChar(): Char = input.readChar()
  override def readFloat(): Float = input.readFloat()
  override def readByte(): Byte = input.readByte()
  override def readShort(): Short = input.readShort()
  override def readUTF(): String = input.readString() // readString in 
kryo does utf8
  override def readInt(): Int = input.readInt()
  override def readUnsignedShort(): Int = input.readShortUnsigned()
  override def skipBytes(n: Int): Int = input.skip(n.toLong).toInt
  override def readFully(b: Array[Byte]): Unit = input.read(b)
  override def readFully(b: Array[Byte], off: Int, len: Int): Unit = 
input.read(b, off, len)
  override def readLine(): String = throw new 
UnsupportedOperationException("readLine")
  override def readBoolean(): Boolean = input.readBoolean()
  override def readUnsignedByte(): Int = input.readByteUnsigned()
  override def readDouble(): Double = input.readDouble()
}

class KryoOutputDataOutputBridge(output: KryoOutput) extends DataOutput 
{
  override def writeFloat(v: Float): Unit = output.writeFloat(v)
  // There is no "readChars" counterpart, except maybe "readLine", 
which is not supported
  override def writeChars(s: String): Unit = throw new 
UnsupportedOperationException("writeChars")
  override def writeDouble(v: Double): Unit = output.writeDouble(v)
  override def writeUTF(s: String): Unit = output.writeString(s) // 
writeString in kryo does UTF8
  override def writeShort(v: Int): Unit = output.writeShort(v)
  override def writeInt(v: Int): Unit = output.writeInt(v)
  override def writeBoolean(v: Boolean): Unit = output.writeBoolean(v)
  override def write(b: Int): Unit = output.write(b)
  override def write(b: Array[Byte]): Unit = output.write(b)
  override def write(b: Array[Byte], off: Int, len: Int): Unit = 
output.write(b, off, len)
  override def writeBytes(s: String): Unit = output.writeString(s)
  override def writeChar(v: Int): Unit = output.writeChar(v.toChar)
  override def writeLong(v: Long): Unit = output.writeLong(v)
  override def writeByte(v: Int): Unit = output.writeByte(v)
}
val outStream = new FileOutputStream("D:\\wfserde")
val output = new KryoOutput(outStream)
val bitmap = new RoaringBitmap
bitmap.add(1)
bitmap.add(3)
bitmap.add(5)
bitmap.serialize(new KryoOutputDataOutputBridge(output))
output.flush()
output.close()

val inStream = new FileInputStream("D:\\wfserde")
val input = new KryoInput(inStream)
val ret = new RoaringBitmap
ret.deserialize(new KryoInputDataInputBridge(input))

println(ret)
```

this will throw `Buffer underflow` error:
```
com.esotericsoftware.kryo.KryoException: Buffer underflow.
at com.esotericsoftware.kryo.io.Input.require(Input.java:156)
at com.esotericsoftware.kryo.io.Input.skip(Input.java:131)
at com.esotericsoftware.kryo.io.Input.skip(Input.java:264)
at 
org.apache.spark.sql.SQLQuerySuite$$anonfun$6$KryoInputDataInputBridge$1.skipBytes
```

after same investigation,  i found this is caused by a bug of kryo's 
`Input.skip(long count)`(https://github.com/EsotericSoftware/kryo/issues/119) 
and we call this method in `KryoInputDataInputBridge`.

So i think we can fix this issue in this two ways:
1) upgrade the kryo version to 2.23.0 or 2.24.0, which has fix this bug in 
kryo (i am not sure the upgrade is safe in spark, can you check it? @davies )

2) we can bypass the  kryo's `Input.skip(long count)` by directly call 
another `skip` method in kryo's 
Input.java(https://github.com/EsotericSoftware/kryo/blob/kryo-2.21/src/com/esotericsoftware/kryo/io/Input.java#L124),
 i.e. write the bug-fixed version of `Input.skip(long count)` in 
KryoInputDataInputBridge's `skipBytes` method:
```
   class KryoInputDataInputBridge(input: KryoInput) extends DataInput {
  ...
  override def skipBytes(n: Int): Int = {
var remaining: Long = n
while (remaining > 0) {
  val skip = Math.min(Integer.MAX_VALUE, 
remaining).asInstanceOf[Int]
  input.s

[GitHub] spark pull request: [SPARK-11016] Move RoaringBitmap to explicit K...

2015-12-08 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/9748#issuecomment-163074120
  
ok, should i send pr to master and branch-1.6 both?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12222] [Core] Deserialize RoaringBitmap...

2015-12-08 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/10213#issuecomment-163089761
  
/cc @davies 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12222] [Core] Deserialize RoaringBitmap...

2015-12-08 Thread scwf
GitHub user scwf opened a pull request:

https://github.com/apache/spark/pull/10213

[SPARK-1] [Core] Deserialize RoaringBitmap using Kryo serializer throw 
Buffer underflow exception

Deserialize RoaringBitmap using Kryo serializer throw Buffer underflow 
exception:
```
com.esotericsoftware.kryo.KryoException: Buffer underflow.
at com.esotericsoftware.kryo.io.Input.require(Input.java:156)
at com.esotericsoftware.kryo.io.Input.skip(Input.java:131)
at com.esotericsoftware.kryo.io.Input.skip(Input.java:264)
at 
org.apache.spark.sql.SQLQuerySuite$$anonfun$6$KryoInputDataInputBridge$1.skipBytes
```

This is caused by a bug of kryo's `Input.skip(long 
count)`(https://github.com/EsotericSoftware/kryo/issues/119) and we call this 
method in `KryoInputDataInputBridge`.

Instead of upgrade kryo's version, this pr bypass the  kryo's 
`Input.skip(long count)` by directly call another `skip` method in kryo's 
Input.java(https://github.com/EsotericSoftware/kryo/blob/kryo-2.21/src/com/esotericsoftware/kryo/io/Input.java#L124),
 i.e. write the bug-fixed version of `Input.skip(long count)` in 
KryoInputDataInputBridge's `skipBytes` method.

more detail link to 
https://github.com/apache/spark/pull/9748#issuecomment-162860246

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/scwf/spark patch-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/10213.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #10213


commit edf438443b35e131490e8652d6382143180b9e26
Author: Fei Wang <wangf...@huawei.com>
Date:   2015-12-09T02:41:42Z

fix skipBytes

commit 01d37b0e1833dd17aedd27427102edef3ffdba79
Author: Fei Wang <wangf...@huawei.com>
Date:   2015-12-09T02:45:19Z

added test suite




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11253][SQL] reset all accumulators in p...

2015-10-26 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/9215#issuecomment-151041137
  
should this merged to branch-1.5?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9281] [SQL] use decimal or double when ...

2015-10-13 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/7642#issuecomment-147906001
  
hi @davies  seems this is not compatible with hiveql, HiveQl still parse 
float number as double. 
https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala#L1670-L1689

Or i missed sth?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4226][SQL]Add subquery (not) in/exists ...

2015-10-11 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/9055#issuecomment-147273499
  
ok, does this support multi exists and in in where clause?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4226][SQL]Add subquery (not) in/exists ...

2015-10-11 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/9055#issuecomment-147272550
  
what's the difference with #4812?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SQL] Add toString to DataFrame/Column

2015-09-24 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/4436#discussion_r40289134
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -67,6 +68,17 @@ abstract class Expression extends TreeNode[Expression] {
   def childrenResolved = !children.exists(!_.resolved)
 
   /**
+   * Returns a string representation of this expression that does not have 
developer centric
+   * debugging information like the expression id.
+   */
+  def prettyString: String = {
+transform {
--- End diff --

no, i am going through the code, and i think we'd better not use transform 
here. in which case we need change the expression?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SQL] Add toString to DataFrame/Column

2015-09-23 Thread scwf
Github user scwf commented on a diff in the pull request:

https://github.com/apache/spark/pull/4436#discussion_r40283225
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -67,6 +68,17 @@ abstract class Expression extends TreeNode[Expression] {
   def childrenResolved = !children.exists(!_.resolved)
 
   /**
+   * Returns a string representation of this expression that does not have 
developer centric
+   * debugging information like the expression id.
+   */
+  def prettyString: String = {
+transform {
--- End diff --

why use transform here? this will change the expression


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4131][SQL] support writing data into th...

2015-09-20 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/4380#issuecomment-141853176
  
@litao-buptsse, i will update this soon thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9066][SQL] Improve cartesian performanc...

2015-09-08 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/7417#issuecomment-138544481
  
@Sephiroth-Lin can you rebase this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9066][SQL] Improve cartesian performanc...

2015-09-05 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/7417#issuecomment-138037270
  
@zsxwing it is definitely putting the small table in the left side of 
'RDD.cartesian` improve the performance. you can have a simple test that do 
cartesian with a big data set and a small data set. the performance of putting  
the small data set right is much much worse.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8813][SQL] Combine files when there're ...

2015-08-16 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/8125#issuecomment-131669839
  
@liancheng we have this cases:
The production system produce small text/csv files every five minute, and 
we use spark sql to do some ETL work(such as agg) on this small files to 
produce the agg parquet/orc files to for analysis. In this case this small 
files only go through once, so we do not want to do compacting periodically 
here and hope that spark sql can merge the small files when do the ETL work. We 
have test this case with this patch, and performance improved about 20%.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4131][SQL] support writing data into th...

2015-08-12 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/4380#issuecomment-130185516
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8968] [SQL] external sort by the partit...

2015-08-12 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/7336#issuecomment-130316086
  
retest this please



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8968] [SQL] external sort by the partit...

2015-08-12 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/7336#issuecomment-130312583
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4131][SQL] support writing data into th...

2015-08-11 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/4380#issuecomment-130153626
  
Retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8968] [SQL] external sort by the partit...

2015-08-11 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/7336#issuecomment-130127052
  
/cc @marmbrus can you take a look at this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4131][SQL] support writing data into th...

2015-08-11 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/4380#issuecomment-130152470
  
yes, since we upgrade the hive version to 1.2.1, we should adapt the token 
tree in hiveql, the old one is not correct in 1.2.1. Updated



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8968] [SQL] external sort by the partit...

2015-08-09 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/7336#issuecomment-129266361
  
/cc @marmbrus 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7190] [SPARK-8804] [SPARK-7815] [SQL] u...

2015-08-06 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/7197#issuecomment-128555369
  
@davies https://issues.apache.org/jira/browse/SPARK-9725


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7190] [SPARK-8804] [SPARK-7815] [SQL] u...

2015-08-06 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/7197#issuecomment-128352195
  
@davies  here is a bug when this PR is in,  that is
when set executor memory = 32g, all the queries for string field  will 
have problem. seems it return empty/garbled string to user.

I am not familiar with the unsafe utf8string and not fund the root cause.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4131][SQL] support writing data into th...

2015-07-29 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/4380#issuecomment-125853720
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4131][SQL] support writing data into th...

2015-07-28 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/4380#issuecomment-125565512
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4131][SQL] support writing data into th...

2015-07-23 Thread scwf
Github user scwf commented on the pull request:

https://github.com/apache/spark/pull/4380#issuecomment-124285788
  
/cc @marmbrus 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >