[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19594 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157699245 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala --- @@ -191,8 +191,19 @@ case class JoinEstimation(join: Join) extends Logging { val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType) if (ValueInterval.isIntersected(lInterval, rInterval)) { val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType) -val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, newMax) -keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat) +val (card, joinStat) = (leftKeyStat.histogram, rightKeyStat.histogram) match { + case (Some(l: Histogram), Some(r: Histogram)) => +computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, newMax) + case _ => +computeByNdv(leftKey, rightKey, newMin, newMax) +} +keyStatsAfterJoin += ( + // Histograms are propagated as unchanged. During future estimation, they should be + // truncated by the updated max/min. In this way, only pointers of the histograms are + // propagated and thus reduce memory consumption. + leftKey -> joinStat.copy(histogram = leftKeyStat.histogram), + rightKey -> joinStat.copy(histogram = rightKeyStat.histogram) --- End diff -- I put it here because `computeByEquiHeightHistogram` returns a single stats, here we keep the histogram for leftKey and rightKey respectively. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157698793 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala --- @@ -225,6 +236,43 @@ case class JoinEstimation(join: Join) extends Logging { (ceil(card), newStats) } + /** Compute join cardinality using equi-height histograms. */ + private def computeByEquiHeightHistogram( + leftKey: AttributeReference, + rightKey: AttributeReference, + leftHistogram: Histogram, + rightHistogram: Histogram, + newMin: Option[Any], + newMax: Option[Any]): (BigInt, ColumnStat) = { +val overlappedRanges = getOverlappedRanges( + leftHistogram = leftHistogram, + rightHistogram = rightHistogram, + // Only numeric values have equi-height histograms. + lowerBound = newMin.get.toString.toDouble, + upperBound = newMax.get.toString.toDouble) + +var card: BigDecimal = 0 +var totalNdv: Double = 0 +for (i <- overlappedRanges.indices) { + val range = overlappedRanges(i) + if (i == 0 || range.hi != overlappedRanges(i - 1).hi) { +// If range.hi == overlappedRanges(i - 1).hi, that means the current range has only one +// value, and this value is already counted in the previous range. So there is no need to +// count it in this range. +totalNdv += math.min(range.leftNdv, range.rightNdv) + } + // Apply the formula in this overlapped range. + card += range.leftNumRows * range.rightNumRows / math.max(range.leftNdv, range.rightNdv) +} + +val leftKeyStat = leftStats.attributeStats(leftKey) +val rightKeyStat = rightStats.attributeStats(rightKey) +val newMaxLen = math.min(leftKeyStat.maxLen, rightKeyStat.maxLen) +val newAvgLen = (leftKeyStat.avgLen + rightKeyStat.avgLen) / 2 --- End diff -- how do we use left/right numRows to calculate this? Ideally avgLen is calculated by total length of keys / numRowsAfterJoin. For string type, we don't the exact length of the matched keys (we don't support string histogram yet), for numeric types, their avgLen should be the same. So the equation is a fair approximation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157696227 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala --- @@ -225,6 +236,43 @@ case class JoinEstimation(join: Join) extends Logging { (ceil(card), newStats) } + /** Compute join cardinality using equi-height histograms. */ + private def computeByEquiHeightHistogram( + leftKey: AttributeReference, + rightKey: AttributeReference, + leftHistogram: Histogram, + rightHistogram: Histogram, + newMin: Option[Any], + newMax: Option[Any]): (BigInt, ColumnStat) = { +val overlappedRanges = getOverlappedRanges( + leftHistogram = leftHistogram, + rightHistogram = rightHistogram, + // Only numeric values have equi-height histograms. + lowerBound = newMin.get.toString.toDouble, + upperBound = newMax.get.toString.toDouble) --- End diff -- that's because we need to update the column stats' min and max at the end of the method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157528163 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala --- @@ -67,6 +67,222 @@ class JoinEstimationSuite extends StatsEstimationTestBase { rowCount = 2, attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo))) + private def estimateByHistogram( + leftHistogram: Histogram, + rightHistogram: Histogram, + expectedMin: Double, + expectedMax: Double, + expectedNdv: Long, + expectedRows: Long): Unit = { +val col1 = attr("key1") +val col2 = attr("key2") +val c1 = generateJoinChild(col1, leftHistogram, expectedMin, expectedMax) +val c2 = generateJoinChild(col2, rightHistogram, expectedMin, expectedMax) + +val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2))) +val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1))) +val expectedStatsAfterJoin = Statistics( + sizeInBytes = expectedRows * (8 + 2 * 4), + rowCount = Some(expectedRows), + attributeStats = AttributeMap(Seq( +col1 -> c1.stats.attributeStats(col1).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)), +col2 -> c2.stats.attributeStats(col2).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax +) + +// Join order should not affect estimation result. +Seq(c1JoinC2, c2JoinC1).foreach { join => + assert(join.stats == expectedStatsAfterJoin) +} + } + + private def generateJoinChild( + col: Attribute, + histogram: Histogram, + expectedMin: Double, + expectedMax: Double): LogicalPlan = { +val colStat = inferColumnStat(histogram) +StatsTestPlan( + outputList = Seq(col), + rowCount = (histogram.height * histogram.bins.length).toLong, + attributeStats = AttributeMap(Seq(col -> colStat))) + } + + /** Column statistics should be consistent with histograms in tests. */ + private def inferColumnStat(histogram: Histogram): ColumnStat = { +var ndv = 0L +for (i <- histogram.bins.indices) { + val bin = histogram.bins(i) + if (i == 0 || bin.hi != histogram.bins(i - 1).hi) { +ndv += bin.ndv + } +} +ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo), + max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4, + histogram = Some(histogram)) + } + + test("equi-height histograms: a bin is contained by another one") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30))) +val histogram2 = Histogram(height = 100, Array( + HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) +// test bin trimming +val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 10, upperBound = 60) +assert(t0 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h0 == 80) +val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 10, upperBound = 60) +assert(t1 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20) + +val expectedRanges = Seq( + // histogram1.bins(0) overlaps t0 + OverlappedRange(10, 30, 10, 40*1/2, 300, 80*1/2), + // histogram1.bins(1) overlaps t0 + OverlappedRange(30, 50, 30*2/3, 40*1/2, 300*2/3, 80*1/2), + // histogram1.bins(1) overlaps t1 + OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20) +) +assert(expectedRanges.equals( + getOverlappedRanges(histogram1, histogram2, lowerBound = 10D, upperBound = 60D))) + +estimateByHistogram( + leftHistogram = histogram1, + rightHistogram = histogram2, + expectedMin = 10D, + expectedMax = 60D, + // 10 + 20 + 8 + expectedNdv = 38L, + // 300*40/20 + 200*40/20 + 100*20/10 + expectedRows = 1200L) + } + + test("equi-height histograms: a bin has only one value after trimming") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 50, hi = 60, ndv = 10), HistogramBin(lo = 60, hi = 75, ndv = 3))) +val histogram2 = Histogram(height = 100, Array( + HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) +// test bin trimming +val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 50, upperBound = 75) +assert(t0 == HistogramBin(lo = 50, hi =
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157526127 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala --- @@ -67,6 +67,222 @@ class JoinEstimationSuite extends StatsEstimationTestBase { rowCount = 2, attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo))) + private def estimateByHistogram( + leftHistogram: Histogram, + rightHistogram: Histogram, + expectedMin: Double, + expectedMax: Double, + expectedNdv: Long, + expectedRows: Long): Unit = { +val col1 = attr("key1") +val col2 = attr("key2") +val c1 = generateJoinChild(col1, leftHistogram, expectedMin, expectedMax) +val c2 = generateJoinChild(col2, rightHistogram, expectedMin, expectedMax) + +val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2))) +val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1))) +val expectedStatsAfterJoin = Statistics( + sizeInBytes = expectedRows * (8 + 2 * 4), + rowCount = Some(expectedRows), + attributeStats = AttributeMap(Seq( +col1 -> c1.stats.attributeStats(col1).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)), +col2 -> c2.stats.attributeStats(col2).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax +) + +// Join order should not affect estimation result. +Seq(c1JoinC2, c2JoinC1).foreach { join => + assert(join.stats == expectedStatsAfterJoin) +} + } + + private def generateJoinChild( + col: Attribute, + histogram: Histogram, + expectedMin: Double, + expectedMax: Double): LogicalPlan = { +val colStat = inferColumnStat(histogram) +StatsTestPlan( + outputList = Seq(col), + rowCount = (histogram.height * histogram.bins.length).toLong, + attributeStats = AttributeMap(Seq(col -> colStat))) + } + + /** Column statistics should be consistent with histograms in tests. */ + private def inferColumnStat(histogram: Histogram): ColumnStat = { +var ndv = 0L +for (i <- histogram.bins.indices) { + val bin = histogram.bins(i) + if (i == 0 || bin.hi != histogram.bins(i - 1).hi) { +ndv += bin.ndv + } +} +ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo), + max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4, + histogram = Some(histogram)) + } + + test("equi-height histograms: a bin is contained by another one") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30))) +val histogram2 = Histogram(height = 100, Array( + HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) +// test bin trimming +val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 10, upperBound = 60) +assert(t0 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h0 == 80) +val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 10, upperBound = 60) +assert(t1 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20) + +val expectedRanges = Seq( + // histogram1.bins(0) overlaps t0 + OverlappedRange(10, 30, 10, 40*1/2, 300, 80*1/2), + // histogram1.bins(1) overlaps t0 + OverlappedRange(30, 50, 30*2/3, 40*1/2, 300*2/3, 80*1/2), + // histogram1.bins(1) overlaps t1 + OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20) +) +assert(expectedRanges.equals( + getOverlappedRanges(histogram1, histogram2, lowerBound = 10D, upperBound = 60D))) + +estimateByHistogram( + leftHistogram = histogram1, + rightHistogram = histogram2, + expectedMin = 10D, + expectedMax = 60D, + // 10 + 20 + 8 + expectedNdv = 38L, + // 300*40/20 + 200*40/20 + 100*20/10 + expectedRows = 1200L) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157525895 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala --- @@ -67,6 +67,222 @@ class JoinEstimationSuite extends StatsEstimationTestBase { rowCount = 2, attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo))) + private def estimateByHistogram( + leftHistogram: Histogram, + rightHistogram: Histogram, + expectedMin: Double, + expectedMax: Double, + expectedNdv: Long, + expectedRows: Long): Unit = { +val col1 = attr("key1") +val col2 = attr("key2") +val c1 = generateJoinChild(col1, leftHistogram, expectedMin, expectedMax) +val c2 = generateJoinChild(col2, rightHistogram, expectedMin, expectedMax) + +val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2))) +val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1))) +val expectedStatsAfterJoin = Statistics( + sizeInBytes = expectedRows * (8 + 2 * 4), + rowCount = Some(expectedRows), + attributeStats = AttributeMap(Seq( +col1 -> c1.stats.attributeStats(col1).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)), +col2 -> c2.stats.attributeStats(col2).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax +) + +// Join order should not affect estimation result. +Seq(c1JoinC2, c2JoinC1).foreach { join => + assert(join.stats == expectedStatsAfterJoin) +} + } + + private def generateJoinChild( + col: Attribute, + histogram: Histogram, + expectedMin: Double, + expectedMax: Double): LogicalPlan = { +val colStat = inferColumnStat(histogram) +StatsTestPlan( + outputList = Seq(col), + rowCount = (histogram.height * histogram.bins.length).toLong, + attributeStats = AttributeMap(Seq(col -> colStat))) + } + + /** Column statistics should be consistent with histograms in tests. */ + private def inferColumnStat(histogram: Histogram): ColumnStat = { +var ndv = 0L +for (i <- histogram.bins.indices) { + val bin = histogram.bins(i) + if (i == 0 || bin.hi != histogram.bins(i - 1).hi) { +ndv += bin.ndv + } +} +ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo), + max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4, + histogram = Some(histogram)) + } + + test("equi-height histograms: a bin is contained by another one") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30))) +val histogram2 = Histogram(height = 100, Array( + HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) +// test bin trimming +val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 10, upperBound = 60) +assert(t0 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h0 == 80) +val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 10, upperBound = 60) +assert(t1 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20) + +val expectedRanges = Seq( + // histogram1.bins(0) overlaps t0 + OverlappedRange(10, 30, 10, 40*1/2, 300, 80*1/2), + // histogram1.bins(1) overlaps t0 + OverlappedRange(30, 50, 30*2/3, 40*1/2, 300*2/3, 80*1/2), + // histogram1.bins(1) overlaps t1 + OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20) +) +assert(expectedRanges.equals( + getOverlappedRanges(histogram1, histogram2, lowerBound = 10D, upperBound = 60D))) + +estimateByHistogram( + leftHistogram = histogram1, + rightHistogram = histogram2, + expectedMin = 10D, + expectedMax = 60D, + // 10 + 20 + 8 + expectedNdv = 38L, --- End diff -- `expectedNdv = 10 + 20 + 8`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157525695 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala --- @@ -67,6 +67,222 @@ class JoinEstimationSuite extends StatsEstimationTestBase { rowCount = 2, attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo))) + private def estimateByHistogram( + leftHistogram: Histogram, + rightHistogram: Histogram, + expectedMin: Double, + expectedMax: Double, + expectedNdv: Long, + expectedRows: Long): Unit = { +val col1 = attr("key1") +val col2 = attr("key2") +val c1 = generateJoinChild(col1, leftHistogram, expectedMin, expectedMax) +val c2 = generateJoinChild(col2, rightHistogram, expectedMin, expectedMax) + +val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2))) +val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1))) +val expectedStatsAfterJoin = Statistics( + sizeInBytes = expectedRows * (8 + 2 * 4), + rowCount = Some(expectedRows), + attributeStats = AttributeMap(Seq( +col1 -> c1.stats.attributeStats(col1).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)), +col2 -> c2.stats.attributeStats(col2).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax +) + +// Join order should not affect estimation result. +Seq(c1JoinC2, c2JoinC1).foreach { join => + assert(join.stats == expectedStatsAfterJoin) +} + } + + private def generateJoinChild( + col: Attribute, + histogram: Histogram, + expectedMin: Double, + expectedMax: Double): LogicalPlan = { +val colStat = inferColumnStat(histogram) +StatsTestPlan( + outputList = Seq(col), + rowCount = (histogram.height * histogram.bins.length).toLong, + attributeStats = AttributeMap(Seq(col -> colStat))) + } + + /** Column statistics should be consistent with histograms in tests. */ + private def inferColumnStat(histogram: Histogram): ColumnStat = { +var ndv = 0L +for (i <- histogram.bins.indices) { + val bin = histogram.bins(i) + if (i == 0 || bin.hi != histogram.bins(i - 1).hi) { +ndv += bin.ndv + } +} +ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo), + max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4, + histogram = Some(histogram)) + } + + test("equi-height histograms: a bin is contained by another one") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30))) +val histogram2 = Histogram(height = 100, Array( + HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) +// test bin trimming +val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 10, upperBound = 60) +assert(t0 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h0 == 80) +val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 10, upperBound = 60) +assert(t1 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20) + +val expectedRanges = Seq( + // histogram1.bins(0) overlaps t0 + OverlappedRange(10, 30, 10, 40*1/2, 300, 80*1/2), + // histogram1.bins(1) overlaps t0 + OverlappedRange(30, 50, 30*2/3, 40*1/2, 300*2/3, 80*1/2), + // histogram1.bins(1) overlaps t1 + OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20) +) +assert(expectedRanges.equals( + getOverlappedRanges(histogram1, histogram2, lowerBound = 10D, upperBound = 60D))) --- End diff -- actually we can just write `10` right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157524477 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala --- @@ -67,6 +67,222 @@ class JoinEstimationSuite extends StatsEstimationTestBase { rowCount = 2, attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo))) + private def estimateByHistogram( + leftHistogram: Histogram, + rightHistogram: Histogram, + expectedMin: Double, + expectedMax: Double, + expectedNdv: Long, + expectedRows: Long): Unit = { +val col1 = attr("key1") +val col2 = attr("key2") +val c1 = generateJoinChild(col1, leftHistogram, expectedMin, expectedMax) +val c2 = generateJoinChild(col2, rightHistogram, expectedMin, expectedMax) + +val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2))) +val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1))) +val expectedStatsAfterJoin = Statistics( + sizeInBytes = expectedRows * (8 + 2 * 4), + rowCount = Some(expectedRows), + attributeStats = AttributeMap(Seq( +col1 -> c1.stats.attributeStats(col1).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)), +col2 -> c2.stats.attributeStats(col2).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax +) + +// Join order should not affect estimation result. +Seq(c1JoinC2, c2JoinC1).foreach { join => + assert(join.stats == expectedStatsAfterJoin) +} + } + + private def generateJoinChild( + col: Attribute, + histogram: Histogram, + expectedMin: Double, + expectedMax: Double): LogicalPlan = { +val colStat = inferColumnStat(histogram) +StatsTestPlan( + outputList = Seq(col), + rowCount = (histogram.height * histogram.bins.length).toLong, + attributeStats = AttributeMap(Seq(col -> colStat))) + } + + /** Column statistics should be consistent with histograms in tests. */ + private def inferColumnStat(histogram: Histogram): ColumnStat = { +var ndv = 0L +for (i <- histogram.bins.indices) { + val bin = histogram.bins(i) + if (i == 0 || bin.hi != histogram.bins(i - 1).hi) { +ndv += bin.ndv + } +} +ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo), + max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4, + histogram = Some(histogram)) + } + + test("equi-height histograms: a bin is contained by another one") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30))) +val histogram2 = Histogram(height = 100, Array( + HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) +// test bin trimming +val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 10, upperBound = 60) +assert(t0 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h0 == 80) +val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 10, upperBound = 60) +assert(t1 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20) + +val expectedRanges = Seq( + // histogram1.bins(0) overlaps t0 + OverlappedRange(10, 30, 10, 40*1/2, 300, 80*1/2), + // histogram1.bins(1) overlaps t0 + OverlappedRange(30, 50, 30*2/3, 40*1/2, 300*2/3, 80*1/2), + // histogram1.bins(1) overlaps t1 + OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20) +) +assert(expectedRanges.equals( + getOverlappedRanges(histogram1, histogram2, lowerBound = 10D, upperBound = 60D))) --- End diff -- 10D looks weird, how about 10.0 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157523857 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala --- @@ -67,6 +67,222 @@ class JoinEstimationSuite extends StatsEstimationTestBase { rowCount = 2, attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo))) + private def estimateByHistogram( + leftHistogram: Histogram, + rightHistogram: Histogram, + expectedMin: Double, + expectedMax: Double, + expectedNdv: Long, + expectedRows: Long): Unit = { +val col1 = attr("key1") +val col2 = attr("key2") +val c1 = generateJoinChild(col1, leftHistogram, expectedMin, expectedMax) +val c2 = generateJoinChild(col2, rightHistogram, expectedMin, expectedMax) + +val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2))) +val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1))) +val expectedStatsAfterJoin = Statistics( + sizeInBytes = expectedRows * (8 + 2 * 4), + rowCount = Some(expectedRows), + attributeStats = AttributeMap(Seq( +col1 -> c1.stats.attributeStats(col1).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)), +col2 -> c2.stats.attributeStats(col2).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax +) + +// Join order should not affect estimation result. +Seq(c1JoinC2, c2JoinC1).foreach { join => + assert(join.stats == expectedStatsAfterJoin) +} + } + + private def generateJoinChild( + col: Attribute, + histogram: Histogram, + expectedMin: Double, + expectedMax: Double): LogicalPlan = { +val colStat = inferColumnStat(histogram) +StatsTestPlan( + outputList = Seq(col), + rowCount = (histogram.height * histogram.bins.length).toLong, + attributeStats = AttributeMap(Seq(col -> colStat))) + } + + /** Column statistics should be consistent with histograms in tests. */ + private def inferColumnStat(histogram: Histogram): ColumnStat = { +var ndv = 0L +for (i <- histogram.bins.indices) { + val bin = histogram.bins(i) + if (i == 0 || bin.hi != histogram.bins(i - 1).hi) { +ndv += bin.ndv + } +} +ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo), + max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4, + histogram = Some(histogram)) + } + + test("equi-height histograms: a bin is contained by another one") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30))) +val histogram2 = Histogram(height = 100, Array( + HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) +// test bin trimming +val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 10, upperBound = 60) +assert(t0 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h0 == 80) +val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 10, upperBound = 60) +assert(t1 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20) + +val expectedRanges = Seq( + // histogram1.bins(0) overlaps t0 + OverlappedRange(10, 30, 10, 40*1/2, 300, 80*1/2), --- End diff -- space between oeprators. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157519924 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala --- @@ -191,8 +191,19 @@ case class JoinEstimation(join: Join) extends Logging { val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType) if (ValueInterval.isIntersected(lInterval, rInterval)) { val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType) -val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, newMax) -keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat) +val (card, joinStat) = (leftKeyStat.histogram, rightKeyStat.histogram) match { + case (Some(l: Histogram), Some(r: Histogram)) => +computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, newMax) + case _ => +computeByNdv(leftKey, rightKey, newMin, newMax) +} +keyStatsAfterJoin += ( + // Histograms are propagated as unchanged. During future estimation, they should be + // truncated by the updated max/min. In this way, only pointers of the histograms are + // propagated and thus reduce memory consumption. + leftKey -> joinStat.copy(histogram = leftKeyStat.histogram), + rightKey -> joinStat.copy(histogram = rightKeyStat.histogram) --- End diff -- i.e. https://github.com/apache/spark/pull/19594/files#diff-6387e7aaeb7d8e0cb1457b9d0fe5cd00R272 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157519831 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala --- @@ -191,8 +191,19 @@ case class JoinEstimation(join: Join) extends Logging { val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType) if (ValueInterval.isIntersected(lInterval, rInterval)) { val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType) -val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, newMax) -keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat) +val (card, joinStat) = (leftKeyStat.histogram, rightKeyStat.histogram) match { + case (Some(l: Histogram), Some(r: Histogram)) => +computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, newMax) + case _ => +computeByNdv(leftKey, rightKey, newMin, newMax) +} +keyStatsAfterJoin += ( + // Histograms are propagated as unchanged. During future estimation, they should be + // truncated by the updated max/min. In this way, only pointers of the histograms are + // propagated and thus reduce memory consumption. + leftKey -> joinStat.copy(histogram = leftKeyStat.histogram), + rightKey -> joinStat.copy(histogram = rightKeyStat.histogram) --- End diff -- shall we do this inside `computeByEquiHeightHistogram`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157519368 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala --- @@ -225,6 +236,43 @@ case class JoinEstimation(join: Join) extends Logging { (ceil(card), newStats) } + /** Compute join cardinality using equi-height histograms. */ + private def computeByEquiHeightHistogram( + leftKey: AttributeReference, + rightKey: AttributeReference, + leftHistogram: Histogram, + rightHistogram: Histogram, + newMin: Option[Any], + newMax: Option[Any]): (BigInt, ColumnStat) = { +val overlappedRanges = getOverlappedRanges( + leftHistogram = leftHistogram, + rightHistogram = rightHistogram, + // Only numeric values have equi-height histograms. + lowerBound = newMin.get.toString.toDouble, + upperBound = newMax.get.toString.toDouble) + +var card: BigDecimal = 0 +var totalNdv: Double = 0 +for (i <- overlappedRanges.indices) { + val range = overlappedRanges(i) + if (i == 0 || range.hi != overlappedRanges(i - 1).hi) { +// If range.hi == overlappedRanges(i - 1).hi, that means the current range has only one +// value, and this value is already counted in the previous range. So there is no need to +// count it in this range. +totalNdv += math.min(range.leftNdv, range.rightNdv) + } + // Apply the formula in this overlapped range. + card += range.leftNumRows * range.rightNumRows / math.max(range.leftNdv, range.rightNdv) +} + +val leftKeyStat = leftStats.attributeStats(leftKey) +val rightKeyStat = rightStats.attributeStats(rightKey) +val newMaxLen = math.min(leftKeyStat.maxLen, rightKeyStat.maxLen) +val newAvgLen = (leftKeyStat.avgLen + rightKeyStat.avgLen) / 2 --- End diff -- shall we count left/right numRows when calculating this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157517554 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala --- @@ -225,6 +236,43 @@ case class JoinEstimation(join: Join) extends Logging { (ceil(card), newStats) } + /** Compute join cardinality using equi-height histograms. */ + private def computeByEquiHeightHistogram( + leftKey: AttributeReference, + rightKey: AttributeReference, + leftHistogram: Histogram, + rightHistogram: Histogram, + newMin: Option[Any], + newMax: Option[Any]): (BigInt, ColumnStat) = { +val overlappedRanges = getOverlappedRanges( + leftHistogram = leftHistogram, + rightHistogram = rightHistogram, + // Only numeric values have equi-height histograms. + lowerBound = newMin.get.toString.toDouble, + upperBound = newMax.get.toString.toDouble) --- End diff -- if we assume the min/max must be defined here, I think the parameter type should be `double` instead of `Option[Any]` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157517120 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala --- @@ -225,6 +236,43 @@ case class JoinEstimation(join: Join) extends Logging { (ceil(card), newStats) } + /** Compute join cardinality using equi-height histograms. */ + private def computeByEquiHeightHistogram( --- End diff -- I think it's ok to only say `Histogram` in method names and explain it's equi-height in comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157514899 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -212,4 +213,186 @@ object EstimationUtils { } } + /** + * Returns overlapped ranges between two histograms, in the given value range + * [lowerBound, upperBound]. + */ + def getOverlappedRanges( +leftHistogram: Histogram, +rightHistogram: Histogram, +lowerBound: Double, +upperBound: Double): Seq[OverlappedRange] = { +val overlappedRanges = new ArrayBuffer[OverlappedRange]() +// Only bins whose range intersect [lowerBound, upperBound] have join possibility. +val leftBins = leftHistogram.bins + .filter(b => b.lo <= upperBound && b.hi >= lowerBound) +val rightBins = rightHistogram.bins + .filter(b => b.lo <= upperBound && b.hi >= lowerBound) + +leftBins.foreach { lb => + rightBins.foreach { rb => +val (left, leftHeight) = trimBin(lb, leftHistogram.height, lowerBound, upperBound) +val (right, rightHeight) = trimBin(rb, rightHistogram.height, lowerBound, upperBound) +// Only collect overlapped ranges. +if (left.lo <= right.hi && left.hi >= right.lo) { + // Collect overlapped ranges. + val range = if (left.lo == left.hi) { +// Case1: the left bin has only one value +OverlappedRange( + lo = left.lo, + hi = left.lo, + leftNdv = 1, + rightNdv = 1, + leftNumRows = leftHeight, + rightNumRows = rightHeight / right.ndv +) + } else if (right.lo == right.hi) { +// Case2: the right bin has only one value +OverlappedRange( + lo = right.lo, + hi = right.lo, + leftNdv = 1, + rightNdv = 1, + leftNumRows = leftHeight / left.ndv, + rightNumRows = rightHeight +) + } else if (right.lo >= left.lo && right.hi >= left.hi) { +// Case3: the left bin is "smaller" than the right bin +// left.loright.lo left.hi right.hi +// +--+++---> +if (left.hi == right.lo) { + // The overlapped range has only one value. + OverlappedRange( +lo = right.lo, +hi = right.lo, +leftNdv = 1, +rightNdv = 1, +leftNumRows = leftHeight / left.ndv, +rightNumRows = rightHeight / right.ndv + ) +} else { + val leftRatio = (left.hi - right.lo) / (left.hi - left.lo) + val rightRatio = (left.hi - right.lo) / (right.hi - right.lo) + OverlappedRange( +lo = right.lo, +hi = left.hi, +leftNdv = left.ndv * leftRatio, +rightNdv = right.ndv * rightRatio, +leftNumRows = leftHeight * leftRatio, +rightNumRows = rightHeight * rightRatio + ) +} + } else if (right.lo <= left.lo && right.hi <= left.hi) { +// Case4: the left bin is "larger" than the right bin +// right.lo left.lo right.hi left.hi +// +--+++---> +if (right.hi == left.lo) { + // The overlapped range has only one value. + OverlappedRange( +lo = right.hi, +hi = right.hi, +leftNdv = 1, +rightNdv = 1, +leftNumRows = leftHeight / left.ndv, +rightNumRows = rightHeight / right.ndv + ) +} else { + val leftRatio = (right.hi - left.lo) / (left.hi - left.lo) + val rightRatio = (right.hi - left.lo) / (right.hi - right.lo) + OverlappedRange( +lo = left.lo, +hi = right.hi, +leftNdv = left.ndv * leftRatio, +rightNdv = right.ndv * rightRatio, +leftNumRows = leftHeight * leftRatio, +rightNumRows = rightHeight * rightRatio + ) +} + } else if (right.lo >= left.lo && right.hi <= left.hi) { +// Case5: the left bin
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157514422 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -212,4 +213,186 @@ object EstimationUtils { } } + /** + * Returns overlapped ranges between two histograms, in the given value range + * [lowerBound, upperBound]. + */ + def getOverlappedRanges( +leftHistogram: Histogram, +rightHistogram: Histogram, +lowerBound: Double, +upperBound: Double): Seq[OverlappedRange] = { +val overlappedRanges = new ArrayBuffer[OverlappedRange]() +// Only bins whose range intersect [lowerBound, upperBound] have join possibility. +val leftBins = leftHistogram.bins + .filter(b => b.lo <= upperBound && b.hi >= lowerBound) +val rightBins = rightHistogram.bins + .filter(b => b.lo <= upperBound && b.hi >= lowerBound) + +leftBins.foreach { lb => + rightBins.foreach { rb => +val (left, leftHeight) = trimBin(lb, leftHistogram.height, lowerBound, upperBound) +val (right, rightHeight) = trimBin(rb, rightHistogram.height, lowerBound, upperBound) +// Only collect overlapped ranges. +if (left.lo <= right.hi && left.hi >= right.lo) { + // Collect overlapped ranges. + val range = if (left.lo == left.hi) { +// Case1: the left bin has only one value +OverlappedRange( + lo = left.lo, + hi = left.lo, + leftNdv = 1, + rightNdv = 1, + leftNumRows = leftHeight, + rightNumRows = rightHeight / right.ndv +) + } else if (right.lo == right.hi) { +// Case2: the right bin has only one value +OverlappedRange( + lo = right.lo, + hi = right.lo, + leftNdv = 1, + rightNdv = 1, + leftNumRows = leftHeight / left.ndv, + rightNumRows = rightHeight +) + } else if (right.lo >= left.lo && right.hi >= left.hi) { +// Case3: the left bin is "smaller" than the right bin +// left.loright.lo left.hi right.hi +// +--+++---> +if (left.hi == right.lo) { --- End diff -- yea this branch is needed, otherwise we will get 0 ratio and lead to wrong result. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157513559 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -212,4 +213,186 @@ object EstimationUtils { } } + /** + * Returns overlapped ranges between two histograms, in the given value range + * [lowerBound, upperBound]. + */ + def getOverlappedRanges( +leftHistogram: Histogram, +rightHistogram: Histogram, +lowerBound: Double, +upperBound: Double): Seq[OverlappedRange] = { +val overlappedRanges = new ArrayBuffer[OverlappedRange]() +// Only bins whose range intersect [lowerBound, upperBound] have join possibility. +val leftBins = leftHistogram.bins + .filter(b => b.lo <= upperBound && b.hi >= lowerBound) +val rightBins = rightHistogram.bins + .filter(b => b.lo <= upperBound && b.hi >= lowerBound) + +leftBins.foreach { lb => + rightBins.foreach { rb => +val (left, leftHeight) = trimBin(lb, leftHistogram.height, lowerBound, upperBound) +val (right, rightHeight) = trimBin(rb, rightHistogram.height, lowerBound, upperBound) +// Only collect overlapped ranges. +if (left.lo <= right.hi && left.hi >= right.lo) { + // Collect overlapped ranges. + val range = if (left.lo == left.hi) { +// Case1: the left bin has only one value +OverlappedRange( + lo = left.lo, + hi = left.lo, + leftNdv = 1, + rightNdv = 1, + leftNumRows = leftHeight, + rightNumRows = rightHeight / right.ndv +) + } else if (right.lo == right.hi) { +// Case2: the right bin has only one value --- End diff -- do we really need case 1 and 2? aren't they covered by branches below? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157513325 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -212,4 +213,186 @@ object EstimationUtils { } } + /** + * Returns overlapped ranges between two histograms, in the given value range + * [lowerBound, upperBound]. + */ + def getOverlappedRanges( +leftHistogram: Histogram, +rightHistogram: Histogram, +lowerBound: Double, +upperBound: Double): Seq[OverlappedRange] = { +val overlappedRanges = new ArrayBuffer[OverlappedRange]() +// Only bins whose range intersect [lowerBound, upperBound] have join possibility. +val leftBins = leftHistogram.bins + .filter(b => b.lo <= upperBound && b.hi >= lowerBound) +val rightBins = rightHistogram.bins + .filter(b => b.lo <= upperBound && b.hi >= lowerBound) + +leftBins.foreach { lb => + rightBins.foreach { rb => +val (left, leftHeight) = trimBin(lb, leftHistogram.height, lowerBound, upperBound) +val (right, rightHeight) = trimBin(rb, rightHistogram.height, lowerBound, upperBound) +// Only collect overlapped ranges. +if (left.lo <= right.hi && left.hi >= right.lo) { + // Collect overlapped ranges. + val range = if (left.lo == left.hi) { +// Case1: the left bin has only one value +OverlappedRange( + lo = left.lo, + hi = left.lo, + leftNdv = 1, + rightNdv = 1, + leftNumRows = leftHeight, + rightNumRows = rightHeight / right.ndv +) + } else if (right.lo == right.hi) { +// Case2: the right bin has only one value +OverlappedRange( + lo = right.lo, + hi = right.lo, + leftNdv = 1, + rightNdv = 1, + leftNumRows = leftHeight / left.ndv, + rightNumRows = rightHeight +) + } else if (right.lo >= left.lo && right.hi >= left.hi) { +// Case3: the left bin is "smaller" than the right bin +// left.loright.lo left.hi right.hi +// +--+++---> +if (left.hi == right.lo) { + // The overlapped range has only one value. + OverlappedRange( +lo = right.lo, +hi = right.lo, +leftNdv = 1, +rightNdv = 1, +leftNumRows = leftHeight / left.ndv, +rightNumRows = rightHeight / right.ndv + ) +} else { + val leftRatio = (left.hi - right.lo) / (left.hi - left.lo) + val rightRatio = (left.hi - right.lo) / (right.hi - right.lo) + OverlappedRange( +lo = right.lo, +hi = left.hi, +leftNdv = left.ndv * leftRatio, +rightNdv = right.ndv * rightRatio, +leftNumRows = leftHeight * leftRatio, +rightNumRows = rightHeight * rightRatio + ) +} + } else if (right.lo <= left.lo && right.hi <= left.hi) { +// Case4: the left bin is "larger" than the right bin +// right.lo left.lo right.hi left.hi +// +--+++---> +if (right.hi == left.lo) { + // The overlapped range has only one value. + OverlappedRange( +lo = right.hi, +hi = right.hi, +leftNdv = 1, +rightNdv = 1, +leftNumRows = leftHeight / left.ndv, +rightNumRows = rightHeight / right.ndv + ) +} else { + val leftRatio = (right.hi - left.lo) / (left.hi - left.lo) + val rightRatio = (right.hi - left.lo) / (right.hi - right.lo) + OverlappedRange( +lo = left.lo, +hi = right.hi, +leftNdv = left.ndv * leftRatio, +rightNdv = right.ndv * rightRatio, +leftNumRows = leftHeight * leftRatio, +rightNumRows = rightHeight * rightRatio + ) +} + } else if (right.lo >= left.lo && right.hi <= left.hi) { +// Case5: the left bin
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157503949 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -212,4 +213,186 @@ object EstimationUtils { } } + /** + * Returns overlapped ranges between two histograms, in the given value range + * [lowerBound, upperBound]. + */ + def getOverlappedRanges( +leftHistogram: Histogram, +rightHistogram: Histogram, +lowerBound: Double, +upperBound: Double): Seq[OverlappedRange] = { +val overlappedRanges = new ArrayBuffer[OverlappedRange]() +// Only bins whose range intersect [lowerBound, upperBound] have join possibility. +val leftBins = leftHistogram.bins + .filter(b => b.lo <= upperBound && b.hi >= lowerBound) +val rightBins = rightHistogram.bins + .filter(b => b.lo <= upperBound && b.hi >= lowerBound) + +leftBins.foreach { lb => + rightBins.foreach { rb => +val (left, leftHeight) = trimBin(lb, leftHistogram.height, lowerBound, upperBound) +val (right, rightHeight) = trimBin(rb, rightHistogram.height, lowerBound, upperBound) +// Only collect overlapped ranges. +if (left.lo <= right.hi && left.hi >= right.lo) { + // Collect overlapped ranges. + val range = if (left.lo == left.hi) { +// Case1: the left bin has only one value +OverlappedRange( + lo = left.lo, + hi = left.lo, + leftNdv = 1, + rightNdv = 1, + leftNumRows = leftHeight, + rightNumRows = rightHeight / right.ndv +) + } else if (right.lo == right.hi) { +// Case2: the right bin has only one value +OverlappedRange( + lo = right.lo, + hi = right.lo, + leftNdv = 1, + rightNdv = 1, + leftNumRows = leftHeight / left.ndv, + rightNumRows = rightHeight +) + } else if (right.lo >= left.lo && right.hi >= left.hi) { +// Case3: the left bin is "smaller" than the right bin +// left.loright.lo left.hi right.hi +// +--+++---> +if (left.hi == right.lo) { + // The overlapped range has only one value. + OverlappedRange( +lo = right.lo, +hi = right.lo, +leftNdv = 1, +rightNdv = 1, +leftNumRows = leftHeight / left.ndv, +rightNumRows = rightHeight / right.ndv + ) +} else { + val leftRatio = (left.hi - right.lo) / (left.hi - left.lo) + val rightRatio = (left.hi - right.lo) / (right.hi - right.lo) + OverlappedRange( +lo = right.lo, +hi = left.hi, +leftNdv = left.ndv * leftRatio, +rightNdv = right.ndv * rightRatio, +leftNumRows = leftHeight * leftRatio, +rightNumRows = rightHeight * rightRatio + ) +} + } else if (right.lo <= left.lo && right.hi <= left.hi) { +// Case4: the left bin is "larger" than the right bin +// right.lo left.lo right.hi left.hi +// +--+++---> +if (right.hi == left.lo) { + // The overlapped range has only one value. + OverlappedRange( +lo = right.hi, +hi = right.hi, +leftNdv = 1, +rightNdv = 1, +leftNumRows = leftHeight / left.ndv, +rightNumRows = rightHeight / right.ndv + ) +} else { + val leftRatio = (right.hi - left.lo) / (left.hi - left.lo) + val rightRatio = (right.hi - left.lo) / (right.hi - right.lo) + OverlappedRange( +lo = left.lo, +hi = right.hi, +leftNdv = left.ndv * leftRatio, +rightNdv = right.ndv * rightRatio, +leftNumRows = leftHeight * leftRatio, +rightNumRows = rightHeight * rightRatio + ) +} + } else if (right.lo >= left.lo && right.hi <= left.hi) { +// Case5: the left bin
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157331840 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +115,183 @@ object EstimationUtils { } } + /** + * Returns overlapped ranges between two histograms, in the given value range [newMin, newMax]. + */ + def getOverlappedRanges( + leftHistogram: Histogram, + rightHistogram: Histogram, + newMin: Double, + newMax: Double): Seq[OverlappedRange] = { +val overlappedRanges = new ArrayBuffer[OverlappedRange]() +// Only bins whose range intersect [newMin, newMax] have join possibility. +val leftBins = leftHistogram.bins + .filter(b => b.lo <= newMax && b.hi >= newMin) +val rightBins = rightHistogram.bins + .filter(b => b.lo <= newMax && b.hi >= newMin) + +leftBins.foreach { lb => + rightBins.foreach { rb => --- End diff -- We only collect `OverlappedRange` when [left part and right part intersect](https://github.com/apache/spark/pull/19594/files#diff-56eed9f23127c954d9add0f6c5c93820R237), and the decision is based on some computation, it's not very convenient to use it as guards. So it seems `yield` form is not very suitable for this case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157331711 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala --- @@ -191,8 +191,16 @@ case class JoinEstimation(join: Join) extends Logging { val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType) if (ValueInterval.isIntersected(lInterval, rInterval)) { val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType) -val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, newMax) -keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat) +val (card, joinStat) = (leftKeyStat.histogram, rightKeyStat.histogram) match { + case (Some(l: Histogram), Some(r: Histogram)) => +computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, newMax) + case _ => +computeByNdv(leftKey, rightKey, newMin, newMax) +} +keyStatsAfterJoin += ( + leftKey -> joinStat.copy(histogram = leftKeyStat.histogram), + rightKey -> joinStat.copy(histogram = rightKeyStat.histogram) --- End diff -- ah right, we can keep it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r156861642 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala --- @@ -191,8 +191,16 @@ case class JoinEstimation(join: Join) extends Logging { val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType) if (ValueInterval.isIntersected(lInterval, rInterval)) { val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType) -val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, newMax) -keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat) +val (card, joinStat) = (leftKeyStat.histogram, rightKeyStat.histogram) match { + case (Some(l: Histogram), Some(r: Histogram)) => +computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, newMax) + case _ => +computeByNdv(leftKey, rightKey, newMin, newMax) +} +keyStatsAfterJoin += ( + leftKey -> joinStat.copy(histogram = leftKeyStat.histogram), + rightKey -> joinStat.copy(histogram = rightKeyStat.histogram) --- End diff -- Actually keeping it unchanged is more memory efficient. We just pass around pointers, but updating the histogram means creating a new one. Let's keep it, and add some comments to explain it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r156847785 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala --- @@ -191,8 +191,16 @@ case class JoinEstimation(join: Join) extends Logging { val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType) if (ValueInterval.isIntersected(lInterval, rInterval)) { val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType) -val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, newMax) -keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat) +val (card, joinStat) = (leftKeyStat.histogram, rightKeyStat.histogram) match { + case (Some(l: Histogram), Some(r: Histogram)) => +computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, newMax) + case _ => +computeByNdv(leftKey, rightKey, newMin, newMax) +} +keyStatsAfterJoin += ( + leftKey -> joinStat.copy(histogram = leftKeyStat.histogram), + rightKey -> joinStat.copy(histogram = rightKeyStat.histogram) --- End diff -- Currently we don't update histogram since min/max can help us to know which bins are valid. It doesn't affect correctness. But updating histograms helps to reduce memory usage for histogram propagation. We can do this in both filter and join estimation in following PRs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r156847046 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +115,183 @@ object EstimationUtils { } } + /** + * Returns overlapped ranges between two histograms, in the given value range [newMin, newMax]. + */ + def getOverlappedRanges( + leftHistogram: Histogram, + rightHistogram: Histogram, + newMin: Double, + newMax: Double): Seq[OverlappedRange] = { +val overlappedRanges = new ArrayBuffer[OverlappedRange]() +// Only bins whose range intersect [newMin, newMax] have join possibility. +val leftBins = leftHistogram.bins + .filter(b => b.lo <= newMax && b.hi >= newMin) +val rightBins = rightHistogram.bins + .filter(b => b.lo <= newMax && b.hi >= newMin) + +leftBins.foreach { lb => + rightBins.foreach { rb => +val (left, leftHeight) = trimBin(lb, leftHistogram.height, newMin, newMax) +val (right, rightHeight) = trimBin(rb, rightHistogram.height, newMin, newMax) +// Only collect overlapped ranges. +if (left.lo <= right.hi && left.hi >= right.lo) { + // Collect overlapped ranges. + val range = if (left.lo == left.hi) { +// Case1: the left bin has only one value +OverlappedRange( + lo = left.lo, + hi = left.lo, + leftNdv = 1, + rightNdv = 1, + leftNumRows = leftHeight, + rightNumRows = rightHeight / right.ndv +) + } else if (right.lo == right.hi) { +// Case2: the right bin has only one value +OverlappedRange( + lo = right.lo, + hi = right.lo, + leftNdv = 1, + rightNdv = 1, + leftNumRows = leftHeight / left.ndv, + rightNumRows = rightHeight +) + } else if (right.lo >= left.lo && right.hi >= left.hi) { +// Case3: the left bin is "smaller" than the right bin +// left.loright.lo left.hi right.hi +// +--+++---> +val leftRatio = (left.hi - right.lo) / (left.hi - left.lo) +val rightRatio = (left.hi - right.lo) / (right.hi - right.lo) +if (leftRatio == 0) { + // The overlapped range has only one value. + OverlappedRange( +lo = right.lo, +hi = right.lo, +leftNdv = 1, +rightNdv = 1, +leftNumRows = leftHeight / left.ndv, +rightNumRows = rightHeight / right.ndv + ) +} else { + OverlappedRange( +lo = right.lo, +hi = left.hi, +leftNdv = left.ndv * leftRatio, +rightNdv = right.ndv * rightRatio, +leftNumRows = leftHeight * leftRatio, +rightNumRows = rightHeight * rightRatio + ) +} + } else if (right.lo <= left.lo && right.hi <= left.hi) { +// Case4: the left bin is "larger" than the right bin +// right.lo left.lo right.hi left.hi +// +--+++---> +val leftRatio = (right.hi - left.lo) / (left.hi - left.lo) +val rightRatio = (right.hi - left.lo) / (right.hi - right.lo) +if (leftRatio == 0) { + // The overlapped range has only one value. + OverlappedRange( +lo = right.hi, +hi = right.hi, +leftNdv = 1, +rightNdv = 1, +leftNumRows = leftHeight / left.ndv, +rightNumRows = rightHeight / right.ndv + ) +} else { + OverlappedRange( +lo = left.lo, +hi = right.hi, +leftNdv = left.ndv * leftRatio, +rightNdv = right.ndv * rightRatio, +leftNumRows = leftHeight * leftRatio, +rightNumRows = rightHeight * rightRatio + ) +} + } else if (right.lo >= left.lo && right.hi <= left.hi) { +// Case5: the left bin contains the right bin +// left.loright.lo
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r156846872 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +115,183 @@ object EstimationUtils { } } + /** + * Returns overlapped ranges between two histograms, in the given value range [newMin, newMax]. + */ + def getOverlappedRanges( + leftHistogram: Histogram, + rightHistogram: Histogram, + newMin: Double, + newMax: Double): Seq[OverlappedRange] = { --- End diff -- yea I think `upperBound/lowerBound` is better. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r156393413 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala --- @@ -191,8 +191,16 @@ case class JoinEstimation(join: Join) extends Logging { val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType) if (ValueInterval.isIntersected(lInterval, rInterval)) { val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType) -val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, newMax) -keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat) +val (card, joinStat) = (leftKeyStat.histogram, rightKeyStat.histogram) match { + case (Some(l: Histogram), Some(r: Histogram)) => +computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, newMax) + case _ => +computeByNdv(leftKey, rightKey, newMin, newMax) +} +keyStatsAfterJoin += ( + leftKey -> joinStat.copy(histogram = leftKeyStat.histogram), + rightKey -> joinStat.copy(histogram = rightKeyStat.histogram) --- End diff -- should we update the histogram after join? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r156392538 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +115,183 @@ object EstimationUtils { } } + /** + * Returns overlapped ranges between two histograms, in the given value range [newMin, newMax]. + */ + def getOverlappedRanges( + leftHistogram: Histogram, + rightHistogram: Histogram, + newMin: Double, + newMax: Double): Seq[OverlappedRange] = { +val overlappedRanges = new ArrayBuffer[OverlappedRange]() +// Only bins whose range intersect [newMin, newMax] have join possibility. +val leftBins = leftHistogram.bins + .filter(b => b.lo <= newMax && b.hi >= newMin) +val rightBins = rightHistogram.bins + .filter(b => b.lo <= newMax && b.hi >= newMin) + +leftBins.foreach { lb => + rightBins.foreach { rb => +val (left, leftHeight) = trimBin(lb, leftHistogram.height, newMin, newMax) +val (right, rightHeight) = trimBin(rb, rightHistogram.height, newMin, newMax) +// Only collect overlapped ranges. +if (left.lo <= right.hi && left.hi >= right.lo) { + // Collect overlapped ranges. + val range = if (left.lo == left.hi) { +// Case1: the left bin has only one value +OverlappedRange( + lo = left.lo, + hi = left.lo, + leftNdv = 1, + rightNdv = 1, + leftNumRows = leftHeight, + rightNumRows = rightHeight / right.ndv +) + } else if (right.lo == right.hi) { +// Case2: the right bin has only one value +OverlappedRange( + lo = right.lo, + hi = right.lo, + leftNdv = 1, + rightNdv = 1, + leftNumRows = leftHeight / left.ndv, + rightNumRows = rightHeight +) + } else if (right.lo >= left.lo && right.hi >= left.hi) { +// Case3: the left bin is "smaller" than the right bin +// left.loright.lo left.hi right.hi +// +--+++---> +val leftRatio = (left.hi - right.lo) / (left.hi - left.lo) +val rightRatio = (left.hi - right.lo) / (right.hi - right.lo) +if (leftRatio == 0) { --- End diff -- it's more understandable to write `if (right.lo == left.hi)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r156389357 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +115,183 @@ object EstimationUtils { } } + /** + * Returns overlapped ranges between two histograms, in the given value range [newMin, newMax]. + */ + def getOverlappedRanges( + leftHistogram: Histogram, + rightHistogram: Histogram, + newMin: Double, + newMax: Double): Seq[OverlappedRange] = { +val overlappedRanges = new ArrayBuffer[OverlappedRange]() +// Only bins whose range intersect [newMin, newMax] have join possibility. +val leftBins = leftHistogram.bins + .filter(b => b.lo <= newMax && b.hi >= newMin) +val rightBins = rightHistogram.bins + .filter(b => b.lo <= newMax && b.hi >= newMin) + +leftBins.foreach { lb => + rightBins.foreach { rb => +val (left, leftHeight) = trimBin(lb, leftHistogram.height, newMin, newMax) +val (right, rightHeight) = trimBin(rb, rightHistogram.height, newMin, newMax) +// Only collect overlapped ranges. +if (left.lo <= right.hi && left.hi >= right.lo) { + // Collect overlapped ranges. + val range = if (left.lo == left.hi) { +// Case1: the left bin has only one value +OverlappedRange( + lo = left.lo, + hi = left.lo, + leftNdv = 1, + rightNdv = 1, + leftNumRows = leftHeight, + rightNumRows = rightHeight / right.ndv +) + } else if (right.lo == right.hi) { +// Case2: the right bin has only one value +OverlappedRange( + lo = right.lo, + hi = right.lo, + leftNdv = 1, + rightNdv = 1, + leftNumRows = leftHeight / left.ndv, + rightNumRows = rightHeight +) + } else if (right.lo >= left.lo && right.hi >= left.hi) { +// Case3: the left bin is "smaller" than the right bin +// left.loright.lo left.hi right.hi +// +--+++---> +val leftRatio = (left.hi - right.lo) / (left.hi - left.lo) +val rightRatio = (left.hi - right.lo) / (right.hi - right.lo) +if (leftRatio == 0) { + // The overlapped range has only one value. + OverlappedRange( +lo = right.lo, +hi = right.lo, +leftNdv = 1, +rightNdv = 1, +leftNumRows = leftHeight / left.ndv, +rightNumRows = rightHeight / right.ndv + ) +} else { + OverlappedRange( +lo = right.lo, +hi = left.hi, +leftNdv = left.ndv * leftRatio, +rightNdv = right.ndv * rightRatio, +leftNumRows = leftHeight * leftRatio, +rightNumRows = rightHeight * rightRatio + ) +} + } else if (right.lo <= left.lo && right.hi <= left.hi) { +// Case4: the left bin is "larger" than the right bin +// right.lo left.lo right.hi left.hi +// +--+++---> +val leftRatio = (right.hi - left.lo) / (left.hi - left.lo) +val rightRatio = (right.hi - left.lo) / (right.hi - right.lo) +if (leftRatio == 0) { + // The overlapped range has only one value. + OverlappedRange( +lo = right.hi, +hi = right.hi, +leftNdv = 1, +rightNdv = 1, +leftNumRows = leftHeight / left.ndv, +rightNumRows = rightHeight / right.ndv + ) +} else { + OverlappedRange( +lo = left.lo, +hi = right.hi, +leftNdv = left.ndv * leftRatio, +rightNdv = right.ndv * rightRatio, +leftNumRows = leftHeight * leftRatio, +rightNumRows = rightHeight * rightRatio + ) +} + } else if (right.lo >= left.lo && right.hi <= left.hi) { +// Case5: the left bin contains the right bin +// left.loright.lo
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r156388807 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +115,183 @@ object EstimationUtils { } } + /** + * Returns overlapped ranges between two histograms, in the given value range [newMin, newMax]. + */ + def getOverlappedRanges( + leftHistogram: Histogram, + rightHistogram: Histogram, + newMin: Double, + newMax: Double): Seq[OverlappedRange] = { --- End diff -- `max`/`min` is also fine --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r156388437 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +115,183 @@ object EstimationUtils { } } + /** + * Returns overlapped ranges between two histograms, in the given value range [newMin, newMax]. + */ + def getOverlappedRanges( + leftHistogram: Histogram, + rightHistogram: Histogram, + newMin: Double, + newMax: Double): Seq[OverlappedRange] = { +val overlappedRanges = new ArrayBuffer[OverlappedRange]() +// Only bins whose range intersect [newMin, newMax] have join possibility. +val leftBins = leftHistogram.bins + .filter(b => b.lo <= newMax && b.hi >= newMin) +val rightBins = rightHistogram.bins + .filter(b => b.lo <= newMax && b.hi >= newMin) + +leftBins.foreach { lb => + rightBins.foreach { rb => --- End diff -- nit: ``` for { leftBin <- leftBins rightBin <- rightBins } yield { ... OverlappedRange ... } ``` Then we can omit `val overlappedRanges = new ArrayBuffer[OverlappedRange]()` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r156387841 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +115,183 @@ object EstimationUtils { } } + /** + * Returns overlapped ranges between two histograms, in the given value range [newMin, newMax]. + */ + def getOverlappedRanges( + leftHistogram: Histogram, + rightHistogram: Histogram, + newMin: Double, + newMax: Double): Seq[OverlappedRange] = { --- End diff -- how about `upperBound`/`lowerBound`? It's hard to understand the meaning of `new` by looking at this method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r155910267 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala --- @@ -67,6 +68,205 @@ class JoinEstimationSuite extends StatsEstimationTestBase { rowCount = 2, attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo))) + private def estimateByHistogram( + histogram1: Histogram, + histogram2: Histogram, + expectedMin: Double, + expectedMax: Double, + expectedNdv: Long, + expectedRows: Long): Unit = { +val col1 = attr("key1") +val col2 = attr("key2") +val c1 = generateJoinChild(col1, histogram1, expectedMin, expectedMax) +val c2 = generateJoinChild(col2, histogram2, expectedMin, expectedMax) + +val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2))) +val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1))) +val expectedStatsAfterJoin = Statistics( + sizeInBytes = expectedRows * (8 + 2 * 4), + rowCount = Some(expectedRows), + attributeStats = AttributeMap(Seq( +col1 -> c1.stats.attributeStats(col1).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)), +col2 -> c2.stats.attributeStats(col2).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax +) + +// Join order should not affect estimation result. +Seq(c1JoinC2, c2JoinC1).foreach { join => + assert(join.stats == expectedStatsAfterJoin) +} + } + + private def generateJoinChild( + col: Attribute, + histogram: Histogram, + expectedMin: Double, + expectedMax: Double): LogicalPlan = { +val colStat = inferColumnStat(histogram) +val t = StatsTestPlan( + outputList = Seq(col), + rowCount = (histogram.height * histogram.bins.length).toLong, + attributeStats = AttributeMap(Seq(col -> colStat))) + +val filterCondition = new ArrayBuffer[Expression]() +if (expectedMin > colStat.min.get.toString.toDouble) { + filterCondition += GreaterThanOrEqual(col, Literal(expectedMin)) +} +if (expectedMax < colStat.max.get.toString.toDouble) { + filterCondition += LessThanOrEqual(col, Literal(expectedMax)) +} +if (filterCondition.isEmpty) t else Filter(filterCondition.reduce(And), t) + } + + private def inferColumnStat(histogram: Histogram): ColumnStat = { +var ndv = 0L +for (i <- histogram.bins.indices) { + val bin = histogram.bins(i) + if (i == 0 || bin.hi != histogram.bins(i - 1).hi) { +ndv += bin.ndv + } +} +ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo), + max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4, + histogram = Some(histogram)) + } + + test("equi-height histograms: a bin is contained by another one") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30))) +val histogram2 = Histogram(height = 100, Array( + HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) +// test bin trimming +val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 10, max = 60) +assert(t1 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h1 == 80) +val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 10, max = 60) +assert(t2 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20) + +val expectedRanges = Seq( + OverlappedRange(10, 30, math.min(10, 40*1/2), math.max(10, 40*1/2), 300, 80*1/2), + OverlappedRange(30, 50, math.min(30*2/3, 40*1/2), math.max(30*2/3, 40*1/2), 300*2/3, 80*1/2), + OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 300*1/3, 20) +) +assert(expectedRanges.equals( + getOverlappedRanges(histogram1, histogram2, newMin = 10D, newMax = 60D))) + +estimateByHistogram( + histogram1 = histogram1, + histogram2 = histogram2, + expectedMin = 10D, + expectedMax = 60D, + // 10 + 20 + 8 + expectedNdv = 38L, + // 300*40/20 + 200*40/20 + 100*20/10 + expectedRows = 1200L) + } + + test("equi-height histograms: a bin has only one value") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 30, hi = 30, ndv = 1), HistogramBin(lo = 30, hi = 60, ndv = 30))) +val histogram2 = Histogram(height = 100, Array( +
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r155910232 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala --- @@ -67,6 +68,205 @@ class JoinEstimationSuite extends StatsEstimationTestBase { rowCount = 2, attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo))) + private def estimateByHistogram( + histogram1: Histogram, + histogram2: Histogram, + expectedMin: Double, + expectedMax: Double, + expectedNdv: Long, + expectedRows: Long): Unit = { +val col1 = attr("key1") +val col2 = attr("key2") +val c1 = generateJoinChild(col1, histogram1, expectedMin, expectedMax) +val c2 = generateJoinChild(col2, histogram2, expectedMin, expectedMax) + +val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2))) +val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1))) +val expectedStatsAfterJoin = Statistics( + sizeInBytes = expectedRows * (8 + 2 * 4), + rowCount = Some(expectedRows), + attributeStats = AttributeMap(Seq( +col1 -> c1.stats.attributeStats(col1).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)), +col2 -> c2.stats.attributeStats(col2).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax +) + +// Join order should not affect estimation result. +Seq(c1JoinC2, c2JoinC1).foreach { join => + assert(join.stats == expectedStatsAfterJoin) +} + } + + private def generateJoinChild( + col: Attribute, + histogram: Histogram, + expectedMin: Double, + expectedMax: Double): LogicalPlan = { +val colStat = inferColumnStat(histogram) +val t = StatsTestPlan( + outputList = Seq(col), + rowCount = (histogram.height * histogram.bins.length).toLong, + attributeStats = AttributeMap(Seq(col -> colStat))) + +val filterCondition = new ArrayBuffer[Expression]() +if (expectedMin > colStat.min.get.toString.toDouble) { + filterCondition += GreaterThanOrEqual(col, Literal(expectedMin)) +} +if (expectedMax < colStat.max.get.toString.toDouble) { + filterCondition += LessThanOrEqual(col, Literal(expectedMax)) +} +if (filterCondition.isEmpty) t else Filter(filterCondition.reduce(And), t) + } + + private def inferColumnStat(histogram: Histogram): ColumnStat = { +var ndv = 0L +for (i <- histogram.bins.indices) { + val bin = histogram.bins(i) + if (i == 0 || bin.hi != histogram.bins(i - 1).hi) { +ndv += bin.ndv + } +} +ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo), + max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4, + histogram = Some(histogram)) + } + + test("equi-height histograms: a bin is contained by another one") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30))) +val histogram2 = Histogram(height = 100, Array( + HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) +// test bin trimming +val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 10, max = 60) +assert(t1 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h1 == 80) +val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 10, max = 60) +assert(t2 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20) + +val expectedRanges = Seq( + OverlappedRange(10, 30, math.min(10, 40*1/2), math.max(10, 40*1/2), 300, 80*1/2), + OverlappedRange(30, 50, math.min(30*2/3, 40*1/2), math.max(30*2/3, 40*1/2), 300*2/3, 80*1/2), + OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 300*1/3, 20) +) +assert(expectedRanges.equals( + getOverlappedRanges(histogram1, histogram2, newMin = 10D, newMax = 60D))) + +estimateByHistogram( + histogram1 = histogram1, + histogram2 = histogram2, + expectedMin = 10D, + expectedMax = 60D, + // 10 + 20 + 8 + expectedNdv = 38L, + // 300*40/20 + 200*40/20 + 100*20/10 + expectedRows = 1200L) + } + + test("equi-height histograms: a bin has only one value") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 30, hi = 30, ndv = 1), HistogramBin(lo = 30, hi = 60, ndv = 30))) +val histogram2 = Histogram(height = 100, Array( +
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r153665547 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala --- @@ -67,6 +68,205 @@ class JoinEstimationSuite extends StatsEstimationTestBase { rowCount = 2, attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo))) + private def estimateByHistogram( + histogram1: Histogram, + histogram2: Histogram, + expectedMin: Double, + expectedMax: Double, + expectedNdv: Long, + expectedRows: Long): Unit = { +val col1 = attr("key1") +val col2 = attr("key2") +val c1 = generateJoinChild(col1, histogram1, expectedMin, expectedMax) +val c2 = generateJoinChild(col2, histogram2, expectedMin, expectedMax) + +val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2))) +val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1))) +val expectedStatsAfterJoin = Statistics( + sizeInBytes = expectedRows * (8 + 2 * 4), + rowCount = Some(expectedRows), + attributeStats = AttributeMap(Seq( +col1 -> c1.stats.attributeStats(col1).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)), +col2 -> c2.stats.attributeStats(col2).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax +) + +// Join order should not affect estimation result. +Seq(c1JoinC2, c2JoinC1).foreach { join => + assert(join.stats == expectedStatsAfterJoin) +} + } + + private def generateJoinChild( + col: Attribute, + histogram: Histogram, + expectedMin: Double, + expectedMax: Double): LogicalPlan = { +val colStat = inferColumnStat(histogram) +val t = StatsTestPlan( + outputList = Seq(col), + rowCount = (histogram.height * histogram.bins.length).toLong, + attributeStats = AttributeMap(Seq(col -> colStat))) + +val filterCondition = new ArrayBuffer[Expression]() +if (expectedMin > colStat.min.get.toString.toDouble) { + filterCondition += GreaterThanOrEqual(col, Literal(expectedMin)) +} +if (expectedMax < colStat.max.get.toString.toDouble) { + filterCondition += LessThanOrEqual(col, Literal(expectedMax)) +} +if (filterCondition.isEmpty) t else Filter(filterCondition.reduce(And), t) + } + + private def inferColumnStat(histogram: Histogram): ColumnStat = { +var ndv = 0L +for (i <- histogram.bins.indices) { + val bin = histogram.bins(i) + if (i == 0 || bin.hi != histogram.bins(i - 1).hi) { +ndv += bin.ndv + } +} +ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo), + max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4, + histogram = Some(histogram)) + } + + test("equi-height histograms: a bin is contained by another one") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30))) +val histogram2 = Histogram(height = 100, Array( + HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) +// test bin trimming +val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 10, max = 60) +assert(t1 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h1 == 80) +val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 10, max = 60) +assert(t2 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20) + +val expectedRanges = Seq( + OverlappedRange(10, 30, math.min(10, 40*1/2), math.max(10, 40*1/2), 300, 80*1/2), + OverlappedRange(30, 50, math.min(30*2/3, 40*1/2), math.max(30*2/3, 40*1/2), 300*2/3, 80*1/2), + OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 300*1/3, 20) +) +assert(expectedRanges.equals( + getOverlappedRanges(histogram1, histogram2, newMin = 10D, newMax = 60D))) + +estimateByHistogram( + histogram1 = histogram1, + histogram2 = histogram2, + expectedMin = 10D, + expectedMax = 60D, + // 10 + 20 + 8 + expectedNdv = 38L, + // 300*40/20 + 200*40/20 + 100*20/10 + expectedRows = 1200L) + } + + test("equi-height histograms: a bin has only one value") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 30, hi = 30, ndv = 1), HistogramBin(lo = 30, hi = 60, ndv = 30))) +val histogram2 = Histogram(height = 100, Array( +
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r153665092 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala --- @@ -67,6 +68,205 @@ class JoinEstimationSuite extends StatsEstimationTestBase { rowCount = 2, attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo))) + private def estimateByHistogram( + histogram1: Histogram, + histogram2: Histogram, + expectedMin: Double, + expectedMax: Double, + expectedNdv: Long, + expectedRows: Long): Unit = { +val col1 = attr("key1") +val col2 = attr("key2") +val c1 = generateJoinChild(col1, histogram1, expectedMin, expectedMax) +val c2 = generateJoinChild(col2, histogram2, expectedMin, expectedMax) + +val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2))) +val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1))) +val expectedStatsAfterJoin = Statistics( + sizeInBytes = expectedRows * (8 + 2 * 4), + rowCount = Some(expectedRows), + attributeStats = AttributeMap(Seq( +col1 -> c1.stats.attributeStats(col1).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)), +col2 -> c2.stats.attributeStats(col2).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax +) + +// Join order should not affect estimation result. +Seq(c1JoinC2, c2JoinC1).foreach { join => + assert(join.stats == expectedStatsAfterJoin) +} + } + + private def generateJoinChild( + col: Attribute, + histogram: Histogram, + expectedMin: Double, + expectedMax: Double): LogicalPlan = { +val colStat = inferColumnStat(histogram) +val t = StatsTestPlan( + outputList = Seq(col), + rowCount = (histogram.height * histogram.bins.length).toLong, + attributeStats = AttributeMap(Seq(col -> colStat))) + +val filterCondition = new ArrayBuffer[Expression]() +if (expectedMin > colStat.min.get.toString.toDouble) { + filterCondition += GreaterThanOrEqual(col, Literal(expectedMin)) +} +if (expectedMax < colStat.max.get.toString.toDouble) { + filterCondition += LessThanOrEqual(col, Literal(expectedMax)) +} +if (filterCondition.isEmpty) t else Filter(filterCondition.reduce(And), t) + } + + private def inferColumnStat(histogram: Histogram): ColumnStat = { +var ndv = 0L +for (i <- histogram.bins.indices) { + val bin = histogram.bins(i) + if (i == 0 || bin.hi != histogram.bins(i - 1).hi) { +ndv += bin.ndv + } +} +ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo), + max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4, + histogram = Some(histogram)) + } + + test("equi-height histograms: a bin is contained by another one") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30))) +val histogram2 = Histogram(height = 100, Array( + HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) +// test bin trimming +val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 10, max = 60) +assert(t1 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h1 == 80) +val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 10, max = 60) +assert(t2 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20) + +val expectedRanges = Seq( + OverlappedRange(10, 30, math.min(10, 40*1/2), math.max(10, 40*1/2), 300, 80*1/2), + OverlappedRange(30, 50, math.min(30*2/3, 40*1/2), math.max(30*2/3, 40*1/2), 300*2/3, 80*1/2), + OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 300*1/3, 20) +) +assert(expectedRanges.equals( + getOverlappedRanges(histogram1, histogram2, newMin = 10D, newMax = 60D))) + +estimateByHistogram( + histogram1 = histogram1, + histogram2 = histogram2, + expectedMin = 10D, + expectedMax = 60D, + // 10 + 20 + 8 + expectedNdv = 38L, + // 300*40/20 + 200*40/20 + 100*20/10 + expectedRows = 1200L) + } + + test("equi-height histograms: a bin has only one value") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 30, hi = 30, ndv = 1), HistogramBin(lo = 30, hi = 60, ndv = 30))) +val histogram2 = Histogram(height = 100, Array( +