[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19594


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-19 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r157699245
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
 ---
@@ -191,8 +191,19 @@ case class JoinEstimation(join: Join) extends Logging {
   val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, 
rightKey.dataType)
   if (ValueInterval.isIntersected(lInterval, rInterval)) {
 val (newMin, newMax) = ValueInterval.intersect(lInterval, 
rInterval, leftKey.dataType)
-val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, 
newMax)
-keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat)
+val (card, joinStat) = (leftKeyStat.histogram, 
rightKeyStat.histogram) match {
+  case (Some(l: Histogram), Some(r: Histogram)) =>
+computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, 
newMax)
+  case _ =>
+computeByNdv(leftKey, rightKey, newMin, newMax)
+}
+keyStatsAfterJoin += (
+  // Histograms are propagated as unchanged. During future 
estimation, they should be
+  // truncated by the updated max/min. In this way, only pointers 
of the histograms are
+  // propagated and thus reduce memory consumption.
+  leftKey -> joinStat.copy(histogram = leftKeyStat.histogram),
+  rightKey -> joinStat.copy(histogram = rightKeyStat.histogram)
--- End diff --

I put it here because `computeByEquiHeightHistogram` returns a single 
stats, here we keep the histogram for leftKey and rightKey respectively.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-19 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r157698793
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
 ---
@@ -225,6 +236,43 @@ case class JoinEstimation(join: Join) extends Logging {
 (ceil(card), newStats)
   }
 
+  /** Compute join cardinality using equi-height histograms. */
+  private def computeByEquiHeightHistogram(
+  leftKey: AttributeReference,
+  rightKey: AttributeReference,
+  leftHistogram: Histogram,
+  rightHistogram: Histogram,
+  newMin: Option[Any],
+  newMax: Option[Any]): (BigInt, ColumnStat) = {
+val overlappedRanges = getOverlappedRanges(
+  leftHistogram = leftHistogram,
+  rightHistogram = rightHistogram,
+  // Only numeric values have equi-height histograms.
+  lowerBound = newMin.get.toString.toDouble,
+  upperBound = newMax.get.toString.toDouble)
+
+var card: BigDecimal = 0
+var totalNdv: Double = 0
+for (i <- overlappedRanges.indices) {
+  val range = overlappedRanges(i)
+  if (i == 0 || range.hi != overlappedRanges(i - 1).hi) {
+// If range.hi == overlappedRanges(i - 1).hi, that means the 
current range has only one
+// value, and this value is already counted in the previous range. 
So there is no need to
+// count it in this range.
+totalNdv += math.min(range.leftNdv, range.rightNdv)
+  }
+  // Apply the formula in this overlapped range.
+  card += range.leftNumRows * range.rightNumRows / 
math.max(range.leftNdv, range.rightNdv)
+}
+
+val leftKeyStat = leftStats.attributeStats(leftKey)
+val rightKeyStat = rightStats.attributeStats(rightKey)
+val newMaxLen = math.min(leftKeyStat.maxLen, rightKeyStat.maxLen)
+val newAvgLen = (leftKeyStat.avgLen + rightKeyStat.avgLen) / 2
--- End diff --

how do we use  left/right numRows to calculate this? Ideally avgLen is 
calculated by total length of keys / numRowsAfterJoin. For string type, we 
don't the exact length of the matched keys (we don't support string histogram 
yet), for numeric types, their avgLen should be the same. So the equation is a 
fair approximation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-19 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r157696227
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
 ---
@@ -225,6 +236,43 @@ case class JoinEstimation(join: Join) extends Logging {
 (ceil(card), newStats)
   }
 
+  /** Compute join cardinality using equi-height histograms. */
+  private def computeByEquiHeightHistogram(
+  leftKey: AttributeReference,
+  rightKey: AttributeReference,
+  leftHistogram: Histogram,
+  rightHistogram: Histogram,
+  newMin: Option[Any],
+  newMax: Option[Any]): (BigInt, ColumnStat) = {
+val overlappedRanges = getOverlappedRanges(
+  leftHistogram = leftHistogram,
+  rightHistogram = rightHistogram,
+  // Only numeric values have equi-height histograms.
+  lowerBound = newMin.get.toString.toDouble,
+  upperBound = newMax.get.toString.toDouble)
--- End diff --

that's because we need to update the column stats' min and max at the end 
of the method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r157528163
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
 ---
@@ -67,6 +67,222 @@ class JoinEstimationSuite extends 
StatsEstimationTestBase {
 rowCount = 2,
 attributeStats = AttributeMap(Seq("key-1-2", 
"key-2-3").map(nameToColInfo)))
 
+  private def estimateByHistogram(
+  leftHistogram: Histogram,
+  rightHistogram: Histogram,
+  expectedMin: Double,
+  expectedMax: Double,
+  expectedNdv: Long,
+  expectedRows: Long): Unit = {
+val col1 = attr("key1")
+val col2 = attr("key2")
+val c1 = generateJoinChild(col1, leftHistogram, expectedMin, 
expectedMax)
+val c2 = generateJoinChild(col2, rightHistogram, expectedMin, 
expectedMax)
+
+val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
+val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
+val expectedStatsAfterJoin = Statistics(
+  sizeInBytes = expectedRows * (8 + 2 * 4),
+  rowCount = Some(expectedRows),
+  attributeStats = AttributeMap(Seq(
+col1 -> c1.stats.attributeStats(col1).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax)),
+col2 -> c2.stats.attributeStats(col2).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax
+)
+
+// Join order should not affect estimation result.
+Seq(c1JoinC2, c2JoinC1).foreach { join =>
+  assert(join.stats == expectedStatsAfterJoin)
+}
+  }
+
+  private def generateJoinChild(
+  col: Attribute,
+  histogram: Histogram,
+  expectedMin: Double,
+  expectedMax: Double): LogicalPlan = {
+val colStat = inferColumnStat(histogram)
+StatsTestPlan(
+  outputList = Seq(col),
+  rowCount = (histogram.height * histogram.bins.length).toLong,
+  attributeStats = AttributeMap(Seq(col -> colStat)))
+  }
+
+  /** Column statistics should be consistent with histograms in tests. */
+  private def inferColumnStat(histogram: Histogram): ColumnStat = {
+var ndv = 0L
+for (i <- histogram.bins.indices) {
+  val bin = histogram.bins(i)
+  if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
+ndv += bin.ndv
+  }
+}
+ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
+  max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, 
maxLen = 4,
+  histogram = Some(histogram))
+  }
+
+  test("equi-height histograms: a bin is contained by another one") {
+val histogram1 = Histogram(height = 300, Array(
+  HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 
60, ndv = 30)))
+val histogram2 = Histogram(height = 100, Array(
+  HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 
100, ndv = 40)))
+// test bin trimming
+val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 
10, upperBound = 60)
+assert(t0 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h0 == 80)
+val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 
10, upperBound = 60)
+assert(t1 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20)
+
+val expectedRanges = Seq(
+  // histogram1.bins(0) overlaps t0
+  OverlappedRange(10, 30, 10, 40*1/2, 300, 80*1/2),
+  // histogram1.bins(1) overlaps t0
+  OverlappedRange(30, 50, 30*2/3, 40*1/2, 300*2/3, 80*1/2),
+  // histogram1.bins(1) overlaps t1
+  OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20)
+)
+assert(expectedRanges.equals(
+  getOverlappedRanges(histogram1, histogram2, lowerBound = 10D, 
upperBound = 60D)))
+
+estimateByHistogram(
+  leftHistogram = histogram1,
+  rightHistogram = histogram2,
+  expectedMin = 10D,
+  expectedMax = 60D,
+  // 10 + 20 + 8
+  expectedNdv = 38L,
+  // 300*40/20 + 200*40/20 + 100*20/10
+  expectedRows = 1200L)
+  }
+
+  test("equi-height histograms: a bin has only one value after trimming") {
+val histogram1 = Histogram(height = 300, Array(
+  HistogramBin(lo = 50, hi = 60, ndv = 10), HistogramBin(lo = 60, hi = 
75, ndv = 3)))
+val histogram2 = Histogram(height = 100, Array(
+  HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 
100, ndv = 40)))
+// test bin trimming
+val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 
50, upperBound = 75)
+assert(t0 == HistogramBin(lo = 50, hi = 

[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r157526127
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
 ---
@@ -67,6 +67,222 @@ class JoinEstimationSuite extends 
StatsEstimationTestBase {
 rowCount = 2,
 attributeStats = AttributeMap(Seq("key-1-2", 
"key-2-3").map(nameToColInfo)))
 
+  private def estimateByHistogram(
+  leftHistogram: Histogram,
+  rightHistogram: Histogram,
+  expectedMin: Double,
+  expectedMax: Double,
+  expectedNdv: Long,
+  expectedRows: Long): Unit = {
+val col1 = attr("key1")
+val col2 = attr("key2")
+val c1 = generateJoinChild(col1, leftHistogram, expectedMin, 
expectedMax)
+val c2 = generateJoinChild(col2, rightHistogram, expectedMin, 
expectedMax)
+
+val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
+val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
+val expectedStatsAfterJoin = Statistics(
+  sizeInBytes = expectedRows * (8 + 2 * 4),
+  rowCount = Some(expectedRows),
+  attributeStats = AttributeMap(Seq(
+col1 -> c1.stats.attributeStats(col1).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax)),
+col2 -> c2.stats.attributeStats(col2).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax
+)
+
+// Join order should not affect estimation result.
+Seq(c1JoinC2, c2JoinC1).foreach { join =>
+  assert(join.stats == expectedStatsAfterJoin)
+}
+  }
+
+  private def generateJoinChild(
+  col: Attribute,
+  histogram: Histogram,
+  expectedMin: Double,
+  expectedMax: Double): LogicalPlan = {
+val colStat = inferColumnStat(histogram)
+StatsTestPlan(
+  outputList = Seq(col),
+  rowCount = (histogram.height * histogram.bins.length).toLong,
+  attributeStats = AttributeMap(Seq(col -> colStat)))
+  }
+
+  /** Column statistics should be consistent with histograms in tests. */
+  private def inferColumnStat(histogram: Histogram): ColumnStat = {
+var ndv = 0L
+for (i <- histogram.bins.indices) {
+  val bin = histogram.bins(i)
+  if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
+ndv += bin.ndv
+  }
+}
+ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
+  max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, 
maxLen = 4,
+  histogram = Some(histogram))
+  }
+
+  test("equi-height histograms: a bin is contained by another one") {
+val histogram1 = Histogram(height = 300, Array(
+  HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 
60, ndv = 30)))
+val histogram2 = Histogram(height = 100, Array(
+  HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 
100, ndv = 40)))
+// test bin trimming
+val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 
10, upperBound = 60)
+assert(t0 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h0 == 80)
+val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 
10, upperBound = 60)
+assert(t1 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20)
+
+val expectedRanges = Seq(
+  // histogram1.bins(0) overlaps t0
+  OverlappedRange(10, 30, 10, 40*1/2, 300, 80*1/2),
+  // histogram1.bins(1) overlaps t0
+  OverlappedRange(30, 50, 30*2/3, 40*1/2, 300*2/3, 80*1/2),
+  // histogram1.bins(1) overlaps t1
+  OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20)
+)
+assert(expectedRanges.equals(
+  getOverlappedRanges(histogram1, histogram2, lowerBound = 10D, 
upperBound = 60D)))
+
+estimateByHistogram(
+  leftHistogram = histogram1,
+  rightHistogram = histogram2,
+  expectedMin = 10D,
+  expectedMax = 60D,
+  // 10 + 20 + 8
+  expectedNdv = 38L,
+  // 300*40/20 + 200*40/20 + 100*20/10
+  expectedRows = 1200L)
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r157525895
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
 ---
