[GitHub] spark pull request #19952: [SPARK-21322][SQL][followup] support histogram in...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19952#discussion_r156555044 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -147,65 +139,76 @@ object EstimationUtils { } /** - * Returns a percentage of a bin holding values for column value in the range of - * [lowerValue, higherValue] - * - * @param higherValue a given upper bound value of a specified column value range - * @param lowerValue a given lower bound value of a specified column value range - * @param bin a single histogram bin - * @return the percentage of a single bin holding values in [lowerValue, higherValue]. + * Returns the possibility of the given histogram bin holding values within the given range + * [lowerBound, upperBound]. */ - private def getOccupation( - higherValue: Double, - lowerValue: Double, + private def binHoldingRangePossibility( + upperBound: Double, + lowerBound: Double, bin: HistogramBin): Double = { -assert(bin.lo <= lowerValue && lowerValue <= higherValue && higherValue <= bin.hi) +assert(bin.lo <= lowerBound && lowerBound <= upperBound && upperBound <= bin.hi) if (bin.hi == bin.lo) { // the entire bin is covered in the range 1.0 -} else if (higherValue == lowerValue) { +} else if (upperBound == lowerBound) { // set percentage to 1/NDV 1.0 / bin.ndv.toDouble } else { // Use proration since the range falls inside this bin. - math.min((higherValue - lowerValue) / (bin.hi - bin.lo), 1.0) + math.min((upperBound - lowerBound) / (bin.hi - bin.lo), 1.0) } } /** - * Returns the number of bins for column values in [lowerValue, higherValue]. - * The column value distribution is saved in an equi-height histogram. The return values is a - * double value is because we may return a portion of a bin. For example, a predicate - * "column = 8" may return the number of bins 0.2 if the holding bin has 5 distinct values. + * Returns the number of histogram bins holding values within the given range + * [lowerBound, upperBound]. + * + * Note that the returned value is double type, because the range boundaries usually occupy a + * portion of a bin. An extrema case is [value, value] which is generated by equal predicate --- End diff -- typo: extreme --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19783: [SPARK-21322][SQL] support histogram in filter cardinali...
Github user ron8hu commented on the issue: https://github.com/apache/spark/pull/19783 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19783: [SPARK-21322][SQL] support histogram in filter cardinali...
Github user ron8hu commented on the issue: https://github.com/apache/spark/pull/19783 For the past 2 test builds #84725 and #84732, I checked the test result on the web. Actually there were no failures. See https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84725/testReport/. It appears that there is a bug in the jenkins test system. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19783: [SPARK-21322][SQL] support histogram in filter cardinali...
Github user ron8hu commented on the issue: https://github.com/apache/spark/pull/19783 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r155964396 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +114,171 @@ object EstimationUtils { } } + /** + * Returns the number of the first bin into which a column values falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param bins an array of bins for a given numeric equi-height histogram + * @return the number of the first bin into which a column values falls. + */ + def findFirstBinForValue(value: Double, bins: Array[HistogramBin]): Int = { +var i = 0 +while ((i < bins.length) && (value > bins(i).hi)) { + i +=1 +} +i + } + + /** + * Returns the number of the last bin into which a column values falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param bins an array of bins for a given numeric equi-height histogram + * @return the number of the last bin into which a column values falls. + */ + def findLastBinForValue(value: Double, bins: Array[HistogramBin]): Int = { +var i = bins.length - 1 +while ((i >= 0) && (value < bins(i).lo)) { + i -=1 +} +i + } + + /** + * Returns a percentage of a bin holding values for column value in the range of + * [lowerValue, higherValue] + * + * @param binId a given bin id in a specified histogram + * @param higherValue a given upper bound value of a specified column value range + * @param lowerValue a given lower bound value of a specified column value range + * @param histogram a numeric equi-height histogram + * @return the percentage of a single bin holding values in [lowerValue, higherValue]. + */ + private def getOccupation( + binId: Int, + higherValue: Double, + lowerValue: Double, + histogram: Histogram): Double = { +val curBin = histogram.bins(binId) +if (curBin.hi == curBin.lo) { + // the entire bin is covered in the range + 1.0 +} else if (higherValue == lowerValue) { + // set percentage to 1/NDV + 1.0 / curBin.ndv.toDouble +} else { + // Use proration since the range falls inside this bin. + math.min((higherValue - lowerValue) / (curBin.hi - curBin.lo), 1.0) +} + } + + /** + * Returns the number of bins for column values in [lowerValue, higherValue]. --- End diff -- OK. Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r155963930 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala --- @@ -359,7 +371,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase { test("cbool > false") { validateEstimatedStats( Filter(GreaterThan(attrBool, Literal(false)), childStatsTestPlan(Seq(attrBool), 10L)), - Seq(attrBool -> ColumnStat(distinctCount = 1, min = Some(true), max = Some(true), + Seq(attrBool -> ColumnStat(distinctCount = 1, min = Some(false), max = Some(true), --- End diff -- Agreed with wzhfy. Today's logic is: for these 2 conditions, (column > x) and (column >= x), we set the min value to x. We do not distinguish these 2 cases. This is because we do not know the exact next value larger than x if x is a continuous data type like double type. We may do some special coding for discrete data types such as Boolean or integer. But, as wzhfy said, it does not deserve the complexity. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r155963740 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -471,37 +508,47 @@ case class FilterEstimation(plan: Filter) extends Logging { percent = 1.0 } else { // This is the partial overlap case: - // Without advanced statistics like histogram, we assume uniform data distribution. - // We just prorate the adjusted range over the initial range to compute filter selectivity. - assert(max > min) - percent = op match { -case _: LessThan => - if (numericLiteral == max) { -// If the literal value is right on the boundary, we can minus the part of the -// boundary value (1/ndv). -1.0 - 1.0 / ndv - } else { -(numericLiteral - min) / (max - min) - } -case _: LessThanOrEqual => - if (numericLiteral == min) { -// The boundary value is the only satisfying value. -1.0 / ndv - } else { -(numericLiteral - min) / (max - min) - } -case _: GreaterThan => - if (numericLiteral == min) { -1.0 - 1.0 / ndv - } else { -(max - numericLiteral) / (max - min) - } -case _: GreaterThanOrEqual => - if (numericLiteral == max) { -1.0 / ndv - } else { -(max - numericLiteral) / (max - min) - } + + if (colStat.histogram.isEmpty) { --- End diff -- We cannot move the if statement to upper level. This is because, for the partial overlap case, we need to update the the [min, max] range for a given column. For the no-overlap and complete-overlap cases, we do not need to do so. I think the current code is modular for this reason. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r155963622 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -332,8 +332,41 @@ case class FilterEstimation(plan: Filter) extends Logging { colStatsMap.update(attr, newStats) } - Some(1.0 / BigDecimal(ndv)) -} else { + if (colStat.histogram.isEmpty) { +// returns 1/ndv if there is no histogram +Some(1.0 / BigDecimal(ndv)) + } else { +// We compute filter selectivity using Histogram information. --- End diff -- O. will do. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r155683828 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +114,144 @@ object EstimationUtils { } } + /** + * Returns the number of the first bin into which a column value falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param bins an array of bins for a given numeric equi-height histogram + * @return the id of the first bin into which a column value falls. + */ + def findFirstBinForValue(value: Double, bins: Array[HistogramBin]): Int = { +var i = 0 +while ((i < bins.length) && (value > bins(i).hi)) { + i += 1 +} +i + } + + /** + * Returns the number of the last bin into which a column value falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param bins an array of bins for a given numeric equi-height histogram + * @return the id of the last bin into which a column value falls. + */ + def findLastBinForValue(value: Double, bins: Array[HistogramBin]): Int = { +var i = bins.length - 1 +while ((i >= 0) && (value < bins(i).lo)) { + i -= 1 +} +i + } + + /** + * Returns a percentage of a bin holding values for column value in the range of + * [lowerValue, higherValue] + * + * @param higherValue a given upper bound value of a specified column value range + * @param lowerValue a given lower bound value of a specified column value range + * @param bin a single histogram bin + * @return the percentage of a single bin holding values in [lowerValue, higherValue]. + */ + private def getOccupation( + higherValue: Double, + lowerValue: Double, + bin: HistogramBin): Double = { +assert(bin.lo <= lowerValue && lowerValue <= higherValue && higherValue <= bin.hi) +if (bin.hi == bin.lo) { + // the entire bin is covered in the range + 1.0 +} else if (higherValue == lowerValue) { + // set percentage to 1/NDV + 1.0 / bin.ndv.toDouble +} else { + // Use proration since the range falls inside this bin. + math.min((higherValue - lowerValue) / (bin.hi - bin.lo), 1.0) +} + } + + /** + * Returns the number of bins for column values in [lowerValue, higherValue]. + * This is an overloaded method. The column value distribution is saved in an + * equi-height histogram. + * + * @param higherId id of the high end bin holding the high end value of a column range + * @param lowerId id of the low end bin holding the low end value of a column range + * @param higherEnd a given upper bound value of a specified column value range + * @param lowerEnd a given lower bound value of a specified column value range + * @param histogram a numeric equi-height histogram + * @return the number of bins for column values in [lowerEnd, higherEnd]. + */ + def getOccupationBins( + higherId: Int, + lowerId: Int, + higherEnd: Double, + lowerEnd: Double, + histogram: Histogram): Double = { +if (lowerId == higherId) { + val curBin = histogram.bins(lowerId) + getOccupation(higherEnd, lowerEnd, curBin) +} else { + // compute how much lowerEnd/higherEnd occupies its bin + val lowerCurBin = histogram.bins(lowerId) + val lowerPart = getOccupation(lowerCurBin.hi, lowerEnd, lowerCurBin) + + val higherCurBin = histogram.bins(higherId) + val higherPart = getOccupation(higherEnd, higherCurBin.lo, higherCurBin) + + // the total length is lowerPart + higherPart + bins between them + lowerPart + higherPart + higherId - lowerId - 1 +} + } + + /** + * Returns the number of distinct values, ndv, for column values in [lowerEnd, higherEnd]. + * The column value distribution is saved in an equi-height histogram. + * + * @param higherId id of the high end bin holding the high end value of a column range + * @param lowerId id of the low end bin holding the low end value of a column range + * @param higherEnd a given upper bound value of a specified column value range + * @param lowerEnd a given lower bound value of a specified column value range + * @param histogram a numeric equi-height histogram + * @return the number of distinct values, ndv, for column values in [lowerEn
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r155343962 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +114,171 @@ object EstimationUtils { } } + /** + * Returns the number of the first bin into which a column values falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param bins an array of bins for a given numeric equi-height histogram + * @return the number of the first bin into which a column values falls. + */ + def findFirstBinForValue(value: Double, bins: Array[HistogramBin]): Int = { +var i = 0 +while ((i < bins.length) && (value > bins(i).hi)) { + i +=1 +} +i + } + + /** + * Returns the number of the last bin into which a column values falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param bins an array of bins for a given numeric equi-height histogram + * @return the number of the last bin into which a column values falls. + */ + def findLastBinForValue(value: Double, bins: Array[HistogramBin]): Int = { +var i = bins.length - 1 +while ((i >= 0) && (value < bins(i).lo)) { + i -=1 +} +i + } + + /** + * Returns a percentage of a bin holding values for column value in the range of + * [lowerValue, higherValue] + * + * @param binId a given bin id in a specified histogram + * @param higherValue a given upper bound value of a specified column value range + * @param lowerValue a given lower bound value of a specified column value range + * @param histogram a numeric equi-height histogram + * @return the percentage of a single bin holding values in [lowerValue, higherValue]. + */ + private def getOccupation( + binId: Int, + higherValue: Double, + lowerValue: Double, + histogram: Histogram): Double = { +val curBin = histogram.bins(binId) +if (curBin.hi == curBin.lo) { + // the entire bin is covered in the range + 1.0 +} else if (higherValue == lowerValue) { + // set percentage to 1/NDV + 1.0 / curBin.ndv.toDouble +} else { + // Use proration since the range falls inside this bin. + math.min((higherValue - lowerValue) / (curBin.hi - curBin.lo), 1.0) +} + } + + /** + * Returns the number of bins for column values in [lowerValue, higherValue]. --- End diff -- This is because we may return a percentage of a bin. For example, a predicate column=5 may return the number of bins 0.2 if the holding bin has 5 distinct values. Hence, we cannot return an integer type value. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r155125157 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +114,194 @@ object EstimationUtils { } } + /** + * Returns the number of the first bin into which a column values falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param bins an array of bins for a given numeric equi-height histogram + * @return the number of the first bin into which a column values falls. + */ + def findFirstBinForValue(value: Double, bins: Array[HistogramBin]): Int = { +var binId = 0 +bins.foreach { bin => + if (value > bin.hi) binId += 1 +} +binId + } + + /** + * Returns the number of the last bin into which a column values falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param bins an array of bins for a given numeric equi-height histogram + * @return the number of the last bin into which a column values falls. + */ + def findLastBinForValue(value: Double, bins: Array[HistogramBin]): Int = { --- End diff -- Good point. We can simplify the logic by iterating from bins.length-1 to 0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r155125029 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +114,194 @@ object EstimationUtils { } } + /** + * Returns the number of the first bin into which a column values falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param bins an array of bins for a given numeric equi-height histogram + * @return the number of the first bin into which a column values falls. + */ + def findFirstBinForValue(value: Double, bins: Array[HistogramBin]): Int = { +var binId = 0 +bins.foreach { bin => + if (value > bin.hi) binId += 1 --- End diff -- Good point. Actually while loop is better because it can exit early when the condition no longer qualifies. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r154255068 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala --- @@ -578,6 +590,112 @@ class FilterEstimationSuite extends StatsEstimationTestBase { expectedRowCount = 5) } + // The following test cases have histogram information collected for the test column + test("Not(cintHgm < 3 AND null)") { +val condition = Not(And(LessThan(attrIntHgm, Literal(3)), Literal(null, IntegerType))) +validateEstimatedStats( + Filter(condition, childStatsTestPlan(Seq(attrIntHgm), 10L)), + Seq(attrIntHgm -> colStatIntHgm.copy(distinctCount = 6)), + expectedRowCount = 9) + } + + test("cintHgm = 5") { +validateEstimatedStats( + Filter(EqualTo(attrIntHgm, Literal(5)), childStatsTestPlan(Seq(attrIntHgm), 10L)), + Seq(attrIntHgm -> ColumnStat(distinctCount = 1, min = Some(5), max = Some(5), +nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))), + expectedRowCount = 4) + } + + test("cintHgm = 0") { +// This is an out-of-range case since 0 is outside the range [min, max] +validateEstimatedStats( + Filter(EqualTo(attrIntHgm, Literal(0)), childStatsTestPlan(Seq(attrIntHgm), 10L)), + Nil, + expectedRowCount = 0) + } + + test("cintHgm < 3") { +validateEstimatedStats( + Filter(LessThan(attrIntHgm, Literal(3)), childStatsTestPlan(Seq(attrIntHgm), 10L)), + Seq(attrIntHgm -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(3), +nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))), + expectedRowCount = 2) + } + + test("cintHgm < 0") { +// This is a corner case since literal 0 is smaller than min. +validateEstimatedStats( + Filter(LessThan(attrIntHgm, Literal(0)), childStatsTestPlan(Seq(attrIntHgm), 10L)), + Nil, + expectedRowCount = 0) + } + + test("cintHgm <= 3") { +validateEstimatedStats( + Filter(LessThanOrEqual(attrIntHgm, Literal(3)), childStatsTestPlan(Seq(attrIntHgm), 10L)), + Seq(attrIntHgm -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(3), +nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))), + expectedRowCount = 2) + } + + test("cintHgm > 6") { +validateEstimatedStats( + Filter(GreaterThan(attrIntHgm, Literal(6)), childStatsTestPlan(Seq(attrIntHgm), 10L)), + Seq(attrIntHgm -> ColumnStat(distinctCount = 2, min = Some(6), max = Some(10), +nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))), + expectedRowCount = 2) + } + + test("cintHgm > 10") { +// This is a corner case since max value is 10. +validateEstimatedStats( + Filter(GreaterThan(attrIntHgm, Literal(10)), childStatsTestPlan(Seq(attrIntHgm), 10L)), + Nil, + expectedRowCount = 0) + } + + test("cintHgm >= 6") { +validateEstimatedStats( + Filter(GreaterThanOrEqual(attrIntHgm, Literal(6)), childStatsTestPlan(Seq(attrIntHgm), 10L)), + Seq(attrIntHgm -> ColumnStat(distinctCount = 3, min = Some(6), max = Some(10), +nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))), + expectedRowCount = 4) + } + + test("cintHgm IS NULL") { +validateEstimatedStats( + Filter(IsNull(attrIntHgm), childStatsTestPlan(Seq(attrIntHgm), 10L)), + Nil, + expectedRowCount = 0) + } + + test("cintHgm IS NOT NULL") { +validateEstimatedStats( + Filter(IsNotNull(attrIntHgm), childStatsTestPlan(Seq(attrIntHgm), 10L)), + Seq(attrIntHgm -> ColumnStat(distinctCount = 6, min = Some(1), max = Some(10), +nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))), + expectedRowCount = 10) + } + + test("cintHgm > 3 AND cintHgm <= 6") { +val condition = And(GreaterThan(attrIntHgm, + Literal(3)), LessThanOrEqual(attrIntHgm, Literal(6))) +validateEstimatedStats( + Filter(condition, childStatsTestPlan(Seq(attrIntHgm), 10L)), + Seq(attrIntHgm -> ColumnStat(distinctCount = 5, min = Some(3), max = Some(6), +nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))), + expectedRowCount = 8) + } + + test("cintHgm = 3 OR cintHgm = 6") { --- End diff -- We have added histogram test cases for sk
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r154254419 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala --- @@ -359,7 +371,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase { test("cbool > false") { validateEstimatedStats( Filter(GreaterThan(attrBool, Literal(false)), childStatsTestPlan(Seq(attrBool), 10L)), - Seq(attrBool -> ColumnStat(distinctCount = 1, min = Some(true), max = Some(true), + Seq(attrBool -> ColumnStat(distinctCount = 1, min = Some(false), max = Some(true), --- End diff -- My earlier comment mentioned this test case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r154254145 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -784,11 +879,16 @@ case class ColumnStatsMap(originalMap: AttributeMap[ColumnStat]) { def outputColumnStats(rowsBeforeFilter: BigInt, rowsAfterFilter: BigInt) : AttributeMap[ColumnStat] = { val newColumnStats = originalMap.map { case (attr, oriColStat) => - // Update ndv based on the overall filter selectivity: scale down ndv if the number of rows - // decreases; otherwise keep it unchanged. - val newNdv = EstimationUtils.updateNdv(oldNumRows = rowsBeforeFilter, -newNumRows = rowsAfterFilter, oldNdv = oriColStat.distinctCount) val colStat = updatedMap.get(attr.exprId).map(_._2).getOrElse(oriColStat) + val newNdv = if (colStat.distinctCount > 1) { --- End diff -- The old code does not work well for a couple of new skewed-distribution tests. For example, test("cintHgm < 3") would fail. Because it still computes to find newNdv in updateNdv() method. But, in reality, we already scale it down to 1. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r154252063 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -513,10 +560,9 @@ case class FilterEstimation(plan: Filter) extends Logging { op match { case _: GreaterThan | _: GreaterThanOrEqual => -// If new ndv is 1, then new max must be equal to new min. -newMin = if (newNdv == 1) newMax else newValue +newMin = newValue case _: LessThan | _: LessThanOrEqual => -newMax = if (newNdv == 1) newMin else newValue +newMax = newValue --- End diff -- Previously I coded that way because of a corner test case: test("cbool > false"). At that time, I set the newMin to newMax since newNdv = 1. However, this logic does not work well for the skewed distribution test case: test ("cintHgm < 3"). In this test, newMin=1 newMax=3. I think the revised code makes better sense. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r154250197 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -332,8 +332,45 @@ case class FilterEstimation(plan: Filter) extends Logging { colStatsMap.update(attr, newStats) } - Some(1.0 / BigDecimal(ndv)) -} else { + // We compute filter selectivity using Histogram information + attr.dataType match { +case StringType | BinaryType => + Some(1.0 / BigDecimal(ndv)) + +case _ => + // returns 1/ndv if there is no histogram + if (colStat.histogram.isEmpty) return Some(1.0 / BigDecimal(ndv)) + + // We traverse histogram bins to locate the literal value + val hgmBins = colStat.histogram.get.bins + val datum = EstimationUtils.toDecimal(literal.value, literal.dataType).toDouble + // find the interval where this datum locates + var lowerId, higherId = -1 + for (i <- hgmBins.indices) { +// if datum > upperBound, just move to next bin +if (datum <= hgmBins(i).hi && lowerId < 0) lowerId = i +if (higherId < 0) { + if ((datum < hgmBins(i).hi || i == hgmBins.length - 1) || +((datum == hgmBins(i).hi) && (datum < hgmBins(i + 1).hi))) { +higherId = i + } +} + } + assert(lowerId <= higherId) + val lowerBinNdv = hgmBins(lowerId).ndv + val higherBinNdv = hgmBins(higherId).ndv + // assume uniform distribution in each bin + val percent = if (lowerId == higherId) { +(1.0 / hgmBins.length) / math.max(lowerBinNdv, 1) + } else { +1.0 / hgmBins.length * (higherId - lowerId - 1) + + (1.0 / hgmBins.length) / math.max(lowerBinNdv, 1) + + (1.0 / hgmBins.length) / math.max(higherBinNdv, 1) + } + Some(percent) --- End diff -- Good point. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r154249995 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +114,197 @@ object EstimationUtils { } } + /** + * Returns the number of the first bin into which a column values falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param histogram a numeric equi-height histogram + * @return the number of the first bin into which a column values falls. + */ + + def findFirstBinForValue(value: Double, histogram: Histogram): Int = { +var binId = 0 +histogram.bins.foreach { bin => + if (value > bin.hi) binId += 1 +} +binId + } + + /** + * Returns the number of the last bin into which a column values falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param histogram a numeric equi-height histogram + * @return the number of the last bin into which a column values falls. + */ + + def findLastBinForValue(value: Double, histogram: Histogram): Int = { +var binId = 0 +for (i <- 0 until histogram.bins.length) { + if (value > histogram.bins(i).hi) { +// increment binId to point to next bin +binId += 1 + } + if ((value == histogram.bins(i).hi) && (i < histogram.bins.length - 1)) { +if (value == histogram.bins(i + 1).lo) { --- End diff -- No. I meant the upper bound for the array of bins in a histogram. The default length of the histogram bin array is 254. When i is equal to 253 (the last bin), then i+1 is 254 leading to out-of-bound error. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r154225069 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +114,197 @@ object EstimationUtils { } } + /** + * Returns the number of the first bin into which a column values falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param histogram a numeric equi-height histogram + * @return the number of the first bin into which a column values falls. + */ + + def findFirstBinForValue(value: Double, histogram: Histogram): Int = { +var binId = 0 +histogram.bins.foreach { bin => + if (value > bin.hi) binId += 1 +} +binId + } + + /** + * Returns the number of the last bin into which a column values falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param histogram a numeric equi-height histogram + * @return the number of the last bin into which a column values falls. + */ + + def findLastBinForValue(value: Double, histogram: Histogram): Int = { +var binId = 0 +for (i <- 0 until histogram.bins.length) { + if (value > histogram.bins(i).hi) { +// increment binId to point to next bin +binId += 1 + } + if ((value == histogram.bins(i).hi) && (i < histogram.bins.length - 1)) { +if (value == histogram.bins(i + 1).lo) { --- End diff -- I used two statements instead of one statement is because, when i points to the last bin, this condition "value == histogram.bins(i + 1).lo" may be out of bound. By separating the conditions into two statements, we can be sure that the out-of-bound error will not happen. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r154223769 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +114,197 @@ object EstimationUtils { } } + /** + * Returns the number of the first bin into which a column values falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param histogram a numeric equi-height histogram + * @return the number of the first bin into which a column values falls. + */ + + def findFirstBinForValue(value: Double, histogram: Histogram): Int = { +var binId = 0 +histogram.bins.foreach { bin => + if (value > bin.hi) binId += 1 +} +binId + } + + /** + * Returns the number of the last bin into which a column values falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param histogram a numeric equi-height histogram + * @return the number of the last bin into which a column values falls. + */ + + def findLastBinForValue(value: Double, histogram: Histogram): Int = { +var binId = 0 --- End diff -- same comment as in my last reply. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r154223705 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +114,197 @@ object EstimationUtils { } } + /** + * Returns the number of the first bin into which a column values falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param histogram a numeric equi-height histogram + * @return the number of the first bin into which a column values falls. + */ + + def findFirstBinForValue(value: Double, histogram: Histogram): Int = { +var binId = 0 --- End diff -- I hesitate to add an assert statement here. This is because an assert such as this may cause Spark system to crash if a user does not fresh his data statistics quickly. In real world, a user may load data, collect statistics, and then add more incremental data, but does not collect statistics immediately. He may issue a SQL query against his newly added data such as "WHERE column=xxx", where xxx is a new value in his incremental load. After all, statistics are auxiliary, a query should still run even the statistics are not up to date. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r153902382 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +114,197 @@ object EstimationUtils { } } + /** + * Returns the number of the first bin/bucket into which a column values falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param histogram a numeric equi-height histogram + * @return the number of the first bin/bucket into which a column values falls. + */ + + def findFirstBucketForValue(value: Double, histogram: Histogram): Int = { --- End diff -- We had bucket(s) and bin(s) used interchangeably. To avoid confusion, I will unify them to use only bin/bins. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r153665547 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala --- @@ -67,6 +68,205 @@ class JoinEstimationSuite extends StatsEstimationTestBase { rowCount = 2, attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo))) + private def estimateByHistogram( + histogram1: Histogram, + histogram2: Histogram, + expectedMin: Double, + expectedMax: Double, + expectedNdv: Long, + expectedRows: Long): Unit = { +val col1 = attr("key1") +val col2 = attr("key2") +val c1 = generateJoinChild(col1, histogram1, expectedMin, expectedMax) +val c2 = generateJoinChild(col2, histogram2, expectedMin, expectedMax) + +val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2))) +val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1))) +val expectedStatsAfterJoin = Statistics( + sizeInBytes = expectedRows * (8 + 2 * 4), + rowCount = Some(expectedRows), + attributeStats = AttributeMap(Seq( +col1 -> c1.stats.attributeStats(col1).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)), +col2 -> c2.stats.attributeStats(col2).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax +) + +// Join order should not affect estimation result. +Seq(c1JoinC2, c2JoinC1).foreach { join => + assert(join.stats == expectedStatsAfterJoin) +} + } + + private def generateJoinChild( + col: Attribute, + histogram: Histogram, + expectedMin: Double, + expectedMax: Double): LogicalPlan = { +val colStat = inferColumnStat(histogram) +val t = StatsTestPlan( + outputList = Seq(col), + rowCount = (histogram.height * histogram.bins.length).toLong, + attributeStats = AttributeMap(Seq(col -> colStat))) + +val filterCondition = new ArrayBuffer[Expression]() +if (expectedMin > colStat.min.get.toString.toDouble) { + filterCondition += GreaterThanOrEqual(col, Literal(expectedMin)) +} +if (expectedMax < colStat.max.get.toString.toDouble) { + filterCondition += LessThanOrEqual(col, Literal(expectedMax)) +} +if (filterCondition.isEmpty) t else Filter(filterCondition.reduce(And), t) + } + + private def inferColumnStat(histogram: Histogram): ColumnStat = { +var ndv = 0L +for (i <- histogram.bins.indices) { + val bin = histogram.bins(i) + if (i == 0 || bin.hi != histogram.bins(i - 1).hi) { +ndv += bin.ndv + } +} +ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo), + max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4, + histogram = Some(histogram)) + } + + test("equi-height histograms: a bin is contained by another one") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30))) +val histogram2 = Histogram(height = 100, Array( + HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) +// test bin trimming +val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 10, max = 60) +assert(t1 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h1 == 80) +val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 10, max = 60) +assert(t2 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20) + +val expectedRanges = Seq( + OverlappedRange(10, 30, math.min(10, 40*1/2), math.max(10, 40*1/2), 300, 80*1/2), + OverlappedRange(30, 50, math.min(30*2/3, 40*1/2), math.max(30*2/3, 40*1/2), 300*2/3, 80*1/2), + OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 300*1/3, 20) +) +assert(expectedRanges.equals( + getOverlappedRanges(histogram1, histogram2, newMin = 10D, newMax = 60D))) + +estimateByHistogram( + histogram1 = histogram1, + histogram2 = histogram2, + expectedMin = 10D, + expectedMax = 60D, + // 10 + 20 + 8 + expectedNdv = 38L, + // 300*40/20 + 200*40/20 + 100*20/10 + expectedRows = 1200L) + } + + test("equi-height histograms: a bin has only one value") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 30, hi = 30, ndv = 1), Hi
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r153665092 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala --- @@ -67,6 +68,205 @@ class JoinEstimationSuite extends StatsEstimationTestBase { rowCount = 2, attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo))) + private def estimateByHistogram( + histogram1: Histogram, + histogram2: Histogram, + expectedMin: Double, + expectedMax: Double, + expectedNdv: Long, + expectedRows: Long): Unit = { +val col1 = attr("key1") +val col2 = attr("key2") +val c1 = generateJoinChild(col1, histogram1, expectedMin, expectedMax) +val c2 = generateJoinChild(col2, histogram2, expectedMin, expectedMax) + +val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2))) +val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1))) +val expectedStatsAfterJoin = Statistics( + sizeInBytes = expectedRows * (8 + 2 * 4), + rowCount = Some(expectedRows), + attributeStats = AttributeMap(Seq( +col1 -> c1.stats.attributeStats(col1).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)), +col2 -> c2.stats.attributeStats(col2).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax +) + +// Join order should not affect estimation result. +Seq(c1JoinC2, c2JoinC1).foreach { join => + assert(join.stats == expectedStatsAfterJoin) +} + } + + private def generateJoinChild( + col: Attribute, + histogram: Histogram, + expectedMin: Double, + expectedMax: Double): LogicalPlan = { +val colStat = inferColumnStat(histogram) +val t = StatsTestPlan( + outputList = Seq(col), + rowCount = (histogram.height * histogram.bins.length).toLong, + attributeStats = AttributeMap(Seq(col -> colStat))) + +val filterCondition = new ArrayBuffer[Expression]() +if (expectedMin > colStat.min.get.toString.toDouble) { + filterCondition += GreaterThanOrEqual(col, Literal(expectedMin)) +} +if (expectedMax < colStat.max.get.toString.toDouble) { + filterCondition += LessThanOrEqual(col, Literal(expectedMax)) +} +if (filterCondition.isEmpty) t else Filter(filterCondition.reduce(And), t) + } + + private def inferColumnStat(histogram: Histogram): ColumnStat = { +var ndv = 0L +for (i <- histogram.bins.indices) { + val bin = histogram.bins(i) + if (i == 0 || bin.hi != histogram.bins(i - 1).hi) { +ndv += bin.ndv + } +} +ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo), + max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4, + histogram = Some(histogram)) + } + + test("equi-height histograms: a bin is contained by another one") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30))) +val histogram2 = Histogram(height = 100, Array( + HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) +// test bin trimming +val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 10, max = 60) +assert(t1 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h1 == 80) +val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 10, max = 60) +assert(t2 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20) + +val expectedRanges = Seq( + OverlappedRange(10, 30, math.min(10, 40*1/2), math.max(10, 40*1/2), 300, 80*1/2), + OverlappedRange(30, 50, math.min(30*2/3, 40*1/2), math.max(30*2/3, 40*1/2), 300*2/3, 80*1/2), + OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 300*1/3, 20) +) +assert(expectedRanges.equals( + getOverlappedRanges(histogram1, histogram2, newMin = 10D, newMax = 60D))) + +estimateByHistogram( + histogram1 = histogram1, + histogram2 = histogram2, + expectedMin = 10D, + expectedMax = 60D, + // 10 + 20 + 8 + expectedNdv = 38L, + // 300*40/20 + 200*40/20 + 100*20/10 + expectedRows = 1200L) + } + + test("equi-height histograms: a bin has only one value") { +val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 30, hi = 30, ndv = 1), Hi
[GitHub] spark issue #19783: [SPARK-21322][SQL] support histogram in filter cardinali...
Github user ron8hu commented on the issue: https://github.com/apache/spark/pull/19783 cc @sameeragarwal @cloud-fan @gatorsmile @wzhfy --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19357: [SPARK-21322][SQL][WIP] support histogram in filter card...
Github user ron8hu commented on the issue: https://github.com/apache/spark/pull/19357 This pull request was created while there were several dependencies that had not been defined yet. As a result, it has caused many conflicts. I decided to close this pull request and start a clean one. Please review pull request 19783 which is to replace this one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19357: [SPARK-21322][SQL][WIP] support histogram in filt...
Github user ron8hu closed the pull request at: https://github.com/apache/spark/pull/19357 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: support histogram in filter cardinality estimatio...
GitHub user ron8hu opened a pull request: https://github.com/apache/spark/pull/19783 support histogram in filter cardinality estimation ## What changes were proposed in this pull request? Histogram is effective in dealing with skewed distribution. After we generate histogram information for column statistics, we need to adjust filter estimation based on histogram data structure. ## How was this patch tested? We revised all the unit test cases by including histogram data structure. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ron8hu/spark supportHistogram Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19783.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19783 commit dd5b975dafdf9fc4edd94cf6e369f5e899db74e2 Author: Ron Hu <ron...@huawei.com> Date: 2017-11-19T19:37:47Z support histogram in filter cardinality estimation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19479: [SPARK-17074] [SQL] Generate equi-height histogra...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19479#discussion_r148724807 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -275,6 +327,64 @@ object ColumnStat extends Logging { avgLen = row.getLong(4), maxLen = row.getLong(5) ) +if (row.isNullAt(6)) { + cs +} else { + val ndvs = row.getArray(6).toLongArray() + assert(percentiles.get.length == ndvs.length + 1) + val endpoints = percentiles.get.map(_.toString.toDouble) + // Construct equi-height histogram + val buckets = ndvs.zipWithIndex.map { case (ndv, i) => +EquiHeightBucket(endpoints(i), endpoints(i + 1), ndv) + } + val nonNullRows = rowCount - cs.nullCount + val ehHistogram = EquiHeightHistogram(nonNullRows.toDouble / ndvs.length, buckets) + cs.copy(histogram = Some(ehHistogram)) +} } } + +/** + * There are a few types of histograms in state-of-the-art estimation methods. E.g. equi-width + * histogram, equi-height histogram, frequency histogram (value-frequency pairs) and hybrid + * histogram, etc. + * Currently in Spark, we support equi-height histogram since it is good at handling skew + * distribution, and also provides reasonable accuracy in other cases. + * We can add other histograms in the future, which will make estimation logic more complicated. + * This is because we will have to deal with computation between different types of histograms in + * some cases, e.g. for join columns. + */ +trait Histogram + +/** + * Equi-height histogram represents column value distribution by a sequence of buckets. Each bucket + * has a value range and contains approximately the same number of rows. + * @param height number of rows in each bucket + * @param ehBuckets equi-height histogram buckets + */ +case class EquiHeightHistogram(height: Double, ehBuckets: Seq[EquiHeightBucket]) extends Histogram { --- End diff -- Please declare ehBuckets: Array[EquiHeightBucket]) instead of ehBuckets: Seq[EquiHeightBucket]). This is because we need to access a bucket directly and randomly. For random access, Scala Array can provide better performance as it has index to access an array element quickly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19479: [SPARK-17074] [SQL] Generate equi-height histogra...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19479#discussion_r145840719 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -216,65 +218,61 @@ object ColumnStat extends Logging { } } - /** - * Constructs an expression to compute column statistics for a given column. - * - * The expression should create a single struct column with the following schema: - * distinctCount: Long, min: T, max: T, nullCount: Long, avgLen: Long, maxLen: Long - * - * Together with [[rowToColumnStat]], this function is used to create [[ColumnStat]] and - * as a result should stay in sync with it. - */ - def statExprs(col: Attribute, relativeSD: Double): CreateNamedStruct = { -def struct(exprs: Expression*): CreateNamedStruct = CreateStruct(exprs.map { expr => - expr.transformUp { case af: AggregateFunction => af.toAggregateExpression() } -}) -val one = Literal(1, LongType) + private def convertToHistogram(s: String): EquiHeightHistogram = { +val idx = s.indexOf(",") +if (idx <= 0) { + throw new AnalysisException("Failed to parse histogram.") +} +val height = s.substring(0, idx).toDouble +val pattern = "Bucket\\(([^,]+), ([^,]+), ([^\\)]+)\\)".r +val buckets = pattern.findAllMatchIn(s).map { m => + EquiHeightBucket(m.group(1).toDouble, m.group(2).toDouble, m.group(3).toLong) +}.toSeq +EquiHeightHistogram(height, buckets) + } -// the approximate ndv (num distinct value) should never be larger than the number of rows -val numNonNulls = if (col.nullable) Count(col) else Count(one) -val ndv = Least(Seq(HyperLogLogPlusPlus(col, relativeSD), numNonNulls)) -val numNulls = Subtract(Count(one), numNonNulls) -val defaultSize = Literal(col.dataType.defaultSize, LongType) +} -def fixedLenTypeStruct(castType: DataType) = { - // For fixed width types, avg size should be the same as max size. - struct(ndv, Cast(Min(col), castType), Cast(Max(col), castType), numNulls, defaultSize, -defaultSize) -} +/** + * There are a few types of histograms in state-of-the-art estimation methods. E.g. equi-width + * histogram, equi-height histogram, frequency histogram (value-frequency pairs) and hybrid + * histogram, etc. + * Currently in Spark, we support equi-height histogram since it is good at handling skew + * distribution, and also provides reasonable accuracy in other cases. + * We can add other histograms in the future, which will make estimation logic more complicated. + * Because we will have to deal with computation between different types of histograms in some + * cases, e.g. for join columns. + */ +trait Histogram -col.dataType match { - case dt: IntegralType => fixedLenTypeStruct(dt) - case _: DecimalType => fixedLenTypeStruct(col.dataType) - case dt @ (DoubleType | FloatType) => fixedLenTypeStruct(dt) - case BooleanType => fixedLenTypeStruct(col.dataType) - case DateType => fixedLenTypeStruct(col.dataType) - case TimestampType => fixedLenTypeStruct(col.dataType) - case BinaryType | StringType => -// For string and binary type, we don't store min/max. -val nullLit = Literal(null, col.dataType) -struct( - ndv, nullLit, nullLit, numNulls, - // Set avg/max size to default size if all the values are null or there is no value. - Coalesce(Seq(Ceil(Average(Length(col))), defaultSize)), - Coalesce(Seq(Cast(Max(Length(col)), LongType), defaultSize))) - case _ => -throw new AnalysisException("Analyzing column statistics is not supported for column " + -s"${col.name} of data type: ${col.dataType}.") -} - } +/** + * Equi-height histogram represents column value distribution by a sequence of buckets. Each bucket + * has a value range and contains approximately the same number of rows. + * @param height number of rows in each bucket + * @param ehBuckets equi-height histogram buckets + */ +case class EquiHeightHistogram(height: Double, ehBuckets: Seq[EquiHeightBucket]) extends Histogram { - /** Convert a struct for column stats (defined in statExprs) into [[ColumnStat]]. */ - def rowToColumnStat(row: InternalRow, attr: Attribute): ColumnStat = { -ColumnStat( - distinctCount = BigInt(row.getLong(0)), - // for string/binary min/max, get should return null - min = Option(row.get(1, attr.dataType)), -
[GitHub] spark pull request #19479: [SPARK-17074] [SQL] Generate equi-height histogra...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/19479#discussion_r145828713 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -216,65 +218,61 @@ object ColumnStat extends Logging { } } - /** - * Constructs an expression to compute column statistics for a given column. - * - * The expression should create a single struct column with the following schema: - * distinctCount: Long, min: T, max: T, nullCount: Long, avgLen: Long, maxLen: Long - * - * Together with [[rowToColumnStat]], this function is used to create [[ColumnStat]] and - * as a result should stay in sync with it. - */ - def statExprs(col: Attribute, relativeSD: Double): CreateNamedStruct = { -def struct(exprs: Expression*): CreateNamedStruct = CreateStruct(exprs.map { expr => - expr.transformUp { case af: AggregateFunction => af.toAggregateExpression() } -}) -val one = Literal(1, LongType) + private def convertToHistogram(s: String): EquiHeightHistogram = { +val idx = s.indexOf(",") +if (idx <= 0) { + throw new AnalysisException("Failed to parse histogram.") +} +val height = s.substring(0, idx).toDouble +val pattern = "Bucket\\(([^,]+), ([^,]+), ([^\\)]+)\\)".r +val buckets = pattern.findAllMatchIn(s).map { m => + EquiHeightBucket(m.group(1).toDouble, m.group(2).toDouble, m.group(3).toLong) +}.toSeq +EquiHeightHistogram(height, buckets) + } -// the approximate ndv (num distinct value) should never be larger than the number of rows -val numNonNulls = if (col.nullable) Count(col) else Count(one) -val ndv = Least(Seq(HyperLogLogPlusPlus(col, relativeSD), numNonNulls)) -val numNulls = Subtract(Count(one), numNonNulls) -val defaultSize = Literal(col.dataType.defaultSize, LongType) +} -def fixedLenTypeStruct(castType: DataType) = { - // For fixed width types, avg size should be the same as max size. - struct(ndv, Cast(Min(col), castType), Cast(Max(col), castType), numNulls, defaultSize, -defaultSize) -} +/** + * There are a few types of histograms in state-of-the-art estimation methods. E.g. equi-width + * histogram, equi-height histogram, frequency histogram (value-frequency pairs) and hybrid + * histogram, etc. + * Currently in Spark, we support equi-height histogram since it is good at handling skew + * distribution, and also provides reasonable accuracy in other cases. + * We can add other histograms in the future, which will make estimation logic more complicated. + * Because we will have to deal with computation between different types of histograms in some --- End diff -- This is because we will have to --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19357: [SPARK-21322][SQL][WIP] support histogram in filter card...
Github user ron8hu commented on the issue: https://github.com/apache/spark/pull/19357 cc @wzhfy Please review code first before I request the community to review it. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19357: support histogram in filter cardinality estimatio...
GitHub user ron8hu opened a pull request: https://github.com/apache/spark/pull/19357 support histogram in filter cardinality estimation ## What changes were proposed in this pull request? Histogram is effective in dealing with skewed distribution. After we generate histogram information for column statistics, we need to adjust filter estimation based on histogram data structure. ## How was this patch tested? We revised all the unit test cases by including histogram data structure. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ron8hu/spark createhistogram Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19357.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19357 commit 46af54d9c86fa1e5322fdd92ed47fe3d419dd966 Author: Ron Hu <ron...@huawei.com> Date: 2017-09-26T23:33:49Z support histogram in filter cardinality estimation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17918: [SPARK-20678][SQL] Ndv for columns not in filter conditi...
Github user ron8hu commented on the issue: https://github.com/apache/spark/pull/17918 I agree that we need to scale down NDV for all the referenced columns in a query if a filter condition reduces the number of qualified rows. Do you find this problem when running tpc-ds benchmark? If so, what queries encounters this issue? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17546#discussion_r110480494 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -736,6 +736,12 @@ object SQLConf { .checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].") .createWithDefault(0.7) + val JOIN_REORDER_DP_STAR_FILTER = +buildConf("spark.sql.cbo.joinReorder.dp.star.filter") + .doc("Applies star-join filter heuristics to cost based join enumeration.") + .booleanConf + .createWithDefault(false) --- End diff -- In Spark 2.2, we introduced a couple of new configuration parameters in optimizer area. In order to play on the safe side, we set the default value to false. I suggest that we can change the default value to true AFTER we are sure that the new optimizer feature does not cause any regression. I think the system regression/integration test suites help us make a decision. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r109476536 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -550,6 +565,221 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator, including =, <=>, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of the given columns + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => +// TODO: It is difficult to support other binary comparisons for String/Binary +// type without min/max and advanced statistics like histogram. +logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) +return None + case _ => +} + +val colStatLeft = colStatsMap(attrLeft) +val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] +val maxLeft = BigDecimal(statsRangeLeft.max) +val minLeft = BigDecimal(statsRangeLeft.min) + +val colStatRight = colStatsMap(attrRight) +val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericRange] +val maxRight = BigDecimal(statsRangeRight.max) +val minRight = BigDecimal(statsRangeRight.min) + +// determine the overlapping degree between predicate range and column's range +val allNotNull = (colStatLeft.nullCount == 0) && (colStatRight.nullCount == 0) +val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + // Left < Right or Left <= Right + // - no overlap: + // minRight maxRight minLeft maxLeft + // +--++-+---> + // - complete overlap: (If null values exists, we set it to partial overlap.) + // minLeftmaxLeft minRight maxRight + // +--++-+---> + case _: LessThan => +(minLeft >= maxRight, (maxLeft < minRight) && allNotNull) + case _: LessThanOrEqual => +(minLeft > maxRight, (maxLeft <= minRight) && allNotNull) + + // Left > Right or Left >= Right + // - no overlap: + // minLeftmaxLeft minRight maxRight + // +--++-+---> + // - complete overlap: (If null values exists, we set it to partial overlap.) + // minRight maxRight minLeft maxLeft + // +--++-+---> + case _: GreaterThan => +(maxLeft <= minRight, (minLeft > maxRight) && allNotNull) + case _: GreaterThanOrEqual => +(maxLeft < minRight, (minLeft >= maxRight) && allNotNull) + + // Left = Right or Left <=> Right + // - no overlap: + // minLeftmaxLeft minRight maxRight + // +--++-+---> + // minRight maxRight minLeft maxLeft + // +
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r109476505 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -550,6 +565,221 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator, including =, <=>, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of the given columns + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => +// TODO: It is difficult to support other binary comparisons for String/Binary +// type without min/max and advanced statistics like histogram. +logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) +return None + case _ => +} + +val colStatLeft = colStatsMap(attrLeft) +val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] +val maxLeft = BigDecimal(statsRangeLeft.max) +val minLeft = BigDecimal(statsRangeLeft.min) + +val colStatRight = colStatsMap(attrRight) +val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericRange] +val maxRight = BigDecimal(statsRangeRight.max) +val minRight = BigDecimal(statsRangeRight.min) + +// determine the overlapping degree between predicate range and column's range +val allNotNull = (colStatLeft.nullCount == 0) && (colStatRight.nullCount == 0) +val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + // Left < Right or Left <= Right + // - no overlap: + // minRight maxRight minLeft maxLeft + // +--++-+---> + // - complete overlap: (If null values exists, we set it to partial overlap.) + // minLeftmaxLeft minRight maxRight + // +--++-+---> + case _: LessThan => +(minLeft >= maxRight, (maxLeft < minRight) && allNotNull) + case _: LessThanOrEqual => +(minLeft > maxRight, (maxLeft <= minRight) && allNotNull) + + // Left > Right or Left >= Right + // - no overlap: + // minLeftmaxLeft minRight maxRight + // +--++-+---> + // - complete overlap: (If null values exists, we set it to partial overlap.) + // minRight maxRight minLeft maxLeft + // +--++-+---> + case _: GreaterThan => +(maxLeft <= minRight, (minLeft > maxRight) && allNotNull) + case _: GreaterThanOrEqual => +(maxLeft < minRight, (minLeft >= maxRight) && allNotNull) + + // Left = Right or Left <=> Right + // - no overlap: + // minLeftmaxLeft minRight maxRight + // +--++-+---> + // minRight maxRight minLeft maxLeft + // +
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r109476460 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala --- @@ -491,7 +599,22 @@ class FilterEstimationSuite extends StatsEstimationTestBase { sizeInBytes = getOutputSize(filter.output, expectedRowCount, expectedAttributeMap), rowCount = Some(expectedRowCount), attributeStats = expectedAttributeMap) - assert(filter.stats(conf) == expectedStats) + + val filterStats = filter.stats(conf) + assert(filterStats.sizeInBytes == expectedStats.sizeInBytes) + assert(filterStats.rowCount == expectedStats.rowCount) + val rowCountValue = filterStats.rowCount.getOrElse(0) + // check the output column stats if the row count is > 0. + // When row count is 0, the output is set to empty. + if (rowCountValue != 0) { +// Need to check attributeStats one by one because we may have multiple output columns. +// Due to update operation, the output columns may be in different order. +expectedColStats.foreach { kv => + val filterColumnStat = filterStats.attributeStats.get(kv._1).get --- End diff -- Good point. fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r109471156 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -550,6 +565,220 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator, including =, <=>, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of the given columns + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => +// TODO: It is difficult to support other binary comparisons for String/Binary +// type without min/max and advanced statistics like histogram. +logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) +return None + case _ => +} + +val colStatLeft = colStatsMap(attrLeft) +val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] +val maxLeft = BigDecimal(statsRangeLeft.max) +val minLeft = BigDecimal(statsRangeLeft.min) + +val colStatRight = colStatsMap(attrRight) +val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericRange] +val maxRight = BigDecimal(statsRangeRight.max) +val minRight = BigDecimal(statsRangeRight.min) + +// determine the overlapping degree between predicate range and column's range +val allNotNull = (colStatLeft.nullCount == 0) && (colStatRight.nullCount == 0) +val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + // Left < Right or Left <= Right + // - no overlap: + // minRight maxRight minLeft maxLeft + // +--++-+---> + // - complete overlap: (If null values exists, we set it to partial overlap.) + // minLeftmaxLeft minRight maxRight + // +--++-+---> + case _: LessThan => +(minLeft >= maxRight, (maxLeft < minRight) && allNotNull) + case _: LessThanOrEqual => +(minLeft > maxRight, (maxLeft <= minRight) && allNotNull) + + // Left > Right or Left >= Right + // - no overlap: + // minLeftmaxLeft minRight maxRight + // +--++-+---> + // - complete overlap: (If null values exists, we set it to partial overlap.) + // minRight maxRight minLeft maxLeft + // +--++-+---> + case _: GreaterThan => +(maxLeft <= minRight, (minLeft > maxRight) && allNotNull) + case _: GreaterThanOrEqual => +(maxLeft < minRight, (minLeft >= maxRight) && allNotNull) + + // Left = Right or Left <=> Right + // - no overlap: + // minLeftmaxLeft minRight maxRight + // +--++-+---> + // minRight maxRight minLeft maxLeft + // +
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r109340165 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -550,6 +565,220 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator, including =, <=>, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of the given columns + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => +// TODO: It is difficult to support other binary comparisons for String/Binary +// type without min/max and advanced statistics like histogram. +logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) +return None + case _ => +} + +val colStatLeft = colStatsMap(attrLeft) +val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] +val maxLeft = BigDecimal(statsRangeLeft.max) +val minLeft = BigDecimal(statsRangeLeft.min) + +val colStatRight = colStatsMap(attrRight) +val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericRange] +val maxRight = BigDecimal(statsRangeRight.max) +val minRight = BigDecimal(statsRangeRight.min) + +// determine the overlapping degree between predicate range and column's range +val allNotNull = (colStatLeft.nullCount == 0) && (colStatRight.nullCount == 0) +val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + // Left < Right or Left <= Right + // - no overlap: + // minRight maxRight minLeft maxLeft + // +--++-+---> + // - complete overlap: (If null values exists, we set it to partial overlap.) + // minLeftmaxLeft minRight maxRight + // +--++-+---> + case _: LessThan => +(minLeft >= maxRight, (maxLeft < minRight) && allNotNull) + case _: LessThanOrEqual => +(minLeft > maxRight, (maxLeft <= minRight) && allNotNull) + + // Left > Right or Left >= Right + // - no overlap: + // minLeftmaxLeft minRight maxRight + // +--++-+---> + // - complete overlap: (If null values exists, we set it to partial overlap.) + // minRight maxRight minLeft maxLeft + // +--++-+---> + case _: GreaterThan => +(maxLeft <= minRight, (minLeft > maxRight) && allNotNull) + case _: GreaterThanOrEqual => +(maxLeft < minRight, (minLeft >= maxRight) && allNotNull) + + // Left = Right or Left <=> Right + // - no overlap: + // minLeftmaxLeft minRight maxRight + // +--++-+---> + // minRight maxRight minLeft maxLeft + // +
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r109326608 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -550,6 +565,225 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator, including =, <=>, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of the given columns + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => +// TODO: It is difficult to support other binary comparisons for String/Binary +// type without min/max and advanced statistics like histogram. +logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) +return None + case _ => +} + +val colStatLeft = colStatsMap(attrLeft) +val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] +val maxLeft = BigDecimal(statsRangeLeft.max) +val minLeft = BigDecimal(statsRangeLeft.min) + +val colStatRight = colStatsMap(attrRight) +val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericRange] +val maxRight = BigDecimal(statsRangeRight.max) +val minRight = BigDecimal(statsRangeRight.min) + +// determine the overlapping degree between predicate range and column's range +val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + // Left < Right or Left <= Right + // - no overlap: + // minRight maxRight minLeft maxLeft + // +--++-+---> + // - complete overlap: (If null values exists, we set it to partial overlap.) + // minLeftmaxLeft minRight maxRight + // +--++-+---> + case _: LessThan => +(minLeft >= maxRight, + maxLeft < minRight && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0) + case _: LessThanOrEqual => +(minLeft > maxRight, + maxLeft <= minRight && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0) + + // Left > Right or Left >= Right + // - no overlap: + // minLeftmaxLeft minRight maxRight + // +--++-+---> + // - complete overlap: (If null values exists, we set it to partial overlap.) + // minRight maxRight minLeft maxLeft + // +--++-+---> + case _: GreaterThan => +(maxLeft <= minRight, + minLeft > maxRight && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0) + case _: GreaterThanOrEqual => +(maxLeft < minRight, + minLeft >= maxRight && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0) + + // Left = Right or Left <=> Right + // - no overlap: + // minLeftmaxLeft minRight maxRight + // +--++
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r109323394 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -550,6 +565,225 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator, including =, <=>, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of the given columns + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => +// TODO: It is difficult to support other binary comparisons for String/Binary +// type without min/max and advanced statistics like histogram. +logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) +return None + case _ => +} + +val colStatLeft = colStatsMap(attrLeft) +val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] +val maxLeft = BigDecimal(statsRangeLeft.max) +val minLeft = BigDecimal(statsRangeLeft.min) + +val colStatRight = colStatsMap(attrRight) +val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericRange] +val maxRight = BigDecimal(statsRangeRight.max) +val minRight = BigDecimal(statsRangeRight.min) + +// determine the overlapping degree between predicate range and column's range +val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + // Left < Right or Left <= Right + // - no overlap: + // minRight maxRight minLeft maxLeft + // +--++-+---> + // - complete overlap: (If null values exists, we set it to partial overlap.) + // minLeftmaxLeft minRight maxRight + // +--++-+---> + case _: LessThan => +(minLeft >= maxRight, + maxLeft < minRight && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0) + case _: LessThanOrEqual => +(minLeft > maxRight, + maxLeft <= minRight && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0) + + // Left > Right or Left >= Right + // - no overlap: + // minLeftmaxLeft minRight maxRight + // +--++-+---> + // - complete overlap: (If null values exists, we set it to partial overlap.) + // minRight maxRight minLeft maxLeft + // +--++-+---> + case _: GreaterThan => +(maxLeft <= minRight, + minLeft > maxRight && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0) + case _: GreaterThanOrEqual => +(maxLeft < minRight, + minLeft >= maxRight && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0) + + // Left = Right or Left <=> Right + // - no overlap: + // minLeftmaxLeft minRight maxRight + // +--++
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r109323397 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -550,6 +565,225 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator, including =, <=>, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of the given columns + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => +// TODO: It is difficult to support other binary comparisons for String/Binary +// type without min/max and advanced statistics like histogram. +logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) +return None + case _ => +} + +val colStatLeft = colStatsMap(attrLeft) +val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] +val maxLeft = BigDecimal(statsRangeLeft.max) +val minLeft = BigDecimal(statsRangeLeft.min) + +val colStatRight = colStatsMap(attrRight) +val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericRange] +val maxRight = BigDecimal(statsRangeRight.max) +val minRight = BigDecimal(statsRangeRight.min) + +// determine the overlapping degree between predicate range and column's range +val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + // Left < Right or Left <= Right + // - no overlap: + // minRight maxRight minLeft maxLeft + // +--++-+---> + // - complete overlap: (If null values exists, we set it to partial overlap.) + // minLeftmaxLeft minRight maxRight + // +--++-+---> + case _: LessThan => +(minLeft >= maxRight, + maxLeft < minRight && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0) --- End diff -- fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r109323376 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -550,6 +565,225 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator, including =, <=>, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of the given columns + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => +// TODO: It is difficult to support other binary comparisons for String/Binary +// type without min/max and advanced statistics like histogram. +logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) +return None + case _ => +} + +val colStatLeft = colStatsMap(attrLeft) +val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] +val maxLeft = BigDecimal(statsRangeLeft.max) +val minLeft = BigDecimal(statsRangeLeft.min) + +val colStatRight = colStatsMap(attrRight) +val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericRange] +val maxRight = BigDecimal(statsRangeRight.max) +val minRight = BigDecimal(statsRangeRight.min) + +// determine the overlapping degree between predicate range and column's range +val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + // Left < Right or Left <= Right + // - no overlap: + // minRight maxRight minLeft maxLeft + // +--++-+---> + // - complete overlap: (If null values exists, we set it to partial overlap.) + // minLeftmaxLeft minRight maxRight + // +--++-+---> + case _: LessThan => +(minLeft >= maxRight, + maxLeft < minRight && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0) + case _: LessThanOrEqual => +(minLeft > maxRight, + maxLeft <= minRight && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0) + + // Left > Right or Left >= Right + // - no overlap: + // minLeftmaxLeft minRight maxRight + // +--++-+---> + // - complete overlap: (If null values exists, we set it to partial overlap.) + // minRight maxRight minLeft maxLeft + // +--++-+---> + case _: GreaterThan => +(maxLeft <= minRight, + minLeft > maxRight && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0) + case _: GreaterThanOrEqual => +(maxLeft < minRight, + minLeft >= maxRight && colStatLeft.nullCount == 0 && colStatRight.nullCount == 0) + + // Left = Right or Left <=> Right + // - no overlap: + // minLeftmaxLeft minRight maxRight + // +--++
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r109293607 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -550,6 +565,220 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator, including =, <=>, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of the given columns + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => +// TODO: It is difficult to support other binary comparisons for String/Binary +// type without min/max and advanced statistics like histogram. +logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) +return None + case _ => +} + +val colStatLeft = colStatsMap(attrLeft) +val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] +val maxLeft = BigDecimal(statsRangeLeft.max) +val minLeft = BigDecimal(statsRangeLeft.min) + +val colStatRight = colStatsMap(attrRight) +val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericRange] +val maxRight = BigDecimal(statsRangeRight.max) +val minRight = BigDecimal(statsRangeRight.min) + +// determine the overlapping degree between predicate range and column's range +val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + // Left < Right or Left <= Right + // - no overlap: + // minRight maxRight minLeft maxLeft + // 0 --+--++-+---> + // - complete overlap: + // minLeftmaxLeft minRight maxRight + // 0 --+--++-+---> + case _: LessThan => +(minLeft >= maxRight, maxLeft < minRight) + case _: LessThanOrEqual => +(minLeft > maxRight, maxLeft <= minRight) + + // Left > Right or Left >= Right + // - no overlap: + // minLeftmaxLeft minRight maxRight + // 0 --+--++-+---> + // - complete overlap: + // minRight maxRight minLeft maxLeft --- End diff -- Good point. fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r109273331 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -550,6 +565,140 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator such as =, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of the given columns + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => +// TODO: It is difficult to support other binary comparisons for String/Binary +// type without min/max and advanced statistics like histogram. +logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) +return None + case _ => +} + +val colStatLeft = colStatsMap(attrLeft) +val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] +val maxLeft = BigDecimal(statsRangeLeft.max) +val minLeft = BigDecimal(statsRangeLeft.min) +val ndvLeft = BigDecimal(colStatLeft.distinctCount) + +val colStatRight = colStatsMap(attrRight) +val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericRange] +val maxRight = BigDecimal(statsRangeRight.max) +val minRight = BigDecimal(statsRangeRight.min) +val ndvRight = BigDecimal(colStatRight.distinctCount) + +// determine the overlapping degree between predicate range and column's range +val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + case _: LessThan => +(minLeft >= maxRight, maxLeft < minRight) + case _: LessThanOrEqual => +(minLeft > maxRight, maxLeft <= minRight) + case _: GreaterThan => +(maxLeft <= minRight, minLeft > maxRight) + case _: GreaterThanOrEqual => +(maxLeft < minRight, minLeft >= maxRight) + case _: EqualTo => +((maxLeft < minRight) || (maxRight < minLeft), + (minLeft == minRight) && (maxLeft == maxRight)) + case _: EqualNullSafe => +// For null-safe equality, we use a very restrictive condition to evaluate its overlap. +// If null values exists, we set it to partial overlap. +(((maxLeft < minRight) || (maxRight < minLeft)) +&& colStatLeft.nullCount == 0 && colStatRight.nullCount == 0, + ((minLeft == minRight) && (maxLeft == maxRight)) +&& colStatLeft.nullCount == 0 && colStatRight.nullCount == 0 +) +} + +var percent = BigDecimal(1.0) +if (noOverlap) { + percent = 0.0 +} else if (completeOverlap) { + percent = 1.0 +} else { + // For partial overlap, we use an empirical value 1/3 as suggested by the book + // "Database Systems, the complete book". + percent = 1.0/3.0 + + if (update) { +// Need to adjust new min/max after the filter condition is applied + +val ndvLeft = BigDecimal(colStatLeft.distinctCount) +var newNdvLeft = (ndvLeft * percent).setScale(0, RoundingMode.HALF_UP).toBigInt() +if (newNdvLeft < 1) new
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r109273334 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -550,6 +565,140 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator such as =, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of the given columns + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => +// TODO: It is difficult to support other binary comparisons for String/Binary +// type without min/max and advanced statistics like histogram. +logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) +return None + case _ => +} + +val colStatLeft = colStatsMap(attrLeft) +val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] +val maxLeft = BigDecimal(statsRangeLeft.max) +val minLeft = BigDecimal(statsRangeLeft.min) +val ndvLeft = BigDecimal(colStatLeft.distinctCount) + +val colStatRight = colStatsMap(attrRight) +val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericRange] +val maxRight = BigDecimal(statsRangeRight.max) +val minRight = BigDecimal(statsRangeRight.min) +val ndvRight = BigDecimal(colStatRight.distinctCount) + +// determine the overlapping degree between predicate range and column's range +val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + case _: LessThan => +(minLeft >= maxRight, maxLeft < minRight) + case _: LessThanOrEqual => +(minLeft > maxRight, maxLeft <= minRight) + case _: GreaterThan => +(maxLeft <= minRight, minLeft > maxRight) + case _: GreaterThanOrEqual => +(maxLeft < minRight, minLeft >= maxRight) + case _: EqualTo => +((maxLeft < minRight) || (maxRight < minLeft), + (minLeft == minRight) && (maxLeft == maxRight)) + case _: EqualNullSafe => +// For null-safe equality, we use a very restrictive condition to evaluate its overlap. +// If null values exists, we set it to partial overlap. +(((maxLeft < minRight) || (maxRight < minLeft)) +&& colStatLeft.nullCount == 0 && colStatRight.nullCount == 0, + ((minLeft == minRight) && (maxLeft == maxRight)) +&& colStatLeft.nullCount == 0 && colStatRight.nullCount == 0 +) +} + +var percent = BigDecimal(1.0) +if (noOverlap) { + percent = 0.0 +} else if (completeOverlap) { + percent = 1.0 +} else { + // For partial overlap, we use an empirical value 1/3 as suggested by the book + // "Database Systems, the complete book". + percent = 1.0/3.0 + + if (update) { +// Need to adjust new min/max after the filter condition is applied + +val ndvLeft = BigDecimal(colStatLeft.distinctCount) +var newNdvLeft = (ndvLeft * percent).setScale(0, RoundingMode.HALF_UP).toBigInt() +if (newNdvLeft < 1) new
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r109273326 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -550,6 +565,140 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator such as =, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of the given columns + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => +// TODO: It is difficult to support other binary comparisons for String/Binary +// type without min/max and advanced statistics like histogram. +logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) +return None + case _ => +} + +val colStatLeft = colStatsMap(attrLeft) +val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] +val maxLeft = BigDecimal(statsRangeLeft.max) +val minLeft = BigDecimal(statsRangeLeft.min) +val ndvLeft = BigDecimal(colStatLeft.distinctCount) + +val colStatRight = colStatsMap(attrRight) +val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericRange] +val maxRight = BigDecimal(statsRangeRight.max) +val minRight = BigDecimal(statsRangeRight.min) +val ndvRight = BigDecimal(colStatRight.distinctCount) + +// determine the overlapping degree between predicate range and column's range +val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + case _: LessThan => +(minLeft >= maxRight, maxLeft < minRight) + case _: LessThanOrEqual => +(minLeft > maxRight, maxLeft <= minRight) + case _: GreaterThan => +(maxLeft <= minRight, minLeft > maxRight) + case _: GreaterThanOrEqual => +(maxLeft < minRight, minLeft >= maxRight) + case _: EqualTo => +((maxLeft < minRight) || (maxRight < minLeft), + (minLeft == minRight) && (maxLeft == maxRight)) + case _: EqualNullSafe => +// For null-safe equality, we use a very restrictive condition to evaluate its overlap. +// If null values exists, we set it to partial overlap. +(((maxLeft < minRight) || (maxRight < minLeft)) +&& colStatLeft.nullCount == 0 && colStatRight.nullCount == 0, + ((minLeft == minRight) && (maxLeft == maxRight)) +&& colStatLeft.nullCount == 0 && colStatRight.nullCount == 0 +) +} + +var percent = BigDecimal(1.0) +if (noOverlap) { + percent = 0.0 +} else if (completeOverlap) { + percent = 1.0 +} else { + // For partial overlap, we use an empirical value 1/3 as suggested by the book + // "Database Systems, the complete book". + percent = 1.0/3.0 + + if (update) { +// Need to adjust new min/max after the filter condition is applied + +val ndvLeft = BigDecimal(colStatLeft.distinctCount) +var newNdvLeft = (ndvLeft * percent).setScale(0, RoundingMode.HALF_UP).toBigInt() +if (newNdvLeft < 1) new
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r109268076 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -550,6 +565,140 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator such as =, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of the given columns + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => +// TODO: It is difficult to support other binary comparisons for String/Binary +// type without min/max and advanced statistics like histogram. +logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) +return None + case _ => +} + +val colStatLeft = colStatsMap(attrLeft) +val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] +val maxLeft = BigDecimal(statsRangeLeft.max) +val minLeft = BigDecimal(statsRangeLeft.min) +val ndvLeft = BigDecimal(colStatLeft.distinctCount) + +val colStatRight = colStatsMap(attrRight) +val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericRange] +val maxRight = BigDecimal(statsRangeRight.max) +val minRight = BigDecimal(statsRangeRight.min) +val ndvRight = BigDecimal(colStatRight.distinctCount) + +// determine the overlapping degree between predicate range and column's range +val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + case _: LessThan => --- End diff -- The graphical comments are helpful. Fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r109267675 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -550,6 +565,140 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator such as =, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of the given columns + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => +// TODO: It is difficult to support other binary comparisons for String/Binary +// type without min/max and advanced statistics like histogram. +logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) +return None + case _ => +} + +val colStatLeft = colStatsMap(attrLeft) +val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] +val maxLeft = BigDecimal(statsRangeLeft.max) +val minLeft = BigDecimal(statsRangeLeft.min) +val ndvLeft = BigDecimal(colStatLeft.distinctCount) --- End diff -- Good catch. fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17415: [SPARK-19408][SQL] filter estimation on two columns of s...
Github user ron8hu commented on the issue: https://github.com/apache/spark/pull/17415 cc @sameeragarwal @cloud-fan @gatorsmile @wzhfy After a few round of changes and commits, this PR should be in good shape. If we can include in Spark 2.2, then it can help tpc-h queries. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r108754614 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -515,8 +530,138 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator such as =, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of a given column + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => +// TODO: It is difficult to support other binary comparisons for String/Binary +// type without min/max and advanced statistics like histogram. +logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) +return None + case _ => +} + +val colStatLeft = colStatsMap(attrLeft) +val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] +val maxLeft = BigDecimal(statsRangeLeft.max) +val minLeft = BigDecimal(statsRangeLeft.min) +val ndvLeft = BigDecimal(colStatLeft.distinctCount) + +val colStatRight = colStatsMap(attrRight) +val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericRange] +val maxRight = BigDecimal(statsRangeRight.max) +val minRight = BigDecimal(statsRangeRight.min) +val ndvRight = BigDecimal(colStatRight.distinctCount) + +// determine the overlapping degree between predicate range and column's range +val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + case _: EqualTo => +((maxLeft < minRight) || (maxRight < minLeft), + (minLeft == minRight) && (maxLeft == maxRight)) + case _: LessThan => +(minLeft >= maxRight, maxLeft <= minRight) + case _: LessThanOrEqual => +(minLeft >= maxRight, maxLeft <= minRight) + case _: GreaterThan => +(maxLeft <= minRight, minLeft >= maxRight) --- End diff -- I think it should be (maxLeft < minRight, minLeft > maxRight) For no-overlap case, the condition should be "maxLeft < minRight" because we do not want any intersection point. It is a little bit complex. This best way is to draw a diagram to show its relationship. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r108753830 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -515,8 +530,138 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator such as =, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of a given column + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => +// TODO: It is difficult to support other binary comparisons for String/Binary +// type without min/max and advanced statistics like histogram. +logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) +return None + case _ => +} + +val colStatLeft = colStatsMap(attrLeft) +val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] +val maxLeft = BigDecimal(statsRangeLeft.max) +val minLeft = BigDecimal(statsRangeLeft.min) +val ndvLeft = BigDecimal(colStatLeft.distinctCount) + +val colStatRight = colStatsMap(attrRight) +val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericRange] +val maxRight = BigDecimal(statsRangeRight.max) +val minRight = BigDecimal(statsRangeRight.min) +val ndvRight = BigDecimal(colStatRight.distinctCount) + +// determine the overlapping degree between predicate range and column's range +val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + case _: EqualTo => +((maxLeft < minRight) || (maxRight < minLeft), + (minLeft == minRight) && (maxLeft == maxRight)) + case _: LessThan => +(minLeft >= maxRight, maxLeft <= minRight) --- End diff -- I think it should be (minLeft > maxRight, maxLeft < minRight) For no-overlap case, the condition should be "minLeft > maxRight" because we do not want any intersection point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r108752975 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -515,8 +530,138 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator such as =, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of a given column + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => +// TODO: It is difficult to support other binary comparisons for String/Binary +// type without min/max and advanced statistics like histogram. +logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) +return None + case _ => +} + +val colStatLeft = colStatsMap(attrLeft) +val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] +val maxLeft = BigDecimal(statsRangeLeft.max) +val minLeft = BigDecimal(statsRangeLeft.min) +val ndvLeft = BigDecimal(colStatLeft.distinctCount) + +val colStatRight = colStatsMap(attrRight) +val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericRange] +val maxRight = BigDecimal(statsRangeRight.max) +val minRight = BigDecimal(statsRangeRight.min) +val ndvRight = BigDecimal(colStatRight.distinctCount) + +// determine the overlapping degree between predicate range and column's range +val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + case _: EqualTo => +((maxLeft < minRight) || (maxRight < minLeft), + (minLeft == minRight) && (maxLeft == maxRight)) + case _: LessThan => +(minLeft >= maxRight, maxLeft <= minRight) + case _: LessThanOrEqual => +(minLeft >= maxRight, maxLeft <= minRight) + case _: GreaterThan => +(maxLeft <= minRight, minLeft >= maxRight) + case _: GreaterThanOrEqual => +(maxLeft < minRight, minLeft > maxRight) --- End diff -- Good catch. Fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r108751882 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -515,8 +530,138 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator such as =, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of a given column + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => +// TODO: It is difficult to support other binary comparisons for String/Binary +// type without min/max and advanced statistics like histogram. +logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) +return None + case _ => +} + +val colStatLeft = colStatsMap(attrLeft) +val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] +val maxLeft = BigDecimal(statsRangeLeft.max) +val minLeft = BigDecimal(statsRangeLeft.min) +val ndvLeft = BigDecimal(colStatLeft.distinctCount) + +val colStatRight = colStatsMap(attrRight) +val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericRange] +val maxRight = BigDecimal(statsRangeRight.max) +val minRight = BigDecimal(statsRangeRight.min) +val ndvRight = BigDecimal(colStatRight.distinctCount) + +// determine the overlapping degree between predicate range and column's range +val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + case _: EqualTo => +((maxLeft < minRight) || (maxRight < minLeft), + (minLeft == minRight) && (maxLeft == maxRight)) + case _: LessThan => +(minLeft >= maxRight, maxLeft <= minRight) + case _: LessThanOrEqual => +(minLeft >= maxRight, maxLeft <= minRight) --- End diff -- Good catch. Fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r108583109 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -515,8 +530,135 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator such as =, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of a given column + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => --- End diff -- The current code is written in such a way that we do not have too deep indentation. Some engineers do not like deep indentation as they often put screen monitor vertically. Let's handle it when the need occurs. I think, with good test case coverage, we will be able to catch anything we miss. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r108582594 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -515,8 +530,135 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator such as =, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of a given column + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => +// TODO: It is difficult to support other binary comparisons for String/Binary +// type without min/max and advanced statistics like histogram. +logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) +return None + case _ => +} + +val colStatLeft = colStatsMap(attrLeft) +val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] +val maxLeft = BigDecimal(statsRangeLeft.max) +val minLeft = BigDecimal(statsRangeLeft.min) +val ndvLeft = BigDecimal(colStatLeft.distinctCount) + +val colStatRight = colStatsMap(attrRight) +val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericRange] +val maxRight = BigDecimal(statsRangeRight.max) +val minRight = BigDecimal(statsRangeRight.min) +val ndvRight = BigDecimal(colStatRight.distinctCount) + +// determine the overlapping degree between predicate range and column's range +val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + case _: EqualTo => +((maxLeft < minRight) || (maxRight < minLeft), + (minLeft == minRight) && (maxLeft == maxRight)) + case _: LessThan => +(minLeft >= maxRight, maxLeft <= minRight) + case _: LessThanOrEqual => +(minLeft >= maxRight, maxLeft <= minRight) + case _: GreaterThan => +(maxLeft <= minRight, minLeft >= maxRight) + case _: GreaterThanOrEqual => +(maxLeft < minRight, minLeft > maxRight) +} --- End diff -- Good catch. Fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r108582540 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -509,8 +524,131 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator such as =, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of a given column + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => +// TODO: It is difficult to support other binary comparisons for String/Binary +// type without min/max and advanced statistics like histogram. +logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) +return None + case _ => +} + +val colStatLeft = colStatsMap(attrLeft) +val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] +val maxLeft = BigDecimal(statsRangeLeft.max) +val minLeft = BigDecimal(statsRangeLeft.min) +val ndvLeft = BigDecimal(colStatLeft.distinctCount) + +val colStatRight = colStatsMap(attrRight) +val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericRange] +val maxRight = BigDecimal(statsRangeRight.max) +val minRight = BigDecimal(statsRangeRight.min) +val ndvRight = BigDecimal(colStatRight.distinctCount) + +// determine the overlapping degree between predicate range and column's range +val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + case _: EqualTo => --- End diff -- I just revised the code to handle EqualNullSafe separately from EqualTo. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17446: [SPARK-17075][SQL][followup] Add Estimation of Constant ...
Github user ron8hu commented on the issue: https://github.com/apache/spark/pull/17446 The logic is straightforward. LGTM. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r108308949 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -509,8 +524,131 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator such as =, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of a given column + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => +// TODO: It is difficult to support other binary comparisons for String/Binary +// type without min/max and advanced statistics like histogram. +logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) +return None + case _ => +} + +val colStatLeft = colStatsMap(attrLeft) +val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] +val maxLeft = BigDecimal(statsRangeLeft.max) +val minLeft = BigDecimal(statsRangeLeft.min) +val ndvLeft = BigDecimal(colStatLeft.distinctCount) + +val colStatRight = colStatsMap(attrRight) +val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericRange] +val maxRight = BigDecimal(statsRangeRight.max) +val minRight = BigDecimal(statsRangeRight.min) +val ndvRight = BigDecimal(colStatRight.distinctCount) + +// determine the overlapping degree between predicate range and column's range +val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + case _: EqualTo => +((maxLeft < minRight) || (maxRight < minLeft), + (minLeft == minRight) && (maxLeft == maxRight)) + case _: LessThan => +(minLeft >= maxRight, maxLeft <= minRight) + case _: LessThanOrEqual => +(minLeft >= maxRight, maxLeft <= minRight) + case _: GreaterThan => +(maxLeft <= minRight, minLeft >= maxRight) + case _: GreaterThanOrEqual => +(maxLeft < minRight, minLeft > maxRight) +} + +var percent = BigDecimal(1.0) +if (noOverlap) { + percent = 0.0 +} else if (completeOverlap) { + percent = 1.0 +} else { + // For partial overlap, we use an empirical value 1/3 as suggested by the book + // "Database Systems, the complete book". + percent = 1.0/3.0 + + if (update) { +// Need to adjust new min/max after the filter condition is applied + +val ndv = BigDecimal(colStatLeft.distinctCount) +var newNdv = (ndv * percent).setScale(0, RoundingMode.HALF_UP).toBigInt() --- End diff -- Good point. Fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r108307672 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala --- @@ -381,7 +461,22 @@ class FilterEstimationSuite extends StatsEstimationTestBase { sizeInBytes = getOutputSize(filter.output, expectedRowCount, expectedAttributeMap), rowCount = Some(expectedRowCount), attributeStats = expectedAttributeMap) - assert(filter.stats(conf) == expectedStats) + + val filterStats = filter.stats(conf) + assert(filterStats.sizeInBytes == expectedStats.sizeInBytes) + assert(filterStats.rowCount == expectedStats.rowCount) + val rowCountValue = filterStats.rowCount.getOrElse(0) + // check the output column stats if the row count is > 0. + // When row count is 0, the output is set to empty. + if (rowCountValue != 0) { +// Need to check attributeStats one by one because we may have multiple output columns. +// Due to update operation, the output columns may be in different order. +expectedColStats.foreach { kv => + val filterColumnStat = filterStats.attributeStats.get(kv._1).get + assert(filterColumnStat == kv._2) +} + } + // assert(filter.stats(conf) == expectedStats) --- End diff -- My bad. I should remove the line that has been commented out. This line is replaced by the following code: if (rowCountValue != 0) { // Need to check attributeStats one by one because we may have multiple output columns. // Due to update operation, the output columns may be in different order. expectedColStats.foreach { kv => val filterColumnStat = filterStats.attributeStats.get(kv._1).get assert(filterColumnStat == kv._2) } --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r108307490 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala --- @@ -381,7 +461,22 @@ class FilterEstimationSuite extends StatsEstimationTestBase { sizeInBytes = getOutputSize(filter.output, expectedRowCount, expectedAttributeMap), rowCount = Some(expectedRowCount), attributeStats = expectedAttributeMap) - assert(filter.stats(conf) == expectedStats) + + val filterStats = filter.stats(conf) + assert(filterStats.sizeInBytes == expectedStats.sizeInBytes) + assert(filterStats.rowCount == expectedStats.rowCount) + val rowCountValue = filterStats.rowCount.getOrElse(0) + // check the output column stats if the row count is > 0. + // When row count is 0, the output is set to empty. + if (rowCountValue != 0) { +// Need to check attributeStats one by one because we may have multiple output columns. +// Due to update operation, the output columns may be in different order. +expectedColStats.foreach { kv => + val filterColumnStat = filterStats.attributeStats.get(kv._1).get + assert(filterColumnStat == kv._2) +} + } + // assert(filter.stats(conf) == expectedStats) --- End diff -- My bad. I should remove the line that has been commented out. This line is replaced by the following code: if (rowCountValue != 0) { // Need to check attributeStats one by one because we may have multiple output columns. // Due to update operation, the output columns may be in different order. expectedColStats.foreach { kv => val filterColumnStat = filterStats.attributeStats.get(kv._1).get assert(filterColumnStat == kv._2) } --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17415#discussion_r108246637 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -509,8 +524,131 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(percent.toDouble) } + /** + * Returns a percentage of rows meeting a binary comparison expression containing two columns. + * In SQL queries, we also see predicate expressions involving two columns + * such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. + * Note that, if column-1 and column-2 belong to different tables, then it is a join + * operator's work, NOT a filter operator's work. + * + * @param op a binary comparison operator such as =, <, <=, >, >= + * @param attrLeft the left Attribute (or a column) + * @param attrRight the right Attribute (or a column) + * @param update a boolean flag to specify if we need to update ColumnStat of a given column + * for subsequent conditions + * @return an optional double value to show the percentage of rows meeting a given condition + */ + def evaluateBinaryForTwoColumns( + op: BinaryComparison, + attrLeft: Attribute, + attrRight: Attribute, + update: Boolean): Option[Double] = { + +if (!colStatsMap.contains(attrLeft)) { + logDebug("[CBO] No statistics for " + attrLeft) + return None +} +if (!colStatsMap.contains(attrRight)) { + logDebug("[CBO] No statistics for " + attrRight) + return None +} + +attrLeft.dataType match { + case StringType | BinaryType => +// TODO: It is difficult to support other binary comparisons for String/Binary +// type without min/max and advanced statistics like histogram. +logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) +return None + case _ => +} + +val colStatLeft = colStatsMap(attrLeft) +val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType) + .asInstanceOf[NumericRange] +val maxLeft = BigDecimal(statsRangeLeft.max) +val minLeft = BigDecimal(statsRangeLeft.min) +val ndvLeft = BigDecimal(colStatLeft.distinctCount) + +val colStatRight = colStatsMap(attrRight) +val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType) + .asInstanceOf[NumericRange] +val maxRight = BigDecimal(statsRangeRight.max) +val minRight = BigDecimal(statsRangeRight.min) +val ndvRight = BigDecimal(colStatRight.distinctCount) + +// determine the overlapping degree between predicate range and column's range +val (noOverlap: Boolean, completeOverlap: Boolean) = op match { + case _: EqualTo => --- End diff -- The left side has a range [minLeft, maxLeft]. The right side has a range [minRight, maxRight]. If the range overlaps, then we assume that they have a complete overlap. Without detailed/advanced statistics, this is the best estimate we can get. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17415: [SPARK-19408][SQL] filter estimation on two columns of s...
Github user ron8hu commented on the issue: https://github.com/apache/spark/pull/17415 cc @sameeragarwal @cloud-fan @gatorsmile This Jira is not on Spark 2.2 blocker list. If time permits, we can include it in Spark 2.2. If not, we can wait for a maintenance release. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...
GitHub user ron8hu opened a pull request: https://github.com/apache/spark/pull/17415 [SPARK-19408][SQL] filter estimation on two columns of same table ## What changes were proposed in this pull request? In SQL queries, we also see predicate expressions involving two columns such as "column-1 (op) column-2" where column-1 and column-2 belong to same table. Note that, if column-1 and column-2 belong to different tables, then it is a join operator's work, NOT a filter operator's work. This PR estimates filter selectivity on two columns of same table. ## How was this patch tested? We added 6 new test cases to test various logical predicates involving two columns of same table. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ron8hu/spark filterTwoColumns Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17415.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17415 commit 893066905b690c78a127eae58b908dff1dabf7cf Author: Ron Hu <ron...@huawei.com> Date: 2017-03-24T20:31:35Z filter estimation on two columns of same table --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15363: [SPARK-17791][SQL] Join reordering using star sch...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/15363#discussion_r106766281 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -20,19 +20,347 @@ package org.apache.spark.sql.catalyst.optimizer import scala.annotation.tailrec import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins +import org.apache.spark.sql.catalyst.planning.{ExtractFiltersAndInnerJoins, PhysicalOperation} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.catalyst.CatalystConf + +/** + * Encapsulates star-schema join detection. + */ +case class StarSchemaDetection(conf: CatalystConf) extends PredicateHelper { + + /** + * Star schema consists of one or more fact tables referencing a number of dimension + * tables. In general, star-schema joins are detected using the following conditions: + * 1. Informational RI constraints (reliable detection) + *+ Dimension contains a primary key that is being joined to the fact table. + *+ Fact table contains foreign keys referencing multiple dimension tables. + * 2. Cardinality based heuristics + *+ Usually, the table with the highest cardinality is the fact table. + *+ Table being joined with the most number of tables is the fact table. + * + * To detect star joins, the algorithm uses a combination of the above two conditions. + * The fact table is chosen based on the cardinality heuristics, and the dimension + * tables are chosen based on the RI constraints. A star join will consist of the largest + * fact table joined with the dimension tables on their primary keys. To detect that a + * column is a primary key, the algorithm uses table and column statistics. + * + * Since Catalyst only supports left-deep tree plans, the algorithm currently returns only + * the star join with the largest fact table. Choosing the largest fact table on the + * driving arm to avoid large inners is in general a good heuristic. This restriction can + * be lifted with support for bushy tree plans. + * + * The highlights of the algorithm are the following: + * + * Given a set of joined tables/plans, the algorithm first verifies if they are eligible + * for star join detection. An eligible plan is a base table access with valid statistics. + * A base table access represents Project or Filter operators above a LeafNode. Conservatively, + * the algorithm only considers base table access as part of a star join since they provide + * reliable statistics. + * + * If some of the plans are not base table access, or statistics are not available, the algorithm + * returns an empty star join plan since, in the absence of statistics, it cannot make + * good planning decisions. Otherwise, the algorithm finds the table with the largest cardinality + * (number of rows), which is assumed to be a fact table. + * + * Next, it computes the set of dimension tables for the current fact table. A dimension table + * is assumed to be in a RI relationship with a fact table. To infer column uniqueness, + * the algorithm compares the number of distinct values with the total number of rows in the + * table. If their relative difference is within certain limits (i.e. ndvMaxError * 2, adjusted + * based on 1TB TPC-DS data), the column is assumed to be unique. + */ + def findStarJoins( + input: Seq[LogicalPlan], + conditions: Seq[Expression]): Seq[Seq[LogicalPlan]] = { + +val emptyStarJoinPlan = Seq.empty[Seq[LogicalPlan]] + +if (!conf.starSchemaDetection || input.size < 2) { + emptyStarJoinPlan +} else { --- End diff -- } else if (conf.joinReorderEnabled) { emptyStarJoinPlan } else { When both configuration parameters joinReorderEnabled and starSchemaDetection are true, we want to avoid performing join reorder twice. There is no added value to perform join reorder twice. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15363: [SPARK-17791][SQL] Join reordering using star sch...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/15363#discussion_r106285527 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -51,6 +51,11 @@ case class CostBasedJoinReorder(conf: CatalystConf) extends Rule[LogicalPlan] wi def reorder(plan: LogicalPlan, output: AttributeSet): LogicalPlan = { val (items, conditions) = extractInnerJoins(plan) +// Find the star schema joins. Currently, it returns the star join with the largest +// fact table. In the future, it can return more than one star join (e.g. F1-D1-D2 +// and F2-D3-D4). +val starJoinPlans = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) --- End diff -- @ioana-delaney Thanks for the pointer. We had a good exchange to clarify our points. We definitely need a joint community team effort to improve Spark's cost based optimizer in the future. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15363: [SPARK-17791][SQL] Join reordering using star sch...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/15363#discussion_r106271250 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -51,6 +51,11 @@ case class CostBasedJoinReorder(conf: CatalystConf) extends Rule[LogicalPlan] wi def reorder(plan: LogicalPlan, output: AttributeSet): LogicalPlan = { val (items, conditions) = extractInnerJoins(plan) +// Find the star schema joins. Currently, it returns the star join with the largest +// fact table. In the future, it can return more than one star join (e.g. F1-D1-D2 +// and F2-D3-D4). +val starJoinPlans = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) --- End diff -- @ioana-delaney Thanks for sharing your thought. This is helpful. As of today, Spark's join reorder computation is still at its early stage. Your comments above can serve as a guideline for future enhancement. Can you point to a paper/article for your optimization idea? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15363: [SPARK-17791][SQL] Join reordering using star sch...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/15363#discussion_r106237955 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -51,6 +51,11 @@ case class CostBasedJoinReorder(conf: CatalystConf) extends Rule[LogicalPlan] wi def reorder(plan: LogicalPlan, output: AttributeSet): LogicalPlan = { val (items, conditions) = extractInnerJoins(plan) +// Find the star schema joins. Currently, it returns the star join with the largest +// fact table. In the future, it can return more than one star join (e.g. F1-D1-D2 +// and F2-D3-D4). +val starJoinPlans = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) --- End diff -- @gatorsmile @ioana-delaney Thank you for your replies. My main point is to identify the deficiency of the DP algorithm so that we can make improvement. Since you are familiar with DP algorithm, can you help us identify its deficiency/limitations so that we can improve it? One deficiency the DP algorithm has is the explosion of the search space when there is a large number of join relations such as >30. In CostBasedJoinReorder, we do not optimize join order if the number of join relations is greater than the threshold value joinReorderDPThreshold. I think this is a place star join reorder algorithm can help. This is because it defaults to left-deep tree which is a smaller search space. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15363: [SPARK-17791][SQL] Join reordering using star sch...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/15363#discussion_r106084556 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -51,6 +51,11 @@ case class CostBasedJoinReorder(conf: CatalystConf) extends Rule[LogicalPlan] wi def reorder(plan: LogicalPlan, output: AttributeSet): LogicalPlan = { val (items, conditions) = extractInnerJoins(plan) +// Find the star schema joins. Currently, it returns the star join with the largest +// fact table. In the future, it can return more than one star join (e.g. F1-D1-D2 +// and F2-D3-D4). +val starJoinPlans = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) --- End diff -- As discussed earlier, we only need to perform join reorder algorithm once. CostBasedJoinReorder implemented Dynamic Programming algorithm published in the classic paper "Access Path Selection in a relational database system" by Patricia Selinger. The same algorithm was used in PostgreSQL. To my understanding, it is a generic algorithm that can work on both star schema and non-star schema. For example, it is capable to generate a bushy tree if it is optimal. That is it is not limited to left-deep tree only. I suggest that we identify the strength of the star join reorder algorithm and it can help solve the deficiency of the dynamic programming algorithm. Then we add the necessary code to address the deficiency. There is no need to add code that does the same job twice without added value. Perhaps running TPC-ds benchmark queries and inspecting the generated query plan can help us identify the strength and weakness of both algorithms. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17138: [SPARK-17080] [SQL] join reorder
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17138#discussion_r104760579 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala --- @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, PredicateHelper} +import org.apache.spark.sql.catalyst.plans.{Inner, InnerLike} +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule + + +/** + * Cost-based join reorder. + * We may have several join reorder algorithms in the future. This class is the entry of these + * algorithms, and chooses which one to use. + */ +case class CostBasedJoinReorder(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper { + def apply(plan: LogicalPlan): LogicalPlan = { +if (!conf.cboEnabled || !conf.joinReorderEnabled) { + plan +} else { + plan transform { +case p @ Project(projectList, j @ Join(_, _, _: InnerLike, _)) if !j.ordered => + reorder(j, p.outputSet) +case j @ Join(_, _, _: InnerLike, _) if !j.ordered => + reorder(j, j.outputSet) + } +} + } + + def reorder(plan: LogicalPlan, output: AttributeSet): LogicalPlan = { +val (items, conditions) = extractInnerJoins(plan) +val result = + if (items.size > 2 && items.size <= conf.joinReorderDPThreshold && conditions.nonEmpty) { +JoinReorderDP(conf, items, conditions, output).search().getOrElse(plan) + } else { +plan + } +// Set all inside joins ordered. +setOrdered(result) +result + } + + /** + * Extract inner joinable items and join conditions. + * This method works for bushy trees and left/right deep trees. + */ + def extractInnerJoins(plan: LogicalPlan): (Seq[LogicalPlan], Set[Expression]) = plan match { +case j @ Join(left, right, _: InnerLike, cond) => + val (leftPlans, leftConditions) = extractInnerJoins(left) + val (rightPlans, rightConditions) = extractInnerJoins(right) + (leftPlans ++ rightPlans, cond.map(splitConjunctivePredicates).getOrElse(Nil).toSet ++ +leftConditions ++ rightConditions) +case Project(_, j @ Join(left, right, _: InnerLike, cond)) => + val (leftPlans, leftConditions) = extractInnerJoins(left) + val (rightPlans, rightConditions) = extractInnerJoins(right) + (leftPlans ++ rightPlans, cond.map(splitConjunctivePredicates).getOrElse(Nil).toSet ++ +leftConditions ++ rightConditions) +case _ => + (Seq(plan), Set()) + } + + def setOrdered(plan: LogicalPlan): Unit = plan match { +case j @ Join(left, right, _: InnerLike, cond) => + j.ordered = true + setOrdered(left) + setOrdered(right) +case Project(_, j @ Join(left, right, _: InnerLike, cond)) => + j.ordered = true + setOrdered(left) + setOrdered(right) +case _ => + } +} + +/** + * Reorder the joins using a dynamic programming algorithm: --- End diff -- @hvanhovell We had a meeting with Sameer and Wenchen on 2/21/2017. We did not meet you as you were not in San Francisco office on that day. In the meeting, we agreed to have a good join reorder algorithm implemented in CBO's first release as long as the algorithm has a good reference base. We can improve the join reorder algorithm later in CBO's second release. After all, we run short of time for Spark 2.2. We decided to use the algorithm in Selinger's paper. For CBO's first release, w
[GitHub] spark pull request #17148: [SPARK-17075][SQL][followup] fix filter estimatio...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17148#discussion_r104557770 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -90,32 +95,43 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo def calculateFilterSelectivity(condition: Expression, update: Boolean = true): Option[Double] = { condition match { case And(cond1, cond2) => -// For ease of debugging, we compute percent1 and percent2 in 2 statements. -val percent1 = calculateFilterSelectivity(cond1, update) -val percent2 = calculateFilterSelectivity(cond2, update) -(percent1, percent2) match { - case (Some(p1), Some(p2)) => Some(p1 * p2) - case (Some(p1), None) => Some(p1) - case (None, Some(p2)) => Some(p2) - case (None, None) => None -} +val percent1 = calculateFilterSelectivity(cond1, update).getOrElse(1.0) +val percent2 = calculateFilterSelectivity(cond2, update).getOrElse(1.0) +Some(percent1 * percent2) case Or(cond1, cond2) => -// For ease of debugging, we compute percent1 and percent2 in 2 statements. -val percent1 = calculateFilterSelectivity(cond1, update = false) -val percent2 = calculateFilterSelectivity(cond2, update = false) -(percent1, percent2) match { - case (Some(p1), Some(p2)) => Some(math.min(1.0, p1 + p2 - (p1 * p2))) - case (Some(p1), None) => Some(1.0) - case (None, Some(p2)) => Some(1.0) - case (None, None) => None +val percent1 = calculateFilterSelectivity(cond1, update = false).getOrElse(1.0) +val percent2 = calculateFilterSelectivity(cond2, update = false).getOrElse(1.0) +Some(percent1 + percent2 - (percent1 * percent2)) + + // For AND and OR conditions, we will estimate conservatively if one of two + // components is not supported, e.g. suppose c1 is not supported, + // then p(And(c1, c2)) = p(c2), and p(Or(c1, c2)) = 1.0. + // But once they are wrapped in NOT condition, then after 1 - p, it becomes + // under-estimation. So in these cases, we consider them as unsupported. + case Not(And(cond1, cond2)) => --- End diff -- The current code is fine. If we just call calculateSingleCondition for "case Not(And(cond1, cond2))", then it is too restrictive. The current code computes selectivity for only when we can get selectivity for both conditions. If we cannot get selectivity for either one or both, then we just return None. I think it is a clean solution. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17148: [SPARK-17075][SQL][followup] fix filter estimatio...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17148#discussion_r104557294 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -90,32 +95,43 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo def calculateFilterSelectivity(condition: Expression, update: Boolean = true): Option[Double] = { condition match { case And(cond1, cond2) => -// For ease of debugging, we compute percent1 and percent2 in 2 statements. -val percent1 = calculateFilterSelectivity(cond1, update) -val percent2 = calculateFilterSelectivity(cond2, update) -(percent1, percent2) match { - case (Some(p1), Some(p2)) => Some(p1 * p2) - case (Some(p1), None) => Some(p1) - case (None, Some(p2)) => Some(p2) - case (None, None) => None -} +val percent1 = calculateFilterSelectivity(cond1, update).getOrElse(1.0) +val percent2 = calculateFilterSelectivity(cond2, update).getOrElse(1.0) +Some(percent1 * percent2) case Or(cond1, cond2) => -// For ease of debugging, we compute percent1 and percent2 in 2 statements. -val percent1 = calculateFilterSelectivity(cond1, update = false) -val percent2 = calculateFilterSelectivity(cond2, update = false) -(percent1, percent2) match { - case (Some(p1), Some(p2)) => Some(math.min(1.0, p1 + p2 - (p1 * p2))) - case (Some(p1), None) => Some(1.0) - case (None, Some(p2)) => Some(1.0) - case (None, None) => None +val percent1 = calculateFilterSelectivity(cond1, update = false).getOrElse(1.0) +val percent2 = calculateFilterSelectivity(cond2, update = false).getOrElse(1.0) +Some(percent1 + percent2 - (percent1 * percent2)) + + // For AND and OR conditions, we will estimate conservatively if one of two + // components is not supported, e.g. suppose c1 is not supported, + // then p(And(c1, c2)) = p(c2), and p(Or(c1, c2)) = 1.0. + // But once they are wrapped in NOT condition, then after 1 - p, it becomes + // under-estimation. So in these cases, we consider them as unsupported. + case Not(And(cond1, cond2)) => +val p1 = calculateFilterSelectivity(cond1, update = false) +val p2 = calculateFilterSelectivity(cond2, update = false) +if (p1.isDefined && p2.isDefined) { + Some(1 - p1.get * p2.get) +} else { + None } - case Not(cond) => calculateFilterSelectivity(cond, update = false) match { -case Some(percent) => Some(1.0 - percent) -// for not-supported condition, set filter selectivity to a conservative estimate 100% -case None => None - } + case Not(Or(cond1, cond2)) => +val p1 = calculateFilterSelectivity(cond1, update = false) +val p2 = calculateFilterSelectivity(cond2, update = false) +if (p1.isDefined && p2.isDefined) { + Some(1 - (p1.get + p2.get - (p1.get * p2.get))) +} else { + None --- End diff -- This is good. We compute selectivity for "Not(Or(cond1, cond2))" only when we can get selectivity for both conditions. If we cannot get selectivity for either one or both, then we just return None. It is a clean solution. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17148: [SPARK-17075][SQL][followup] fix filter estimatio...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17148#discussion_r104527462 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala --- @@ -254,133 +270,118 @@ class FilterEstimationSuite extends StatsEstimationTestBase { val d20170104 = Date.valueOf("2017-01-04") val d20170105 = Date.valueOf("2017-01-05") validateEstimatedStats( - arDate, - Filter(In(arDate, Seq(Literal(d20170103), Literal(d20170104), Literal(d20170105))), -childStatsTestPlan(Seq(arDate), 10L)), - ColumnStat(distinctCount = 3, min = Some(d20170103), max = Some(d20170105), -nullCount = 0, avgLen = 4, maxLen = 4), - 3) + Filter(In(attrDate, Seq(Literal(d20170103), Literal(d20170104), Literal(d20170105))), +childStatsTestPlan(Seq(attrDate), 10L)), + Seq(attrDate -> ColumnStat(distinctCount = 3, min = Some(d20170103), max = Some(d20170105), +nullCount = 0, avgLen = 4, maxLen = 4)), + expectedRowCount = 3) } test("cdecimal = 0.40") { val dec_0_40 = new java.math.BigDecimal("0.40") validateEstimatedStats( - arDecimal, - Filter(EqualTo(arDecimal, Literal(dec_0_40)), -childStatsTestPlan(Seq(arDecimal), 4L)), - ColumnStat(distinctCount = 1, min = Some(dec_0_40), max = Some(dec_0_40), -nullCount = 0, avgLen = 8, maxLen = 8), - 1) + Filter(EqualTo(attrDecimal, Literal(dec_0_40)), +childStatsTestPlan(Seq(attrDecimal), 4L)), + Seq(attrDecimal -> ColumnStat(distinctCount = 1, min = Some(dec_0_40), max = Some(dec_0_40), +nullCount = 0, avgLen = 8, maxLen = 8)), + expectedRowCount = 1) } test("cdecimal < 0.60 ") { val dec_0_60 = new java.math.BigDecimal("0.60") validateEstimatedStats( - arDecimal, - Filter(LessThan(arDecimal, Literal(dec_0_60)), -childStatsTestPlan(Seq(arDecimal), 4L)), - ColumnStat(distinctCount = 3, min = Some(decMin), max = Some(dec_0_60), -nullCount = 0, avgLen = 8, maxLen = 8), - 3) + Filter(LessThan(attrDecimal, Literal(dec_0_60)), +childStatsTestPlan(Seq(attrDecimal), 4L)), + Seq(attrDecimal -> ColumnStat(distinctCount = 3, min = Some(decMin), max = Some(dec_0_60), +nullCount = 0, avgLen = 8, maxLen = 8)), + expectedRowCount = 3) } test("cdouble < 3.0") { validateEstimatedStats( - arDouble, - Filter(LessThan(arDouble, Literal(3.0)), childStatsTestPlan(Seq(arDouble), 10L)), - ColumnStat(distinctCount = 2, min = Some(1.0), max = Some(3.0), -nullCount = 0, avgLen = 8, maxLen = 8), - 3) + Filter(LessThan(attrDouble, Literal(3.0)), childStatsTestPlan(Seq(attrDouble), 10L)), + Seq(attrDouble -> ColumnStat(distinctCount = 2, min = Some(1.0), max = Some(3.0), +nullCount = 0, avgLen = 8, maxLen = 8)), + expectedRowCount = 3) } test("cstring = 'A2'") { validateEstimatedStats( - arString, - Filter(EqualTo(arString, Literal("A2")), childStatsTestPlan(Seq(arString), 10L)), - ColumnStat(distinctCount = 1, min = None, max = None, -nullCount = 0, avgLen = 2, maxLen = 2), - 1) + Filter(EqualTo(attrString, Literal("A2")), childStatsTestPlan(Seq(attrString), 10L)), + Seq(attrString -> ColumnStat(distinctCount = 1, min = None, max = None, +nullCount = 0, avgLen = 2, maxLen = 2)), + expectedRowCount = 1) } - // There is no min/max statistics for String type. We estimate 10 rows returned. - test("cstring < 'A2'") { + test("cstring < 'A2' - unsupported condition") { validateEstimatedStats( - arString, - Filter(LessThan(arString, Literal("A2")), childStatsTestPlan(Seq(arString), 10L)), - ColumnStat(distinctCount = 10, min = None, max = None, -nullCount = 0, avgLen = 2, maxLen = 2), - 10) + Filter(LessThan(attrString, Literal("A2")), childStatsTestPlan(Seq(attrString), 10L)), + Seq(attrString -> ColumnStat(distinctCount = 10, min = None, max = None, +nullCount = 0, avgLen = 2, maxLen = 2)), + expectedRowCount = 10) } - // This is a corner test case. We want to test if we can handle the case when the number of - // valid values in IN clause is greater than the number of distinct values for a
[GitHub] spark pull request #17148: [SPARK-17075][SQL][followup] fix filter estimatio...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17148#discussion_r104267627 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala --- @@ -157,7 +157,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase { Filter(IsNull(arInt), childStatsTestPlan(Seq(arInt), 10L)), ColumnStat(distinctCount = 0, min = None, max = None, nullCount = 0, avgLen = 4, maxLen = 4), - 0) + rowCount = 0) } test("cint IS NOT NULL") { --- End diff -- may add a nested NOT test case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17148: [SPARK-17075][SQL][followup] fix filter estimatio...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17148#discussion_r104267295 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -101,21 +101,23 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo } case Or(cond1, cond2) => -// For ease of debugging, we compute percent1 and percent2 in 2 statements. val percent1 = calculateFilterSelectivity(cond1, update = false) val percent2 = calculateFilterSelectivity(cond2, update = false) (percent1, percent2) match { case (Some(p1), Some(p2)) => Some(math.min(1.0, p1 + p2 - (p1 * p2))) - case (Some(p1), None) => Some(1.0) - case (None, Some(p2)) => Some(1.0) - case (None, None) => None + case _ => None } - case Not(cond) => calculateFilterSelectivity(cond, update = false) match { -case Some(percent) => Some(1.0 - percent) -// for not-supported condition, set filter selectivity to a conservative estimate 100% -case None => None - } + case Not(cond) => +if (cond.isInstanceOf[And] || cond.isInstanceOf[Or]) { + // Don't support compound Not expression. --- End diff -- I thought that we agreed not to support the nested NOT condition. First we need to clarify what is nested NOT. Here You allow ((NOT cond1) && (NOT cond2)). But you disallow a condition NOT(cond1 && cond2). Is this right? How about this case NOT( cond1 && (NOT cond2))? The third case is a truly nested NOT case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17065: [SPARK-17075][SQL][followup] fix some minor issue...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17065#discussion_r103090517 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -361,57 +343,52 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo */ def evaluateInSet( - attrRef: AttributeReference, + attr: Attribute, hSet: Set[Any], - update: Boolean) -: Option[Double] = { -if (!mutableColStats.contains(attrRef.exprId)) { - logDebug("[CBO] No statistics for " + attrRef) + update: Boolean): Option[Double] = { +if (!colStatsMap.contains(attr)) { + logDebug("[CBO] No statistics for " + attr) return None } -val aColStat = mutableColStats(attrRef.exprId) -val ndv = aColStat.distinctCount -val aType = attrRef.dataType -var newNdv: Long = 0 +val colStat = colStatsMap(attr) +val ndv = colStat.distinctCount +val dataType = attr.dataType +var newNdv = ndv // use [min, max] to filter the original hSet -aType match { - case _: NumericType | DateType | TimestampType => -val statsRange = - Range(aColStat.min, aColStat.max, aType).asInstanceOf[NumericRange] - -// To facilitate finding the min and max values in hSet, we map hSet values to BigDecimal. -// Using hSetBigdec, we can find the min and max values quickly in the ordered hSetBigdec. -val hSetBigdec = hSet.map(e => BigDecimal(e.toString)) -val validQuerySet = hSetBigdec.filter(e => e >= statsRange.min && e <= statsRange.max) -// We use hSetBigdecToAnyMap to help us find the original hSet value. -val hSetBigdecToAnyMap: Map[BigDecimal, Any] = - hSet.map(e => BigDecimal(e.toString) -> e).toMap +dataType match { + case _: NumericType | BooleanType | DateType | TimestampType => +val statsRange = Range(colStat.min, colStat.max, dataType).asInstanceOf[NumericRange] +val validQuerySet = hSet.filter { v => + v != null && statsRange.contains(Literal(v, dataType)) --- End diff -- nit: for better readability: (v != null) && statsRange.contains(Literal(v, dataType)) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17065: [SPARK-17075][SQL][followup] fix some minor issue...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17065#discussion_r103087483 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -95,15 +84,16 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo * @param condition the compound logical expression * @param update a boolean flag to specify if we need to update ColumnStat of a column * for subsequent conditions - * @return a double value to show the percentage of rows meeting a given condition. + * @return an optional double value to show the percentage of rows meeting a given condition. * It returns None if the condition is not supported. */ def calculateFilterSelectivity(condition: Expression, update: Boolean = true): Option[Double] = { - condition match { case And(cond1, cond2) => -(calculateFilterSelectivity(cond1, update), calculateFilterSelectivity(cond2, update)) -match { +// For ease of debugging, we compute percent1 and percent2 in 2 statements. +val percent1 = calculateFilterSelectivity(cond1, update) +val percent2 = calculateFilterSelectivity(cond2, update) +(percent1, percent2) match { case (Some(p1), Some(p2)) => Some(p1 * p2) case (Some(p1), None) => Some(p1) --- End diff -- This shows that it is difficult to always over-estimate. How about we do not handle the nested NOT. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17065: [SPARK-17075][SQL][followup] fix some minor issue...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/17065#discussion_r103087345 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -297,6 +278,8 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo Some(DateTimeUtils.toJavaDate(litValue.toString.toInt)) case TimestampType => Some(DateTimeUtils.toJavaTimestamp(litValue.toString.toLong)) + case _: DecimalType => +Some(litValue.asInstanceOf[Decimal].toJavaBigDecimal) --- End diff -- Agreed. Thanks for fixing it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/16395#discussion_r102133390 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala --- @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.statsEstimation + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types._ + +/** + * In this test suite, we test predicates containing the following operators: + * =, <, <=, >, >=, AND, OR, IS NULL, IS NOT NULL, IN, NOT IN + */ +class FilterEstimationSuite extends StatsEstimationTestBase { + + // Suppose our test table has 10 rows and 6 columns. + // First column cint has values: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 + // Hence, distinctCount:10, min:1, max:10, nullCount:0, avgLen:4, maxLen:4 + val arInt = AttributeReference("cint", IntegerType)() + val childColStatInt = ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), +nullCount = 0, avgLen = 4, maxLen = 4) + + // Second column cdate has 10 values from 2017-01-01 through 2017-01-10. + val dMin = Date.valueOf("2017-01-01") + val dMax = Date.valueOf("2017-01-10") + val arDate = AttributeReference("cdate", DateType)() + val childColStatDate = ColumnStat(distinctCount = 10, min = Some(dMin), max = Some(dMax), +nullCount = 0, avgLen = 4, maxLen = 4) + + // Third column ctimestamp has 10 values from "2017-01-01 01:00:00" through + // "2017-01-01 10:00:00" for 10 distinct timestamps (or hours). + val tsMin = Timestamp.valueOf("2017-01-01 01:00:00") + val tsMax = Timestamp.valueOf("2017-01-01 10:00:00") + val arTimestamp = AttributeReference("ctimestamp", TimestampType)() + val childColStatTimestamp = ColumnStat(distinctCount = 10, min = Some(tsMin), max = Some(tsMax), +nullCount = 0, avgLen = 8, maxLen = 8) + + // Fourth column cdecimal has 10 values from 0.20 through 2.00 at increment of 0.2. + val decMin = new java.math.BigDecimal("0.20") + val decMax = new java.math.BigDecimal("2.00") + val arDecimal = AttributeReference("cdecimal", DecimalType(12, 2))() + val childColStatDecimal = ColumnStat(distinctCount = 10, min = Some(decMin), max = Some(decMax), +nullCount = 0, avgLen = 8, maxLen = 8) + + // Fifth column cdouble has 10 double values: 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0 + val arDouble = AttributeReference("cdouble", DoubleType)() + val childColStatDouble = ColumnStat(distinctCount = 10, min = Some(1.0), max = Some(10.0), +nullCount = 0, avgLen = 8, maxLen = 8) + + // Sixth column cstring has 10 String values: + // "A0", "A1", "A2", "A3", "A4", "A5", "A6", "A7", "A8", "A9" + val arString = AttributeReference("cstring", StringType)() + val childColStatString = ColumnStat(distinctCount = 10, min = None, max = None, +nullCount = 0, avgLen = 2, maxLen = 2) + + test("cint = 2") { +validateEstimatedStats( + arInt, + Filter(EqualTo(arInt, Literal(2)), childStatsTestPlan(Seq(arInt))), + ColumnStat(distinctCount = 1, min = Some(2), max = Some(2), +nullCount = 0, avgLen = 4, maxLen = 4), + Some(1L) +) + } + + test("cint = 0") { +// This is an out-of-range case since 0 is outside the
[GitHub] spark issue #16395: [SPARK-17075][SQL] implemented filter estimation
Github user ron8hu commented on the issue: https://github.com/apache/spark/pull/16395 @cloud-fan I have updated code based on your feedback. Please review it again. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16395: [SPARK-17075][SQL] implemented filter estimation
Github user ron8hu commented on the issue: https://github.com/apache/spark/pull/16395 Hi @cloud-fan I revised the code using latest Range class. Thanks for reviewing the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/16395#discussion_r100955392 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -0,0 +1,623 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.statsEstimation + +import java.sql.{Date, Timestamp} + +import scala.collection.immutable.{HashSet, Map} +import scala.collection.mutable + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * @param plan a LogicalPlan node that must be an instance of Filter + * @param catalystConf a configuration showing if CBO is enabled + */ +case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Logging { + + /** + * We use a mutable colStats because we need to update the corresponding ColumnStat + * for a column after we apply a predicate condition. For example, column c has + * [min, max] value as [0, 100]. In a range condition such as (c > 40 AND c <= 50), + * we need to set the column's [min, max] value to [40, 100] after we evaluate the + * first condition c > 40. We need to set the column's [min, max] value to [40, 50] + * after we evaluate the second condition c <= 50. + */ + private var mutableColStats: mutable.Map[ExprId, ColumnStat] = mutable.Map.empty + + /** + * Returns an option of Statistics for a Filter logical plan node. + * For a given compound expression condition, this method computes filter selectivity + * (or the percentage of rows meeting the filter condition), which + * is used to compute row count, size in bytes, and the updated statistics after a given + * predicated is applied. + * + * @return Option[Statistics] When there is no statistics collected, it returns None. + */ + def estimate: Option[Statistics] = { +val stats: Statistics = plan.child.stats(catalystConf) +if (stats.rowCount.isEmpty) return None + +// save a mutable copy of colStats so that we can later change it recursively +mutableColStats = mutable.Map(stats.attributeStats.map(kv => (kv._1.exprId, kv._2)).toSeq: _*) + +// estimate selectivity of this filter predicate +val filterSelectivity: Double = calculateConditions(plan.condition) + +// attributeStats has mapping Attribute-to-ColumnStat. +// mutableColStats has mapping ExprId-to-ColumnStat. +// We use an ExprId-to-Attribute map to facilitate the mapping Attribute-to-ColumnStat +val expridToAttrMap: Map[ExprId, Attribute] = + stats.attributeStats.map(kv => (kv._1.exprId, kv._1)) +// copy mutableColStats contents to an immutable AttributeMap. +val mutableAttributeStats: mutable.Map[Attribute, ColumnStat] = + mutableColStats.map(kv => expridToAttrMap(kv._1) -> kv._2) +val newColStats = AttributeMap(mutableAttributeStats.toSeq) + +val filteredRowCount: BigInt = + EstimationUtils.ceil(BigDecimal(stats.rowCount.get) * filterSelectivity) +val filteredSizeInBytes: BigInt = EstimationUtils.ceil(BigDecimal( +EstimationUtils.getOutputSize(plan.output, filteredRowCount, newColStats) +)) + +Some(stats.copy(sizeInBytes = filteredSizeInBytes, rowCount = Some(filteredRowCount), + attributeStats = newColStats)) + } + + /** + * Returns a percentage of rows meeting a compound condition in Filter node. + * A compound condition is decomposed into multiple single conditions linked with AND, O
[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/16395#discussion_r100954234 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -0,0 +1,623 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.statsEstimation + +import java.sql.{Date, Timestamp} + +import scala.collection.immutable.{HashSet, Map} +import scala.collection.mutable + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * @param plan a LogicalPlan node that must be an instance of Filter + * @param catalystConf a configuration showing if CBO is enabled + */ +case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Logging { + + /** + * We use a mutable colStats because we need to update the corresponding ColumnStat + * for a column after we apply a predicate condition. For example, column c has + * [min, max] value as [0, 100]. In a range condition such as (c > 40 AND c <= 50), + * we need to set the column's [min, max] value to [40, 100] after we evaluate the + * first condition c > 40. We need to set the column's [min, max] value to [40, 50] + * after we evaluate the second condition c <= 50. + */ + private var mutableColStats: mutable.Map[ExprId, ColumnStat] = mutable.Map.empty + + /** + * Returns an option of Statistics for a Filter logical plan node. + * For a given compound expression condition, this method computes filter selectivity + * (or the percentage of rows meeting the filter condition), which + * is used to compute row count, size in bytes, and the updated statistics after a given + * predicated is applied. + * + * @return Option[Statistics] When there is no statistics collected, it returns None. + */ + def estimate: Option[Statistics] = { +val stats: Statistics = plan.child.stats(catalystConf) +if (stats.rowCount.isEmpty) return None + +// save a mutable copy of colStats so that we can later change it recursively +mutableColStats = mutable.Map(stats.attributeStats.map(kv => (kv._1.exprId, kv._2)).toSeq: _*) + +// estimate selectivity of this filter predicate +val filterSelectivity: Double = calculateConditions(plan.condition) + +// attributeStats has mapping Attribute-to-ColumnStat. +// mutableColStats has mapping ExprId-to-ColumnStat. +// We use an ExprId-to-Attribute map to facilitate the mapping Attribute-to-ColumnStat +val expridToAttrMap: Map[ExprId, Attribute] = + stats.attributeStats.map(kv => (kv._1.exprId, kv._1)) +// copy mutableColStats contents to an immutable AttributeMap. +val mutableAttributeStats: mutable.Map[Attribute, ColumnStat] = + mutableColStats.map(kv => expridToAttrMap(kv._1) -> kv._2) +val newColStats = AttributeMap(mutableAttributeStats.toSeq) + +val filteredRowCount: BigInt = + EstimationUtils.ceil(BigDecimal(stats.rowCount.get) * filterSelectivity) +val filteredSizeInBytes: BigInt = EstimationUtils.ceil(BigDecimal( +EstimationUtils.getOutputSize(plan.output, filteredRowCount, newColStats) +)) + +Some(stats.copy(sizeInBytes = filteredSizeInBytes, rowCount = Some(filteredRowCount), + attributeStats = newColStats)) + } + + /** + * Returns a percentage of rows meeting a compound condition in Filter node. + * A compound condition is decomposed into multiple single conditions linked with AND, O
[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/16395#discussion_r100952454 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -0,0 +1,623 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.statsEstimation + +import java.sql.{Date, Timestamp} + +import scala.collection.immutable.{HashSet, Map} +import scala.collection.mutable + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * @param plan a LogicalPlan node that must be an instance of Filter + * @param catalystConf a configuration showing if CBO is enabled + */ +case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Logging { + + /** + * We use a mutable colStats because we need to update the corresponding ColumnStat + * for a column after we apply a predicate condition. For example, column c has + * [min, max] value as [0, 100]. In a range condition such as (c > 40 AND c <= 50), + * we need to set the column's [min, max] value to [40, 100] after we evaluate the + * first condition c > 40. We need to set the column's [min, max] value to [40, 50] + * after we evaluate the second condition c <= 50. + */ + private var mutableColStats: mutable.Map[ExprId, ColumnStat] = mutable.Map.empty + + /** + * Returns an option of Statistics for a Filter logical plan node. + * For a given compound expression condition, this method computes filter selectivity + * (or the percentage of rows meeting the filter condition), which + * is used to compute row count, size in bytes, and the updated statistics after a given + * predicated is applied. + * + * @return Option[Statistics] When there is no statistics collected, it returns None. + */ + def estimate: Option[Statistics] = { +val stats: Statistics = plan.child.stats(catalystConf) +if (stats.rowCount.isEmpty) return None + +// save a mutable copy of colStats so that we can later change it recursively +mutableColStats = mutable.Map(stats.attributeStats.map(kv => (kv._1.exprId, kv._2)).toSeq: _*) + +// estimate selectivity of this filter predicate +val filterSelectivity: Double = calculateConditions(plan.condition) + +// attributeStats has mapping Attribute-to-ColumnStat. +// mutableColStats has mapping ExprId-to-ColumnStat. +// We use an ExprId-to-Attribute map to facilitate the mapping Attribute-to-ColumnStat +val expridToAttrMap: Map[ExprId, Attribute] = + stats.attributeStats.map(kv => (kv._1.exprId, kv._1)) +// copy mutableColStats contents to an immutable AttributeMap. +val mutableAttributeStats: mutable.Map[Attribute, ColumnStat] = + mutableColStats.map(kv => expridToAttrMap(kv._1) -> kv._2) +val newColStats = AttributeMap(mutableAttributeStats.toSeq) + +val filteredRowCount: BigInt = + EstimationUtils.ceil(BigDecimal(stats.rowCount.get) * filterSelectivity) +val filteredSizeInBytes: BigInt = EstimationUtils.ceil(BigDecimal( +EstimationUtils.getOutputSize(plan.output, filteredRowCount, newColStats) +)) + +Some(stats.copy(sizeInBytes = filteredSizeInBytes, rowCount = Some(filteredRowCount), + attributeStats = newColStats)) + } + + /** + * Returns a percentage of rows meeting a compound condition in Filter node. + * A compound condition is decomposed into multiple single conditions linked with AND, O
[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/16395#discussion_r100941831 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -0,0 +1,623 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.statsEstimation + +import java.sql.{Date, Timestamp} + +import scala.collection.immutable.{HashSet, Map} +import scala.collection.mutable + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * @param plan a LogicalPlan node that must be an instance of Filter + * @param catalystConf a configuration showing if CBO is enabled + */ +case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Logging { + + /** + * We use a mutable colStats because we need to update the corresponding ColumnStat + * for a column after we apply a predicate condition. For example, column c has + * [min, max] value as [0, 100]. In a range condition such as (c > 40 AND c <= 50), + * we need to set the column's [min, max] value to [40, 100] after we evaluate the + * first condition c > 40. We need to set the column's [min, max] value to [40, 50] + * after we evaluate the second condition c <= 50. + */ + private var mutableColStats: mutable.Map[ExprId, ColumnStat] = mutable.Map.empty + + /** + * Returns an option of Statistics for a Filter logical plan node. + * For a given compound expression condition, this method computes filter selectivity + * (or the percentage of rows meeting the filter condition), which + * is used to compute row count, size in bytes, and the updated statistics after a given + * predicated is applied. + * + * @return Option[Statistics] When there is no statistics collected, it returns None. + */ + def estimate: Option[Statistics] = { +val stats: Statistics = plan.child.stats(catalystConf) --- End diff -- Here we mean the Statistics of the current node. We first copy from its child node's statistics and then we modify it to become the output statistics of the current Filter node. I can add comments to make it clear. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/16395#discussion_r100940942 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -0,0 +1,623 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.statsEstimation + +import java.sql.{Date, Timestamp} + +import scala.collection.immutable.{HashSet, Map} +import scala.collection.mutable + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * @param plan a LogicalPlan node that must be an instance of Filter + * @param catalystConf a configuration showing if CBO is enabled + */ +case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Logging { + + /** + * We use a mutable colStats because we need to update the corresponding ColumnStat + * for a column after we apply a predicate condition. For example, column c has + * [min, max] value as [0, 100]. In a range condition such as (c > 40 AND c <= 50), + * we need to set the column's [min, max] value to [40, 100] after we evaluate the + * first condition c > 40. We need to set the column's [min, max] value to [40, 50] + * after we evaluate the second condition c <= 50. + */ + private var mutableColStats: mutable.Map[ExprId, ColumnStat] = mutable.Map.empty --- End diff -- mutableColStats is a class member variable. When we instantiate a Filter object, we need to set mutableColStats to mutable.Map.empty. Afterwards, we need to re-assign it to a new value in method estimate. Since we change the value of mutableColStats, we need to declare it as a var variable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16696: [SPARK-19350] [SQL] Cardinality estimation of Lim...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/16696#discussion_r99474494 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationSuite.scala --- @@ -18,12 +18,41 @@ package org.apache.spark.sql.catalyst.statsEstimation import org.apache.spark.sql.catalyst.CatalystConf -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference} -import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Literal} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types.IntegerType -class StatsConfSuite extends StatsEstimationTestBase { +class StatsEstimationSuite extends StatsEstimationTestBase { + val (ar, colStat) = (attr("key"), ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), +nullCount = 0, avgLen = 4, maxLen = 4)) + + val plan = StatsTestPlan( +outputList = Seq(ar), +attributeStats = AttributeMap(Seq(ar -> colStat)), +rowCount = 10, +size = Some(10 * (8 + 4))) + + test("limit estimation") { +val localLimit = LocalLimit(Literal(2), plan) +val globalLimit = GlobalLimit(Literal(2), plan) +// LocalLimit and GlobalLimit share the same stats estimation logic. +val expected = Statistics(sizeInBytes = 24, rowCount = Some(2)) +checkStats(localLimit, expected) +checkStats(globalLimit, expected) + } + + test("sample estimation") { +val sample = Sample(0.0, 0.5, withReplacement = false, (math.random * 1000).toLong, plan)() +checkStats(sample, expected = Statistics(sizeInBytes = 60, rowCount = Some(5))) + +// Test if Sample's child doesn't have rowCount in stats +val stats2 = Statistics(sizeInBytes = 120) --- End diff -- For limit estimation test cases, we may add a test with limit number greater than a child node's row count. This test can show if we properly select the smaller value between limit number child node's row count. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16696: [SPARK-19350] [SQL] Cardinality estimation of Lim...
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/16696#discussion_r98776491 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -727,37 +728,18 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN } override def computeStats(conf: CatalystConf): Statistics = { val limit = limitExpr.eval().asInstanceOf[Int] -val sizeInBytes = if (limit == 0) { - // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero - // (product of children). - 1 -} else { - (limit: Long) * output.map(a => a.dataType.defaultSize).sum -} -child.stats(conf).copy(sizeInBytes = sizeInBytes) +val childStats = child.stats(conf) +// Don't propagate column stats, because we don't know the distribution after a limit operation +Statistics( + sizeInBytes = EstimationUtils.getOutputSize(output, limit, childStats.attributeStats), --- End diff -- Agreed. We can pick the smaller value between the child node's row count and the limit number. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16594: [SPARK-17078] [SQL] Show stats when explain
Github user ron8hu commented on the issue: https://github.com/apache/spark/pull/16594 To show a very large Long number, there is no need to print out every digit in the number. We can use exponent. For example, a number 120,000,000,005,123 can be printed as 1.2*10**14, where 10**14 means 10 to the power 14. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16395: [SPARK-17075][SQL] implemented filter estimation
Github user ron8hu commented on the issue: https://github.com/apache/spark/pull/16395 @wzhfy For predicate condition d_date >= '2000-01-27', we do not support it because Spark SQL cast d_date column to String first before comparison. For predicate condition d_date >= cast('2000-01-27' AS DATE), CBO supports it by comparing values in date type. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/16395#discussion_r96718076 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.statsEstimation + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types.{DateType, IntegerType, LongType, TimestampType} + +/** + * In this test suite, we test predicates containing the following operators: + * =, <, <=, >, >=, AND, OR, IS NULL, IS NOT NULL, IN, NOT IN + */ + +class FilterEstimationSuite extends StatsEstimationTestBase { + + // Suppose our test table has 10 rows and 3 columns. + // First column cint has values: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 + // Hence, distinctCount:10, min:1, max:10, nullCount:0, avgLen:4, maxLen:4 + val arInt = AttributeReference("cint", IntegerType)() + val childColStatInt = ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), +nullCount = 0, avgLen = 4, maxLen = 4) + + // Second column cdate has values, from 2017-01-01 through 2017-01-10 for 10 values. + val dMin = Date.valueOf("2017-01-01") + val dMax = Date.valueOf("2017-01-10") + val arDate = AttributeReference("cdate", DateType)() + val childColStatDate = ColumnStat(distinctCount = 10, min = Some(dMin), max = Some(dMax), +nullCount = 0, avgLen = 4, maxLen = 4) + + // Third column ctimestamp has values from "2017-01-01 01:00:00" through + // "2017-01-01 10:00:00" for 10 distinct timestamps (or hours). + val tsMin = Timestamp.valueOf("2017-01-01 01:00:00") + val tsMax = Timestamp.valueOf("2017-01-01 10:00:00") + val arTimestamp = AttributeReference("ctimestamp", TimestampType)() + val childColStatTimestamp = ColumnStat(distinctCount = 10, min = Some(tsMin), max = Some(tsMax), +nullCount = 0, avgLen = 8, maxLen = 8) + + test("cint = 2") { +validateEstimatedStats( + arInt, + Filter(EqualTo(arInt, Literal(2)), childStatsTestPlan(Seq(arInt))), + ColumnStat(distinctCount = 1, min = Some(2), max = Some(2), +nullCount = 0, avgLen = 4, maxLen = 4), + Some(1L) +) + } + + test("cint = 0") { +// This is an out-of-range case since 0 is outside the range [min, max] +validateEstimatedStats( + arInt, + Filter(EqualTo(arInt, Literal(0)), childStatsTestPlan(Seq(arInt))), + ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), +nullCount = 0, avgLen = 4, maxLen = 4), + Some(0L) +) + } + + test("cint < 3") { +validateEstimatedStats( + arInt, + Filter(LessThan(arInt, Literal(3)), childStatsTestPlan(Seq(arInt))), + ColumnStat(distinctCount = 2, min = Some(1), max = Some(3), +nullCount = 0, avgLen = 4, maxLen = 4), + Some(3L) +) + } + + test("cint < 0") { +// This is a corner case since literal 0 is smaller than min. +validateEstimatedStats( + arInt, + Filter(LessThan(arInt, Literal(0)), childStatsTestPlan(Seq(arInt))), + ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), +nullCount = 0, avgLen = 4, maxLen = 4), + Some(0L) +) + } + + test("cint <= 3") { +validateEstimatedStats( + arInt, + Filter(LessThanOrEqual(arInt, Literal(
[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/16395#discussion_r96325991 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -0,0 +1,620 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.statsEstimation + +import java.sql.{Date, Timestamp} + +import scala.collection.immutable.{HashSet, Map} +import scala.collection.mutable + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * @param plan a LogicalPlan node that must be an instance of Filter + * @param catalystConf a configuration showing if CBO is enabled + */ +case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Logging { + + /** + * We use a mutable colStats because we need to update the corresponding ColumnStat + * for a column after we apply a predicate condition. For example, A column c has + * [min, max] value as [0, 100]. In a range condition such as (c > 40 AND c <= 50), + * we need to set the column's [min, max] value to [40, 100] after we evaluate the + * first condition c > 40. We need to set the column's [min, max] value to [40, 50] + * after we evaluate the second condition c <= 50. + */ + private var mutableColStats: mutable.Map[ExprId, ColumnStat] = mutable.Map.empty + + /** + * Returns an option of Statistics for a Filter logical plan node. + * For a given compound expression condition, this method computes filter selectivity + * (or the percentage of rows meeting the filter condition), which + * is used to compute row count, size in bytes, and the updated statistics after a given + * predicated is applied. + * + * @return Option[Statistics] When there is no statistics collected, it returns None. + */ + def estimate: Option[Statistics] = { +val stats: Statistics = plan.child.stats(catalystConf) +if (stats.rowCount.isEmpty) return None + +// save a mutable copy of colStats so that we can later change it recursively +val statsExprIdMap: Map[ExprId, ColumnStat] = + stats.attributeStats.map(kv => (kv._1.exprId, kv._2)) +mutableColStats = mutable.Map.empty ++= statsExprIdMap + +// estimate selectivity of this filter predicate +val filterSelectivity: Double = calculateConditions(plan.condition) + +// attributeStats has mapping Attribute-to-ColumnStat. +// mutableColStats has mapping ExprId-to-ColumnStat. +// We use an ExprId-to-Attribute map to facilitate the mapping Attribute-to-ColumnStat +val expridToAttrMap: Map[ExprId, Attribute] = + stats.attributeStats.map(kv => (kv._1.exprId, kv._1)) +// copy mutableColStats contents to an immutable AttributeMap. +val mutableAttributeStats: mutable.Map[Attribute, ColumnStat] = + mutableColStats.map(kv => expridToAttrMap(kv._1) -> kv._2) +val newColStats = AttributeMap(mutableAttributeStats.toSeq) + +val filteredRowCountValue: BigInt = + EstimationUtils.ceil(BigDecimal(stats.rowCount.get) * filterSelectivity) +val filteredSizeInBytes: BigInt = EstimationUtils.ceil(BigDecimal( +EstimationUtils.getOutputSize(plan.output, newColStats, filteredRowCountValue) +)) + +Some(stats.copy(sizeInBytes = filteredSizeInBytes, rowCount = Some(filteredRowCountValue), + attributeStats = newColStats)) + } + + /** + * Returns a percentage of rows meeting a compound condition in Filter node.
[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/16395#discussion_r96324552 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -0,0 +1,620 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.statsEstimation + +import java.sql.{Date, Timestamp} + +import scala.collection.immutable.{HashSet, Map} +import scala.collection.mutable + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * @param plan a LogicalPlan node that must be an instance of Filter + * @param catalystConf a configuration showing if CBO is enabled + */ +case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Logging { + + /** + * We use a mutable colStats because we need to update the corresponding ColumnStat + * for a column after we apply a predicate condition. For example, A column c has + * [min, max] value as [0, 100]. In a range condition such as (c > 40 AND c <= 50), + * we need to set the column's [min, max] value to [40, 100] after we evaluate the + * first condition c > 40. We need to set the column's [min, max] value to [40, 50] + * after we evaluate the second condition c <= 50. + */ + private var mutableColStats: mutable.Map[ExprId, ColumnStat] = mutable.Map.empty + + /** + * Returns an option of Statistics for a Filter logical plan node. + * For a given compound expression condition, this method computes filter selectivity + * (or the percentage of rows meeting the filter condition), which + * is used to compute row count, size in bytes, and the updated statistics after a given + * predicated is applied. + * + * @return Option[Statistics] When there is no statistics collected, it returns None. + */ + def estimate: Option[Statistics] = { +val stats: Statistics = plan.child.stats(catalystConf) +if (stats.rowCount.isEmpty) return None + +// save a mutable copy of colStats so that we can later change it recursively +val statsExprIdMap: Map[ExprId, ColumnStat] = + stats.attributeStats.map(kv => (kv._1.exprId, kv._2)) +mutableColStats = mutable.Map.empty ++= statsExprIdMap + +// estimate selectivity of this filter predicate +val filterSelectivity: Double = calculateConditions(plan.condition) + +// attributeStats has mapping Attribute-to-ColumnStat. +// mutableColStats has mapping ExprId-to-ColumnStat. +// We use an ExprId-to-Attribute map to facilitate the mapping Attribute-to-ColumnStat +val expridToAttrMap: Map[ExprId, Attribute] = + stats.attributeStats.map(kv => (kv._1.exprId, kv._1)) +// copy mutableColStats contents to an immutable AttributeMap. +val mutableAttributeStats: mutable.Map[Attribute, ColumnStat] = + mutableColStats.map(kv => expridToAttrMap(kv._1) -> kv._2) +val newColStats = AttributeMap(mutableAttributeStats.toSeq) + +val filteredRowCountValue: BigInt = + EstimationUtils.ceil(BigDecimal(stats.rowCount.get) * filterSelectivity) +val filteredSizeInBytes: BigInt = EstimationUtils.ceil(BigDecimal( +EstimationUtils.getOutputSize(plan.output, newColStats, filteredRowCountValue) +)) + +Some(stats.copy(sizeInBytes = filteredSizeInBytes, rowCount = Some(filteredRowCountValue), + attributeStats = newColStats)) + } + + /** + * Returns a percentage of rows meeting a compound condition in Filter node.
[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/16395#discussion_r96308583 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala --- @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.statsEstimation + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types.{DateType, IntegerType, LongType, TimestampType} + +/** + * In this test suite, we test predicates containing the following operators: + * =, <, <=, >, >=, AND, OR, IS NULL, IS NOT NULL, IN, NOT IN + */ + +class FilterEstimationSuite extends StatsEstimationTestBase { + + // Suppose our test table has 10 rows and 3 columns. + // First column cint has values: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 + // Hence, distinctCount:10, min:1, max:10, nullCount:0, avgLen:4, maxLen:4 + val arInt = AttributeReference("cint", IntegerType)() + val childColStatInt = ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), +nullCount = 0, avgLen = 4, maxLen = 4) + + // Second column cdate has values, from 2017-01-01 through 2017-01-10 for 10 values. + val dMin = Date.valueOf("2017-01-01") + val dMax = Date.valueOf("2017-01-10") + val arDate = AttributeReference("cdate", DateType)() + val childColStatDate = ColumnStat(distinctCount = 10, min = Some(dMin), max = Some(dMax), +nullCount = 0, avgLen = 4, maxLen = 4) + + // Third column ctimestamp has values from "2017-01-01 01:00:00" through + // "2017-01-01 10:00:00" for 10 distinct timestamps (or hours). + val tsMin = Timestamp.valueOf("2017-01-01 01:00:00") + val tsMax = Timestamp.valueOf("2017-01-01 10:00:00") + val arTimestamp = AttributeReference("ctimestamp", TimestampType)() + val childColStatTimestamp = ColumnStat(distinctCount = 10, min = Some(tsMin), max = Some(tsMax), +nullCount = 0, avgLen = 8, maxLen = 8) + + test("cint = 2") { +// the predicate is "WHERE cint = 2" +validateEstimatedStats( + arInt, + Filter(EqualTo(arInt, Literal(2)), ChildStatsTestPlan(Seq(arInt))), + ColumnStat(distinctCount = 1, min = Some(2), max = Some(2), +nullCount = 0, avgLen = 4, maxLen = 4), + Some(1L) +) + } + + test("cint = 0") { +// the predicate is "WHERE cint = 0" +// This is an out-of-range case since 0 is outside the range [min, max] +validateEstimatedStats( + arInt, + Filter(EqualTo(arInt, Literal(0)), ChildStatsTestPlan(Seq(arInt))), + ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), +nullCount = 0, avgLen = 4, maxLen = 4), + Some(0L) +) + } + + test("cint < 3") { +// the predicate is "WHERE cint < 3" +validateEstimatedStats( + arInt, + Filter(LessThan(arInt, Literal(3)), ChildStatsTestPlan(Seq(arInt))), + ColumnStat(distinctCount = 2, min = Some(1), max = Some(3), +nullCount = 0, avgLen = 4, maxLen = 4), + Some(3L) +) + } + + test("cint < 0") { +// the predicate is "WHERE cint < 0" +// This is a corner case since literal 0 is smaller than min. +validateEstimatedStats( + arInt, + Filter(LessThan(arInt, Literal(0)), ChildStatsTestPlan(Seq(arInt))), + ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), +nullCount = 0,
[GitHub] spark issue #16395: [SPARK-17075][SQL] implemented filter estimation
Github user ron8hu commented on the issue: https://github.com/apache/spark/pull/16395 cc @rxin @wzhfy Have updated code. Please review again. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/16395#discussion_r96128057 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.statsEstimation + +import java.math.{BigDecimal => JDecimal} +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types.{BooleanType, DateType, TimestampType, _} + + +/** Value range of a column. */ +trait Range + +/** For simplicity we use decimal to unify operations of numeric ranges. */ +case class NumericRange(min: JDecimal, max: JDecimal) extends Range + +/** + * This version of Spark does not have min/max for binary/string types, we define their default + * behaviors by this class. + */ +class DefaultRange extends Range + +/** This is for columns with only null values. */ +class NullRange extends Range + +object Range { + def apply(min: Option[Any], max: Option[Any], dataType: DataType): Range = dataType match { +case StringType | BinaryType => new DefaultRange() +case _ if min.isEmpty || max.isEmpty => new NullRange() +case _ => toNumericRange(min.get, max.get, dataType) + } + + /** + * For simplicity we use decimal to unify operations of numeric types, the two methods below + * are the contract of conversion. + */ + private def toNumericRange(min: Any, max: Any, dataType: DataType): NumericRange = { +dataType match { + case _: NumericType => +NumericRange(new JDecimal(min.toString), new JDecimal(max.toString)) + case BooleanType => +val min1 = if (min.asInstanceOf[Boolean]) 1 else 0 +val max1 = if (max.asInstanceOf[Boolean]) 1 else 0 +NumericRange(new JDecimal(min1), new JDecimal(max1)) + case DateType => +val min1 = DateTimeUtils.fromJavaDate(min.asInstanceOf[Date]) +val max1 = DateTimeUtils.fromJavaDate(max.asInstanceOf[Date]) +NumericRange(new JDecimal(min1), new JDecimal(max1)) + case TimestampType => +val min1 = DateTimeUtils.fromJavaTimestamp(min.asInstanceOf[Timestamp]) +val max1 = DateTimeUtils.fromJavaTimestamp(max.asInstanceOf[Timestamp]) +NumericRange(new JDecimal(min1), new JDecimal(max1)) + case _ => +throw new AnalysisException(s"Type $dataType is not castable to numeric in estimation.") --- End diff -- OK. removed this case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/16395#discussion_r96128021 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.statsEstimation + +import java.math.{BigDecimal => JDecimal} +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types.{BooleanType, DateType, TimestampType, _} + + +/** Value range of a column. */ +trait Range + +/** For simplicity we use decimal to unify operations of numeric ranges. */ +case class NumericRange(min: JDecimal, max: JDecimal) extends Range + +/** + * This version of Spark does not have min/max for binary/string types, we define their default + * behaviors by this class. + */ +class DefaultRange extends Range + +/** This is for columns with only null values. */ +class NullRange extends Range + +object Range { + def apply(min: Option[Any], max: Option[Any], dataType: DataType): Range = dataType match { +case StringType | BinaryType => new DefaultRange() +case _ if min.isEmpty || max.isEmpty => new NullRange() +case _ => toNumericRange(min.get, max.get, dataType) + } + + /** + * For simplicity we use decimal to unify operations of numeric types, the two methods below + * are the contract of conversion. + */ + private def toNumericRange(min: Any, max: Any, dataType: DataType): NumericRange = { +dataType match { + case _: NumericType => +NumericRange(new JDecimal(min.toString), new JDecimal(max.toString)) + case BooleanType => +val min1 = if (min.asInstanceOf[Boolean]) 1 else 0 +val max1 = if (max.asInstanceOf[Boolean]) 1 else 0 +NumericRange(new JDecimal(min1), new JDecimal(max1)) + case DateType => +val min1 = DateTimeUtils.fromJavaDate(min.asInstanceOf[Date]) +val max1 = DateTimeUtils.fromJavaDate(max.asInstanceOf[Date]) +NumericRange(new JDecimal(min1), new JDecimal(max1)) + case TimestampType => +val min1 = DateTimeUtils.fromJavaTimestamp(min.asInstanceOf[Timestamp]) --- End diff -- Yes. Added date and timestamp tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/16395#discussion_r95935452 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -116,6 +116,12 @@ case class Filter(condition: Expression, child: LogicalPlan) .filterNot(SubqueryExpression.hasCorrelatedSubquery) child.constraints.union(predicates.toSet) } + + override lazy val statistics: Statistics = { --- End diff -- OK. fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation
Github user ron8hu commented on a diff in the pull request: https://github.com/apache/spark/pull/16395#discussion_r95916520 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -0,0 +1,555 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.statsEstimation + +import scala.collection.immutable.{HashSet, Map} +import scala.collection.mutable + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + + +class FilterEstimation extends Logging { + + /** + * We use a mutable colStats because we need to update the corresponding ColumnStat + * for a column after we apply a predicate condition. For example, A column c has + * [min, max] value as [0, 100]. In a range condition such as (c > 40 AND c <= 50), + * we need to set the column's [min, max] value to [40, 100] after we evaluate the + * first condition c > 40. We need to set the column's [min, max] value to [40, 50] + * after we evaluate the second condition c <= 50. + */ + private var mutableColStats: mutable.Map[ExprId, ColumnStat] = mutable.Map.empty + + /** + * Returns an option of Statistics for a Filter logical plan node. + * For a given compound expression condition, this method computes filter selectivity + * (or the percentage of rows meeting the filter condition), which + * is used to compute row count, size in bytes, and the updated statistics after a given + * predicated is applied. + * + * @param plan a LogicalPlan node that must be an instance of Filter. + * @return Option[Statistics] When there is no statistics collected, it returns None. + */ + def estimate(plan: Filter): Option[Statistics] = { +val stats: Statistics = plan.child.statistics +if (stats.rowCount.isEmpty) return None + +// save a mutable copy of colStats so that we can later change it recursively +val statsExprIdMap: Map[ExprId, ColumnStat] = + stats.attributeStats.map(kv => (kv._1.exprId, kv._2)) +mutableColStats = mutable.Map.empty ++= statsExprIdMap + +// estimate selectivity of this filter predicate +val percent: Double = calculateConditions(plan, plan.condition) + +// attributeStats has mapping Attribute-to-ColumnStat. +// mutableColStats has mapping ExprId-to-ColumnStat. +// We use an ExprId-to-Attribute map to facilitate the mapping Attribute-to-ColumnStat +val expridToAttrMap: Map[ExprId, Attribute] = + stats.attributeStats.map(kv => (kv._1.exprId, kv._1)) +// copy mutableColStats contents to an immutable AttributeMap. +val mutableAttributeStats: mutable.Map[Attribute, ColumnStat] = + mutableColStats.map(kv => expridToAttrMap(kv._1) -> kv._2) +val newColStats = AttributeMap(mutableAttributeStats.toSeq) + +val filteredRowCountValue: BigInt = + EstimationUtils.ceil(BigDecimal(stats.rowCount.get) * percent) +val avgRowSize = BigDecimal(EstimationUtils.getRowSize(plan.output, newColStats)) +val filteredSizeInBytes: BigInt = + EstimationUtils.ceil(BigDecimal(filteredRowCountValue) * avgRowSize) + +Some(stats.copy(sizeInBytes = filteredSizeInBytes, rowCount = Some(filteredRowCountValue), + attributeStats = newColStats)) + } + + /** + * Returns a percentage of rows meeting a compound condition in Filter node. + * A compound condition is depomposed into multiple single conditions linked with AND, OR, NOT. + * For logical AND conditions, we need to update stats after a condition estimation +