@@ -67,6 +67,222 @@ class JoinEstimationSuite extends 
StatsEstimationTestBase {
 rowCount = 2,
 attributeStats = AttributeMap(Seq("key-1-2", 
"key-2-3").map(nameToColInfo)))
 
+  private def estimateByHistogram(
+  leftHistogram: Histogram,
+  rightHistogram: Histogram,
+  expectedMin: Double,
+  expectedMax: Double,
+  expectedNdv: Long,
+  expectedRows: Long): Unit = {
+val col1 = attr("key1")
+val col2 = attr("key2")
+val c1 = generateJoinChild(col1, leftHistogram, expectedMin, 
expectedMax)
+val c2 = generateJoinChild(col2, rightHistogram, expectedMin, 
expectedMax)
+
+val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
+val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
+val expectedStatsAfterJoin = Statistics(
+  sizeInBytes = expectedRows * (8 + 2 * 4),
+  rowCount = Some(expectedRows),
+  attributeStats = AttributeMap(Seq(
+col1 -> c1.stats.attributeStats(col1).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax)),
+col2 -> c2.stats.attributeStats(col2).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax
+)
+
+// Join order should not affect estimation result.
+Seq(c1JoinC2, c2JoinC1).foreach { join =>
+  assert(join.stats == expectedStatsAfterJoin)
+}
+  }
+
+  private def generateJoinChild(
+  col: Attribute,
+  histogram: Histogram,
+  expectedMin: Double,
+  expectedMax: Double): LogicalPlan = {
+val colStat = inferColumnStat(histogram)
+StatsTestPlan(
+  outputList = Seq(col),
+  rowCount = (histogram.height * histogram.bins.length).toLong,
+  attributeStats = AttributeMap(Seq(col -> colStat)))
+  }
+
+  /** Column statistics should be consistent with histograms in tests. */
+  private def inferColumnStat(histogram: Histogram): ColumnStat = {
+var ndv = 0L
+for (i <- histogram.bins.indices) {
+  val bin = histogram.bins(i)
+  if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
+ndv += bin.ndv
+  }
+}
+ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
+  max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, 
maxLen = 4,
+  histogram = Some(histogram))
+  }
+
+  test("equi-height histograms: a bin is contained by another one") {
+val histogram1 = Histogram(height = 300, Array(
+  HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 
60, ndv = 30)))
+val histogram2 = Histogram(height = 100, Array(
+  HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 
100, ndv = 40)))
+// test bin trimming
+val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 
10, upperBound = 60)
+assert(t0 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h0 == 80)
+val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 
10, upperBound = 60)
+assert(t1 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20)
+
+val expectedRanges = Seq(
+  // histogram1.bins(0) overlaps t0
+  OverlappedRange(10, 30, 10, 40*1/2, 300, 80*1/2),
+  // histogram1.bins(1) overlaps t0
+  OverlappedRange(30, 50, 30*2/3, 40*1/2, 300*2/3, 80*1/2),
+  // histogram1.bins(1) overlaps t1
+  OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20)
+)
+assert(expectedRanges.equals(
+  getOverlappedRanges(histogram1, histogram2, lowerBound = 10D, 
upperBound = 60D)))
+
+estimateByHistogram(
+  leftHistogram = histogram1,
+  rightHistogram = histogram2,
+  expectedMin = 10D,
+  expectedMax = 60D,
+  // 10 + 20 + 8
+  expectedNdv = 38L,
--- End diff --

`expectedNdv = 10 + 20 + 8`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r157525695
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
 ---
@@ -67,6 +67,222 @@ class JoinEstimationSuite extends 
StatsEstimationTestBase {
 rowCount = 2,
 attributeStats = AttributeMap(Seq("key-1-2", 
"key-2-3").map(nameToColInfo)))
 
+  private def estimateByHistogram(
+  leftHistogram: Histogram,
+  rightHistogram: Histogram,
+  expectedMin: Double,
+  expectedMax: Double,
+  expectedNdv: Long,
+  expectedRows: Long): Unit = {
+val col1 = attr("key1")
+val col2 = attr("key2")
+val c1 = generateJoinChild(col1, leftHistogram, expectedMin, 
expectedMax)
+val c2 = generateJoinChild(col2, rightHistogram, expectedMin, 
expectedMax)
+
+val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
+val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
+val expectedStatsAfterJoin = Statistics(
+  sizeInBytes = expectedRows * (8 + 2 * 4),
+  rowCount = Some(expectedRows),
+  attributeStats = AttributeMap(Seq(
+col1 -> c1.stats.attributeStats(col1).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax)),
+col2 -> c2.stats.attributeStats(col2).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax
+)
+
+// Join order should not affect estimation result.
+Seq(c1JoinC2, c2JoinC1).foreach { join =>
+  assert(join.stats == expectedStatsAfterJoin)
+}
+  }
+
+  private def generateJoinChild(
+  col: Attribute,
+  histogram: Histogram,
+  expectedMin: Double,
+  expectedMax: Double): LogicalPlan = {
+val colStat = inferColumnStat(histogram)
+StatsTestPlan(
+  outputList = Seq(col),
+  rowCount = (histogram.height * histogram.bins.length).toLong,
+  attributeStats = AttributeMap(Seq(col -> colStat)))
+  }
+
+  /** Column statistics should be consistent with histograms in tests. */
+  private def inferColumnStat(histogram: Histogram): ColumnStat = {
+var ndv = 0L
+for (i <- histogram.bins.indices) {
+  val bin = histogram.bins(i)
+  if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
+ndv += bin.ndv
+  }
+}
+ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
+  max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, 
maxLen = 4,
+  histogram = Some(histogram))
+  }
+
+  test("equi-height histograms: a bin is contained by another one") {
+val histogram1 = Histogram(height = 300, Array(
+  HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 
60, ndv = 30)))
+val histogram2 = Histogram(height = 100, Array(
+  HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 
100, ndv = 40)))
+// test bin trimming
+val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 
10, upperBound = 60)
+assert(t0 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h0 == 80)
+val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 
10, upperBound = 60)
+assert(t1 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20)
+
+val expectedRanges = Seq(
+  // histogram1.bins(0) overlaps t0
+  OverlappedRange(10, 30, 10, 40*1/2, 300, 80*1/2),
+  // histogram1.bins(1) overlaps t0
+  OverlappedRange(30, 50, 30*2/3, 40*1/2, 300*2/3, 80*1/2),
+  // histogram1.bins(1) overlaps t1
+  OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20)
+)
+assert(expectedRanges.equals(
+  getOverlappedRanges(histogram1, histogram2, lowerBound = 10D, 
upperBound = 60D)))
--- End diff --

actually we can just write `10` right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r157524477
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
 ---
@@ -67,6 +67,222 @@ class JoinEstimationSuite extends 
StatsEstimationTestBase {
 rowCount = 2,
 attributeStats = AttributeMap(Seq("key-1-2", 
"key-2-3").map(nameToColInfo)))
 
+  private def estimateByHistogram(
+  leftHistogram: Histogram,
+  rightHistogram: Histogram,
+  expectedMin: Double,
+  expectedMax: Double,
+  expectedNdv: Long,
+  expectedRows: Long): Unit = {
+val col1 = attr("key1")
+val col2 = attr("key2")
+val c1 = generateJoinChild(col1, leftHistogram, expectedMin, 
expectedMax)
+val c2 = generateJoinChild(col2, rightHistogram, expectedMin, 
expectedMax)
+
+val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
+val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
+val expectedStatsAfterJoin = Statistics(
+  sizeInBytes = expectedRows * (8 + 2 * 4),
+  rowCount = Some(expectedRows),
+  attributeStats = AttributeMap(Seq(
+col1 -> c1.stats.attributeStats(col1).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax)),
+col2 -> c2.stats.attributeStats(col2).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax
+)
+
+// Join order should not affect estimation result.
+Seq(c1JoinC2, c2JoinC1).foreach { join =>
+  assert(join.stats == expectedStatsAfterJoin)
+}
+  }
+
+  private def generateJoinChild(
+  col: Attribute,
+  histogram: Histogram,
+  expectedMin: Double,
+  expectedMax: Double): LogicalPlan = {
+val colStat = inferColumnStat(histogram)
+StatsTestPlan(
+  outputList = Seq(col),
+  rowCount = (histogram.height * histogram.bins.length).toLong,
+  attributeStats = AttributeMap(Seq(col -> colStat)))
+  }
+
+  /** Column statistics should be consistent with histograms in tests. */
+  private def inferColumnStat(histogram: Histogram): ColumnStat = {
+var ndv = 0L
+for (i <- histogram.bins.indices) {
+  val bin = histogram.bins(i)
+  if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
+ndv += bin.ndv
+  }
+}
+ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
+  max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, 
maxLen = 4,
+  histogram = Some(histogram))
+  }
+
+  test("equi-height histograms: a bin is contained by another one") {
+val histogram1 = Histogram(height = 300, Array(
+  HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 
60, ndv = 30)))
+val histogram2 = Histogram(height = 100, Array(
+  HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 
100, ndv = 40)))
+// test bin trimming
+val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 
10, upperBound = 60)
+assert(t0 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h0 == 80)
+val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 
10, upperBound = 60)
+assert(t1 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20)
+
+val expectedRanges = Seq(
+  // histogram1.bins(0) overlaps t0
+  OverlappedRange(10, 30, 10, 40*1/2, 300, 80*1/2),
+  // histogram1.bins(1) overlaps t0
+  OverlappedRange(30, 50, 30*2/3, 40*1/2, 300*2/3, 80*1/2),
+  // histogram1.bins(1) overlaps t1
+  OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20)
+)
+assert(expectedRanges.equals(
+  getOverlappedRanges(histogram1, histogram2, lowerBound = 10D, 
upperBound = 60D)))
--- End diff --

10D looks weird, how about 10.0


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r157523857
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
 ---
@@ -67,6 +67,222 @@ class JoinEstimationSuite extends 
StatsEstimationTestBase {
 rowCount = 2,
 attributeStats = AttributeMap(Seq("key-1-2", 
"key-2-3").map(nameToColInfo)))
 
+  private def estimateByHistogram(
+  leftHistogram: Histogram,
+  rightHistogram: Histogram,
+  expectedMin: Double,
+  expectedMax: Double,
+  expectedNdv: Long,
+  expectedRows: Long): Unit = {
+val col1 = attr("key1")
+val col2 = attr("key2")
+val c1 = generateJoinChild(col1, leftHistogram, expectedMin, 
expectedMax)
+val c2 = generateJoinChild(col2, rightHistogram, expectedMin, 
expectedMax)
+
+val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
+val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
+val expectedStatsAfterJoin = Statistics(
+  sizeInBytes = expectedRows * (8 + 2 * 4),
+  rowCount = Some(expectedRows),
+  attributeStats = AttributeMap(Seq(
+col1 -> c1.stats.attributeStats(col1).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax)),
+col2 -> c2.stats.attributeStats(col2).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax
+)
+
+// Join order should not affect estimation result.
+Seq(c1JoinC2, c2JoinC1).foreach { join =>
+  assert(join.stats == expectedStatsAfterJoin)
+}
+  }
+
+  private def generateJoinChild(
+  col: Attribute,
+  histogram: Histogram,
+  expectedMin: Double,
+  expectedMax: Double): LogicalPlan = {
+val colStat = inferColumnStat(histogram)
+StatsTestPlan(
+  outputList = Seq(col),
+  rowCount = (histogram.height * histogram.bins.length).toLong,
+  attributeStats = AttributeMap(Seq(col -> colStat)))
+  }
+
+  /** Column statistics should be consistent with histograms in tests. */
+  private def inferColumnStat(histogram: Histogram): ColumnStat = {
+var ndv = 0L
+for (i <- histogram.bins.indices) {
+  val bin = histogram.bins(i)
+  if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
+ndv += bin.ndv
+  }
+}
+ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
+  max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, 
maxLen = 4,
+  histogram = Some(histogram))
+  }
+
+  test("equi-height histograms: a bin is contained by another one") {
+val histogram1 = Histogram(height = 300, Array(
+  HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 
60, ndv = 30)))
+val histogram2 = Histogram(height = 100, Array(
+  HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 
100, ndv = 40)))
+// test bin trimming
+val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 
10, upperBound = 60)
+assert(t0 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h0 == 80)
+val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 
10, upperBound = 60)
+assert(t1 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20)
+
+val expectedRanges = Seq(
+  // histogram1.bins(0) overlaps t0
+  OverlappedRange(10, 30, 10, 40*1/2, 300, 80*1/2),
--- End diff --

space between oeprators.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r157519924
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
 ---
@@ -191,8 +191,19 @@ case class JoinEstimation(join: Join) extends Logging {
   val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, 
rightKey.dataType)
   if (ValueInterval.isIntersected(lInterval, rInterval)) {
 val (newMin, newMax) = ValueInterval.intersect(lInterval, 
rInterval, leftKey.dataType)
-val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, 
newMax)
-keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat)
+val (card, joinStat) = (leftKeyStat.histogram, 
rightKeyStat.histogram) match {
+  case (Some(l: Histogram), Some(r: Histogram)) =>
+computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, 
newMax)
+  case _ =>
+computeByNdv(leftKey, rightKey, newMin, newMax)
+}
+keyStatsAfterJoin += (
+  // Histograms are propagated as unchanged. During future 
estimation, they should be
+  // truncated by the updated max/min. In this way, only pointers 
of the histograms are
+  // propagated and thus reduce memory consumption.
+  leftKey -> joinStat.copy(histogram = leftKeyStat.histogram),
+  rightKey -> joinStat.copy(histogram = rightKeyStat.histogram)
--- End diff --

i.e. 
https://github.com/apache/spark/pull/19594/files#diff-6387e7aaeb7d8e0cb1457b9d0fe5cd00R272


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r157519831
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
 ---
@@ -191,8 +191,19 @@ case class JoinEstimation(join: Join) extends Logging {
   val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, 
rightKey.dataType)
   if (ValueInterval.isIntersected(lInterval, rInterval)) {
 val (newMin, newMax) = ValueInterval.intersect(lInterval, 
rInterval, leftKey.dataType)
-val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, 
newMax)
-keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat)
+val (card, joinStat) = (leftKeyStat.histogram, 
rightKeyStat.histogram) match {
+  case (Some(l: Histogram), Some(r: Histogram)) =>
+computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, 
newMax)
+  case _ =>
+computeByNdv(leftKey, rightKey, newMin, newMax)
+}
+keyStatsAfterJoin += (
+  // Histograms are propagated as unchanged. During future 
estimation, they should be
+  // truncated by the updated max/min. In this way, only pointers 
of the histograms are
+  // propagated and thus reduce memory consumption.
+  leftKey -> joinStat.copy(histogram = leftKeyStat.histogram),
+  rightKey -> joinStat.copy(histogram = rightKeyStat.histogram)
--- End diff --

shall we do this inside `computeByEquiHeightHistogram`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r157519368
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
 ---
@@ -225,6 +236,43 @@ case class JoinEstimation(join: Join) extends Logging {
 (ceil(card), newStats)
   }
 
+  /** Compute join cardinality using equi-height histograms. */
+  private def computeByEquiHeightHistogram(
+  leftKey: AttributeReference,
+  rightKey: AttributeReference,
+  leftHistogram: Histogram,
+  rightHistogram: Histogram,
+  newMin: Option[Any],
+  newMax: Option[Any]): (BigInt, ColumnStat) = {
+val overlappedRanges = getOverlappedRanges(
+  leftHistogram = leftHistogram,
+  rightHistogram = rightHistogram,
+  // Only numeric values have equi-height histograms.
+  lowerBound = newMin.get.toString.toDouble,
+  upperBound = newMax.get.toString.toDouble)
+
+var card: BigDecimal = 0
+var totalNdv: Double = 0
+for (i <- overlappedRanges.indices) {
+  val range = overlappedRanges(i)
+  if (i == 0 || range.hi != overlappedRanges(i - 1).hi) {
+// If range.hi == overlappedRanges(i - 1).hi, that means the 
current range has only one
+// value, and this value is already counted in the previous range. 
So there is no need to
+// count it in this range.
+totalNdv += math.min(range.leftNdv, range.rightNdv)
+  }
+  // Apply the formula in this overlapped range.
+  card += range.leftNumRows * range.rightNumRows / 
math.max(range.leftNdv, range.rightNdv)
+}
+
+val leftKeyStat = leftStats.attributeStats(leftKey)
+val rightKeyStat = rightStats.attributeStats(rightKey)
+val newMaxLen = math.min(leftKeyStat.maxLen, rightKeyStat.maxLen)
+val newAvgLen = (leftKeyStat.avgLen + rightKeyStat.avgLen) / 2
--- End diff --

shall we count left/right numRows when calculating this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r157517554
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
 ---
@@ -225,6 +236,43 @@ case class JoinEstimation(join: Join) extends Logging {
 (ceil(card), newStats)
   }
 
+  /** Compute join cardinality using equi-height histograms. */
+  private def computeByEquiHeightHistogram(
+  leftKey: AttributeReference,
+  rightKey: AttributeReference,
+  leftHistogram: Histogram,
+  rightHistogram: Histogram,
+  newMin: Option[Any],
+  newMax: Option[Any]): (BigInt, ColumnStat) = {
+val overlappedRanges = getOverlappedRanges(
+  leftHistogram = leftHistogram,
+  rightHistogram = rightHistogram,
+  // Only numeric values have equi-height histograms.
+  lowerBound = newMin.get.toString.toDouble,
+  upperBound = newMax.get.toString.toDouble)
--- End diff --

if we assume the min/max must be defined here, I think the parameter type 
should be `double` instead of `Option[Any]`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r157517120
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
 ---
@@ -225,6 +236,43 @@ case class JoinEstimation(join: Join) extends Logging {
 (ceil(card), newStats)
   }
 
+  /** Compute join cardinality using equi-height histograms. */
+  private def computeByEquiHeightHistogram(
--- End diff --

I think it's ok to only say `Histogram` in method names and explain it's 
equi-height in comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r157514899
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -212,4 +213,186 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns overlapped ranges between two histograms, in the given value 
range
+   * [lowerBound, upperBound].
+   */
+  def getOverlappedRanges(
+leftHistogram: Histogram,
+rightHistogram: Histogram,
+lowerBound: Double,
+upperBound: Double): Seq[OverlappedRange] = {
+val overlappedRanges = new ArrayBuffer[OverlappedRange]()
+// Only bins whose range intersect [lowerBound, upperBound] have join 
possibility.
+val leftBins = leftHistogram.bins
+  .filter(b => b.lo <= upperBound && b.hi >= lowerBound)
+val rightBins = rightHistogram.bins
+  .filter(b => b.lo <= upperBound && b.hi >= lowerBound)
+
+leftBins.foreach { lb =>
+  rightBins.foreach { rb =>
+val (left, leftHeight) = trimBin(lb, leftHistogram.height, 
lowerBound, upperBound)
+val (right, rightHeight) = trimBin(rb, rightHistogram.height, 
lowerBound, upperBound)
+// Only collect overlapped ranges.
+if (left.lo <= right.hi && left.hi >= right.lo) {
+  // Collect overlapped ranges.
+  val range = if (left.lo == left.hi) {
+// Case1: the left bin has only one value
+OverlappedRange(
+  lo = left.lo,
+  hi = left.lo,
+  leftNdv = 1,
+  rightNdv = 1,
+  leftNumRows = leftHeight,
+  rightNumRows = rightHeight / right.ndv
+)
+  } else if (right.lo == right.hi) {
+// Case2: the right bin has only one value
+OverlappedRange(
+  lo = right.lo,
+  hi = right.lo,
+  leftNdv = 1,
+  rightNdv = 1,
+  leftNumRows = leftHeight / left.ndv,
+  rightNumRows = rightHeight
+)
+  } else if (right.lo >= left.lo && right.hi >= left.hi) {
+// Case3: the left bin is "smaller" than the right bin
+//  left.loright.lo left.hi  
right.hi
+// 
+--+++--->
+if (left.hi == right.lo) {
+  // The overlapped range has only one value.
+  OverlappedRange(
+lo = right.lo,
+hi = right.lo,
+leftNdv = 1,
+rightNdv = 1,
+leftNumRows = leftHeight / left.ndv,
+rightNumRows = rightHeight / right.ndv
+  )
+} else {
+  val leftRatio = (left.hi - right.lo) / (left.hi - left.lo)
+  val rightRatio = (left.hi - right.lo) / (right.hi - right.lo)
+  OverlappedRange(
+lo = right.lo,
+hi = left.hi,
+leftNdv = left.ndv * leftRatio,
+rightNdv = right.ndv * rightRatio,
+leftNumRows = leftHeight * leftRatio,
+rightNumRows = rightHeight * rightRatio
+  )
+}
+  } else if (right.lo <= left.lo && right.hi <= left.hi) {
+// Case4: the left bin is "larger" than the right bin
+//  right.lo   left.lo  right.hi 
left.hi
+// 
+--+++--->
+if (right.hi == left.lo) {
+  // The overlapped range has only one value.
+  OverlappedRange(
+lo = right.hi,
+hi = right.hi,
+leftNdv = 1,
+rightNdv = 1,
+leftNumRows = leftHeight / left.ndv,
+rightNumRows = rightHeight / right.ndv
+  )
+} else {
+  val leftRatio = (right.hi - left.lo) / (left.hi - left.lo)
+  val rightRatio = (right.hi - left.lo) / (right.hi - right.lo)
+  OverlappedRange(
+lo = left.lo,
+hi = right.hi,
+leftNdv = left.ndv * leftRatio,
+rightNdv = right.ndv * rightRatio,
+leftNumRows = leftHeight * leftRatio,
+rightNumRows = rightHeight * rightRatio
+  )
+}
+  } else if (right.lo >= left.lo && right.hi <= left.hi) {
+// Case5: the left bin 

[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r157514422
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -212,4 +213,186 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns overlapped ranges between two histograms, in the given value 
range
+   * [lowerBound, upperBound].
+   */
+  def getOverlappedRanges(
+leftHistogram: Histogram,
+rightHistogram: Histogram,
+lowerBound: Double,
+upperBound: Double): Seq[OverlappedRange] = {
+val overlappedRanges = new ArrayBuffer[OverlappedRange]()
+// Only bins whose range intersect [lowerBound, upperBound] have join 
possibility.
+val leftBins = leftHistogram.bins
+  .filter(b => b.lo <= upperBound && b.hi >= lowerBound)
+val rightBins = rightHistogram.bins
+  .filter(b => b.lo <= upperBound && b.hi >= lowerBound)
+
+leftBins.foreach { lb =>
+  rightBins.foreach { rb =>
+val (left, leftHeight) = trimBin(lb, leftHistogram.height, 
lowerBound, upperBound)
+val (right, rightHeight) = trimBin(rb, rightHistogram.height, 
lowerBound, upperBound)
+// Only collect overlapped ranges.
+if (left.lo <= right.hi && left.hi >= right.lo) {
+  // Collect overlapped ranges.
+  val range = if (left.lo == left.hi) {
+// Case1: the left bin has only one value
+OverlappedRange(
+  lo = left.lo,
+  hi = left.lo,
+  leftNdv = 1,
+  rightNdv = 1,
+  leftNumRows = leftHeight,
+  rightNumRows = rightHeight / right.ndv
+)
+  } else if (right.lo == right.hi) {
+// Case2: the right bin has only one value
+OverlappedRange(
+  lo = right.lo,
+  hi = right.lo,
+  leftNdv = 1,
+  rightNdv = 1,
+  leftNumRows = leftHeight / left.ndv,
+  rightNumRows = rightHeight
+)
+  } else if (right.lo >= left.lo && right.hi >= left.hi) {
+// Case3: the left bin is "smaller" than the right bin
+//  left.loright.lo left.hi  
right.hi
+// 
+--+++--->
+if (left.hi == right.lo) {
--- End diff --

yea this branch is needed, otherwise we will get 0 ratio and lead to wrong 
result.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r157513559
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -212,4 +213,186 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns overlapped ranges between two histograms, in the given value 
range
+   * [lowerBound, upperBound].
+   */
+  def getOverlappedRanges(
+leftHistogram: Histogram,
+rightHistogram: Histogram,
+lowerBound: Double,
+upperBound: Double): Seq[OverlappedRange] = {
+val overlappedRanges = new ArrayBuffer[OverlappedRange]()
+// Only bins whose range intersect [lowerBound, upperBound] have join 
possibility.
+val leftBins = leftHistogram.bins
+  .filter(b => b.lo <= upperBound && b.hi >= lowerBound)
+val rightBins = rightHistogram.bins
+  .filter(b => b.lo <= upperBound && b.hi >= lowerBound)
+
+leftBins.foreach { lb =>
+  rightBins.foreach { rb =>
+val (left, leftHeight) = trimBin(lb, leftHistogram.height, 
lowerBound, upperBound)
+val (right, rightHeight) = trimBin(rb, rightHistogram.height, 
lowerBound, upperBound)
+// Only collect overlapped ranges.
+if (left.lo <= right.hi && left.hi >= right.lo) {
+  // Collect overlapped ranges.
+  val range = if (left.lo == left.hi) {
+// Case1: the left bin has only one value
+OverlappedRange(
+  lo = left.lo,
+  hi = left.lo,
+  leftNdv = 1,
+  rightNdv = 1,
+  leftNumRows = leftHeight,
+  rightNumRows = rightHeight / right.ndv
+)
+  } else if (right.lo == right.hi) {
+// Case2: the right bin has only one value
--- End diff --

do we really need case 1 and 2? aren't they covered by branches below?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r157513325
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -212,4 +213,186 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns overlapped ranges between two histograms, in the given value 
range
+   * [lowerBound, upperBound].
+   */
+  def getOverlappedRanges(
+leftHistogram: Histogram,
+rightHistogram: Histogram,
+lowerBound: Double,
+upperBound: Double): Seq[OverlappedRange] = {
+val overlappedRanges = new ArrayBuffer[OverlappedRange]()
+// Only bins whose range intersect [lowerBound, upperBound] have join 
possibility.
+val leftBins = leftHistogram.bins
+  .filter(b => b.lo <= upperBound && b.hi >= lowerBound)
+val rightBins = rightHistogram.bins
+  .filter(b => b.lo <= upperBound && b.hi >= lowerBound)
+
+leftBins.foreach { lb =>
+  rightBins.foreach { rb =>
+val (left, leftHeight) = trimBin(lb, leftHistogram.height, 
lowerBound, upperBound)
+val (right, rightHeight) = trimBin(rb, rightHistogram.height, 
lowerBound, upperBound)
+// Only collect overlapped ranges.
+if (left.lo <= right.hi && left.hi >= right.lo) {
+  // Collect overlapped ranges.
+  val range = if (left.lo == left.hi) {
+// Case1: the left bin has only one value
+OverlappedRange(
+  lo = left.lo,
+  hi = left.lo,
+  leftNdv = 1,
+  rightNdv = 1,
+  leftNumRows = leftHeight,
+  rightNumRows = rightHeight / right.ndv
+)
+  } else if (right.lo == right.hi) {
+// Case2: the right bin has only one value
+OverlappedRange(
+  lo = right.lo,
+  hi = right.lo,
+  leftNdv = 1,
+  rightNdv = 1,
+  leftNumRows = leftHeight / left.ndv,
+  rightNumRows = rightHeight
+)
+  } else if (right.lo >= left.lo && right.hi >= left.hi) {
+// Case3: the left bin is "smaller" than the right bin
+//  left.loright.lo left.hi  
right.hi
+// 
+--+++--->
+if (left.hi == right.lo) {
+  // The overlapped range has only one value.
+  OverlappedRange(
+lo = right.lo,
+hi = right.lo,
+leftNdv = 1,
+rightNdv = 1,
+leftNumRows = leftHeight / left.ndv,
+rightNumRows = rightHeight / right.ndv
+  )
+} else {
+  val leftRatio = (left.hi - right.lo) / (left.hi - left.lo)
+  val rightRatio = (left.hi - right.lo) / (right.hi - right.lo)
+  OverlappedRange(
+lo = right.lo,
+hi = left.hi,
+leftNdv = left.ndv * leftRatio,
+rightNdv = right.ndv * rightRatio,
+leftNumRows = leftHeight * leftRatio,
+rightNumRows = rightHeight * rightRatio
+  )
+}
+  } else if (right.lo <= left.lo && right.hi <= left.hi) {
+// Case4: the left bin is "larger" than the right bin
+//  right.lo   left.lo  right.hi 
left.hi
+// 
+--+++--->
+if (right.hi == left.lo) {
+  // The overlapped range has only one value.
+  OverlappedRange(
+lo = right.hi,
+hi = right.hi,
+leftNdv = 1,
+rightNdv = 1,
+leftNumRows = leftHeight / left.ndv,
+rightNumRows = rightHeight / right.ndv
+  )
+} else {
+  val leftRatio = (right.hi - left.lo) / (left.hi - left.lo)
+  val rightRatio = (right.hi - left.lo) / (right.hi - right.lo)
+  OverlappedRange(
+lo = left.lo,
+hi = right.hi,
+leftNdv = left.ndv * leftRatio,
+rightNdv = right.ndv * rightRatio,
+leftNumRows = leftHeight * leftRatio,
+rightNumRows = rightHeight * rightRatio
+  )
+}
+  } else if (right.lo >= left.lo && right.hi <= left.hi) {
+// Case5: the left bin 

[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r157503949
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -212,4 +213,186 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns overlapped ranges between two histograms, in the given value 
range
+   * [lowerBound, upperBound].
+   */
+  def getOverlappedRanges(
+leftHistogram: Histogram,
+rightHistogram: Histogram,
+lowerBound: Double,
+upperBound: Double): Seq[OverlappedRange] = {
+val overlappedRanges = new ArrayBuffer[OverlappedRange]()
+// Only bins whose range intersect [lowerBound, upperBound] have join 
possibility.
+val leftBins = leftHistogram.bins
+  .filter(b => b.lo <= upperBound && b.hi >= lowerBound)
+val rightBins = rightHistogram.bins
+  .filter(b => b.lo <= upperBound && b.hi >= lowerBound)
+
+leftBins.foreach { lb =>
+  rightBins.foreach { rb =>
+val (left, leftHeight) = trimBin(lb, leftHistogram.height, 
lowerBound, upperBound)
+val (right, rightHeight) = trimBin(rb, rightHistogram.height, 
lowerBound, upperBound)
+// Only collect overlapped ranges.
+if (left.lo <= right.hi && left.hi >= right.lo) {
+  // Collect overlapped ranges.
+  val range = if (left.lo == left.hi) {
+// Case1: the left bin has only one value
+OverlappedRange(
+  lo = left.lo,
+  hi = left.lo,
+  leftNdv = 1,
+  rightNdv = 1,
+  leftNumRows = leftHeight,
+  rightNumRows = rightHeight / right.ndv
+)
+  } else if (right.lo == right.hi) {
+// Case2: the right bin has only one value
+OverlappedRange(
+  lo = right.lo,
+  hi = right.lo,
+  leftNdv = 1,
+  rightNdv = 1,
+  leftNumRows = leftHeight / left.ndv,
+  rightNumRows = rightHeight
+)
+  } else if (right.lo >= left.lo && right.hi >= left.hi) {
+// Case3: the left bin is "smaller" than the right bin
+//  left.loright.lo left.hi  
right.hi
+// 
+--+++--->
+if (left.hi == right.lo) {
+  // The overlapped range has only one value.
+  OverlappedRange(
+lo = right.lo,
+hi = right.lo,
+leftNdv = 1,
+rightNdv = 1,
+leftNumRows = leftHeight / left.ndv,
+rightNumRows = rightHeight / right.ndv
+  )
+} else {
+  val leftRatio = (left.hi - right.lo) / (left.hi - left.lo)
+  val rightRatio = (left.hi - right.lo) / (right.hi - right.lo)
+  OverlappedRange(
+lo = right.lo,
+hi = left.hi,
+leftNdv = left.ndv * leftRatio,
+rightNdv = right.ndv * rightRatio,
+leftNumRows = leftHeight * leftRatio,
+rightNumRows = rightHeight * rightRatio
+  )
+}
+  } else if (right.lo <= left.lo && right.hi <= left.hi) {
+// Case4: the left bin is "larger" than the right bin
+//  right.lo   left.lo  right.hi 
left.hi
+// 
+--+++--->
+if (right.hi == left.lo) {
+  // The overlapped range has only one value.
+  OverlappedRange(
+lo = right.hi,
+hi = right.hi,
+leftNdv = 1,
+rightNdv = 1,
+leftNumRows = leftHeight / left.ndv,
+rightNumRows = rightHeight / right.ndv
+  )
+} else {
+  val leftRatio = (right.hi - left.lo) / (left.hi - left.lo)
+  val rightRatio = (right.hi - left.lo) / (right.hi - right.lo)
+  OverlappedRange(
+lo = left.lo,
+hi = right.hi,
+leftNdv = left.ndv * leftRatio,
+rightNdv = right.ndv * rightRatio,
+leftNumRows = leftHeight * leftRatio,
+rightNumRows = rightHeight * rightRatio
+  )
+}
+  } else if (right.lo >= left.lo && right.hi <= left.hi) {
+// Case5: the left bin 

[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-15 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r157331840
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -114,4 +115,183 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns overlapped ranges between two histograms, in the given value 
range [newMin, newMax].
+   */
+  def getOverlappedRanges(
+  leftHistogram: Histogram,
+  rightHistogram: Histogram,
+  newMin: Double,
+  newMax: Double): Seq[OverlappedRange] = {
+val overlappedRanges = new ArrayBuffer[OverlappedRange]()
+// Only bins whose range intersect [newMin, newMax] have join 
possibility.
+val leftBins = leftHistogram.bins
+  .filter(b => b.lo <= newMax && b.hi >= newMin)
+val rightBins = rightHistogram.bins
+  .filter(b => b.lo <= newMax && b.hi >= newMin)
+
+leftBins.foreach { lb =>
+  rightBins.foreach { rb =>
--- End diff --

We only collect `OverlappedRange` when [left part and right part 
intersect](https://github.com/apache/spark/pull/19594/files#diff-56eed9f23127c954d9add0f6c5c93820R237),
 and the decision is based on some computation, it's not very convenient to use 
it as guards. So it seems `yield` form is not very suitable for this case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-15 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r157331711
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
 ---
@@ -191,8 +191,16 @@ case class JoinEstimation(join: Join) extends Logging {
   val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, 
rightKey.dataType)
   if (ValueInterval.isIntersected(lInterval, rInterval)) {
 val (newMin, newMax) = ValueInterval.intersect(lInterval, 
rInterval, leftKey.dataType)
-val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, 
newMax)
-keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat)
+val (card, joinStat) = (leftKeyStat.histogram, 
rightKeyStat.histogram) match {
+  case (Some(l: Histogram), Some(r: Histogram)) =>
+computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, 
newMax)
+  case _ =>
+computeByNdv(leftKey, rightKey, newMin, newMax)
+}
+keyStatsAfterJoin += (
+  leftKey -> joinStat.copy(histogram = leftKeyStat.histogram),
+  rightKey -> joinStat.copy(histogram = rightKeyStat.histogram)
--- End diff --

ah right, we can keep it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r156861642
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
 ---
@@ -191,8 +191,16 @@ case class JoinEstimation(join: Join) extends Logging {
   val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, 
rightKey.dataType)
   if (ValueInterval.isIntersected(lInterval, rInterval)) {
 val (newMin, newMax) = ValueInterval.intersect(lInterval, 
rInterval, leftKey.dataType)
-val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, 
newMax)
-keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat)
+val (card, joinStat) = (leftKeyStat.histogram, 
rightKeyStat.histogram) match {
+  case (Some(l: Histogram), Some(r: Histogram)) =>
+computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, 
newMax)
+  case _ =>
+computeByNdv(leftKey, rightKey, newMin, newMax)
+}
+keyStatsAfterJoin += (
+  leftKey -> joinStat.copy(histogram = leftKeyStat.histogram),
+  rightKey -> joinStat.copy(histogram = rightKeyStat.histogram)
--- End diff --

Actually keeping it unchanged is more memory efficient. We just pass around 
pointers, but updating the histogram means creating a new one.

Let's keep it, and add some comments to explain it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-13 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r156847785
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
 ---
@@ -191,8 +191,16 @@ case class JoinEstimation(join: Join) extends Logging {
   val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, 
rightKey.dataType)
   if (ValueInterval.isIntersected(lInterval, rInterval)) {
 val (newMin, newMax) = ValueInterval.intersect(lInterval, 
rInterval, leftKey.dataType)
-val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, 
newMax)
-keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat)
+val (card, joinStat) = (leftKeyStat.histogram, 
rightKeyStat.histogram) match {
+  case (Some(l: Histogram), Some(r: Histogram)) =>
+computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, 
newMax)
+  case _ =>
+computeByNdv(leftKey, rightKey, newMin, newMax)
+}
+keyStatsAfterJoin += (
+  leftKey -> joinStat.copy(histogram = leftKeyStat.histogram),
+  rightKey -> joinStat.copy(histogram = rightKeyStat.histogram)
--- End diff --

Currently we don't update histogram since min/max can help us to know which 
bins are valid. It doesn't affect correctness. But updating histograms helps to 
reduce memory usage for histogram propagation. We can do this in both filter 
and join estimation in following PRs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-13 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r156847046
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -114,4 +115,183 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns overlapped ranges between two histograms, in the given value 
range [newMin, newMax].
+   */
+  def getOverlappedRanges(
+  leftHistogram: Histogram,
+  rightHistogram: Histogram,
+  newMin: Double,
+  newMax: Double): Seq[OverlappedRange] = {
+val overlappedRanges = new ArrayBuffer[OverlappedRange]()
+// Only bins whose range intersect [newMin, newMax] have join 
possibility.
+val leftBins = leftHistogram.bins
+  .filter(b => b.lo <= newMax && b.hi >= newMin)
+val rightBins = rightHistogram.bins
+  .filter(b => b.lo <= newMax && b.hi >= newMin)
+
+leftBins.foreach { lb =>
+  rightBins.foreach { rb =>
+val (left, leftHeight) = trimBin(lb, leftHistogram.height, newMin, 
newMax)
+val (right, rightHeight) = trimBin(rb, rightHistogram.height, 
newMin, newMax)
+// Only collect overlapped ranges.
+if (left.lo <= right.hi && left.hi >= right.lo) {
+  // Collect overlapped ranges.
+  val range = if (left.lo == left.hi) {
+// Case1: the left bin has only one value
+OverlappedRange(
+  lo = left.lo,
+  hi = left.lo,
+  leftNdv = 1,
+  rightNdv = 1,
+  leftNumRows = leftHeight,
+  rightNumRows = rightHeight / right.ndv
+)
+  } else if (right.lo == right.hi) {
+// Case2: the right bin has only one value
+OverlappedRange(
+  lo = right.lo,
+  hi = right.lo,
+  leftNdv = 1,
+  rightNdv = 1,
+  leftNumRows = leftHeight / left.ndv,
+  rightNumRows = rightHeight
+)
+  } else if (right.lo >= left.lo && right.hi >= left.hi) {
+// Case3: the left bin is "smaller" than the right bin
+//  left.loright.lo left.hi  
right.hi
+// 
+--+++--->
+val leftRatio = (left.hi - right.lo) / (left.hi - left.lo)
+val rightRatio = (left.hi - right.lo) / (right.hi - right.lo)
+if (leftRatio == 0) {
+  // The overlapped range has only one value.
+  OverlappedRange(
+lo = right.lo,
+hi = right.lo,
+leftNdv = 1,
+rightNdv = 1,
+leftNumRows = leftHeight / left.ndv,
+rightNumRows = rightHeight / right.ndv
+  )
+} else {
+  OverlappedRange(
+lo = right.lo,
+hi = left.hi,
+leftNdv = left.ndv * leftRatio,
+rightNdv = right.ndv * rightRatio,
+leftNumRows = leftHeight * leftRatio,
+rightNumRows = rightHeight * rightRatio
+  )
+}
+  } else if (right.lo <= left.lo && right.hi <= left.hi) {
+// Case4: the left bin is "larger" than the right bin
+//  right.lo   left.lo  right.hi 
left.hi
+// 
+--+++--->
+val leftRatio = (right.hi - left.lo) / (left.hi - left.lo)
+val rightRatio = (right.hi - left.lo) / (right.hi - right.lo)
+if (leftRatio == 0) {
+  // The overlapped range has only one value.
+  OverlappedRange(
+lo = right.hi,
+hi = right.hi,
+leftNdv = 1,
+rightNdv = 1,
+leftNumRows = leftHeight / left.ndv,
+rightNumRows = rightHeight / right.ndv
+  )
+} else {
+  OverlappedRange(
+lo = left.lo,
+hi = right.hi,
+leftNdv = left.ndv * leftRatio,
+rightNdv = right.ndv * rightRatio,
+leftNumRows = leftHeight * leftRatio,
+rightNumRows = rightHeight * rightRatio
+  )
+}
+  } else if (right.lo >= left.lo && right.hi <= left.hi) {
+// Case5: the left bin contains the right bin
+//  left.loright.lo 

[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-13 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r156846872
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -114,4 +115,183 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns overlapped ranges between two histograms, in the given value 
range [newMin, newMax].
+   */
+  def getOverlappedRanges(
+  leftHistogram: Histogram,
+  rightHistogram: Histogram,
+  newMin: Double,
+  newMax: Double): Seq[OverlappedRange] = {
--- End diff --

yea I think `upperBound/lowerBound` is better.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r156393413
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
 ---
@@ -191,8 +191,16 @@ case class JoinEstimation(join: Join) extends Logging {
   val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, 
rightKey.dataType)
   if (ValueInterval.isIntersected(lInterval, rInterval)) {
 val (newMin, newMax) = ValueInterval.intersect(lInterval, 
rInterval, leftKey.dataType)
-val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, 
newMax)
-keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat)
+val (card, joinStat) = (leftKeyStat.histogram, 
rightKeyStat.histogram) match {
+  case (Some(l: Histogram), Some(r: Histogram)) =>
+computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, 
newMax)
+  case _ =>
+computeByNdv(leftKey, rightKey, newMin, newMax)
+}
+keyStatsAfterJoin += (
+  leftKey -> joinStat.copy(histogram = leftKeyStat.histogram),
+  rightKey -> joinStat.copy(histogram = rightKeyStat.histogram)
--- End diff --

should we update the histogram after join?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r156392538
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -114,4 +115,183 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns overlapped ranges between two histograms, in the given value 
range [newMin, newMax].
+   */
+  def getOverlappedRanges(
+  leftHistogram: Histogram,
+  rightHistogram: Histogram,
+  newMin: Double,
+  newMax: Double): Seq[OverlappedRange] = {
+val overlappedRanges = new ArrayBuffer[OverlappedRange]()
+// Only bins whose range intersect [newMin, newMax] have join 
possibility.
+val leftBins = leftHistogram.bins
+  .filter(b => b.lo <= newMax && b.hi >= newMin)
+val rightBins = rightHistogram.bins
+  .filter(b => b.lo <= newMax && b.hi >= newMin)
+
+leftBins.foreach { lb =>
+  rightBins.foreach { rb =>
+val (left, leftHeight) = trimBin(lb, leftHistogram.height, newMin, 
newMax)
+val (right, rightHeight) = trimBin(rb, rightHistogram.height, 
newMin, newMax)
+// Only collect overlapped ranges.
+if (left.lo <= right.hi && left.hi >= right.lo) {
+  // Collect overlapped ranges.
+  val range = if (left.lo == left.hi) {
+// Case1: the left bin has only one value
+OverlappedRange(
+  lo = left.lo,
+  hi = left.lo,
+  leftNdv = 1,
+  rightNdv = 1,
+  leftNumRows = leftHeight,
+  rightNumRows = rightHeight / right.ndv
+)
+  } else if (right.lo == right.hi) {
+// Case2: the right bin has only one value
+OverlappedRange(
+  lo = right.lo,
+  hi = right.lo,
+  leftNdv = 1,
+  rightNdv = 1,
+  leftNumRows = leftHeight / left.ndv,
+  rightNumRows = rightHeight
+)
+  } else if (right.lo >= left.lo && right.hi >= left.hi) {
+// Case3: the left bin is "smaller" than the right bin
+//  left.loright.lo left.hi  
right.hi
+// 
+--+++--->
+val leftRatio = (left.hi - right.lo) / (left.hi - left.lo)
+val rightRatio = (left.hi - right.lo) / (right.hi - right.lo)
+if (leftRatio == 0) {
--- End diff --

it's more understandable to write `if (right.lo == left.hi)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r156389357
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -114,4 +115,183 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns overlapped ranges between two histograms, in the given value 
range [newMin, newMax].
+   */
+  def getOverlappedRanges(
+  leftHistogram: Histogram,
+  rightHistogram: Histogram,
+  newMin: Double,
+  newMax: Double): Seq[OverlappedRange] = {
+val overlappedRanges = new ArrayBuffer[OverlappedRange]()
+// Only bins whose range intersect [newMin, newMax] have join 
possibility.
+val leftBins = leftHistogram.bins
+  .filter(b => b.lo <= newMax && b.hi >= newMin)
+val rightBins = rightHistogram.bins
+  .filter(b => b.lo <= newMax && b.hi >= newMin)
+
+leftBins.foreach { lb =>
+  rightBins.foreach { rb =>
+val (left, leftHeight) = trimBin(lb, leftHistogram.height, newMin, 
newMax)
+val (right, rightHeight) = trimBin(rb, rightHistogram.height, 
newMin, newMax)
+// Only collect overlapped ranges.
+if (left.lo <= right.hi && left.hi >= right.lo) {
+  // Collect overlapped ranges.
+  val range = if (left.lo == left.hi) {
+// Case1: the left bin has only one value
+OverlappedRange(
+  lo = left.lo,
+  hi = left.lo,
+  leftNdv = 1,
+  rightNdv = 1,
+  leftNumRows = leftHeight,
+  rightNumRows = rightHeight / right.ndv
+)
+  } else if (right.lo == right.hi) {
+// Case2: the right bin has only one value
+OverlappedRange(
+  lo = right.lo,
+  hi = right.lo,
+  leftNdv = 1,
+  rightNdv = 1,
+  leftNumRows = leftHeight / left.ndv,
+  rightNumRows = rightHeight
+)
+  } else if (right.lo >= left.lo && right.hi >= left.hi) {
+// Case3: the left bin is "smaller" than the right bin
+//  left.loright.lo left.hi  
right.hi
+// 
+--+++--->
+val leftRatio = (left.hi - right.lo) / (left.hi - left.lo)
+val rightRatio = (left.hi - right.lo) / (right.hi - right.lo)
+if (leftRatio == 0) {
+  // The overlapped range has only one value.
+  OverlappedRange(
+lo = right.lo,
+hi = right.lo,
+leftNdv = 1,
+rightNdv = 1,
+leftNumRows = leftHeight / left.ndv,
+rightNumRows = rightHeight / right.ndv
+  )
+} else {
+  OverlappedRange(
+lo = right.lo,
+hi = left.hi,
+leftNdv = left.ndv * leftRatio,
+rightNdv = right.ndv * rightRatio,
+leftNumRows = leftHeight * leftRatio,
+rightNumRows = rightHeight * rightRatio
+  )
+}
+  } else if (right.lo <= left.lo && right.hi <= left.hi) {
+// Case4: the left bin is "larger" than the right bin
+//  right.lo   left.lo  right.hi 
left.hi
+// 
+--+++--->
+val leftRatio = (right.hi - left.lo) / (left.hi - left.lo)
+val rightRatio = (right.hi - left.lo) / (right.hi - right.lo)
+if (leftRatio == 0) {
+  // The overlapped range has only one value.
+  OverlappedRange(
+lo = right.hi,
+hi = right.hi,
+leftNdv = 1,
+rightNdv = 1,
+leftNumRows = leftHeight / left.ndv,
+rightNumRows = rightHeight / right.ndv
+  )
+} else {
+  OverlappedRange(
+lo = left.lo,
+hi = right.hi,
+leftNdv = left.ndv * leftRatio,
+rightNdv = right.ndv * rightRatio,
+leftNumRows = leftHeight * leftRatio,
+rightNumRows = rightHeight * rightRatio
+  )
+}
+  } else if (right.lo >= left.lo && right.hi <= left.hi) {
+// Case5: the left bin contains the right bin
+//  left.loright.lo 

[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r156388807
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -114,4 +115,183 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns overlapped ranges between two histograms, in the given value 
range [newMin, newMax].
+   */
+  def getOverlappedRanges(
+  leftHistogram: Histogram,
+  rightHistogram: Histogram,
+  newMin: Double,
+  newMax: Double): Seq[OverlappedRange] = {
--- End diff --

`max`/`min` is also fine


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r156388437
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -114,4 +115,183 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns overlapped ranges between two histograms, in the given value 
range [newMin, newMax].
+   */
+  def getOverlappedRanges(
+  leftHistogram: Histogram,
+  rightHistogram: Histogram,
+  newMin: Double,
+  newMax: Double): Seq[OverlappedRange] = {
+val overlappedRanges = new ArrayBuffer[OverlappedRange]()
+// Only bins whose range intersect [newMin, newMax] have join 
possibility.
+val leftBins = leftHistogram.bins
+  .filter(b => b.lo <= newMax && b.hi >= newMin)
+val rightBins = rightHistogram.bins
+  .filter(b => b.lo <= newMax && b.hi >= newMin)
+
+leftBins.foreach { lb =>
+  rightBins.foreach { rb =>
--- End diff --

nit:
```
for {
  leftBin <- leftBins
  rightBin <- rightBins
} yield {
  ...
  OverlappedRange ...
}
```
Then we can omit `val overlappedRanges = new ArrayBuffer[OverlappedRange]()`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r156387841
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -114,4 +115,183 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns overlapped ranges between two histograms, in the given value 
range [newMin, newMax].
+   */
+  def getOverlappedRanges(
+  leftHistogram: Histogram,
+  rightHistogram: Histogram,
+  newMin: Double,
+  newMax: Double): Seq[OverlappedRange] = {
--- End diff --

how about `upperBound`/`lowerBound`? It's hard to understand the meaning of 
`new` by looking at this method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-08 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r155910267
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
 ---
@@ -67,6 +68,205 @@ class JoinEstimationSuite extends 
StatsEstimationTestBase {
 rowCount = 2,
 attributeStats = AttributeMap(Seq("key-1-2", 
"key-2-3").map(nameToColInfo)))
 
+  private def estimateByHistogram(
+  histogram1: Histogram,
+  histogram2: Histogram,
+  expectedMin: Double,
+  expectedMax: Double,
+  expectedNdv: Long,
+  expectedRows: Long): Unit = {
+val col1 = attr("key1")
+val col2 = attr("key2")
+val c1 = generateJoinChild(col1, histogram1, expectedMin, expectedMax)
+val c2 = generateJoinChild(col2, histogram2, expectedMin, expectedMax)
+
+val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
+val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
+val expectedStatsAfterJoin = Statistics(
+  sizeInBytes = expectedRows * (8 + 2 * 4),
+  rowCount = Some(expectedRows),
+  attributeStats = AttributeMap(Seq(
+col1 -> c1.stats.attributeStats(col1).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax)),
+col2 -> c2.stats.attributeStats(col2).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax
+)
+
+// Join order should not affect estimation result.
+Seq(c1JoinC2, c2JoinC1).foreach { join =>
+  assert(join.stats == expectedStatsAfterJoin)
+}
+  }
+
+  private def generateJoinChild(
+  col: Attribute,
+  histogram: Histogram,
+  expectedMin: Double,
+  expectedMax: Double): LogicalPlan = {
+val colStat = inferColumnStat(histogram)
+val t = StatsTestPlan(
+  outputList = Seq(col),
+  rowCount = (histogram.height * histogram.bins.length).toLong,
+  attributeStats = AttributeMap(Seq(col -> colStat)))
+
+val filterCondition = new ArrayBuffer[Expression]()
+if (expectedMin > colStat.min.get.toString.toDouble) {
+  filterCondition += GreaterThanOrEqual(col, Literal(expectedMin))
+}
+if (expectedMax < colStat.max.get.toString.toDouble) {
+  filterCondition += LessThanOrEqual(col, Literal(expectedMax))
+}
+if (filterCondition.isEmpty) t else 
Filter(filterCondition.reduce(And), t)
+  }
+
+  private def inferColumnStat(histogram: Histogram): ColumnStat = {
+var ndv = 0L
+for (i <- histogram.bins.indices) {
+  val bin = histogram.bins(i)
+  if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
+ndv += bin.ndv
+  }
+}
+ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
+  max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, 
maxLen = 4,
+  histogram = Some(histogram))
+  }
+
+  test("equi-height histograms: a bin is contained by another one") {
+val histogram1 = Histogram(height = 300, Array(
+  HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 
60, ndv = 30)))
+val histogram2 = Histogram(height = 100, Array(
+  HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 
100, ndv = 40)))
+// test bin trimming
+val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 10, max 
= 60)
+assert(t1 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h1 == 80)
+val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 10, max 
= 60)
+assert(t2 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20)
+
+val expectedRanges = Seq(
+  OverlappedRange(10, 30, math.min(10, 40*1/2), math.max(10, 40*1/2), 
300, 80*1/2),
+  OverlappedRange(30, 50, math.min(30*2/3, 40*1/2), math.max(30*2/3, 
40*1/2), 300*2/3, 80*1/2),
+  OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 
300*1/3, 20)
+)
+assert(expectedRanges.equals(
+  getOverlappedRanges(histogram1, histogram2, newMin = 10D, newMax = 
60D)))
+
+estimateByHistogram(
+  histogram1 = histogram1,
+  histogram2 = histogram2,
+  expectedMin = 10D,
+  expectedMax = 60D,
+  // 10 + 20 + 8
+  expectedNdv = 38L,
+  // 300*40/20 + 200*40/20 + 100*20/10
+  expectedRows = 1200L)
+  }
+
+  test("equi-height histograms: a bin has only one value") {
+val histogram1 = Histogram(height = 300, Array(
+  HistogramBin(lo = 30, hi = 30, ndv = 1), HistogramBin(lo = 30, hi = 
60, ndv = 30)))
+val histogram2 = Histogram(height = 100, Array(
+

[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-12-08 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r155910232
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
 ---
@@ -67,6 +68,205 @@ class JoinEstimationSuite extends 
StatsEstimationTestBase {
 rowCount = 2,
 attributeStats = AttributeMap(Seq("key-1-2", 
"key-2-3").map(nameToColInfo)))
 
+  private def estimateByHistogram(
+  histogram1: Histogram,
+  histogram2: Histogram,
+  expectedMin: Double,
+  expectedMax: Double,
+  expectedNdv: Long,
+  expectedRows: Long): Unit = {
+val col1 = attr("key1")
+val col2 = attr("key2")
+val c1 = generateJoinChild(col1, histogram1, expectedMin, expectedMax)
+val c2 = generateJoinChild(col2, histogram2, expectedMin, expectedMax)
+
+val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
+val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
+val expectedStatsAfterJoin = Statistics(
+  sizeInBytes = expectedRows * (8 + 2 * 4),
+  rowCount = Some(expectedRows),
+  attributeStats = AttributeMap(Seq(
+col1 -> c1.stats.attributeStats(col1).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax)),
+col2 -> c2.stats.attributeStats(col2).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax
+)
+
+// Join order should not affect estimation result.
+Seq(c1JoinC2, c2JoinC1).foreach { join =>
+  assert(join.stats == expectedStatsAfterJoin)
+}
+  }
+
+  private def generateJoinChild(
+  col: Attribute,
+  histogram: Histogram,
+  expectedMin: Double,
+  expectedMax: Double): LogicalPlan = {
+val colStat = inferColumnStat(histogram)
+val t = StatsTestPlan(
+  outputList = Seq(col),
+  rowCount = (histogram.height * histogram.bins.length).toLong,
+  attributeStats = AttributeMap(Seq(col -> colStat)))
+
+val filterCondition = new ArrayBuffer[Expression]()
+if (expectedMin > colStat.min.get.toString.toDouble) {
+  filterCondition += GreaterThanOrEqual(col, Literal(expectedMin))
+}
+if (expectedMax < colStat.max.get.toString.toDouble) {
+  filterCondition += LessThanOrEqual(col, Literal(expectedMax))
+}
+if (filterCondition.isEmpty) t else 
Filter(filterCondition.reduce(And), t)
+  }
+
+  private def inferColumnStat(histogram: Histogram): ColumnStat = {
+var ndv = 0L
+for (i <- histogram.bins.indices) {
+  val bin = histogram.bins(i)
+  if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
+ndv += bin.ndv
+  }
+}
+ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
+  max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, 
maxLen = 4,
+  histogram = Some(histogram))
+  }
+
+  test("equi-height histograms: a bin is contained by another one") {
+val histogram1 = Histogram(height = 300, Array(
+  HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 
60, ndv = 30)))
+val histogram2 = Histogram(height = 100, Array(
+  HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 
100, ndv = 40)))
+// test bin trimming
+val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 10, max 
= 60)
+assert(t1 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h1 == 80)
+val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 10, max 
= 60)
+assert(t2 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20)
+
+val expectedRanges = Seq(
+  OverlappedRange(10, 30, math.min(10, 40*1/2), math.max(10, 40*1/2), 
300, 80*1/2),
+  OverlappedRange(30, 50, math.min(30*2/3, 40*1/2), math.max(30*2/3, 
40*1/2), 300*2/3, 80*1/2),
+  OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 
300*1/3, 20)
+)
+assert(expectedRanges.equals(
+  getOverlappedRanges(histogram1, histogram2, newMin = 10D, newMax = 
60D)))
+
+estimateByHistogram(
+  histogram1 = histogram1,
+  histogram2 = histogram2,
+  expectedMin = 10D,
+  expectedMax = 60D,
+  // 10 + 20 + 8
+  expectedNdv = 38L,
+  // 300*40/20 + 200*40/20 + 100*20/10
+  expectedRows = 1200L)
+  }
+
+  test("equi-height histograms: a bin has only one value") {
+val histogram1 = Histogram(height = 300, Array(
+  HistogramBin(lo = 30, hi = 30, ndv = 1), HistogramBin(lo = 30, hi = 
60, ndv = 30)))
+val histogram2 = Histogram(height = 100, Array(
+

[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-11-28 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r153665547
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
 ---
@@ -67,6 +68,205 @@ class JoinEstimationSuite extends 
StatsEstimationTestBase {
 rowCount = 2,
 attributeStats = AttributeMap(Seq("key-1-2", 
"key-2-3").map(nameToColInfo)))
 
+  private def estimateByHistogram(
+  histogram1: Histogram,
+  histogram2: Histogram,
+  expectedMin: Double,
+  expectedMax: Double,
+  expectedNdv: Long,
+  expectedRows: Long): Unit = {
+val col1 = attr("key1")
+val col2 = attr("key2")
+val c1 = generateJoinChild(col1, histogram1, expectedMin, expectedMax)
+val c2 = generateJoinChild(col2, histogram2, expectedMin, expectedMax)
+
+val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
+val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
+val expectedStatsAfterJoin = Statistics(
+  sizeInBytes = expectedRows * (8 + 2 * 4),
+  rowCount = Some(expectedRows),
+  attributeStats = AttributeMap(Seq(
+col1 -> c1.stats.attributeStats(col1).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax)),
+col2 -> c2.stats.attributeStats(col2).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax
+)
+
+// Join order should not affect estimation result.
+Seq(c1JoinC2, c2JoinC1).foreach { join =>
+  assert(join.stats == expectedStatsAfterJoin)
+}
+  }
+
+  private def generateJoinChild(
+  col: Attribute,
+  histogram: Histogram,
+  expectedMin: Double,
+  expectedMax: Double): LogicalPlan = {
+val colStat = inferColumnStat(histogram)
+val t = StatsTestPlan(
+  outputList = Seq(col),
+  rowCount = (histogram.height * histogram.bins.length).toLong,
+  attributeStats = AttributeMap(Seq(col -> colStat)))
+
+val filterCondition = new ArrayBuffer[Expression]()
+if (expectedMin > colStat.min.get.toString.toDouble) {
+  filterCondition += GreaterThanOrEqual(col, Literal(expectedMin))
+}
+if (expectedMax < colStat.max.get.toString.toDouble) {
+  filterCondition += LessThanOrEqual(col, Literal(expectedMax))
+}
+if (filterCondition.isEmpty) t else 
Filter(filterCondition.reduce(And), t)
+  }
+
+  private def inferColumnStat(histogram: Histogram): ColumnStat = {
+var ndv = 0L
+for (i <- histogram.bins.indices) {
+  val bin = histogram.bins(i)
+  if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
+ndv += bin.ndv
+  }
+}
+ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
+  max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, 
maxLen = 4,
+  histogram = Some(histogram))
+  }
+
+  test("equi-height histograms: a bin is contained by another one") {
+val histogram1 = Histogram(height = 300, Array(
+  HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 
60, ndv = 30)))
+val histogram2 = Histogram(height = 100, Array(
+  HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 
100, ndv = 40)))
+// test bin trimming
+val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 10, max 
= 60)
+assert(t1 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h1 == 80)
+val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 10, max 
= 60)
+assert(t2 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20)
+
+val expectedRanges = Seq(
+  OverlappedRange(10, 30, math.min(10, 40*1/2), math.max(10, 40*1/2), 
300, 80*1/2),
+  OverlappedRange(30, 50, math.min(30*2/3, 40*1/2), math.max(30*2/3, 
40*1/2), 300*2/3, 80*1/2),
+  OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 
300*1/3, 20)
+)
+assert(expectedRanges.equals(
+  getOverlappedRanges(histogram1, histogram2, newMin = 10D, newMax = 
60D)))
+
+estimateByHistogram(
+  histogram1 = histogram1,
+  histogram2 = histogram2,
+  expectedMin = 10D,
+  expectedMax = 60D,
+  // 10 + 20 + 8
+  expectedNdv = 38L,
+  // 300*40/20 + 200*40/20 + 100*20/10
+  expectedRows = 1200L)
+  }
+
+  test("equi-height histograms: a bin has only one value") {
+val histogram1 = Histogram(height = 300, Array(
+  HistogramBin(lo = 30, hi = 30, ndv = 1), HistogramBin(lo = 30, hi = 
60, ndv = 30)))
+val histogram2 = Histogram(height = 100, Array(
+   

[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-11-28 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r153665092
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
 ---
@@ -67,6 +68,205 @@ class JoinEstimationSuite extends 
StatsEstimationTestBase {
 rowCount = 2,
 attributeStats = AttributeMap(Seq("key-1-2", 
"key-2-3").map(nameToColInfo)))
 
+  private def estimateByHistogram(
+  histogram1: Histogram,
+  histogram2: Histogram,
+  expectedMin: Double,
+  expectedMax: Double,
+  expectedNdv: Long,
+  expectedRows: Long): Unit = {
+val col1 = attr("key1")
+val col2 = attr("key2")
+val c1 = generateJoinChild(col1, histogram1, expectedMin, expectedMax)
+val c2 = generateJoinChild(col2, histogram2, expectedMin, expectedMax)
+
+val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
+val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
+val expectedStatsAfterJoin = Statistics(
+  sizeInBytes = expectedRows * (8 + 2 * 4),
+  rowCount = Some(expectedRows),
+  attributeStats = AttributeMap(Seq(
+col1 -> c1.stats.attributeStats(col1).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax)),
+col2 -> c2.stats.attributeStats(col2).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax
+)
+
+// Join order should not affect estimation result.
+Seq(c1JoinC2, c2JoinC1).foreach { join =>
+  assert(join.stats == expectedStatsAfterJoin)
+}
+  }
+
+  private def generateJoinChild(
+  col: Attribute,
+  histogram: Histogram,
+  expectedMin: Double,
+  expectedMax: Double): LogicalPlan = {
+val colStat = inferColumnStat(histogram)
+val t = StatsTestPlan(
+  outputList = Seq(col),
+  rowCount = (histogram.height * histogram.bins.length).toLong,
+  attributeStats = AttributeMap(Seq(col -> colStat)))
+
+val filterCondition = new ArrayBuffer[Expression]()
+if (expectedMin > colStat.min.get.toString.toDouble) {
+  filterCondition += GreaterThanOrEqual(col, Literal(expectedMin))
+}
+if (expectedMax < colStat.max.get.toString.toDouble) {
+  filterCondition += LessThanOrEqual(col, Literal(expectedMax))
+}
+if (filterCondition.isEmpty) t else 
Filter(filterCondition.reduce(And), t)
+  }
+
+  private def inferColumnStat(histogram: Histogram): ColumnStat = {
+var ndv = 0L
+for (i <- histogram.bins.indices) {
+  val bin = histogram.bins(i)
+  if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
+ndv += bin.ndv
+  }
+}
+ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
+  max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, 
maxLen = 4,
+  histogram = Some(histogram))
+  }
+
+  test("equi-height histograms: a bin is contained by another one") {
+val histogram1 = Histogram(height = 300, Array(
+  HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 
60, ndv = 30)))
+val histogram2 = Histogram(height = 100, Array(
+  HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 
100, ndv = 40)))
+// test bin trimming
+val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 10, max 
= 60)
+assert(t1 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h1 == 80)
+val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 10, max 
= 60)
+assert(t2 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20)
+
+val expectedRanges = Seq(
+  OverlappedRange(10, 30, math.min(10, 40*1/2), math.max(10, 40*1/2), 
300, 80*1/2),
+  OverlappedRange(30, 50, math.min(30*2/3, 40*1/2), math.max(30*2/3, 
40*1/2), 300*2/3, 80*1/2),
+  OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 
300*1/3, 20)
+)
+assert(expectedRanges.equals(
+  getOverlappedRanges(histogram1, histogram2, newMin = 10D, newMax = 
60D)))
+
+estimateByHistogram(
+  histogram1 = histogram1,
+  histogram2 = histogram2,
+  expectedMin = 10D,
+  expectedMax = 60D,
+  // 10 + 20 + 8
+  expectedNdv = 38L,
+  // 300*40/20 + 200*40/20 + 100*20/10
+  expectedRows = 1200L)
+  }
+
+  test("equi-height histograms: a bin has only one value") {
+val histogram1 = Histogram(height = 300, Array(
+  HistogramBin(lo = 30, hi = 30, ndv = 1), HistogramBin(lo = 30, hi = 
60, ndv = 30)))
+val histogram2 = Histogram(height = 100, Array(
+