[GitHub] spark pull request #19952: [SPARK-21322][SQL][followup] support histogram in...

2017-12-12 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19952#discussion_r156555044
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -147,65 +139,76 @@ object EstimationUtils {
   }
 
   /**
-   * Returns a percentage of a bin holding values for column value in the 
range of
-   * [lowerValue, higherValue]
-   *
-   * @param higherValue a given upper bound value of a specified column 
value range
-   * @param lowerValue a given lower bound value of a specified column 
value range
-   * @param bin a single histogram bin
-   * @return the percentage of a single bin holding values in [lowerValue, 
higherValue].
+   * Returns the possibility of the given histogram bin holding values 
within the given range
+   * [lowerBound, upperBound].
*/
-  private def getOccupation(
-  higherValue: Double,
-  lowerValue: Double,
+  private def binHoldingRangePossibility(
+  upperBound: Double,
+  lowerBound: Double,
   bin: HistogramBin): Double = {
-assert(bin.lo <= lowerValue && lowerValue <= higherValue && 
higherValue <= bin.hi)
+assert(bin.lo <= lowerBound && lowerBound <= upperBound && upperBound 
<= bin.hi)
 if (bin.hi == bin.lo) {
   // the entire bin is covered in the range
   1.0
-} else if (higherValue == lowerValue) {
+} else if (upperBound == lowerBound) {
   // set percentage to 1/NDV
   1.0 / bin.ndv.toDouble
 } else {
   // Use proration since the range falls inside this bin.
-  math.min((higherValue - lowerValue) / (bin.hi - bin.lo), 1.0)
+  math.min((upperBound - lowerBound) / (bin.hi - bin.lo), 1.0)
 }
   }
 
   /**
-   * Returns the number of bins for column values in [lowerValue, 
higherValue].
-   * The column value distribution is saved in an equi-height histogram.  
The return values is a
-   * double value is because we may return a portion of a bin. For 
example, a predicate
-   * "column = 8" may return the number of bins 0.2 if the holding bin has 
5 distinct values.
+   * Returns the number of histogram bins holding values within the given 
range
+   * [lowerBound, upperBound].
+   *
+   * Note that the returned value is double type, because the range 
boundaries usually occupy a
+   * portion of a bin. An extrema case is [value, value] which is 
generated by equal predicate
--- End diff --

typo: extreme


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19783: [SPARK-21322][SQL] support histogram in filter cardinali...

2017-12-11 Thread ron8hu
Github user ron8hu commented on the issue:

https://github.com/apache/spark/pull/19783
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19783: [SPARK-21322][SQL] support histogram in filter cardinali...

2017-12-11 Thread ron8hu
Github user ron8hu commented on the issue:

https://github.com/apache/spark/pull/19783
  
For the past 2 test builds #84725 and #84732, I checked the test result on 
the web.  Actually there were no failures.  See 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84725/testReport/.
  It appears that there is a bug in the jenkins test system.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19783: [SPARK-21322][SQL] support histogram in filter cardinali...

2017-12-11 Thread ron8hu
Github user ron8hu commented on the issue:

https://github.com/apache/spark/pull/19783
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...

2017-12-10 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19783#discussion_r155964396
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -114,4 +114,171 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns the number of the first bin into which a column values falls 
for a specified
+   * numeric equi-height histogram.
+   *
+   * @param value a literal value of a column
+   * @param bins an array of bins for a given numeric equi-height histogram
+   * @return the number of the first bin into which a column values falls.
+   */
+  def findFirstBinForValue(value: Double, bins: Array[HistogramBin]): Int 
= {
+var i = 0
+while ((i < bins.length) && (value > bins(i).hi)) {
+  i +=1
+}
+i
+  }
+
+  /**
+   * Returns the number of the last bin into which a column values falls 
for a specified
+   * numeric equi-height histogram.
+   *
+   * @param value a literal value of a column
+   * @param bins an array of bins for a given numeric equi-height histogram
+   * @return the number of the last bin into which a column values falls.
+   */
+  def findLastBinForValue(value: Double, bins: Array[HistogramBin]): Int = 
{
+var i = bins.length - 1
+while ((i >= 0) && (value < bins(i).lo)) {
+  i -=1
+}
+i
+  }
+
+  /**
+   * Returns a percentage of a bin holding values for column value in the 
range of
+   * [lowerValue, higherValue]
+   *
+   * @param binId a given bin id in a specified histogram
+   * @param higherValue a given upper bound value of a specified column 
value range
+   * @param lowerValue a given lower bound value of a specified column 
value range
+   * @param histogram a numeric equi-height histogram
+   * @return the percentage of a single bin holding values in [lowerValue, 
higherValue].
+   */
+  private def getOccupation(
+  binId: Int,
+  higherValue: Double,
+  lowerValue: Double,
+  histogram: Histogram): Double = {
+val curBin = histogram.bins(binId)
+if (curBin.hi == curBin.lo) {
+  // the entire bin is covered in the range
+  1.0
+} else if (higherValue == lowerValue) {
+  // set percentage to 1/NDV
+  1.0 / curBin.ndv.toDouble
+} else {
+  // Use proration since the range falls inside this bin.
+  math.min((higherValue - lowerValue) / (curBin.hi - curBin.lo), 1.0)
+}
+  }
+
+  /**
+   * Returns the number of bins for column values in [lowerValue, 
higherValue].
--- End diff --

OK.  Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...

2017-12-10 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19783#discussion_r155963930
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
 ---
@@ -359,7 +371,7 @@ class FilterEstimationSuite extends 
StatsEstimationTestBase {
   test("cbool > false") {
 validateEstimatedStats(
   Filter(GreaterThan(attrBool, Literal(false)), 
childStatsTestPlan(Seq(attrBool), 10L)),
-  Seq(attrBool -> ColumnStat(distinctCount = 1, min = Some(true), max 
= Some(true),
+  Seq(attrBool -> ColumnStat(distinctCount = 1, min = Some(false), max 
= Some(true),
--- End diff --

Agreed with wzhfy.  Today's logic is: for these 2 conditions, (column > x) 
and (column >= x), we set the min value to x.  We do not distinguish these 2 
cases.  This is because we do not know the exact next value larger than x if x 
is a continuous data type like double type.  We may do some special coding for 
discrete data types such as Boolean or integer.  But, as wzhfy said, it does 
not deserve the complexity.   


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...

2017-12-10 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19783#discussion_r155963740
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -471,37 +508,47 @@ case class FilterEstimation(plan: Filter) extends 
Logging {
   percent = 1.0
 } else {
   // This is the partial overlap case:
-  // Without advanced statistics like histogram, we assume uniform 
data distribution.
-  // We just prorate the adjusted range over the initial range to 
compute filter selectivity.
-  assert(max > min)
-  percent = op match {
-case _: LessThan =>
-  if (numericLiteral == max) {
-// If the literal value is right on the boundary, we can minus 
the part of the
-// boundary value (1/ndv).
-1.0 - 1.0 / ndv
-  } else {
-(numericLiteral - min) / (max - min)
-  }
-case _: LessThanOrEqual =>
-  if (numericLiteral == min) {
-// The boundary value is the only satisfying value.
-1.0 / ndv
-  } else {
-(numericLiteral - min) / (max - min)
-  }
-case _: GreaterThan =>
-  if (numericLiteral == min) {
-1.0 - 1.0 / ndv
-  } else {
-(max - numericLiteral) / (max - min)
-  }
-case _: GreaterThanOrEqual =>
-  if (numericLiteral == max) {
-1.0 / ndv
-  } else {
-(max - numericLiteral) / (max - min)
-  }
+
+  if (colStat.histogram.isEmpty) {
--- End diff --

We cannot move the if statement to upper level.  This is because, for the 
partial overlap case, we need to update the the [min, max] range for a given 
column.  For the no-overlap and complete-overlap cases, we do  not need to do 
so.  I think the current code is modular for this reason.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...

2017-12-10 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19783#discussion_r155963622
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -332,8 +332,41 @@ case class FilterEstimation(plan: Filter) extends 
Logging {
 colStatsMap.update(attr, newStats)
   }
 
-  Some(1.0 / BigDecimal(ndv))
-} else {
+  if (colStat.histogram.isEmpty) {
+// returns 1/ndv if there is no histogram
+Some(1.0 / BigDecimal(ndv))
+  } else {
+// We compute filter selectivity using Histogram information.
--- End diff --

O.  will do.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...

2017-12-07 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19783#discussion_r155683828
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -114,4 +114,144 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns the number of the first bin into which a column value falls 
for a specified
+   * numeric equi-height histogram.
+   *
+   * @param value a literal value of a column
+   * @param bins an array of bins for a given numeric equi-height histogram
+   * @return the id of the first bin into which a column value falls.
+   */
+  def findFirstBinForValue(value: Double, bins: Array[HistogramBin]): Int 
= {
+var i = 0
+while ((i < bins.length) && (value > bins(i).hi)) {
+  i += 1
+}
+i
+  }
+
+  /**
+   * Returns the number of the last bin into which a column value falls 
for a specified
+   * numeric equi-height histogram.
+   *
+   * @param value a literal value of a column
+   * @param bins an array of bins for a given numeric equi-height histogram
+   * @return the id of the last bin into which a column value falls.
+   */
+  def findLastBinForValue(value: Double, bins: Array[HistogramBin]): Int = 
{
+var i = bins.length - 1
+while ((i >= 0) && (value < bins(i).lo)) {
+  i -= 1
+}
+i
+  }
+
+  /**
+   * Returns a percentage of a bin holding values for column value in the 
range of
+   * [lowerValue, higherValue]
+   *
+   * @param higherValue a given upper bound value of a specified column 
value range
+   * @param lowerValue a given lower bound value of a specified column 
value range
+   * @param bin a single histogram bin
+   * @return the percentage of a single bin holding values in [lowerValue, 
higherValue].
+   */
+  private def getOccupation(
+  higherValue: Double,
+  lowerValue: Double,
+  bin: HistogramBin): Double = {
+assert(bin.lo <= lowerValue && lowerValue <= higherValue && 
higherValue <= bin.hi)
+if (bin.hi == bin.lo) {
+  // the entire bin is covered in the range
+  1.0
+} else if (higherValue == lowerValue) {
+  // set percentage to 1/NDV
+  1.0 / bin.ndv.toDouble
+} else {
+  // Use proration since the range falls inside this bin.
+  math.min((higherValue - lowerValue) / (bin.hi - bin.lo), 1.0)
+}
+  }
+
+  /**
+   * Returns the number of bins for column values in [lowerValue, 
higherValue].
+   * This is an overloaded method. The column value distribution is saved 
in an
+   * equi-height histogram.
+   *
+   * @param higherId id of the high end bin holding the high end value of 
a column range
+   * @param lowerId id of the low end bin holding the low end value of a 
column range
+   * @param higherEnd a given upper bound value of a specified column 
value range
+   * @param lowerEnd a given lower bound value of a specified column value 
range
+   * @param histogram a numeric equi-height histogram
+   * @return the number of bins for column values in [lowerEnd, higherEnd].
+   */
+  def getOccupationBins(
+  higherId: Int,
+  lowerId: Int,
+  higherEnd: Double,
+  lowerEnd: Double,
+  histogram: Histogram): Double = {
+if (lowerId == higherId) {
+  val curBin = histogram.bins(lowerId)
+  getOccupation(higherEnd, lowerEnd, curBin)
+} else {
+  // compute how much lowerEnd/higherEnd occupies its bin
+  val lowerCurBin = histogram.bins(lowerId)
+  val lowerPart = getOccupation(lowerCurBin.hi, lowerEnd, lowerCurBin)
+
+  val higherCurBin = histogram.bins(higherId)
+  val higherPart = getOccupation(higherEnd, higherCurBin.lo, 
higherCurBin)
+
+  // the total length is lowerPart + higherPart + bins between them
+  lowerPart + higherPart + higherId - lowerId - 1
+}
+  }
+
+  /**
+   * Returns the number of distinct values, ndv, for column values in 
[lowerEnd, higherEnd].
+   * The column value distribution is saved in an equi-height histogram.
+   *
+   * @param higherId id of the high end bin holding the high end value of 
a column range
+   * @param lowerId id of the low end bin holding the low end value of a 
column range
+   * @param higherEnd a given upper bound value of a specified column 
value range
+   * @param lowerEnd a given lower bound value of a specified column value 
range
+   * @param histogram a numeric equi-height histogram
+   * @return the number of distinct values, ndv, for column values in 
[lowerEn

[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...

2017-12-06 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19783#discussion_r155343962
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -114,4 +114,171 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns the number of the first bin into which a column values falls 
for a specified
+   * numeric equi-height histogram.
+   *
+   * @param value a literal value of a column
+   * @param bins an array of bins for a given numeric equi-height histogram
+   * @return the number of the first bin into which a column values falls.
+   */
+  def findFirstBinForValue(value: Double, bins: Array[HistogramBin]): Int 
= {
+var i = 0
+while ((i < bins.length) && (value > bins(i).hi)) {
+  i +=1
+}
+i
+  }
+
+  /**
+   * Returns the number of the last bin into which a column values falls 
for a specified
+   * numeric equi-height histogram.
+   *
+   * @param value a literal value of a column
+   * @param bins an array of bins for a given numeric equi-height histogram
+   * @return the number of the last bin into which a column values falls.
+   */
+  def findLastBinForValue(value: Double, bins: Array[HistogramBin]): Int = 
{
+var i = bins.length - 1
+while ((i >= 0) && (value < bins(i).lo)) {
+  i -=1
+}
+i
+  }
+
+  /**
+   * Returns a percentage of a bin holding values for column value in the 
range of
+   * [lowerValue, higherValue]
+   *
+   * @param binId a given bin id in a specified histogram
+   * @param higherValue a given upper bound value of a specified column 
value range
+   * @param lowerValue a given lower bound value of a specified column 
value range
+   * @param histogram a numeric equi-height histogram
+   * @return the percentage of a single bin holding values in [lowerValue, 
higherValue].
+   */
+  private def getOccupation(
+  binId: Int,
+  higherValue: Double,
+  lowerValue: Double,
+  histogram: Histogram): Double = {
+val curBin = histogram.bins(binId)
+if (curBin.hi == curBin.lo) {
+  // the entire bin is covered in the range
+  1.0
+} else if (higherValue == lowerValue) {
+  // set percentage to 1/NDV
+  1.0 / curBin.ndv.toDouble
+} else {
+  // Use proration since the range falls inside this bin.
+  math.min((higherValue - lowerValue) / (curBin.hi - curBin.lo), 1.0)
+}
+  }
+
+  /**
+   * Returns the number of bins for column values in [lowerValue, 
higherValue].
--- End diff --

This is because we may return a percentage of a bin.  For example, a 
predicate column=5 may return the number of bins 0.2 if the holding bin has 5 
distinct values.  Hence, we cannot return an integer type value.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...

2017-12-05 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19783#discussion_r155125157
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -114,4 +114,194 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns the number of the first bin into which a column values falls 
for a specified
+   * numeric equi-height histogram.
+   *
+   * @param value a literal value of a column
+   * @param bins an array of bins for a given numeric equi-height histogram
+   * @return the number of the first bin into which a column values falls.
+   */
+  def findFirstBinForValue(value: Double, bins: Array[HistogramBin]): Int 
= {
+var binId = 0
+bins.foreach { bin =>
+  if (value > bin.hi) binId += 1
+}
+binId
+  }
+
+  /**
+   * Returns the number of the last bin into which a column values falls 
for a specified
+   * numeric equi-height histogram.
+   *
+   * @param value a literal value of a column
+   * @param bins an array of bins for a given numeric equi-height histogram
+   * @return the number of the last bin into which a column values falls.
+   */
+  def findLastBinForValue(value: Double, bins: Array[HistogramBin]): Int = 
{
--- End diff --

Good point.  We can simplify the logic by iterating from bins.length-1 to 0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...

2017-12-05 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19783#discussion_r155125029
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -114,4 +114,194 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns the number of the first bin into which a column values falls 
for a specified
+   * numeric equi-height histogram.
+   *
+   * @param value a literal value of a column
+   * @param bins an array of bins for a given numeric equi-height histogram
+   * @return the number of the first bin into which a column values falls.
+   */
+  def findFirstBinForValue(value: Double, bins: Array[HistogramBin]): Int 
= {
+var binId = 0
+bins.foreach { bin =>
+  if (value > bin.hi) binId += 1
--- End diff --

Good point.  Actually while loop is better because it can exit early when 
the condition no longer qualifies.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...

2017-11-30 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19783#discussion_r154255068
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
 ---
@@ -578,6 +590,112 @@ class FilterEstimationSuite extends 
StatsEstimationTestBase {
   expectedRowCount = 5)
   }
 
+  // The following test cases have histogram information collected for the 
test column
+  test("Not(cintHgm < 3 AND null)") {
+val condition = Not(And(LessThan(attrIntHgm, Literal(3)), 
Literal(null, IntegerType)))
+validateEstimatedStats(
+  Filter(condition, childStatsTestPlan(Seq(attrIntHgm), 10L)),
+  Seq(attrIntHgm -> colStatIntHgm.copy(distinctCount = 6)),
+  expectedRowCount = 9)
+  }
+
+  test("cintHgm = 5") {
+validateEstimatedStats(
+  Filter(EqualTo(attrIntHgm, Literal(5)), 
childStatsTestPlan(Seq(attrIntHgm), 10L)),
+  Seq(attrIntHgm -> ColumnStat(distinctCount = 1, min = Some(5), max = 
Some(5),
+nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+  expectedRowCount = 4)
+  }
+
+  test("cintHgm = 0") {
+// This is an out-of-range case since 0 is outside the range [min, max]
+validateEstimatedStats(
+  Filter(EqualTo(attrIntHgm, Literal(0)), 
childStatsTestPlan(Seq(attrIntHgm), 10L)),
+  Nil,
+  expectedRowCount = 0)
+  }
+
+  test("cintHgm < 3") {
+validateEstimatedStats(
+  Filter(LessThan(attrIntHgm, Literal(3)), 
childStatsTestPlan(Seq(attrIntHgm), 10L)),
+  Seq(attrIntHgm -> ColumnStat(distinctCount = 1, min = Some(1), max = 
Some(3),
+nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+  expectedRowCount = 2)
+  }
+
+  test("cintHgm < 0") {
+// This is a corner case since literal 0 is smaller than min.
+validateEstimatedStats(
+  Filter(LessThan(attrIntHgm, Literal(0)), 
childStatsTestPlan(Seq(attrIntHgm), 10L)),
+  Nil,
+  expectedRowCount = 0)
+  }
+
+  test("cintHgm <= 3") {
+validateEstimatedStats(
+  Filter(LessThanOrEqual(attrIntHgm, Literal(3)), 
childStatsTestPlan(Seq(attrIntHgm), 10L)),
+  Seq(attrIntHgm -> ColumnStat(distinctCount = 1, min = Some(1), max = 
Some(3),
+nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+  expectedRowCount = 2)
+  }
+
+  test("cintHgm > 6") {
+validateEstimatedStats(
+  Filter(GreaterThan(attrIntHgm, Literal(6)), 
childStatsTestPlan(Seq(attrIntHgm), 10L)),
+  Seq(attrIntHgm -> ColumnStat(distinctCount = 2, min = Some(6), max = 
Some(10),
+nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+  expectedRowCount = 2)
+  }
+
+  test("cintHgm > 10") {
+// This is a corner case since max value is 10.
+validateEstimatedStats(
+  Filter(GreaterThan(attrIntHgm, Literal(10)), 
childStatsTestPlan(Seq(attrIntHgm), 10L)),
+  Nil,
+  expectedRowCount = 0)
+  }
+
+  test("cintHgm >= 6") {
+validateEstimatedStats(
+  Filter(GreaterThanOrEqual(attrIntHgm, Literal(6)), 
childStatsTestPlan(Seq(attrIntHgm), 10L)),
+  Seq(attrIntHgm -> ColumnStat(distinctCount = 3, min = Some(6), max = 
Some(10),
+nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+  expectedRowCount = 4)
+  }
+
+  test("cintHgm IS NULL") {
+validateEstimatedStats(
+  Filter(IsNull(attrIntHgm), childStatsTestPlan(Seq(attrIntHgm), 10L)),
+  Nil,
+  expectedRowCount = 0)
+  }
+
+  test("cintHgm IS NOT NULL") {
+validateEstimatedStats(
+  Filter(IsNotNull(attrIntHgm), childStatsTestPlan(Seq(attrIntHgm), 
10L)),
+  Seq(attrIntHgm -> ColumnStat(distinctCount = 6, min = Some(1), max = 
Some(10),
+nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+  expectedRowCount = 10)
+  }
+
+  test("cintHgm > 3 AND cintHgm <= 6") {
+val condition = And(GreaterThan(attrIntHgm,
+  Literal(3)), LessThanOrEqual(attrIntHgm, Literal(6)))
+validateEstimatedStats(
+  Filter(condition, childStatsTestPlan(Seq(attrIntHgm), 10L)),
+  Seq(attrIntHgm -> ColumnStat(distinctCount = 5, min = Some(3), max = 
Some(6),
+nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+  expectedRowCount = 8)
+  }
+
+  test("cintHgm = 3 OR cintHgm = 6") {
--- End diff --

We have added histogram test cases for sk

[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...

2017-11-30 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19783#discussion_r154254419
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
 ---
@@ -359,7 +371,7 @@ class FilterEstimationSuite extends 
StatsEstimationTestBase {
   test("cbool > false") {
 validateEstimatedStats(
   Filter(GreaterThan(attrBool, Literal(false)), 
childStatsTestPlan(Seq(attrBool), 10L)),
-  Seq(attrBool -> ColumnStat(distinctCount = 1, min = Some(true), max 
= Some(true),
+  Seq(attrBool -> ColumnStat(distinctCount = 1, min = Some(false), max 
= Some(true),
--- End diff --

My earlier comment mentioned this test case.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...

2017-11-30 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19783#discussion_r154254145
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -784,11 +879,16 @@ case class ColumnStatsMap(originalMap: 
AttributeMap[ColumnStat]) {
   def outputColumnStats(rowsBeforeFilter: BigInt, rowsAfterFilter: BigInt)
 : AttributeMap[ColumnStat] = {
 val newColumnStats = originalMap.map { case (attr, oriColStat) =>
-  // Update ndv based on the overall filter selectivity: scale down 
ndv if the number of rows
-  // decreases; otherwise keep it unchanged.
-  val newNdv = EstimationUtils.updateNdv(oldNumRows = rowsBeforeFilter,
-newNumRows = rowsAfterFilter, oldNdv = oriColStat.distinctCount)
   val colStat = 
updatedMap.get(attr.exprId).map(_._2).getOrElse(oriColStat)
+  val newNdv = if (colStat.distinctCount > 1) {
--- End diff --

The old code does not work well for a couple of new skewed-distribution 
tests.  For example, test("cintHgm < 3") would fail.  Because it still computes 
to find newNdv in updateNdv() method.  But, in reality, we already scale it 
down to 1.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...

2017-11-30 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19783#discussion_r154252063
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -513,10 +560,9 @@ case class FilterEstimation(plan: Filter) extends 
Logging {
 
 op match {
   case _: GreaterThan | _: GreaterThanOrEqual =>
-// If new ndv is 1, then new max must be equal to new min.
-newMin = if (newNdv == 1) newMax else newValue
+newMin = newValue
   case _: LessThan | _: LessThanOrEqual =>
-newMax = if (newNdv == 1) newMin else newValue
+newMax = newValue
--- End diff --

Previously I coded that way because of a corner test case: test("cbool > 
false").  At that time, I set the newMin to newMax since newNdv = 1.  However, 
this logic does not work well for the skewed distribution test case: test 
("cintHgm < 3").  In this test, newMin=1 newMax=3.  I think the revised code 
makes better sense.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...

2017-11-30 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19783#discussion_r154250197
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -332,8 +332,45 @@ case class FilterEstimation(plan: Filter) extends 
Logging {
 colStatsMap.update(attr, newStats)
   }
 
-  Some(1.0 / BigDecimal(ndv))
-} else {
+  // We compute filter selectivity using Histogram information
+  attr.dataType match {
+case StringType | BinaryType =>
+  Some(1.0 / BigDecimal(ndv))
+
+case _ =>
+  // returns 1/ndv if there is no histogram
+  if (colStat.histogram.isEmpty) return Some(1.0 / BigDecimal(ndv))
+
+  // We traverse histogram bins to locate the literal value
+  val hgmBins = colStat.histogram.get.bins
+  val datum = EstimationUtils.toDecimal(literal.value, 
literal.dataType).toDouble
+  // find the interval where this datum locates
+  var lowerId, higherId = -1
+  for (i <- hgmBins.indices) {
+// if datum > upperBound, just move to next bin
+if (datum <= hgmBins(i).hi && lowerId < 0) lowerId = i
+if (higherId < 0) {
+  if ((datum < hgmBins(i).hi || i == hgmBins.length - 1) ||
+((datum == hgmBins(i).hi) && (datum < hgmBins(i + 1).hi))) 
{
+higherId = i
+  }
+}
+  }
+  assert(lowerId <= higherId)
+  val lowerBinNdv = hgmBins(lowerId).ndv
+  val higherBinNdv = hgmBins(higherId).ndv
+  // assume uniform distribution in each bin
+  val percent = if (lowerId == higherId) {
+(1.0 / hgmBins.length) / math.max(lowerBinNdv, 1)
+  } else {
+1.0 / hgmBins.length * (higherId - lowerId - 1) +
+  (1.0 / hgmBins.length) / math.max(lowerBinNdv, 1) +
+  (1.0 / hgmBins.length) / math.max(higherBinNdv, 1)
+  }
+  Some(percent)
--- End diff --

Good point. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...

2017-11-30 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19783#discussion_r154249995
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -114,4 +114,197 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns the number of the first bin into which a column values falls 
for a specified
+   * numeric equi-height histogram.
+   *
+   * @param value a literal value of a column
+   * @param histogram a numeric equi-height histogram
+   * @return the number of the first bin into which a column values falls.
+   */
+
+  def findFirstBinForValue(value: Double, histogram: Histogram): Int = {
+var binId = 0
+histogram.bins.foreach { bin =>
+  if (value > bin.hi) binId += 1
+}
+binId
+  }
+
+  /**
+   * Returns the number of the last bin into which a column values falls 
for a specified
+   * numeric equi-height histogram.
+   *
+   * @param value a literal value of a column
+   * @param histogram a numeric equi-height histogram
+   * @return the number of the last bin into which a column values falls.
+   */
+
+  def findLastBinForValue(value: Double, histogram: Histogram): Int = {
+var binId = 0
+for (i <- 0 until histogram.bins.length) {
+  if (value > histogram.bins(i).hi) {
+// increment binId to point to next bin
+binId += 1
+  }
+  if ((value == histogram.bins(i).hi) && (i < histogram.bins.length - 
1)) {
+if (value == histogram.bins(i + 1).lo) {
--- End diff --

No.  I meant the upper bound for the array of bins in a histogram.  The 
default length of the histogram bin array is 254.  When i is equal to 253 (the 
last bin), then i+1 is 254 leading to out-of-bound error.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...

2017-11-30 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19783#discussion_r154225069
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -114,4 +114,197 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns the number of the first bin into which a column values falls 
for a specified
+   * numeric equi-height histogram.
+   *
+   * @param value a literal value of a column
+   * @param histogram a numeric equi-height histogram
+   * @return the number of the first bin into which a column values falls.
+   */
+
+  def findFirstBinForValue(value: Double, histogram: Histogram): Int = {
+var binId = 0
+histogram.bins.foreach { bin =>
+  if (value > bin.hi) binId += 1
+}
+binId
+  }
+
+  /**
+   * Returns the number of the last bin into which a column values falls 
for a specified
+   * numeric equi-height histogram.
+   *
+   * @param value a literal value of a column
+   * @param histogram a numeric equi-height histogram
+   * @return the number of the last bin into which a column values falls.
+   */
+
+  def findLastBinForValue(value: Double, histogram: Histogram): Int = {
+var binId = 0
+for (i <- 0 until histogram.bins.length) {
+  if (value > histogram.bins(i).hi) {
+// increment binId to point to next bin
+binId += 1
+  }
+  if ((value == histogram.bins(i).hi) && (i < histogram.bins.length - 
1)) {
+if (value == histogram.bins(i + 1).lo) {
--- End diff --

I used two statements instead of one statement is because, when i points to 
the last bin, this condition "value == histogram.bins(i + 1).lo" may be out of 
bound.  By separating the conditions into two statements, we can be sure that 
the out-of-bound error will not happen.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...

2017-11-30 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19783#discussion_r154223769
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -114,4 +114,197 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns the number of the first bin into which a column values falls 
for a specified
+   * numeric equi-height histogram.
+   *
+   * @param value a literal value of a column
+   * @param histogram a numeric equi-height histogram
+   * @return the number of the first bin into which a column values falls.
+   */
+
+  def findFirstBinForValue(value: Double, histogram: Histogram): Int = {
+var binId = 0
+histogram.bins.foreach { bin =>
+  if (value > bin.hi) binId += 1
+}
+binId
+  }
+
+  /**
+   * Returns the number of the last bin into which a column values falls 
for a specified
+   * numeric equi-height histogram.
+   *
+   * @param value a literal value of a column
+   * @param histogram a numeric equi-height histogram
+   * @return the number of the last bin into which a column values falls.
+   */
+
+  def findLastBinForValue(value: Double, histogram: Histogram): Int = {
+var binId = 0
--- End diff --

same comment as in my last reply.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...

2017-11-30 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19783#discussion_r154223705
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -114,4 +114,197 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns the number of the first bin into which a column values falls 
for a specified
+   * numeric equi-height histogram.
+   *
+   * @param value a literal value of a column
+   * @param histogram a numeric equi-height histogram
+   * @return the number of the first bin into which a column values falls.
+   */
+
+  def findFirstBinForValue(value: Double, histogram: Histogram): Int = {
+var binId = 0
--- End diff --

I hesitate to add an assert statement here.  This is because an assert such 
as this may cause Spark system to crash if a user does not fresh his data 
statistics quickly.  In real world, a user may load data, collect statistics, 
and then add more incremental data, but does not collect statistics 
immediately.  He may issue a SQL query against his newly added data such as 
"WHERE column=xxx", where xxx is a new value in his incremental load.  After 
all, statistics are auxiliary, a query should still run even the statistics are 
not up to date.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...

2017-11-29 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19783#discussion_r153902382
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
@@ -114,4 +114,197 @@ object EstimationUtils {
 }
   }
 
+  /**
+   * Returns the number of the first bin/bucket into which a column values 
falls for a specified
+   * numeric equi-height histogram.
+   *
+   * @param value a literal value of a column
+   * @param histogram a numeric equi-height histogram
+   * @return the number of the first bin/bucket into which a column values 
falls.
+   */
+
+  def findFirstBucketForValue(value: Double, histogram: Histogram): Int = {
--- End diff --

We had bucket(s) and bin(s) used interchangeably.  To avoid confusion, I 
will unify them to use only bin/bins.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-11-28 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r153665547
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
 ---
@@ -67,6 +68,205 @@ class JoinEstimationSuite extends 
StatsEstimationTestBase {
 rowCount = 2,
 attributeStats = AttributeMap(Seq("key-1-2", 
"key-2-3").map(nameToColInfo)))
 
+  private def estimateByHistogram(
+  histogram1: Histogram,
+  histogram2: Histogram,
+  expectedMin: Double,
+  expectedMax: Double,
+  expectedNdv: Long,
+  expectedRows: Long): Unit = {
+val col1 = attr("key1")
+val col2 = attr("key2")
+val c1 = generateJoinChild(col1, histogram1, expectedMin, expectedMax)
+val c2 = generateJoinChild(col2, histogram2, expectedMin, expectedMax)
+
+val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
+val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
+val expectedStatsAfterJoin = Statistics(
+  sizeInBytes = expectedRows * (8 + 2 * 4),
+  rowCount = Some(expectedRows),
+  attributeStats = AttributeMap(Seq(
+col1 -> c1.stats.attributeStats(col1).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax)),
+col2 -> c2.stats.attributeStats(col2).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax
+)
+
+// Join order should not affect estimation result.
+Seq(c1JoinC2, c2JoinC1).foreach { join =>
+  assert(join.stats == expectedStatsAfterJoin)
+}
+  }
+
+  private def generateJoinChild(
+  col: Attribute,
+  histogram: Histogram,
+  expectedMin: Double,
+  expectedMax: Double): LogicalPlan = {
+val colStat = inferColumnStat(histogram)
+val t = StatsTestPlan(
+  outputList = Seq(col),
+  rowCount = (histogram.height * histogram.bins.length).toLong,
+  attributeStats = AttributeMap(Seq(col -> colStat)))
+
+val filterCondition = new ArrayBuffer[Expression]()
+if (expectedMin > colStat.min.get.toString.toDouble) {
+  filterCondition += GreaterThanOrEqual(col, Literal(expectedMin))
+}
+if (expectedMax < colStat.max.get.toString.toDouble) {
+  filterCondition += LessThanOrEqual(col, Literal(expectedMax))
+}
+if (filterCondition.isEmpty) t else 
Filter(filterCondition.reduce(And), t)
+  }
+
+  private def inferColumnStat(histogram: Histogram): ColumnStat = {
+var ndv = 0L
+for (i <- histogram.bins.indices) {
+  val bin = histogram.bins(i)
+  if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
+ndv += bin.ndv
+  }
+}
+ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
+  max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, 
maxLen = 4,
+  histogram = Some(histogram))
+  }
+
+  test("equi-height histograms: a bin is contained by another one") {
+val histogram1 = Histogram(height = 300, Array(
+  HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 
60, ndv = 30)))
+val histogram2 = Histogram(height = 100, Array(
+  HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 
100, ndv = 40)))
+// test bin trimming
+val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 10, max 
= 60)
+assert(t1 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h1 == 80)
+val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 10, max 
= 60)
+assert(t2 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20)
+
+val expectedRanges = Seq(
+  OverlappedRange(10, 30, math.min(10, 40*1/2), math.max(10, 40*1/2), 
300, 80*1/2),
+  OverlappedRange(30, 50, math.min(30*2/3, 40*1/2), math.max(30*2/3, 
40*1/2), 300*2/3, 80*1/2),
+  OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 
300*1/3, 20)
+)
+assert(expectedRanges.equals(
+  getOverlappedRanges(histogram1, histogram2, newMin = 10D, newMax = 
60D)))
+
+estimateByHistogram(
+  histogram1 = histogram1,
+  histogram2 = histogram2,
+  expectedMin = 10D,
+  expectedMax = 60D,
+  // 10 + 20 + 8
+  expectedNdv = 38L,
+  // 300*40/20 + 200*40/20 + 100*20/10
+  expectedRows = 1200L)
+  }
+
+  test("equi-height histograms: a bin has only one value") {
+val histogram1 = Histogram(height = 300, Array(
+  HistogramBin(lo = 30, hi = 30, ndv = 1), Hi

[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

2017-11-28 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19594#discussion_r153665092
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
 ---
@@ -67,6 +68,205 @@ class JoinEstimationSuite extends 
StatsEstimationTestBase {
 rowCount = 2,
 attributeStats = AttributeMap(Seq("key-1-2", 
"key-2-3").map(nameToColInfo)))
 
+  private def estimateByHistogram(
+  histogram1: Histogram,
+  histogram2: Histogram,
+  expectedMin: Double,
+  expectedMax: Double,
+  expectedNdv: Long,
+  expectedRows: Long): Unit = {
+val col1 = attr("key1")
+val col2 = attr("key2")
+val c1 = generateJoinChild(col1, histogram1, expectedMin, expectedMax)
+val c2 = generateJoinChild(col2, histogram2, expectedMin, expectedMax)
+
+val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
+val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
+val expectedStatsAfterJoin = Statistics(
+  sizeInBytes = expectedRows * (8 + 2 * 4),
+  rowCount = Some(expectedRows),
+  attributeStats = AttributeMap(Seq(
+col1 -> c1.stats.attributeStats(col1).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax)),
+col2 -> c2.stats.attributeStats(col2).copy(
+  distinctCount = expectedNdv, min = Some(expectedMin), max = 
Some(expectedMax
+)
+
+// Join order should not affect estimation result.
+Seq(c1JoinC2, c2JoinC1).foreach { join =>
+  assert(join.stats == expectedStatsAfterJoin)
+}
+  }
+
+  private def generateJoinChild(
+  col: Attribute,
+  histogram: Histogram,
+  expectedMin: Double,
+  expectedMax: Double): LogicalPlan = {
+val colStat = inferColumnStat(histogram)
+val t = StatsTestPlan(
+  outputList = Seq(col),
+  rowCount = (histogram.height * histogram.bins.length).toLong,
+  attributeStats = AttributeMap(Seq(col -> colStat)))
+
+val filterCondition = new ArrayBuffer[Expression]()
+if (expectedMin > colStat.min.get.toString.toDouble) {
+  filterCondition += GreaterThanOrEqual(col, Literal(expectedMin))
+}
+if (expectedMax < colStat.max.get.toString.toDouble) {
+  filterCondition += LessThanOrEqual(col, Literal(expectedMax))
+}
+if (filterCondition.isEmpty) t else 
Filter(filterCondition.reduce(And), t)
+  }
+
+  private def inferColumnStat(histogram: Histogram): ColumnStat = {
+var ndv = 0L
+for (i <- histogram.bins.indices) {
+  val bin = histogram.bins(i)
+  if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
+ndv += bin.ndv
+  }
+}
+ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
+  max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, 
maxLen = 4,
+  histogram = Some(histogram))
+  }
+
+  test("equi-height histograms: a bin is contained by another one") {
+val histogram1 = Histogram(height = 300, Array(
+  HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 
60, ndv = 30)))
+val histogram2 = Histogram(height = 100, Array(
+  HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 
100, ndv = 40)))
+// test bin trimming
+val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 10, max 
= 60)
+assert(t1 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h1 == 80)
+val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 10, max 
= 60)
+assert(t2 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20)
+
+val expectedRanges = Seq(
+  OverlappedRange(10, 30, math.min(10, 40*1/2), math.max(10, 40*1/2), 
300, 80*1/2),
+  OverlappedRange(30, 50, math.min(30*2/3, 40*1/2), math.max(30*2/3, 
40*1/2), 300*2/3, 80*1/2),
+  OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 
300*1/3, 20)
+)
+assert(expectedRanges.equals(
+  getOverlappedRanges(histogram1, histogram2, newMin = 10D, newMax = 
60D)))
+
+estimateByHistogram(
+  histogram1 = histogram1,
+  histogram2 = histogram2,
+  expectedMin = 10D,
+  expectedMax = 60D,
+  // 10 + 20 + 8
+  expectedNdv = 38L,
+  // 300*40/20 + 200*40/20 + 100*20/10
+  expectedRows = 1200L)
+  }
+
+  test("equi-height histograms: a bin has only one value") {
+val histogram1 = Histogram(height = 300, Array(
+  HistogramBin(lo = 30, hi = 30, ndv = 1), Hi

[GitHub] spark issue #19783: [SPARK-21322][SQL] support histogram in filter cardinali...

2017-11-19 Thread ron8hu
Github user ron8hu commented on the issue:

https://github.com/apache/spark/pull/19783
  
cc @sameeragarwal @cloud-fan @gatorsmile @wzhfy 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19357: [SPARK-21322][SQL][WIP] support histogram in filter card...

2017-11-19 Thread ron8hu
Github user ron8hu commented on the issue:

https://github.com/apache/spark/pull/19357
  
This pull request was created while there were several dependencies that 
had not been defined yet.  As a result, it has caused many conflicts.  I 
decided to close this pull request and start a clean one.  Please review pull 
request 19783 which is to replace this one.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19357: [SPARK-21322][SQL][WIP] support histogram in filt...

2017-11-19 Thread ron8hu
Github user ron8hu closed the pull request at:

https://github.com/apache/spark/pull/19357


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19783: support histogram in filter cardinality estimatio...

2017-11-19 Thread ron8hu
GitHub user ron8hu opened a pull request:

https://github.com/apache/spark/pull/19783

support histogram in filter cardinality estimation

## What changes were proposed in this pull request?

Histogram is effective in dealing with skewed distribution. After we 
generate histogram information for column statistics, we need to adjust filter 
estimation based on histogram data structure.

## How was this patch tested?

We revised all the unit test cases by including histogram data structure.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ron8hu/spark supportHistogram

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19783.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19783


commit dd5b975dafdf9fc4edd94cf6e369f5e899db74e2
Author: Ron Hu <ron...@huawei.com>
Date:   2017-11-19T19:37:47Z

support histogram in filter cardinality estimation




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19479: [SPARK-17074] [SQL] Generate equi-height histogra...

2017-11-03 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19479#discussion_r148724807
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 ---
@@ -275,6 +327,64 @@ object ColumnStat extends Logging {
   avgLen = row.getLong(4),
   maxLen = row.getLong(5)
 )
+if (row.isNullAt(6)) {
+  cs
+} else {
+  val ndvs = row.getArray(6).toLongArray()
+  assert(percentiles.get.length == ndvs.length + 1)
+  val endpoints = percentiles.get.map(_.toString.toDouble)
+  // Construct equi-height histogram
+  val buckets = ndvs.zipWithIndex.map { case (ndv, i) =>
+EquiHeightBucket(endpoints(i), endpoints(i + 1), ndv)
+  }
+  val nonNullRows = rowCount - cs.nullCount
+  val ehHistogram = EquiHeightHistogram(nonNullRows.toDouble / 
ndvs.length, buckets)
+  cs.copy(histogram = Some(ehHistogram))
+}
   }
 
 }
+
+/**
+ * There are a few types of histograms in state-of-the-art estimation 
methods. E.g. equi-width
+ * histogram, equi-height histogram, frequency histogram (value-frequency 
pairs) and hybrid
+ * histogram, etc.
+ * Currently in Spark, we support equi-height histogram since it is good 
at handling skew
+ * distribution, and also provides reasonable accuracy in other cases.
+ * We can add other histograms in the future, which will make estimation 
logic more complicated.
+ * This is because we will have to deal with computation between different 
types of histograms in
+ * some cases, e.g. for join columns.
+ */
+trait Histogram
+
+/**
+ * Equi-height histogram represents column value distribution by a 
sequence of buckets. Each bucket
+ * has a value range and contains approximately the same number of rows.
+ * @param height number of rows in each bucket
+ * @param ehBuckets equi-height histogram buckets
+ */
+case class EquiHeightHistogram(height: Double, ehBuckets: 
Seq[EquiHeightBucket]) extends Histogram {
--- End diff --

Please declare ehBuckets: Array[EquiHeightBucket]) instead of ehBuckets: 
Seq[EquiHeightBucket]).  This is because we need to access a bucket directly 
and randomly.  For random access, Scala Array can provide better performance as 
it has index to access an array element quickly.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19479: [SPARK-17074] [SQL] Generate equi-height histogra...

2017-10-19 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19479#discussion_r145840719
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 ---
@@ -216,65 +218,61 @@ object ColumnStat extends Logging {
 }
   }
 
-  /**
-   * Constructs an expression to compute column statistics for a given 
column.
-   *
-   * The expression should create a single struct column with the 
following schema:
-   * distinctCount: Long, min: T, max: T, nullCount: Long, avgLen: Long, 
maxLen: Long
-   *
-   * Together with [[rowToColumnStat]], this function is used to create 
[[ColumnStat]] and
-   * as a result should stay in sync with it.
-   */
-  def statExprs(col: Attribute, relativeSD: Double): CreateNamedStruct = {
-def struct(exprs: Expression*): CreateNamedStruct = 
CreateStruct(exprs.map { expr =>
-  expr.transformUp { case af: AggregateFunction => 
af.toAggregateExpression() }
-})
-val one = Literal(1, LongType)
+  private def convertToHistogram(s: String): EquiHeightHistogram = {
+val idx = s.indexOf(",")
+if (idx <= 0) {
+  throw new AnalysisException("Failed to parse histogram.")
+}
+val height = s.substring(0, idx).toDouble
+val pattern = "Bucket\\(([^,]+), ([^,]+), ([^\\)]+)\\)".r
+val buckets = pattern.findAllMatchIn(s).map { m =>
+  EquiHeightBucket(m.group(1).toDouble, m.group(2).toDouble, 
m.group(3).toLong)
+}.toSeq
+EquiHeightHistogram(height, buckets)
+  }
 
-// the approximate ndv (num distinct value) should never be larger 
than the number of rows
-val numNonNulls = if (col.nullable) Count(col) else Count(one)
-val ndv = Least(Seq(HyperLogLogPlusPlus(col, relativeSD), numNonNulls))
-val numNulls = Subtract(Count(one), numNonNulls)
-val defaultSize = Literal(col.dataType.defaultSize, LongType)
+}
 
-def fixedLenTypeStruct(castType: DataType) = {
-  // For fixed width types, avg size should be the same as max size.
-  struct(ndv, Cast(Min(col), castType), Cast(Max(col), castType), 
numNulls, defaultSize,
-defaultSize)
-}
+/**
+ * There are a few types of histograms in state-of-the-art estimation 
methods. E.g. equi-width
+ * histogram, equi-height histogram, frequency histogram (value-frequency 
pairs) and hybrid
+ * histogram, etc.
+ * Currently in Spark, we support equi-height histogram since it is good 
at handling skew
+ * distribution, and also provides reasonable accuracy in other cases.
+ * We can add other histograms in the future, which will make estimation 
logic more complicated.
+ * Because we will have to deal with computation between different types 
of histograms in some
+ * cases, e.g. for join columns.
+ */
+trait Histogram
 
-col.dataType match {
-  case dt: IntegralType => fixedLenTypeStruct(dt)
-  case _: DecimalType => fixedLenTypeStruct(col.dataType)
-  case dt @ (DoubleType | FloatType) => fixedLenTypeStruct(dt)
-  case BooleanType => fixedLenTypeStruct(col.dataType)
-  case DateType => fixedLenTypeStruct(col.dataType)
-  case TimestampType => fixedLenTypeStruct(col.dataType)
-  case BinaryType | StringType =>
-// For string and binary type, we don't store min/max.
-val nullLit = Literal(null, col.dataType)
-struct(
-  ndv, nullLit, nullLit, numNulls,
-  // Set avg/max size to default size if all the values are null 
or there is no value.
-  Coalesce(Seq(Ceil(Average(Length(col))), defaultSize)),
-  Coalesce(Seq(Cast(Max(Length(col)), LongType), defaultSize)))
-  case _ =>
-throw new AnalysisException("Analyzing column statistics is not 
supported for column " +
-s"${col.name} of data type: ${col.dataType}.")
-}
-  }
+/**
+ * Equi-height histogram represents column value distribution by a 
sequence of buckets. Each bucket
+ * has a value range and contains approximately the same number of rows.
+ * @param height number of rows in each bucket
+ * @param ehBuckets equi-height histogram buckets
+ */
+case class EquiHeightHistogram(height: Double, ehBuckets: 
Seq[EquiHeightBucket]) extends Histogram {
 
-  /** Convert a struct for column stats (defined in statExprs) into 
[[ColumnStat]]. */
-  def rowToColumnStat(row: InternalRow, attr: Attribute): ColumnStat = {
-ColumnStat(
-  distinctCount = BigInt(row.getLong(0)),
-  // for string/binary min/max, get should return null
-  min = Option(row.get(1, attr.dataType)),
-

[GitHub] spark pull request #19479: [SPARK-17074] [SQL] Generate equi-height histogra...

2017-10-19 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19479#discussion_r145828713
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 ---
@@ -216,65 +218,61 @@ object ColumnStat extends Logging {
 }
   }
 
-  /**
-   * Constructs an expression to compute column statistics for a given 
column.
-   *
-   * The expression should create a single struct column with the 
following schema:
-   * distinctCount: Long, min: T, max: T, nullCount: Long, avgLen: Long, 
maxLen: Long
-   *
-   * Together with [[rowToColumnStat]], this function is used to create 
[[ColumnStat]] and
-   * as a result should stay in sync with it.
-   */
-  def statExprs(col: Attribute, relativeSD: Double): CreateNamedStruct = {
-def struct(exprs: Expression*): CreateNamedStruct = 
CreateStruct(exprs.map { expr =>
-  expr.transformUp { case af: AggregateFunction => 
af.toAggregateExpression() }
-})
-val one = Literal(1, LongType)
+  private def convertToHistogram(s: String): EquiHeightHistogram = {
+val idx = s.indexOf(",")
+if (idx <= 0) {
+  throw new AnalysisException("Failed to parse histogram.")
+}
+val height = s.substring(0, idx).toDouble
+val pattern = "Bucket\\(([^,]+), ([^,]+), ([^\\)]+)\\)".r
+val buckets = pattern.findAllMatchIn(s).map { m =>
+  EquiHeightBucket(m.group(1).toDouble, m.group(2).toDouble, 
m.group(3).toLong)
+}.toSeq
+EquiHeightHistogram(height, buckets)
+  }
 
-// the approximate ndv (num distinct value) should never be larger 
than the number of rows
-val numNonNulls = if (col.nullable) Count(col) else Count(one)
-val ndv = Least(Seq(HyperLogLogPlusPlus(col, relativeSD), numNonNulls))
-val numNulls = Subtract(Count(one), numNonNulls)
-val defaultSize = Literal(col.dataType.defaultSize, LongType)
+}
 
-def fixedLenTypeStruct(castType: DataType) = {
-  // For fixed width types, avg size should be the same as max size.
-  struct(ndv, Cast(Min(col), castType), Cast(Max(col), castType), 
numNulls, defaultSize,
-defaultSize)
-}
+/**
+ * There are a few types of histograms in state-of-the-art estimation 
methods. E.g. equi-width
+ * histogram, equi-height histogram, frequency histogram (value-frequency 
pairs) and hybrid
+ * histogram, etc.
+ * Currently in Spark, we support equi-height histogram since it is good 
at handling skew
+ * distribution, and also provides reasonable accuracy in other cases.
+ * We can add other histograms in the future, which will make estimation 
logic more complicated.
+ * Because we will have to deal with computation between different types 
of histograms in some
--- End diff --

This is because we will have to 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19357: [SPARK-21322][SQL][WIP] support histogram in filter card...

2017-09-26 Thread ron8hu
Github user ron8hu commented on the issue:

https://github.com/apache/spark/pull/19357
  
cc @wzhfy Please review code first before I request the community to review 
it.  Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19357: support histogram in filter cardinality estimatio...

2017-09-26 Thread ron8hu
GitHub user ron8hu opened a pull request:

https://github.com/apache/spark/pull/19357

support histogram in filter cardinality estimation

## What changes were proposed in this pull request?

Histogram is effective in dealing with skewed distribution. After we 
generate histogram information for column statistics, we need to adjust filter 
estimation based on histogram data structure.

## How was this patch tested?

We revised all the unit test cases by including histogram data structure.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ron8hu/spark createhistogram

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19357.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19357


commit 46af54d9c86fa1e5322fdd92ed47fe3d419dd966
Author: Ron Hu <ron...@huawei.com>
Date:   2017-09-26T23:33:49Z

support histogram in filter cardinality estimation




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17918: [SPARK-20678][SQL] Ndv for columns not in filter conditi...

2017-05-09 Thread ron8hu
Github user ron8hu commented on the issue:

https://github.com/apache/spark/pull/17918
  
I agree that we need to scale down NDV for all the referenced columns in a 
query if a filter condition reduces the number of qualified rows.  Do you find 
this problem when running tpc-ds benchmark?  If so, what queries encounters 
this issue?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17546: [SPARK-20233] [SQL] Apply star-join filter heuris...

2017-04-07 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17546#discussion_r110480494
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -736,6 +736,12 @@ object SQLConf {
   .checkValue(weight => weight >= 0 && weight <= 1, "The weight value 
must be in [0, 1].")
   .createWithDefault(0.7)
 
+  val JOIN_REORDER_DP_STAR_FILTER =
+buildConf("spark.sql.cbo.joinReorder.dp.star.filter")
+  .doc("Applies star-join filter heuristics to cost based join 
enumeration.")
+  .booleanConf
+  .createWithDefault(false)
--- End diff --

In Spark 2.2, we introduced a couple of new configuration parameters in 
optimizer area.  In order to play on the safe side, we set the default value to 
false.  I suggest that we can change the default value to true AFTER we are 
sure that the new optimizer feature does not cause any regression.  I think the 
system regression/integration test suites help us make a decision. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-04-03 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r109476536
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -550,6 +565,221 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator, including =, <=>, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of the given columns
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
+// TODO: It is difficult to support other binary comparisons for 
String/Binary
+// type without min/max and advanced statistics like histogram.
+logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
+return None
+  case _ =>
+}
+
+val colStatLeft = colStatsMap(attrLeft)
+val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
+  .asInstanceOf[NumericRange]
+val maxLeft = BigDecimal(statsRangeLeft.max)
+val minLeft = BigDecimal(statsRangeLeft.min)
+
+val colStatRight = colStatsMap(attrRight)
+val statsRangeRight = Range(colStatRight.min, colStatRight.max, 
attrRight.dataType)
+  .asInstanceOf[NumericRange]
+val maxRight = BigDecimal(statsRangeRight.max)
+val minRight = BigDecimal(statsRangeRight.min)
+
+// determine the overlapping degree between predicate range and 
column's range
+val allNotNull = (colStatLeft.nullCount == 0) && 
(colStatRight.nullCount == 0)
+val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
+  // Left < Right or Left <= Right
+  // - no overlap:
+  //  minRight   maxRight minLeft   maxLeft
+  // +--++-+--->
+  // - complete overlap: (If null values exists, we set it to partial 
overlap.)
+  //  minLeftmaxLeft  minRight  maxRight
+  // +--++-+--->
+  case _: LessThan =>
+(minLeft >= maxRight, (maxLeft < minRight) && allNotNull)
+  case _: LessThanOrEqual =>
+(minLeft > maxRight, (maxLeft <= minRight) && allNotNull)
+
+  // Left > Right or Left >= Right
+  // - no overlap:
+  //  minLeftmaxLeft  minRight  maxRight
+  // +--++-+--->
+  // - complete overlap: (If null values exists, we set it to partial 
overlap.)
+  //  minRight   maxRight minLeft   maxLeft
+  // +--++-+--->
+  case _: GreaterThan =>
+(maxLeft <= minRight, (minLeft > maxRight) && allNotNull)
+  case _: GreaterThanOrEqual =>
+(maxLeft < minRight, (minLeft >= maxRight) && allNotNull)
+
+  // Left = Right or Left <=> Right
+  // - no overlap:
+  //  minLeftmaxLeft  minRight  maxRight
+  // +--++-+--->
+  //  minRight   maxRight minLeft   maxLeft
+  // +

[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-04-03 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r109476505
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -550,6 +565,221 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator, including =, <=>, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of the given columns
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
+// TODO: It is difficult to support other binary comparisons for 
String/Binary
+// type without min/max and advanced statistics like histogram.
+logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
+return None
+  case _ =>
+}
+
+val colStatLeft = colStatsMap(attrLeft)
+val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
+  .asInstanceOf[NumericRange]
+val maxLeft = BigDecimal(statsRangeLeft.max)
+val minLeft = BigDecimal(statsRangeLeft.min)
+
+val colStatRight = colStatsMap(attrRight)
+val statsRangeRight = Range(colStatRight.min, colStatRight.max, 
attrRight.dataType)
+  .asInstanceOf[NumericRange]
+val maxRight = BigDecimal(statsRangeRight.max)
+val minRight = BigDecimal(statsRangeRight.min)
+
+// determine the overlapping degree between predicate range and 
column's range
+val allNotNull = (colStatLeft.nullCount == 0) && 
(colStatRight.nullCount == 0)
+val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
+  // Left < Right or Left <= Right
+  // - no overlap:
+  //  minRight   maxRight minLeft   maxLeft
+  // +--++-+--->
+  // - complete overlap: (If null values exists, we set it to partial 
overlap.)
+  //  minLeftmaxLeft  minRight  maxRight
+  // +--++-+--->
+  case _: LessThan =>
+(minLeft >= maxRight, (maxLeft < minRight) && allNotNull)
+  case _: LessThanOrEqual =>
+(minLeft > maxRight, (maxLeft <= minRight) && allNotNull)
+
+  // Left > Right or Left >= Right
+  // - no overlap:
+  //  minLeftmaxLeft  minRight  maxRight
+  // +--++-+--->
+  // - complete overlap: (If null values exists, we set it to partial 
overlap.)
+  //  minRight   maxRight minLeft   maxLeft
+  // +--++-+--->
+  case _: GreaterThan =>
+(maxLeft <= minRight, (minLeft > maxRight) && allNotNull)
+  case _: GreaterThanOrEqual =>
+(maxLeft < minRight, (minLeft >= maxRight) && allNotNull)
+
+  // Left = Right or Left <=> Right
+  // - no overlap:
+  //  minLeftmaxLeft  minRight  maxRight
+  // +--++-+--->
+  //  minRight   maxRight minLeft   maxLeft
+  // +

[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-04-03 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r109476460
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
 ---
@@ -491,7 +599,22 @@ class FilterEstimationSuite extends 
StatsEstimationTestBase {
 sizeInBytes = getOutputSize(filter.output, expectedRowCount, 
expectedAttributeMap),
 rowCount = Some(expectedRowCount),
 attributeStats = expectedAttributeMap)
-  assert(filter.stats(conf) == expectedStats)
+
+  val filterStats = filter.stats(conf)
+  assert(filterStats.sizeInBytes == expectedStats.sizeInBytes)
+  assert(filterStats.rowCount == expectedStats.rowCount)
+  val rowCountValue = filterStats.rowCount.getOrElse(0)
+  // check the output column stats if the row count is > 0.
+  // When row count is 0, the output is set to empty.
+  if (rowCountValue != 0) {
+// Need to check attributeStats one by one because we may have 
multiple output columns.
+// Due to update operation, the output columns may be in different 
order.
+expectedColStats.foreach { kv =>
+  val filterColumnStat = filterStats.attributeStats.get(kv._1).get
--- End diff --

Good point.  fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-04-03 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r109471156
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -550,6 +565,220 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator, including =, <=>, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of the given columns
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
+// TODO: It is difficult to support other binary comparisons for 
String/Binary
+// type without min/max and advanced statistics like histogram.
+logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
+return None
+  case _ =>
+}
+
+val colStatLeft = colStatsMap(attrLeft)
+val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
+  .asInstanceOf[NumericRange]
+val maxLeft = BigDecimal(statsRangeLeft.max)
+val minLeft = BigDecimal(statsRangeLeft.min)
+
+val colStatRight = colStatsMap(attrRight)
+val statsRangeRight = Range(colStatRight.min, colStatRight.max, 
attrRight.dataType)
+  .asInstanceOf[NumericRange]
+val maxRight = BigDecimal(statsRangeRight.max)
+val minRight = BigDecimal(statsRangeRight.min)
+
+// determine the overlapping degree between predicate range and 
column's range
+val allNotNull = (colStatLeft.nullCount == 0) && 
(colStatRight.nullCount == 0)
+val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
+  // Left < Right or Left <= Right
+  // - no overlap:
+  //  minRight   maxRight minLeft   maxLeft
+  // +--++-+--->
+  // - complete overlap: (If null values exists, we set it to partial 
overlap.)
+  //  minLeftmaxLeft  minRight  maxRight
+  // +--++-+--->
+  case _: LessThan =>
+(minLeft >= maxRight, (maxLeft < minRight) && allNotNull)
+  case _: LessThanOrEqual =>
+(minLeft > maxRight, (maxLeft <= minRight) && allNotNull)
+
+  // Left > Right or Left >= Right
+  // - no overlap:
+  //  minLeftmaxLeft  minRight  maxRight
+  // +--++-+--->
+  // - complete overlap: (If null values exists, we set it to partial 
overlap.)
+  //  minRight   maxRight minLeft   maxLeft
+  // +--++-+--->
+  case _: GreaterThan =>
+(maxLeft <= minRight, (minLeft > maxRight) && allNotNull)
+  case _: GreaterThanOrEqual =>
+(maxLeft < minRight, (minLeft >= maxRight) && allNotNull)
+
+  // Left = Right or Left <=> Right
+  // - no overlap:
+  //  minLeftmaxLeft  minRight  maxRight
+  // +--++-+--->
+  //  minRight   maxRight minLeft   maxLeft
+  // +

[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-04-02 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r109340165
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -550,6 +565,220 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator, including =, <=>, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of the given columns
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
+// TODO: It is difficult to support other binary comparisons for 
String/Binary
+// type without min/max and advanced statistics like histogram.
+logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
+return None
+  case _ =>
+}
+
+val colStatLeft = colStatsMap(attrLeft)
+val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
+  .asInstanceOf[NumericRange]
+val maxLeft = BigDecimal(statsRangeLeft.max)
+val minLeft = BigDecimal(statsRangeLeft.min)
+
+val colStatRight = colStatsMap(attrRight)
+val statsRangeRight = Range(colStatRight.min, colStatRight.max, 
attrRight.dataType)
+  .asInstanceOf[NumericRange]
+val maxRight = BigDecimal(statsRangeRight.max)
+val minRight = BigDecimal(statsRangeRight.min)
+
+// determine the overlapping degree between predicate range and 
column's range
+val allNotNull = (colStatLeft.nullCount == 0) && 
(colStatRight.nullCount == 0)
+val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
+  // Left < Right or Left <= Right
+  // - no overlap:
+  //  minRight   maxRight minLeft   maxLeft
+  // +--++-+--->
+  // - complete overlap: (If null values exists, we set it to partial 
overlap.)
+  //  minLeftmaxLeft  minRight  maxRight
+  // +--++-+--->
+  case _: LessThan =>
+(minLeft >= maxRight, (maxLeft < minRight) && allNotNull)
+  case _: LessThanOrEqual =>
+(minLeft > maxRight, (maxLeft <= minRight) && allNotNull)
+
+  // Left > Right or Left >= Right
+  // - no overlap:
+  //  minLeftmaxLeft  minRight  maxRight
+  // +--++-+--->
+  // - complete overlap: (If null values exists, we set it to partial 
overlap.)
+  //  minRight   maxRight minLeft   maxLeft
+  // +--++-+--->
+  case _: GreaterThan =>
+(maxLeft <= minRight, (minLeft > maxRight) && allNotNull)
+  case _: GreaterThanOrEqual =>
+(maxLeft < minRight, (minLeft >= maxRight) && allNotNull)
+
+  // Left = Right or Left <=> Right
+  // - no overlap:
+  //  minLeftmaxLeft  minRight  maxRight
+  // +--++-+--->
+  //  minRight   maxRight minLeft   maxLeft
+  // +

[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-04-02 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r109326608
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -550,6 +565,225 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator, including =, <=>, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of the given columns
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
+// TODO: It is difficult to support other binary comparisons for 
String/Binary
+// type without min/max and advanced statistics like histogram.
+logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
+return None
+  case _ =>
+}
+
+val colStatLeft = colStatsMap(attrLeft)
+val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
+  .asInstanceOf[NumericRange]
+val maxLeft = BigDecimal(statsRangeLeft.max)
+val minLeft = BigDecimal(statsRangeLeft.min)
+
+val colStatRight = colStatsMap(attrRight)
+val statsRangeRight = Range(colStatRight.min, colStatRight.max, 
attrRight.dataType)
+  .asInstanceOf[NumericRange]
+val maxRight = BigDecimal(statsRangeRight.max)
+val minRight = BigDecimal(statsRangeRight.min)
+
+// determine the overlapping degree between predicate range and 
column's range
+val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
+  // Left < Right or Left <= Right
+  // - no overlap:
+  //  minRight   maxRight minLeft   maxLeft
+  // +--++-+--->
+  // - complete overlap: (If null values exists, we set it to partial 
overlap.)
+  //  minLeftmaxLeft  minRight  maxRight
+  // +--++-+--->
+  case _: LessThan =>
+(minLeft >= maxRight,
+  maxLeft < minRight && colStatLeft.nullCount == 0 && 
colStatRight.nullCount == 0)
+  case _: LessThanOrEqual =>
+(minLeft > maxRight,
+  maxLeft <= minRight && colStatLeft.nullCount == 0 && 
colStatRight.nullCount == 0)
+
+  // Left > Right or Left >= Right
+  // - no overlap:
+  //  minLeftmaxLeft  minRight  maxRight
+  // +--++-+--->
+  // - complete overlap: (If null values exists, we set it to partial 
overlap.)
+  //  minRight   maxRight minLeft   maxLeft
+  // +--++-+--->
+  case _: GreaterThan =>
+(maxLeft <= minRight,
+  minLeft > maxRight && colStatLeft.nullCount == 0 && 
colStatRight.nullCount == 0)
+  case _: GreaterThanOrEqual =>
+(maxLeft < minRight,
+  minLeft >= maxRight && colStatLeft.nullCount == 0 && 
colStatRight.nullCount == 0)
+
+  // Left = Right or Left <=> Right
+  // - no overlap:
+  //  minLeftmaxLeft  minRight  maxRight
+  // +--++

[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-04-02 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r109323394
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -550,6 +565,225 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator, including =, <=>, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of the given columns
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
+// TODO: It is difficult to support other binary comparisons for 
String/Binary
+// type without min/max and advanced statistics like histogram.
+logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
+return None
+  case _ =>
+}
+
+val colStatLeft = colStatsMap(attrLeft)
+val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
+  .asInstanceOf[NumericRange]
+val maxLeft = BigDecimal(statsRangeLeft.max)
+val minLeft = BigDecimal(statsRangeLeft.min)
+
+val colStatRight = colStatsMap(attrRight)
+val statsRangeRight = Range(colStatRight.min, colStatRight.max, 
attrRight.dataType)
+  .asInstanceOf[NumericRange]
+val maxRight = BigDecimal(statsRangeRight.max)
+val minRight = BigDecimal(statsRangeRight.min)
+
+// determine the overlapping degree between predicate range and 
column's range
+val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
+  // Left < Right or Left <= Right
+  // - no overlap:
+  //  minRight   maxRight minLeft   maxLeft
+  // +--++-+--->
+  // - complete overlap: (If null values exists, we set it to partial 
overlap.)
+  //  minLeftmaxLeft  minRight  maxRight
+  // +--++-+--->
+  case _: LessThan =>
+(minLeft >= maxRight,
+  maxLeft < minRight && colStatLeft.nullCount == 0 && 
colStatRight.nullCount == 0)
+  case _: LessThanOrEqual =>
+(minLeft > maxRight,
+  maxLeft <= minRight && colStatLeft.nullCount == 0 && 
colStatRight.nullCount == 0)
+
+  // Left > Right or Left >= Right
+  // - no overlap:
+  //  minLeftmaxLeft  minRight  maxRight
+  // +--++-+--->
+  // - complete overlap: (If null values exists, we set it to partial 
overlap.)
+  //  minRight   maxRight minLeft   maxLeft
+  // +--++-+--->
+  case _: GreaterThan =>
+(maxLeft <= minRight,
+  minLeft > maxRight && colStatLeft.nullCount == 0 && 
colStatRight.nullCount == 0)
+  case _: GreaterThanOrEqual =>
+(maxLeft < minRight,
+  minLeft >= maxRight && colStatLeft.nullCount == 0 && 
colStatRight.nullCount == 0)
+
+  // Left = Right or Left <=> Right
+  // - no overlap:
+  //  minLeftmaxLeft  minRight  maxRight
+  // +--++

[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-04-02 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r109323397
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -550,6 +565,225 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator, including =, <=>, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of the given columns
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
+// TODO: It is difficult to support other binary comparisons for 
String/Binary
+// type without min/max and advanced statistics like histogram.
+logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
+return None
+  case _ =>
+}
+
+val colStatLeft = colStatsMap(attrLeft)
+val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
+  .asInstanceOf[NumericRange]
+val maxLeft = BigDecimal(statsRangeLeft.max)
+val minLeft = BigDecimal(statsRangeLeft.min)
+
+val colStatRight = colStatsMap(attrRight)
+val statsRangeRight = Range(colStatRight.min, colStatRight.max, 
attrRight.dataType)
+  .asInstanceOf[NumericRange]
+val maxRight = BigDecimal(statsRangeRight.max)
+val minRight = BigDecimal(statsRangeRight.min)
+
+// determine the overlapping degree between predicate range and 
column's range
+val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
+  // Left < Right or Left <= Right
+  // - no overlap:
+  //  minRight   maxRight minLeft   maxLeft
+  // +--++-+--->
+  // - complete overlap: (If null values exists, we set it to partial 
overlap.)
+  //  minLeftmaxLeft  minRight  maxRight
+  // +--++-+--->
+  case _: LessThan =>
+(minLeft >= maxRight,
+  maxLeft < minRight && colStatLeft.nullCount == 0 && 
colStatRight.nullCount == 0)
--- End diff --

fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-04-02 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r109323376
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -550,6 +565,225 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator, including =, <=>, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of the given columns
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
+// TODO: It is difficult to support other binary comparisons for 
String/Binary
+// type without min/max and advanced statistics like histogram.
+logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
+return None
+  case _ =>
+}
+
+val colStatLeft = colStatsMap(attrLeft)
+val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
+  .asInstanceOf[NumericRange]
+val maxLeft = BigDecimal(statsRangeLeft.max)
+val minLeft = BigDecimal(statsRangeLeft.min)
+
+val colStatRight = colStatsMap(attrRight)
+val statsRangeRight = Range(colStatRight.min, colStatRight.max, 
attrRight.dataType)
+  .asInstanceOf[NumericRange]
+val maxRight = BigDecimal(statsRangeRight.max)
+val minRight = BigDecimal(statsRangeRight.min)
+
+// determine the overlapping degree between predicate range and 
column's range
+val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
+  // Left < Right or Left <= Right
+  // - no overlap:
+  //  minRight   maxRight minLeft   maxLeft
+  // +--++-+--->
+  // - complete overlap: (If null values exists, we set it to partial 
overlap.)
+  //  minLeftmaxLeft  minRight  maxRight
+  // +--++-+--->
+  case _: LessThan =>
+(minLeft >= maxRight,
+  maxLeft < minRight && colStatLeft.nullCount == 0 && 
colStatRight.nullCount == 0)
+  case _: LessThanOrEqual =>
+(minLeft > maxRight,
+  maxLeft <= minRight && colStatLeft.nullCount == 0 && 
colStatRight.nullCount == 0)
+
+  // Left > Right or Left >= Right
+  // - no overlap:
+  //  minLeftmaxLeft  minRight  maxRight
+  // +--++-+--->
+  // - complete overlap: (If null values exists, we set it to partial 
overlap.)
+  //  minRight   maxRight minLeft   maxLeft
+  // +--++-+--->
+  case _: GreaterThan =>
+(maxLeft <= minRight,
+  minLeft > maxRight && colStatLeft.nullCount == 0 && 
colStatRight.nullCount == 0)
+  case _: GreaterThanOrEqual =>
+(maxLeft < minRight,
+  minLeft >= maxRight && colStatLeft.nullCount == 0 && 
colStatRight.nullCount == 0)
+
+  // Left = Right or Left <=> Right
+  // - no overlap:
+  //  minLeftmaxLeft  minRight  maxRight
+  // +--++

[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-04-01 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r109293607
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -550,6 +565,220 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator, including =, <=>, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of the given columns
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
+// TODO: It is difficult to support other binary comparisons for 
String/Binary
+// type without min/max and advanced statistics like histogram.
+logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
+return None
+  case _ =>
+}
+
+val colStatLeft = colStatsMap(attrLeft)
+val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
+  .asInstanceOf[NumericRange]
+val maxLeft = BigDecimal(statsRangeLeft.max)
+val minLeft = BigDecimal(statsRangeLeft.min)
+
+val colStatRight = colStatsMap(attrRight)
+val statsRangeRight = Range(colStatRight.min, colStatRight.max, 
attrRight.dataType)
+  .asInstanceOf[NumericRange]
+val maxRight = BigDecimal(statsRangeRight.max)
+val minRight = BigDecimal(statsRangeRight.min)
+
+// determine the overlapping degree between predicate range and 
column's range
+val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
+  // Left < Right or Left <= Right
+  // - no overlap:
+  //  minRight   maxRight minLeft   maxLeft
+  // 0 --+--++-+--->
+  // - complete overlap:
+  //  minLeftmaxLeft  minRight  maxRight
+  // 0 --+--++-+--->
+  case _: LessThan =>
+(minLeft >= maxRight, maxLeft < minRight)
+  case _: LessThanOrEqual =>
+(minLeft > maxRight, maxLeft <= minRight)
+
+  // Left > Right or Left >= Right
+  // - no overlap:
+  //  minLeftmaxLeft  minRight  maxRight
+  // 0 --+--++-+--->
+  // - complete overlap:
+  //  minRight   maxRight minLeft   maxLeft
--- End diff --

Good point.  fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-03-31 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r109273331
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -550,6 +565,140 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator such as =, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of the given columns
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
+// TODO: It is difficult to support other binary comparisons for 
String/Binary
+// type without min/max and advanced statistics like histogram.
+logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
+return None
+  case _ =>
+}
+
+val colStatLeft = colStatsMap(attrLeft)
+val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
+  .asInstanceOf[NumericRange]
+val maxLeft = BigDecimal(statsRangeLeft.max)
+val minLeft = BigDecimal(statsRangeLeft.min)
+val ndvLeft = BigDecimal(colStatLeft.distinctCount)
+
+val colStatRight = colStatsMap(attrRight)
+val statsRangeRight = Range(colStatRight.min, colStatRight.max, 
attrRight.dataType)
+  .asInstanceOf[NumericRange]
+val maxRight = BigDecimal(statsRangeRight.max)
+val minRight = BigDecimal(statsRangeRight.min)
+val ndvRight = BigDecimal(colStatRight.distinctCount)
+
+// determine the overlapping degree between predicate range and 
column's range
+val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
+  case _: LessThan =>
+(minLeft >= maxRight, maxLeft < minRight)
+  case _: LessThanOrEqual =>
+(minLeft > maxRight, maxLeft <= minRight)
+  case _: GreaterThan =>
+(maxLeft <= minRight, minLeft > maxRight)
+  case _: GreaterThanOrEqual =>
+(maxLeft < minRight, minLeft >= maxRight)
+  case _: EqualTo =>
+((maxLeft < minRight) || (maxRight < minLeft),
+  (minLeft == minRight) && (maxLeft == maxRight))
+  case _: EqualNullSafe =>
+// For null-safe equality, we use a very restrictive condition to 
evaluate its overlap.
+// If null values exists, we set it to partial overlap.
+(((maxLeft < minRight) || (maxRight < minLeft))
+&& colStatLeft.nullCount == 0 && colStatRight.nullCount == 0,
+  ((minLeft == minRight) && (maxLeft == maxRight))
+&& colStatLeft.nullCount == 0 && colStatRight.nullCount == 0
+)
+}
+
+var percent = BigDecimal(1.0)
+if (noOverlap) {
+  percent = 0.0
+} else if (completeOverlap) {
+  percent = 1.0
+} else {
+  // For partial overlap, we use an empirical value 1/3 as suggested 
by the book
+  // "Database Systems, the complete book".
+  percent = 1.0/3.0
+
+  if (update) {
+// Need to adjust new min/max after the filter condition is applied
+
+val ndvLeft = BigDecimal(colStatLeft.distinctCount)
+var newNdvLeft = (ndvLeft * percent).setScale(0, 
RoundingMode.HALF_UP).toBigInt()
+if (newNdvLeft < 1) new

[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-03-31 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r109273334
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -550,6 +565,140 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator such as =, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of the given columns
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
+// TODO: It is difficult to support other binary comparisons for 
String/Binary
+// type without min/max and advanced statistics like histogram.
+logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
+return None
+  case _ =>
+}
+
+val colStatLeft = colStatsMap(attrLeft)
+val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
+  .asInstanceOf[NumericRange]
+val maxLeft = BigDecimal(statsRangeLeft.max)
+val minLeft = BigDecimal(statsRangeLeft.min)
+val ndvLeft = BigDecimal(colStatLeft.distinctCount)
+
+val colStatRight = colStatsMap(attrRight)
+val statsRangeRight = Range(colStatRight.min, colStatRight.max, 
attrRight.dataType)
+  .asInstanceOf[NumericRange]
+val maxRight = BigDecimal(statsRangeRight.max)
+val minRight = BigDecimal(statsRangeRight.min)
+val ndvRight = BigDecimal(colStatRight.distinctCount)
+
+// determine the overlapping degree between predicate range and 
column's range
+val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
+  case _: LessThan =>
+(minLeft >= maxRight, maxLeft < minRight)
+  case _: LessThanOrEqual =>
+(minLeft > maxRight, maxLeft <= minRight)
+  case _: GreaterThan =>
+(maxLeft <= minRight, minLeft > maxRight)
+  case _: GreaterThanOrEqual =>
+(maxLeft < minRight, minLeft >= maxRight)
+  case _: EqualTo =>
+((maxLeft < minRight) || (maxRight < minLeft),
+  (minLeft == minRight) && (maxLeft == maxRight))
+  case _: EqualNullSafe =>
+// For null-safe equality, we use a very restrictive condition to 
evaluate its overlap.
+// If null values exists, we set it to partial overlap.
+(((maxLeft < minRight) || (maxRight < minLeft))
+&& colStatLeft.nullCount == 0 && colStatRight.nullCount == 0,
+  ((minLeft == minRight) && (maxLeft == maxRight))
+&& colStatLeft.nullCount == 0 && colStatRight.nullCount == 0
+)
+}
+
+var percent = BigDecimal(1.0)
+if (noOverlap) {
+  percent = 0.0
+} else if (completeOverlap) {
+  percent = 1.0
+} else {
+  // For partial overlap, we use an empirical value 1/3 as suggested 
by the book
+  // "Database Systems, the complete book".
+  percent = 1.0/3.0
+
+  if (update) {
+// Need to adjust new min/max after the filter condition is applied
+
+val ndvLeft = BigDecimal(colStatLeft.distinctCount)
+var newNdvLeft = (ndvLeft * percent).setScale(0, 
RoundingMode.HALF_UP).toBigInt()
+if (newNdvLeft < 1) new

[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-03-31 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r109273326
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -550,6 +565,140 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator such as =, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of the given columns
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
+// TODO: It is difficult to support other binary comparisons for 
String/Binary
+// type without min/max and advanced statistics like histogram.
+logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
+return None
+  case _ =>
+}
+
+val colStatLeft = colStatsMap(attrLeft)
+val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
+  .asInstanceOf[NumericRange]
+val maxLeft = BigDecimal(statsRangeLeft.max)
+val minLeft = BigDecimal(statsRangeLeft.min)
+val ndvLeft = BigDecimal(colStatLeft.distinctCount)
+
+val colStatRight = colStatsMap(attrRight)
+val statsRangeRight = Range(colStatRight.min, colStatRight.max, 
attrRight.dataType)
+  .asInstanceOf[NumericRange]
+val maxRight = BigDecimal(statsRangeRight.max)
+val minRight = BigDecimal(statsRangeRight.min)
+val ndvRight = BigDecimal(colStatRight.distinctCount)
+
+// determine the overlapping degree between predicate range and 
column's range
+val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
+  case _: LessThan =>
+(minLeft >= maxRight, maxLeft < minRight)
+  case _: LessThanOrEqual =>
+(minLeft > maxRight, maxLeft <= minRight)
+  case _: GreaterThan =>
+(maxLeft <= minRight, minLeft > maxRight)
+  case _: GreaterThanOrEqual =>
+(maxLeft < minRight, minLeft >= maxRight)
+  case _: EqualTo =>
+((maxLeft < minRight) || (maxRight < minLeft),
+  (minLeft == minRight) && (maxLeft == maxRight))
+  case _: EqualNullSafe =>
+// For null-safe equality, we use a very restrictive condition to 
evaluate its overlap.
+// If null values exists, we set it to partial overlap.
+(((maxLeft < minRight) || (maxRight < minLeft))
+&& colStatLeft.nullCount == 0 && colStatRight.nullCount == 0,
+  ((minLeft == minRight) && (maxLeft == maxRight))
+&& colStatLeft.nullCount == 0 && colStatRight.nullCount == 0
+)
+}
+
+var percent = BigDecimal(1.0)
+if (noOverlap) {
+  percent = 0.0
+} else if (completeOverlap) {
+  percent = 1.0
+} else {
+  // For partial overlap, we use an empirical value 1/3 as suggested 
by the book
+  // "Database Systems, the complete book".
+  percent = 1.0/3.0
+
+  if (update) {
+// Need to adjust new min/max after the filter condition is applied
+
+val ndvLeft = BigDecimal(colStatLeft.distinctCount)
+var newNdvLeft = (ndvLeft * percent).setScale(0, 
RoundingMode.HALF_UP).toBigInt()
+if (newNdvLeft < 1) new

[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-03-31 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r109268076
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -550,6 +565,140 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator such as =, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of the given columns
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
+// TODO: It is difficult to support other binary comparisons for 
String/Binary
+// type without min/max and advanced statistics like histogram.
+logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
+return None
+  case _ =>
+}
+
+val colStatLeft = colStatsMap(attrLeft)
+val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
+  .asInstanceOf[NumericRange]
+val maxLeft = BigDecimal(statsRangeLeft.max)
+val minLeft = BigDecimal(statsRangeLeft.min)
+val ndvLeft = BigDecimal(colStatLeft.distinctCount)
+
+val colStatRight = colStatsMap(attrRight)
+val statsRangeRight = Range(colStatRight.min, colStatRight.max, 
attrRight.dataType)
+  .asInstanceOf[NumericRange]
+val maxRight = BigDecimal(statsRangeRight.max)
+val minRight = BigDecimal(statsRangeRight.min)
+val ndvRight = BigDecimal(colStatRight.distinctCount)
+
+// determine the overlapping degree between predicate range and 
column's range
+val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
+  case _: LessThan =>
--- End diff --

The graphical  comments are helpful.  Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-03-31 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r109267675
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -550,6 +565,140 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator such as =, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of the given columns
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
+// TODO: It is difficult to support other binary comparisons for 
String/Binary
+// type without min/max and advanced statistics like histogram.
+logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
+return None
+  case _ =>
+}
+
+val colStatLeft = colStatsMap(attrLeft)
+val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
+  .asInstanceOf[NumericRange]
+val maxLeft = BigDecimal(statsRangeLeft.max)
+val minLeft = BigDecimal(statsRangeLeft.min)
+val ndvLeft = BigDecimal(colStatLeft.distinctCount)
--- End diff --

Good catch.  fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17415: [SPARK-19408][SQL] filter estimation on two columns of s...

2017-03-30 Thread ron8hu
Github user ron8hu commented on the issue:

https://github.com/apache/spark/pull/17415
  
cc @sameeragarwal @cloud-fan @gatorsmile @wzhfy  After a few round of 
changes and commits, this PR should be in good shape.  If we can include in 
Spark 2.2, then it can help tpc-h queries.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-03-29 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r108754614
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -515,8 +530,138 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator such as =, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of a given column
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
+// TODO: It is difficult to support other binary comparisons for 
String/Binary
+// type without min/max and advanced statistics like histogram.
+logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
+return None
+  case _ =>
+}
+
+val colStatLeft = colStatsMap(attrLeft)
+val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
+  .asInstanceOf[NumericRange]
+val maxLeft = BigDecimal(statsRangeLeft.max)
+val minLeft = BigDecimal(statsRangeLeft.min)
+val ndvLeft = BigDecimal(colStatLeft.distinctCount)
+
+val colStatRight = colStatsMap(attrRight)
+val statsRangeRight = Range(colStatRight.min, colStatRight.max, 
attrRight.dataType)
+  .asInstanceOf[NumericRange]
+val maxRight = BigDecimal(statsRangeRight.max)
+val minRight = BigDecimal(statsRangeRight.min)
+val ndvRight = BigDecimal(colStatRight.distinctCount)
+
+// determine the overlapping degree between predicate range and 
column's range
+val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
+  case _: EqualTo =>
+((maxLeft < minRight) || (maxRight < minLeft),
+  (minLeft == minRight) && (maxLeft == maxRight))
+  case _: LessThan =>
+(minLeft >= maxRight, maxLeft <= minRight)
+  case _: LessThanOrEqual =>
+(minLeft >= maxRight, maxLeft <= minRight)
+  case _: GreaterThan =>
+(maxLeft <= minRight, minLeft >= maxRight)
--- End diff --

I think it should be (maxLeft < minRight, minLeft > maxRight)
For no-overlap case, the condition should be "maxLeft < minRight" because 
we do not want any intersection point.  It is a little bit complex.  This best 
way is to draw a diagram to show its relationship.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-03-29 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r108753830
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -515,8 +530,138 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator such as =, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of a given column
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
+// TODO: It is difficult to support other binary comparisons for 
String/Binary
+// type without min/max and advanced statistics like histogram.
+logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
+return None
+  case _ =>
+}
+
+val colStatLeft = colStatsMap(attrLeft)
+val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
+  .asInstanceOf[NumericRange]
+val maxLeft = BigDecimal(statsRangeLeft.max)
+val minLeft = BigDecimal(statsRangeLeft.min)
+val ndvLeft = BigDecimal(colStatLeft.distinctCount)
+
+val colStatRight = colStatsMap(attrRight)
+val statsRangeRight = Range(colStatRight.min, colStatRight.max, 
attrRight.dataType)
+  .asInstanceOf[NumericRange]
+val maxRight = BigDecimal(statsRangeRight.max)
+val minRight = BigDecimal(statsRangeRight.min)
+val ndvRight = BigDecimal(colStatRight.distinctCount)
+
+// determine the overlapping degree between predicate range and 
column's range
+val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
+  case _: EqualTo =>
+((maxLeft < minRight) || (maxRight < minLeft),
+  (minLeft == minRight) && (maxLeft == maxRight))
+  case _: LessThan =>
+(minLeft >= maxRight, maxLeft <= minRight)
--- End diff --

I think it should be
(minLeft > maxRight,  maxLeft < minRight)
For no-overlap case, the condition should be "minLeft > maxRight" because 
we do not want any intersection point.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-03-29 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r108752975
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -515,8 +530,138 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator such as =, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of a given column
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
+// TODO: It is difficult to support other binary comparisons for 
String/Binary
+// type without min/max and advanced statistics like histogram.
+logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
+return None
+  case _ =>
+}
+
+val colStatLeft = colStatsMap(attrLeft)
+val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
+  .asInstanceOf[NumericRange]
+val maxLeft = BigDecimal(statsRangeLeft.max)
+val minLeft = BigDecimal(statsRangeLeft.min)
+val ndvLeft = BigDecimal(colStatLeft.distinctCount)
+
+val colStatRight = colStatsMap(attrRight)
+val statsRangeRight = Range(colStatRight.min, colStatRight.max, 
attrRight.dataType)
+  .asInstanceOf[NumericRange]
+val maxRight = BigDecimal(statsRangeRight.max)
+val minRight = BigDecimal(statsRangeRight.min)
+val ndvRight = BigDecimal(colStatRight.distinctCount)
+
+// determine the overlapping degree between predicate range and 
column's range
+val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
+  case _: EqualTo =>
+((maxLeft < minRight) || (maxRight < minLeft),
+  (minLeft == minRight) && (maxLeft == maxRight))
+  case _: LessThan =>
+(minLeft >= maxRight, maxLeft <= minRight)
+  case _: LessThanOrEqual =>
+(minLeft >= maxRight, maxLeft <= minRight)
+  case _: GreaterThan =>
+(maxLeft <= minRight, minLeft >= maxRight)
+  case _: GreaterThanOrEqual =>
+(maxLeft < minRight, minLeft > maxRight)
--- End diff --

Good catch.  Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-03-29 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r108751882
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -515,8 +530,138 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator such as =, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of a given column
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
+// TODO: It is difficult to support other binary comparisons for 
String/Binary
+// type without min/max and advanced statistics like histogram.
+logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
+return None
+  case _ =>
+}
+
+val colStatLeft = colStatsMap(attrLeft)
+val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
+  .asInstanceOf[NumericRange]
+val maxLeft = BigDecimal(statsRangeLeft.max)
+val minLeft = BigDecimal(statsRangeLeft.min)
+val ndvLeft = BigDecimal(colStatLeft.distinctCount)
+
+val colStatRight = colStatsMap(attrRight)
+val statsRangeRight = Range(colStatRight.min, colStatRight.max, 
attrRight.dataType)
+  .asInstanceOf[NumericRange]
+val maxRight = BigDecimal(statsRangeRight.max)
+val minRight = BigDecimal(statsRangeRight.min)
+val ndvRight = BigDecimal(colStatRight.distinctCount)
+
+// determine the overlapping degree between predicate range and 
column's range
+val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
+  case _: EqualTo =>
+((maxLeft < minRight) || (maxRight < minLeft),
+  (minLeft == minRight) && (maxLeft == maxRight))
+  case _: LessThan =>
+(minLeft >= maxRight, maxLeft <= minRight)
+  case _: LessThanOrEqual =>
+(minLeft >= maxRight, maxLeft <= minRight)
--- End diff --

Good catch.  Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-03-28 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r108583109
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -515,8 +530,135 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator such as =, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of a given column
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
--- End diff --

The current code is written in such a way that we do not have too deep 
indentation.  Some engineers do not like deep indentation as they often put 
screen monitor vertically.
Let's handle it when the need occurs.  I think, with good test case 
coverage, we will be able to catch anything we miss.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-03-28 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r108582594
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -515,8 +530,135 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator such as =, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of a given column
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
+// TODO: It is difficult to support other binary comparisons for 
String/Binary
+// type without min/max and advanced statistics like histogram.
+logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
+return None
+  case _ =>
+}
+
+val colStatLeft = colStatsMap(attrLeft)
+val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
+  .asInstanceOf[NumericRange]
+val maxLeft = BigDecimal(statsRangeLeft.max)
+val minLeft = BigDecimal(statsRangeLeft.min)
+val ndvLeft = BigDecimal(colStatLeft.distinctCount)
+
+val colStatRight = colStatsMap(attrRight)
+val statsRangeRight = Range(colStatRight.min, colStatRight.max, 
attrRight.dataType)
+  .asInstanceOf[NumericRange]
+val maxRight = BigDecimal(statsRangeRight.max)
+val minRight = BigDecimal(statsRangeRight.min)
+val ndvRight = BigDecimal(colStatRight.distinctCount)
+
+// determine the overlapping degree between predicate range and 
column's range
+val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
+  case _: EqualTo =>
+((maxLeft < minRight) || (maxRight < minLeft),
+  (minLeft == minRight) && (maxLeft == maxRight))
+  case _: LessThan =>
+(minLeft >= maxRight, maxLeft <= minRight)
+  case _: LessThanOrEqual =>
+(minLeft >= maxRight, maxLeft <= minRight)
+  case _: GreaterThan =>
+(maxLeft <= minRight, minLeft >= maxRight)
+  case _: GreaterThanOrEqual =>
+(maxLeft < minRight, minLeft > maxRight)
+}
--- End diff --

Good catch.  Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-03-28 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r108582540
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -509,8 +524,131 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator such as =, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of a given column
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
+// TODO: It is difficult to support other binary comparisons for 
String/Binary
+// type without min/max and advanced statistics like histogram.
+logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
+return None
+  case _ =>
+}
+
+val colStatLeft = colStatsMap(attrLeft)
+val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
+  .asInstanceOf[NumericRange]
+val maxLeft = BigDecimal(statsRangeLeft.max)
+val minLeft = BigDecimal(statsRangeLeft.min)
+val ndvLeft = BigDecimal(colStatLeft.distinctCount)
+
+val colStatRight = colStatsMap(attrRight)
+val statsRangeRight = Range(colStatRight.min, colStatRight.max, 
attrRight.dataType)
+  .asInstanceOf[NumericRange]
+val maxRight = BigDecimal(statsRangeRight.max)
+val minRight = BigDecimal(statsRangeRight.min)
+val ndvRight = BigDecimal(colStatRight.distinctCount)
+
+// determine the overlapping degree between predicate range and 
column's range
+val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
+  case _: EqualTo =>
--- End diff --

I just revised the code to handle EqualNullSafe separately from EqualTo.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17446: [SPARK-17075][SQL][followup] Add Estimation of Constant ...

2017-03-27 Thread ron8hu
Github user ron8hu commented on the issue:

https://github.com/apache/spark/pull/17446
  
The logic is straightforward.  LGTM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-03-27 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r108308949
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -509,8 +524,131 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator such as =, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of a given column
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
+// TODO: It is difficult to support other binary comparisons for 
String/Binary
+// type without min/max and advanced statistics like histogram.
+logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
+return None
+  case _ =>
+}
+
+val colStatLeft = colStatsMap(attrLeft)
+val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
+  .asInstanceOf[NumericRange]
+val maxLeft = BigDecimal(statsRangeLeft.max)
+val minLeft = BigDecimal(statsRangeLeft.min)
+val ndvLeft = BigDecimal(colStatLeft.distinctCount)
+
+val colStatRight = colStatsMap(attrRight)
+val statsRangeRight = Range(colStatRight.min, colStatRight.max, 
attrRight.dataType)
+  .asInstanceOf[NumericRange]
+val maxRight = BigDecimal(statsRangeRight.max)
+val minRight = BigDecimal(statsRangeRight.min)
+val ndvRight = BigDecimal(colStatRight.distinctCount)
+
+// determine the overlapping degree between predicate range and 
column's range
+val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
+  case _: EqualTo =>
+((maxLeft < minRight) || (maxRight < minLeft),
+  (minLeft == minRight) && (maxLeft == maxRight))
+  case _: LessThan =>
+(minLeft >= maxRight, maxLeft <= minRight)
+  case _: LessThanOrEqual =>
+(minLeft >= maxRight, maxLeft <= minRight)
+  case _: GreaterThan =>
+(maxLeft <= minRight, minLeft >= maxRight)
+  case _: GreaterThanOrEqual =>
+(maxLeft < minRight, minLeft > maxRight)
+}
+
+var percent = BigDecimal(1.0)
+if (noOverlap) {
+  percent = 0.0
+} else if (completeOverlap) {
+  percent = 1.0
+} else {
+  // For partial overlap, we use an empirical value 1/3 as suggested 
by the book
+  // "Database Systems, the complete book".
+  percent = 1.0/3.0
+
+  if (update) {
+// Need to adjust new min/max after the filter condition is applied
+
+val ndv = BigDecimal(colStatLeft.distinctCount)
+var newNdv = (ndv * percent).setScale(0, 
RoundingMode.HALF_UP).toBigInt()
--- End diff --

Good point.  Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-03-27 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r108307672
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
 ---
@@ -381,7 +461,22 @@ class FilterEstimationSuite extends 
StatsEstimationTestBase {
 sizeInBytes = getOutputSize(filter.output, expectedRowCount, 
expectedAttributeMap),
 rowCount = Some(expectedRowCount),
 attributeStats = expectedAttributeMap)
-  assert(filter.stats(conf) == expectedStats)
+
+  val filterStats = filter.stats(conf)
+  assert(filterStats.sizeInBytes == expectedStats.sizeInBytes)
+  assert(filterStats.rowCount == expectedStats.rowCount)
+  val rowCountValue = filterStats.rowCount.getOrElse(0)
+  // check the output column stats if the row count is > 0.
+  // When row count is 0, the output is set to empty.
+  if (rowCountValue != 0) {
+// Need to check attributeStats one by one because we may have 
multiple output columns.
+// Due to update operation, the output columns may be in different 
order.
+expectedColStats.foreach { kv =>
+  val filterColumnStat = filterStats.attributeStats.get(kv._1).get
+  assert(filterColumnStat == kv._2)
+}
+  }
+  // assert(filter.stats(conf) == expectedStats)
--- End diff --

My bad.  I should remove the line that has been commented out.  This line 
is replaced by the following code:
  if (rowCountValue != 0) {
// Need to check attributeStats one by one because we may have 
multiple output columns.
// Due to update operation, the output columns may be in different 
order.
expectedColStats.foreach { kv =>
  val filterColumnStat = filterStats.attributeStats.get(kv._1).get
  assert(filterColumnStat == kv._2)
} 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-03-27 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r108307490
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
 ---
@@ -381,7 +461,22 @@ class FilterEstimationSuite extends 
StatsEstimationTestBase {
 sizeInBytes = getOutputSize(filter.output, expectedRowCount, 
expectedAttributeMap),
 rowCount = Some(expectedRowCount),
 attributeStats = expectedAttributeMap)
-  assert(filter.stats(conf) == expectedStats)
+
+  val filterStats = filter.stats(conf)
+  assert(filterStats.sizeInBytes == expectedStats.sizeInBytes)
+  assert(filterStats.rowCount == expectedStats.rowCount)
+  val rowCountValue = filterStats.rowCount.getOrElse(0)
+  // check the output column stats if the row count is > 0.
+  // When row count is 0, the output is set to empty.
+  if (rowCountValue != 0) {
+// Need to check attributeStats one by one because we may have 
multiple output columns.
+// Due to update operation, the output columns may be in different 
order.
+expectedColStats.foreach { kv =>
+  val filterColumnStat = filterStats.attributeStats.get(kv._1).get
+  assert(filterColumnStat == kv._2)
+}
+  }
+  // assert(filter.stats(conf) == expectedStats)
--- End diff --

My bad.  I should remove the line that has been commented out.  This line 
is replaced by the following code:
  if (rowCountValue != 0) {
// Need to check attributeStats one by one because we may have 
multiple output columns.
// Due to update operation, the output columns may be in different 
order.
expectedColStats.foreach { kv =>
  val filterColumnStat = filterStats.attributeStats.get(kv._1).get
  assert(filterColumnStat == kv._2)
}



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-03-27 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17415#discussion_r108246637
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -509,8 +524,131 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 Some(percent.toDouble)
   }
 
+  /**
+   * Returns a percentage of rows meeting a binary comparison expression 
containing two columns.
+   * In SQL queries, we also see predicate expressions involving two 
columns
+   * such as "column-1 (op) column-2" where column-1 and column-2 belong 
to same table.
+   * Note that, if column-1 and column-2 belong to different tables, then 
it is a join
+   * operator's work, NOT a filter operator's work.
+   *
+   * @param op a binary comparison operator such as =, <, <=, >, >=
+   * @param attrLeft the left Attribute (or a column)
+   * @param attrRight the right Attribute (or a column)
+   * @param update a boolean flag to specify if we need to update 
ColumnStat of a given column
+   *   for subsequent conditions
+   * @return an optional double value to show the percentage of rows 
meeting a given condition
+   */
+  def evaluateBinaryForTwoColumns(
+  op: BinaryComparison,
+  attrLeft: Attribute,
+  attrRight: Attribute,
+  update: Boolean): Option[Double] = {
+
+if (!colStatsMap.contains(attrLeft)) {
+  logDebug("[CBO] No statistics for " + attrLeft)
+  return None
+}
+if (!colStatsMap.contains(attrRight)) {
+  logDebug("[CBO] No statistics for " + attrRight)
+  return None
+}
+
+attrLeft.dataType match {
+  case StringType | BinaryType =>
+// TODO: It is difficult to support other binary comparisons for 
String/Binary
+// type without min/max and advanced statistics like histogram.
+logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
+return None
+  case _ =>
+}
+
+val colStatLeft = colStatsMap(attrLeft)
+val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
+  .asInstanceOf[NumericRange]
+val maxLeft = BigDecimal(statsRangeLeft.max)
+val minLeft = BigDecimal(statsRangeLeft.min)
+val ndvLeft = BigDecimal(colStatLeft.distinctCount)
+
+val colStatRight = colStatsMap(attrRight)
+val statsRangeRight = Range(colStatRight.min, colStatRight.max, 
attrRight.dataType)
+  .asInstanceOf[NumericRange]
+val maxRight = BigDecimal(statsRangeRight.max)
+val minRight = BigDecimal(statsRangeRight.min)
+val ndvRight = BigDecimal(colStatRight.distinctCount)
+
+// determine the overlapping degree between predicate range and 
column's range
+val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
+  case _: EqualTo =>
--- End diff --

The left side has a range [minLeft, maxLeft].  The right side has a range 
[minRight, maxRight].  If the range overlaps, then we assume that they have a 
complete overlap.  Without detailed/advanced statistics, this is the best 
estimate we can get.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17415: [SPARK-19408][SQL] filter estimation on two columns of s...

2017-03-24 Thread ron8hu
Github user ron8hu commented on the issue:

https://github.com/apache/spark/pull/17415
  
cc @sameeragarwal @cloud-fan @gatorsmile  This Jira is not on Spark 2.2 
blocker list.  If time permits, we can include it in Spark 2.2.  If not, we can 
wait for a maintenance release.  Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17415: [SPARK-19408][SQL] filter estimation on two colum...

2017-03-24 Thread ron8hu
GitHub user ron8hu opened a pull request:

https://github.com/apache/spark/pull/17415

[SPARK-19408][SQL] filter estimation on two columns of same table

## What changes were proposed in this pull request?

In SQL queries, we also see predicate expressions involving two columns 
such as "column-1 (op) column-2" where column-1 and column-2 belong to same 
table. Note that, if column-1 and column-2 belong to different tables, then it 
is a join operator's work, NOT a filter operator's work.  This PR estimates 
filter selectivity on two columns of same table.

## How was this patch tested?

We added 6 new test cases to test various logical predicates involving two 
columns of same table.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ron8hu/spark filterTwoColumns

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17415.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17415


commit 893066905b690c78a127eae58b908dff1dabf7cf
Author: Ron Hu <ron...@huawei.com>
Date:   2017-03-24T20:31:35Z

filter estimation on two columns of same table




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15363: [SPARK-17791][SQL] Join reordering using star sch...

2017-03-17 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/15363#discussion_r106766281
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -20,19 +20,347 @@ package org.apache.spark.sql.catalyst.optimizer
 import scala.annotation.tailrec
 
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
+import 
org.apache.spark.sql.catalyst.planning.{ExtractFiltersAndInnerJoins, 
PhysicalOperation}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.catalyst.CatalystConf
+
+/**
+ * Encapsulates star-schema join detection.
+ */
+case class StarSchemaDetection(conf: CatalystConf) extends PredicateHelper 
{
+
+  /**
+   * Star schema consists of one or more fact tables referencing a number 
of dimension
+   * tables. In general, star-schema joins are detected using the 
following conditions:
+   *  1. Informational RI constraints (reliable detection)
+   *+ Dimension contains a primary key that is being joined to the 
fact table.
+   *+ Fact table contains foreign keys referencing multiple dimension 
tables.
+   *  2. Cardinality based heuristics
+   *+ Usually, the table with the highest cardinality is the fact 
table.
+   *+ Table being joined with the most number of tables is the fact 
table.
+   *
+   * To detect star joins, the algorithm uses a combination of the above 
two conditions.
+   * The fact table is chosen based on the cardinality heuristics, and the 
dimension
+   * tables are chosen based on the RI constraints. A star join will 
consist of the largest
+   * fact table joined with the dimension tables on their primary keys. To 
detect that a
+   * column is a primary key, the algorithm uses table and column 
statistics.
+   *
+   * Since Catalyst only supports left-deep tree plans, the algorithm 
currently returns only
+   * the star join with the largest fact table. Choosing the largest fact 
table on the
+   * driving arm to avoid large inners is in general a good heuristic. 
This restriction can
+   * be lifted with support for bushy tree plans.
+   *
+   * The highlights of the algorithm are the following:
+   *
+   * Given a set of joined tables/plans, the algorithm first verifies if 
they are eligible
+   * for star join detection. An eligible plan is a base table access with 
valid statistics.
+   * A base table access represents Project or Filter operators above a 
LeafNode. Conservatively,
+   * the algorithm only considers base table access as part of a star join 
since they provide
+   * reliable statistics.
+   *
+   * If some of the plans are not base table access, or statistics are not 
available, the algorithm
+   * returns an empty star join plan since, in the absence of statistics, 
it cannot make
+   * good planning decisions. Otherwise, the algorithm finds the table 
with the largest cardinality
+   * (number of rows), which is assumed to be a fact table.
+   *
+   * Next, it computes the set of dimension tables for the current fact 
table. A dimension table
+   * is assumed to be in a RI relationship with a fact table. To infer 
column uniqueness,
+   * the algorithm compares the number of distinct values with the total 
number of rows in the
+   * table. If their relative difference is within certain limits (i.e. 
ndvMaxError * 2, adjusted
+   * based on 1TB TPC-DS data), the column is assumed to be unique.
+   */
+  def findStarJoins(
+  input: Seq[LogicalPlan],
+  conditions: Seq[Expression]): Seq[Seq[LogicalPlan]] = {
+
+val emptyStarJoinPlan = Seq.empty[Seq[LogicalPlan]]
+
+if (!conf.starSchemaDetection || input.size < 2) {
+  emptyStarJoinPlan
+} else {
--- End diff --

} else if (conf.joinReorderEnabled) {
  emptyStarJoinPlan
} else {

When both configuration parameters joinReorderEnabled and 
starSchemaDetection are true, we want to avoid performing join reorder twice.  
There is no added value to perform join reorder twice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15363: [SPARK-17791][SQL] Join reordering using star sch...

2017-03-15 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/15363#discussion_r106285527
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -51,6 +51,11 @@ case class CostBasedJoinReorder(conf: CatalystConf) 
extends Rule[LogicalPlan] wi
 
   def reorder(plan: LogicalPlan, output: AttributeSet): LogicalPlan = {
 val (items, conditions) = extractInnerJoins(plan)
+// Find the star schema joins. Currently, it returns the star join 
with the largest
+// fact table. In the future, it can return more than one star join 
(e.g. F1-D1-D2
+// and F2-D3-D4).
+val starJoinPlans = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
--- End diff --

@ioana-delaney Thanks for the pointer.  We had a good exchange to clarify 
our points.  We definitely need a joint community team effort to improve 
Spark's cost based optimizer in the future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15363: [SPARK-17791][SQL] Join reordering using star sch...

2017-03-15 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/15363#discussion_r106271250
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -51,6 +51,11 @@ case class CostBasedJoinReorder(conf: CatalystConf) 
extends Rule[LogicalPlan] wi
 
   def reorder(plan: LogicalPlan, output: AttributeSet): LogicalPlan = {
 val (items, conditions) = extractInnerJoins(plan)
+// Find the star schema joins. Currently, it returns the star join 
with the largest
+// fact table. In the future, it can return more than one star join 
(e.g. F1-D1-D2
+// and F2-D3-D4).
+val starJoinPlans = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
--- End diff --

@ioana-delaney  Thanks for sharing  your thought.  This is helpful.  As of 
today, Spark's join reorder computation is still at its early stage.  Your 
comments above can serve as a guideline for future enhancement.  Can you point 
to a paper/article for your optimization idea?  Thanks.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15363: [SPARK-17791][SQL] Join reordering using star sch...

2017-03-15 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/15363#discussion_r106237955
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -51,6 +51,11 @@ case class CostBasedJoinReorder(conf: CatalystConf) 
extends Rule[LogicalPlan] wi
 
   def reorder(plan: LogicalPlan, output: AttributeSet): LogicalPlan = {
 val (items, conditions) = extractInnerJoins(plan)
+// Find the star schema joins. Currently, it returns the star join 
with the largest
+// fact table. In the future, it can return more than one star join 
(e.g. F1-D1-D2
+// and F2-D3-D4).
+val starJoinPlans = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
--- End diff --

@gatorsmile @ioana-delaney Thank you for your replies.  My main point is to 
identify the deficiency of the DP algorithm so that we can make improvement.  
Since you are familiar with DP algorithm, can you help us identify its 
deficiency/limitations so that we can improve it?

One deficiency the DP algorithm has is the explosion of the search space 
when there is a large  number of join relations such as >30.   In 
CostBasedJoinReorder, we do not optimize join order if the number of join 
relations is greater than the threshold value  joinReorderDPThreshold.  I think 
this is a place star join reorder algorithm can help.  This is because it 
defaults to left-deep tree which is a smaller search space.  What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15363: [SPARK-17791][SQL] Join reordering using star sch...

2017-03-14 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/15363#discussion_r106084556
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -51,6 +51,11 @@ case class CostBasedJoinReorder(conf: CatalystConf) 
extends Rule[LogicalPlan] wi
 
   def reorder(plan: LogicalPlan, output: AttributeSet): LogicalPlan = {
 val (items, conditions) = extractInnerJoins(plan)
+// Find the star schema joins. Currently, it returns the star join 
with the largest
+// fact table. In the future, it can return more than one star join 
(e.g. F1-D1-D2
+// and F2-D3-D4).
+val starJoinPlans = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
--- End diff --

As discussed earlier, we only need to perform join reorder algorithm once.

CostBasedJoinReorder implemented Dynamic Programming algorithm published in 
the classic paper
"Access Path Selection in a relational database system" by Patricia 
Selinger.  The same algorithm was used in PostgreSQL.  To my understanding, it 
is a generic algorithm that can work on both star schema and non-star schema.  
For example, it is capable to generate a bushy tree if it is optimal.  That is 
it is not limited to left-deep tree only.

I suggest that we identify the strength of the star join reorder algorithm 
and it can help solve the
deficiency of the dynamic programming algorithm.  Then we add the necessary 
code to address the deficiency.  There is no need to add code that does the 
same job twice without added value.  

Perhaps running TPC-ds benchmark queries and inspecting the generated query 
plan can help us identify the strength and weakness of both algorithms.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17138: [SPARK-17080] [SQL] join reorder

2017-03-07 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17138#discussion_r104760579
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 ---
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression, PredicateHelper}
+import org.apache.spark.sql.catalyst.plans.{Inner, InnerLike}
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+
+/**
+ * Cost-based join reorder.
+ * We may have several join reorder algorithms in the future. This class 
is the entry of these
+ * algorithms, and chooses which one to use.
+ */
+case class CostBasedJoinReorder(conf: CatalystConf) extends 
Rule[LogicalPlan] with PredicateHelper {
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.cboEnabled || !conf.joinReorderEnabled) {
+  plan
+} else {
+  plan transform {
+case p @ Project(projectList, j @ Join(_, _, _: InnerLike, _)) if 
!j.ordered =>
+  reorder(j, p.outputSet)
+case j @ Join(_, _, _: InnerLike, _) if !j.ordered =>
+  reorder(j, j.outputSet)
+  }
+}
+  }
+
+  def reorder(plan: LogicalPlan, output: AttributeSet): LogicalPlan = {
+val (items, conditions) = extractInnerJoins(plan)
+val result =
+  if (items.size > 2 && items.size <= conf.joinReorderDPThreshold && 
conditions.nonEmpty) {
+JoinReorderDP(conf, items, conditions, 
output).search().getOrElse(plan)
+  } else {
+plan
+  }
+// Set all inside joins ordered.
+setOrdered(result)
+result
+  }
+
+  /**
+   * Extract inner joinable items and join conditions.
+   * This method works for bushy trees and left/right deep trees.
+   */
+  def extractInnerJoins(plan: LogicalPlan): (Seq[LogicalPlan], 
Set[Expression]) = plan match {
+case j @ Join(left, right, _: InnerLike, cond) =>
+  val (leftPlans, leftConditions) = extractInnerJoins(left)
+  val (rightPlans, rightConditions) = extractInnerJoins(right)
+  (leftPlans ++ rightPlans, 
cond.map(splitConjunctivePredicates).getOrElse(Nil).toSet ++
+leftConditions ++ rightConditions)
+case Project(_, j @ Join(left, right, _: InnerLike, cond)) =>
+  val (leftPlans, leftConditions) = extractInnerJoins(left)
+  val (rightPlans, rightConditions) = extractInnerJoins(right)
+  (leftPlans ++ rightPlans, 
cond.map(splitConjunctivePredicates).getOrElse(Nil).toSet ++
+leftConditions ++ rightConditions)
+case _ =>
+  (Seq(plan), Set())
+  }
+
+  def setOrdered(plan: LogicalPlan): Unit = plan match {
+case j @ Join(left, right, _: InnerLike, cond) =>
+  j.ordered = true
+  setOrdered(left)
+  setOrdered(right)
+case Project(_, j @ Join(left, right, _: InnerLike, cond)) =>
+  j.ordered = true
+  setOrdered(left)
+  setOrdered(right)
+case _ =>
+  }
+}
+
+/**
+ * Reorder the joins using a dynamic programming algorithm:
--- End diff --

@hvanhovell We had a meeting with Sameer and Wenchen on 2/21/2017.  We did 
not meet you as you were not in San Francisco office on that day.  In the 
meeting, we agreed to have a good join reorder algorithm implemented in CBO's 
first release as long as the algorithm has a good reference base.  We can 
improve the join reorder algorithm later in CBO's second release.  After all, 
we run short of time for Spark 2.2.  We decided to use the algorithm in 
Selinger's paper.  For CBO's first release, w

[GitHub] spark pull request #17148: [SPARK-17075][SQL][followup] fix filter estimatio...

2017-03-06 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17148#discussion_r104557770
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -90,32 +95,43 @@ case class FilterEstimation(plan: Filter, catalystConf: 
CatalystConf) extends Lo
   def calculateFilterSelectivity(condition: Expression, update: Boolean = 
true): Option[Double] = {
 condition match {
   case And(cond1, cond2) =>
-// For ease of debugging, we compute percent1 and percent2 in 2 
statements.
-val percent1 = calculateFilterSelectivity(cond1, update)
-val percent2 = calculateFilterSelectivity(cond2, update)
-(percent1, percent2) match {
-  case (Some(p1), Some(p2)) => Some(p1 * p2)
-  case (Some(p1), None) => Some(p1)
-  case (None, Some(p2)) => Some(p2)
-  case (None, None) => None
-}
+val percent1 = calculateFilterSelectivity(cond1, 
update).getOrElse(1.0)
+val percent2 = calculateFilterSelectivity(cond2, 
update).getOrElse(1.0)
+Some(percent1 * percent2)
 
   case Or(cond1, cond2) =>
-// For ease of debugging, we compute percent1 and percent2 in 2 
statements.
-val percent1 = calculateFilterSelectivity(cond1, update = false)
-val percent2 = calculateFilterSelectivity(cond2, update = false)
-(percent1, percent2) match {
-  case (Some(p1), Some(p2)) => Some(math.min(1.0, p1 + p2 - (p1 * 
p2)))
-  case (Some(p1), None) => Some(1.0)
-  case (None, Some(p2)) => Some(1.0)
-  case (None, None) => None
+val percent1 = calculateFilterSelectivity(cond1, update = 
false).getOrElse(1.0)
+val percent2 = calculateFilterSelectivity(cond2, update = 
false).getOrElse(1.0)
+Some(percent1 + percent2 - (percent1 * percent2))
+
+  // For AND and OR conditions, we will estimate conservatively if one 
of two
+  // components is not supported, e.g. suppose c1 is not supported,
+  // then p(And(c1, c2)) = p(c2), and p(Or(c1, c2)) = 1.0.
+  // But once they are wrapped in NOT condition, then after 1 - p, it 
becomes
+  // under-estimation. So in these cases, we consider them as 
unsupported.
+  case Not(And(cond1, cond2)) =>
--- End diff --

The current code is fine.  If we just call calculateSingleCondition for 
"case Not(And(cond1, cond2))", then it is too restrictive.  The current code 
computes selectivity for only when we can get selectivity for both conditions. 
If we cannot get selectivity for either one or both, then we just return None.  
I think it is a clean solution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17148: [SPARK-17075][SQL][followup] fix filter estimatio...

2017-03-06 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17148#discussion_r104557294
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -90,32 +95,43 @@ case class FilterEstimation(plan: Filter, catalystConf: 
CatalystConf) extends Lo
   def calculateFilterSelectivity(condition: Expression, update: Boolean = 
true): Option[Double] = {
 condition match {
   case And(cond1, cond2) =>
-// For ease of debugging, we compute percent1 and percent2 in 2 
statements.
-val percent1 = calculateFilterSelectivity(cond1, update)
-val percent2 = calculateFilterSelectivity(cond2, update)
-(percent1, percent2) match {
-  case (Some(p1), Some(p2)) => Some(p1 * p2)
-  case (Some(p1), None) => Some(p1)
-  case (None, Some(p2)) => Some(p2)
-  case (None, None) => None
-}
+val percent1 = calculateFilterSelectivity(cond1, 
update).getOrElse(1.0)
+val percent2 = calculateFilterSelectivity(cond2, 
update).getOrElse(1.0)
+Some(percent1 * percent2)
 
   case Or(cond1, cond2) =>
-// For ease of debugging, we compute percent1 and percent2 in 2 
statements.
-val percent1 = calculateFilterSelectivity(cond1, update = false)
-val percent2 = calculateFilterSelectivity(cond2, update = false)
-(percent1, percent2) match {
-  case (Some(p1), Some(p2)) => Some(math.min(1.0, p1 + p2 - (p1 * 
p2)))
-  case (Some(p1), None) => Some(1.0)
-  case (None, Some(p2)) => Some(1.0)
-  case (None, None) => None
+val percent1 = calculateFilterSelectivity(cond1, update = 
false).getOrElse(1.0)
+val percent2 = calculateFilterSelectivity(cond2, update = 
false).getOrElse(1.0)
+Some(percent1 + percent2 - (percent1 * percent2))
+
+  // For AND and OR conditions, we will estimate conservatively if one 
of two
+  // components is not supported, e.g. suppose c1 is not supported,
+  // then p(And(c1, c2)) = p(c2), and p(Or(c1, c2)) = 1.0.
+  // But once they are wrapped in NOT condition, then after 1 - p, it 
becomes
+  // under-estimation. So in these cases, we consider them as 
unsupported.
+  case Not(And(cond1, cond2)) =>
+val p1 = calculateFilterSelectivity(cond1, update = false)
+val p2 = calculateFilterSelectivity(cond2, update = false)
+if (p1.isDefined && p2.isDefined) {
+  Some(1 - p1.get * p2.get)
+} else {
+  None
 }
 
-  case Not(cond) => calculateFilterSelectivity(cond, update = false) 
match {
-case Some(percent) => Some(1.0 - percent)
-// for not-supported condition, set filter selectivity to a 
conservative estimate 100%
-case None => None
-  }
+  case Not(Or(cond1, cond2)) =>
+val p1 = calculateFilterSelectivity(cond1, update = false)
+val p2 = calculateFilterSelectivity(cond2, update = false)
+if (p1.isDefined && p2.isDefined) {
+  Some(1 - (p1.get + p2.get - (p1.get * p2.get)))
+} else {
+  None
--- End diff --

This is good.  We  compute selectivity for "Not(Or(cond1, cond2))" only 
when we can get selectivity for both conditions.  If we cannot get selectivity 
for either one or both, then we just return None.  It is a clean solution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17148: [SPARK-17075][SQL][followup] fix filter estimatio...

2017-03-06 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17148#discussion_r104527462
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
 ---
@@ -254,133 +270,118 @@ class FilterEstimationSuite extends 
StatsEstimationTestBase {
 val d20170104 = Date.valueOf("2017-01-04")
 val d20170105 = Date.valueOf("2017-01-05")
 validateEstimatedStats(
-  arDate,
-  Filter(In(arDate, Seq(Literal(d20170103), Literal(d20170104), 
Literal(d20170105))),
-childStatsTestPlan(Seq(arDate), 10L)),
-  ColumnStat(distinctCount = 3, min = Some(d20170103), max = 
Some(d20170105),
-nullCount = 0, avgLen = 4, maxLen = 4),
-  3)
+  Filter(In(attrDate, Seq(Literal(d20170103), Literal(d20170104), 
Literal(d20170105))),
+childStatsTestPlan(Seq(attrDate), 10L)),
+  Seq(attrDate -> ColumnStat(distinctCount = 3, min = Some(d20170103), 
max = Some(d20170105),
+nullCount = 0, avgLen = 4, maxLen = 4)),
+  expectedRowCount = 3)
   }
 
   test("cdecimal = 0.40") {
 val dec_0_40 = new java.math.BigDecimal("0.40")
 validateEstimatedStats(
-  arDecimal,
-  Filter(EqualTo(arDecimal, Literal(dec_0_40)),
-childStatsTestPlan(Seq(arDecimal), 4L)),
-  ColumnStat(distinctCount = 1, min = Some(dec_0_40), max = 
Some(dec_0_40),
-nullCount = 0, avgLen = 8, maxLen = 8),
-  1)
+  Filter(EqualTo(attrDecimal, Literal(dec_0_40)),
+childStatsTestPlan(Seq(attrDecimal), 4L)),
+  Seq(attrDecimal -> ColumnStat(distinctCount = 1, min = 
Some(dec_0_40), max = Some(dec_0_40),
+nullCount = 0, avgLen = 8, maxLen = 8)),
+  expectedRowCount = 1)
   }
 
   test("cdecimal < 0.60 ") {
 val dec_0_60 = new java.math.BigDecimal("0.60")
 validateEstimatedStats(
-  arDecimal,
-  Filter(LessThan(arDecimal, Literal(dec_0_60)),
-childStatsTestPlan(Seq(arDecimal), 4L)),
-  ColumnStat(distinctCount = 3, min = Some(decMin), max = 
Some(dec_0_60),
-nullCount = 0, avgLen = 8, maxLen = 8),
-  3)
+  Filter(LessThan(attrDecimal, Literal(dec_0_60)),
+childStatsTestPlan(Seq(attrDecimal), 4L)),
+  Seq(attrDecimal -> ColumnStat(distinctCount = 3, min = Some(decMin), 
max = Some(dec_0_60),
+nullCount = 0, avgLen = 8, maxLen = 8)),
+  expectedRowCount = 3)
   }
 
   test("cdouble < 3.0") {
 validateEstimatedStats(
-  arDouble,
-  Filter(LessThan(arDouble, Literal(3.0)), 
childStatsTestPlan(Seq(arDouble), 10L)),
-  ColumnStat(distinctCount = 2, min = Some(1.0), max = Some(3.0),
-nullCount = 0, avgLen = 8, maxLen = 8),
-  3)
+  Filter(LessThan(attrDouble, Literal(3.0)), 
childStatsTestPlan(Seq(attrDouble), 10L)),
+  Seq(attrDouble -> ColumnStat(distinctCount = 2, min = Some(1.0), max 
= Some(3.0),
+nullCount = 0, avgLen = 8, maxLen = 8)),
+  expectedRowCount = 3)
   }
 
   test("cstring = 'A2'") {
 validateEstimatedStats(
-  arString,
-  Filter(EqualTo(arString, Literal("A2")), 
childStatsTestPlan(Seq(arString), 10L)),
-  ColumnStat(distinctCount = 1, min = None, max = None,
-nullCount = 0, avgLen = 2, maxLen = 2),
-  1)
+  Filter(EqualTo(attrString, Literal("A2")), 
childStatsTestPlan(Seq(attrString), 10L)),
+  Seq(attrString -> ColumnStat(distinctCount = 1, min = None, max = 
None,
+nullCount = 0, avgLen = 2, maxLen = 2)),
+  expectedRowCount = 1)
   }
 
-  // There is no min/max statistics for String type.  We estimate 10 rows 
returned.
-  test("cstring < 'A2'") {
+  test("cstring < 'A2' - unsupported condition") {
 validateEstimatedStats(
-  arString,
-  Filter(LessThan(arString, Literal("A2")), 
childStatsTestPlan(Seq(arString), 10L)),
-  ColumnStat(distinctCount = 10, min = None, max = None,
-nullCount = 0, avgLen = 2, maxLen = 2),
-  10)
+  Filter(LessThan(attrString, Literal("A2")), 
childStatsTestPlan(Seq(attrString), 10L)),
+  Seq(attrString -> ColumnStat(distinctCount = 10, min = None, max = 
None,
+nullCount = 0, avgLen = 2, maxLen = 2)),
+  expectedRowCount = 10)
   }
 
-  // This is a corner test case.  We want to test if we can handle the 
case when the number of
-  // valid values in IN clause is greater than the number of distinct 
values for a 

[GitHub] spark pull request #17148: [SPARK-17075][SQL][followup] fix filter estimatio...

2017-03-03 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17148#discussion_r104267627
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
 ---
@@ -157,7 +157,7 @@ class FilterEstimationSuite extends 
StatsEstimationTestBase {
   Filter(IsNull(arInt), childStatsTestPlan(Seq(arInt), 10L)),
   ColumnStat(distinctCount = 0, min = None, max = None,
 nullCount = 0, avgLen = 4, maxLen = 4),
-  0)
+  rowCount = 0)
   }
 
   test("cint IS NOT NULL") {
--- End diff --

may add a nested NOT test case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17148: [SPARK-17075][SQL][followup] fix filter estimatio...

2017-03-03 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17148#discussion_r104267295
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -101,21 +101,23 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
 }
 
   case Or(cond1, cond2) =>
-// For ease of debugging, we compute percent1 and percent2 in 2 
statements.
 val percent1 = calculateFilterSelectivity(cond1, update = false)
 val percent2 = calculateFilterSelectivity(cond2, update = false)
 (percent1, percent2) match {
   case (Some(p1), Some(p2)) => Some(math.min(1.0, p1 + p2 - (p1 * 
p2)))
-  case (Some(p1), None) => Some(1.0)
-  case (None, Some(p2)) => Some(1.0)
-  case (None, None) => None
+  case _ => None
 }
 
-  case Not(cond) => calculateFilterSelectivity(cond, update = false) 
match {
-case Some(percent) => Some(1.0 - percent)
-// for not-supported condition, set filter selectivity to a 
conservative estimate 100%
-case None => None
-  }
+  case Not(cond) =>
+if (cond.isInstanceOf[And] || cond.isInstanceOf[Or]) {
+  // Don't support compound Not expression.
--- End diff --

I thought that we agreed not to support the nested NOT condition.  First we 
need to clarify what is nested NOT.  Here You allow ((NOT cond1) && (NOT 
cond2)).  But you disallow a condition NOT(cond1 && cond2).  Is this right?
How about this case NOT( cond1 && (NOT cond2))?  The third case is a truly 
nested NOT case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17065: [SPARK-17075][SQL][followup] fix some minor issue...

2017-02-25 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17065#discussion_r103090517
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -361,57 +343,52 @@ case class FilterEstimation(plan: Filter, 
catalystConf: CatalystConf) extends Lo
*/
 
   def evaluateInSet(
-  attrRef: AttributeReference,
+  attr: Attribute,
   hSet: Set[Any],
-  update: Boolean)
-: Option[Double] = {
-if (!mutableColStats.contains(attrRef.exprId)) {
-  logDebug("[CBO] No statistics for " + attrRef)
+  update: Boolean): Option[Double] = {
+if (!colStatsMap.contains(attr)) {
+  logDebug("[CBO] No statistics for " + attr)
   return None
 }
 
-val aColStat = mutableColStats(attrRef.exprId)
-val ndv = aColStat.distinctCount
-val aType = attrRef.dataType
-var newNdv: Long = 0
+val colStat = colStatsMap(attr)
+val ndv = colStat.distinctCount
+val dataType = attr.dataType
+var newNdv = ndv
 
 // use [min, max] to filter the original hSet
-aType match {
-  case _: NumericType | DateType | TimestampType =>
-val statsRange =
-  Range(aColStat.min, aColStat.max, 
aType).asInstanceOf[NumericRange]
-
-// To facilitate finding the min and max values in hSet, we map 
hSet values to BigDecimal.
-// Using hSetBigdec, we can find the min and max values quickly in 
the ordered hSetBigdec.
-val hSetBigdec = hSet.map(e => BigDecimal(e.toString))
-val validQuerySet = hSetBigdec.filter(e => e >= statsRange.min && 
e <= statsRange.max)
-// We use hSetBigdecToAnyMap to help us find the original hSet 
value.
-val hSetBigdecToAnyMap: Map[BigDecimal, Any] =
-  hSet.map(e => BigDecimal(e.toString) -> e).toMap
+dataType match {
+  case _: NumericType | BooleanType | DateType | TimestampType =>
+val statsRange = Range(colStat.min, colStat.max, 
dataType).asInstanceOf[NumericRange]
+val validQuerySet = hSet.filter { v =>
+  v != null && statsRange.contains(Literal(v, dataType))
--- End diff --

nit: for better readability:  (v != null) && statsRange.contains(Literal(v, 
dataType))


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17065: [SPARK-17075][SQL][followup] fix some minor issue...

2017-02-25 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17065#discussion_r103087483
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -95,15 +84,16 @@ case class FilterEstimation(plan: Filter, catalystConf: 
CatalystConf) extends Lo
* @param condition the compound logical expression
* @param update a boolean flag to specify if we need to update 
ColumnStat of a column
*   for subsequent conditions
-   * @return a double value to show the percentage of rows meeting a given 
condition.
+   * @return an optional double value to show the percentage of rows 
meeting a given condition.
* It returns None if the condition is not supported.
*/
   def calculateFilterSelectivity(condition: Expression, update: Boolean = 
true): Option[Double] = {
-
 condition match {
   case And(cond1, cond2) =>
-(calculateFilterSelectivity(cond1, update), 
calculateFilterSelectivity(cond2, update))
-match {
+// For ease of debugging, we compute percent1 and percent2 in 2 
statements.
+val percent1 = calculateFilterSelectivity(cond1, update)
+val percent2 = calculateFilterSelectivity(cond2, update)
+(percent1, percent2) match {
   case (Some(p1), Some(p2)) => Some(p1 * p2)
   case (Some(p1), None) => Some(p1)
--- End diff --

This shows that it is difficult to always over-estimate.  How about we do 
not handle the nested NOT.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17065: [SPARK-17075][SQL][followup] fix some minor issue...

2017-02-25 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/17065#discussion_r103087345
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -297,6 +278,8 @@ case class FilterEstimation(plan: Filter, catalystConf: 
CatalystConf) extends Lo
 Some(DateTimeUtils.toJavaDate(litValue.toString.toInt))
   case TimestampType =>
 Some(DateTimeUtils.toJavaTimestamp(litValue.toString.toLong))
+  case _: DecimalType =>
+Some(litValue.asInstanceOf[Decimal].toJavaBigDecimal)
--- End diff --

Agreed.  Thanks for fixing it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation

2017-02-20 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16395#discussion_r102133390
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
 ---
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.statsEstimation
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+
+/**
+ * In this test suite, we test predicates containing the following 
operators:
+ * =, <, <=, >, >=, AND, OR, IS NULL, IS NOT NULL, IN, NOT IN
+ */
+class FilterEstimationSuite extends StatsEstimationTestBase {
+
+  // Suppose our test table has 10 rows and 6 columns.
+  // First column cint has values: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
+  // Hence, distinctCount:10, min:1, max:10, nullCount:0, avgLen:4, 
maxLen:4
+  val arInt = AttributeReference("cint", IntegerType)()
+  val childColStatInt = ColumnStat(distinctCount = 10, min = Some(1), max 
= Some(10),
+nullCount = 0, avgLen = 4, maxLen = 4)
+
+  // Second column cdate has 10 values from 2017-01-01 through 2017-01-10.
+  val dMin = Date.valueOf("2017-01-01")
+  val dMax = Date.valueOf("2017-01-10")
+  val arDate = AttributeReference("cdate", DateType)()
+  val childColStatDate = ColumnStat(distinctCount = 10, min = Some(dMin), 
max = Some(dMax),
+nullCount = 0, avgLen = 4, maxLen = 4)
+
+  // Third column ctimestamp has 10 values from "2017-01-01 01:00:00" 
through
+  // "2017-01-01 10:00:00" for 10 distinct timestamps (or hours).
+  val tsMin = Timestamp.valueOf("2017-01-01 01:00:00")
+  val tsMax = Timestamp.valueOf("2017-01-01 10:00:00")
+  val arTimestamp = AttributeReference("ctimestamp", TimestampType)()
+  val childColStatTimestamp = ColumnStat(distinctCount = 10, min = 
Some(tsMin), max = Some(tsMax),
+nullCount = 0, avgLen = 8, maxLen = 8)
+
+  // Fourth column cdecimal has 10 values from 0.20 through 2.00 at 
increment of 0.2.
+  val decMin = new java.math.BigDecimal("0.20")
+  val decMax = new java.math.BigDecimal("2.00")
+  val arDecimal = AttributeReference("cdecimal", DecimalType(12, 2))()
+  val childColStatDecimal = ColumnStat(distinctCount = 10, min = 
Some(decMin), max = Some(decMax),
+nullCount = 0, avgLen = 8, maxLen = 8)
+
+  // Fifth column cdouble has 10 double values: 1.0, 2.0, 3.0, 4.0, 5.0, 
6.0, 7.0, 8.0, 9.0, 10.0
+  val arDouble = AttributeReference("cdouble", DoubleType)()
+  val childColStatDouble = ColumnStat(distinctCount = 10, min = Some(1.0), 
max = Some(10.0),
+nullCount = 0, avgLen = 8, maxLen = 8)
+
+  // Sixth column cstring has 10 String values:
+  // "A0", "A1", "A2", "A3", "A4", "A5", "A6", "A7", "A8", "A9"
+  val arString = AttributeReference("cstring", StringType)()
+  val childColStatString = ColumnStat(distinctCount = 10, min = None, max 
= None,
+nullCount = 0, avgLen = 2, maxLen = 2)
+
+  test("cint = 2") {
+validateEstimatedStats(
+  arInt,
+  Filter(EqualTo(arInt, Literal(2)), childStatsTestPlan(Seq(arInt))),
+  ColumnStat(distinctCount = 1, min = Some(2), max = Some(2),
+nullCount = 0, avgLen = 4, maxLen = 4),
+  Some(1L)
+)
+  }
+
+  test("cint = 0") {
+// This is an out-of-range case since 0 is outside the

[GitHub] spark issue #16395: [SPARK-17075][SQL] implemented filter estimation

2017-02-19 Thread ron8hu
Github user ron8hu commented on the issue:

https://github.com/apache/spark/pull/16395
  
@cloud-fan I have updated code based on your feedback.  Please review it 
again.  Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16395: [SPARK-17075][SQL] implemented filter estimation

2017-02-16 Thread ron8hu
Github user ron8hu commented on the issue:

https://github.com/apache/spark/pull/16395
  
Hi @cloud-fan I revised the code using latest Range class.  Thanks for 
reviewing the code.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation

2017-02-13 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16395#discussion_r100955392
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -0,0 +1,623 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import java.sql.{Date, Timestamp}
+
+import scala.collection.immutable.{HashSet, Map}
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * @param plan a LogicalPlan node that must be an instance of Filter
+ * @param catalystConf a configuration showing if CBO is enabled
+ */
+case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) 
extends Logging {
+
+  /**
+   * We use a mutable colStats because we need to update the corresponding 
ColumnStat
+   * for a column after we apply a predicate condition.  For example, 
column c has
+   * [min, max] value as [0, 100].  In a range condition such as (c > 40 
AND c <= 50),
+   * we need to set the column's [min, max] value to [40, 100] after we 
evaluate the
+   * first condition c > 40.  We need to set the column's [min, max] value 
to [40, 50]
+   * after we evaluate the second condition c <= 50.
+   */
+  private var mutableColStats: mutable.Map[ExprId, ColumnStat] = 
mutable.Map.empty
+
+  /**
+   * Returns an option of Statistics for a Filter logical plan node.
+   * For a given compound expression condition, this method computes 
filter selectivity
+   * (or the percentage of rows meeting the filter condition), which
+   * is used to compute row count, size in bytes, and the updated 
statistics after a given
+   * predicated is applied.
+   *
+   * @return Option[Statistics] When there is no statistics collected, it 
returns None.
+   */
+  def estimate: Option[Statistics] = {
+val stats: Statistics = plan.child.stats(catalystConf)
+if (stats.rowCount.isEmpty) return None
+
+// save a mutable copy of colStats so that we can later change it 
recursively
+mutableColStats = mutable.Map(stats.attributeStats.map(kv => 
(kv._1.exprId, kv._2)).toSeq: _*)
+
+// estimate selectivity of this filter predicate
+val filterSelectivity: Double = calculateConditions(plan.condition)
+
+// attributeStats has mapping Attribute-to-ColumnStat.
+// mutableColStats has mapping ExprId-to-ColumnStat.
+// We use an ExprId-to-Attribute map to facilitate the mapping 
Attribute-to-ColumnStat
+val expridToAttrMap: Map[ExprId, Attribute] =
+  stats.attributeStats.map(kv => (kv._1.exprId, kv._1))
+// copy mutableColStats contents to an immutable AttributeMap.
+val mutableAttributeStats: mutable.Map[Attribute, ColumnStat] =
+  mutableColStats.map(kv => expridToAttrMap(kv._1) -> kv._2)
+val newColStats = AttributeMap(mutableAttributeStats.toSeq)
+
+val filteredRowCount: BigInt =
+  EstimationUtils.ceil(BigDecimal(stats.rowCount.get) * 
filterSelectivity)
+val filteredSizeInBytes: BigInt = EstimationUtils.ceil(BigDecimal(
+EstimationUtils.getOutputSize(plan.output, filteredRowCount, 
newColStats)
+))
+
+Some(stats.copy(sizeInBytes = filteredSizeInBytes, rowCount = 
Some(filteredRowCount),
+  attributeStats = newColStats))
+  }
+
+  /**
+   * Returns a percentage of rows meeting a compound condition in Filter 
node.
+   * A compound condition is decomposed into multiple single conditions 
linked with AND, O

[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation

2017-02-13 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16395#discussion_r100954234
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -0,0 +1,623 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import java.sql.{Date, Timestamp}
+
+import scala.collection.immutable.{HashSet, Map}
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * @param plan a LogicalPlan node that must be an instance of Filter
+ * @param catalystConf a configuration showing if CBO is enabled
+ */
+case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) 
extends Logging {
+
+  /**
+   * We use a mutable colStats because we need to update the corresponding 
ColumnStat
+   * for a column after we apply a predicate condition.  For example, 
column c has
+   * [min, max] value as [0, 100].  In a range condition such as (c > 40 
AND c <= 50),
+   * we need to set the column's [min, max] value to [40, 100] after we 
evaluate the
+   * first condition c > 40.  We need to set the column's [min, max] value 
to [40, 50]
+   * after we evaluate the second condition c <= 50.
+   */
+  private var mutableColStats: mutable.Map[ExprId, ColumnStat] = 
mutable.Map.empty
+
+  /**
+   * Returns an option of Statistics for a Filter logical plan node.
+   * For a given compound expression condition, this method computes 
filter selectivity
+   * (or the percentage of rows meeting the filter condition), which
+   * is used to compute row count, size in bytes, and the updated 
statistics after a given
+   * predicated is applied.
+   *
+   * @return Option[Statistics] When there is no statistics collected, it 
returns None.
+   */
+  def estimate: Option[Statistics] = {
+val stats: Statistics = plan.child.stats(catalystConf)
+if (stats.rowCount.isEmpty) return None
+
+// save a mutable copy of colStats so that we can later change it 
recursively
+mutableColStats = mutable.Map(stats.attributeStats.map(kv => 
(kv._1.exprId, kv._2)).toSeq: _*)
+
+// estimate selectivity of this filter predicate
+val filterSelectivity: Double = calculateConditions(plan.condition)
+
+// attributeStats has mapping Attribute-to-ColumnStat.
+// mutableColStats has mapping ExprId-to-ColumnStat.
+// We use an ExprId-to-Attribute map to facilitate the mapping 
Attribute-to-ColumnStat
+val expridToAttrMap: Map[ExprId, Attribute] =
+  stats.attributeStats.map(kv => (kv._1.exprId, kv._1))
+// copy mutableColStats contents to an immutable AttributeMap.
+val mutableAttributeStats: mutable.Map[Attribute, ColumnStat] =
+  mutableColStats.map(kv => expridToAttrMap(kv._1) -> kv._2)
+val newColStats = AttributeMap(mutableAttributeStats.toSeq)
+
+val filteredRowCount: BigInt =
+  EstimationUtils.ceil(BigDecimal(stats.rowCount.get) * 
filterSelectivity)
+val filteredSizeInBytes: BigInt = EstimationUtils.ceil(BigDecimal(
+EstimationUtils.getOutputSize(plan.output, filteredRowCount, 
newColStats)
+))
+
+Some(stats.copy(sizeInBytes = filteredSizeInBytes, rowCount = 
Some(filteredRowCount),
+  attributeStats = newColStats))
+  }
+
+  /**
+   * Returns a percentage of rows meeting a compound condition in Filter 
node.
+   * A compound condition is decomposed into multiple single conditions 
linked with AND, O

[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation

2017-02-13 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16395#discussion_r100952454
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -0,0 +1,623 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import java.sql.{Date, Timestamp}
+
+import scala.collection.immutable.{HashSet, Map}
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * @param plan a LogicalPlan node that must be an instance of Filter
+ * @param catalystConf a configuration showing if CBO is enabled
+ */
+case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) 
extends Logging {
+
+  /**
+   * We use a mutable colStats because we need to update the corresponding 
ColumnStat
+   * for a column after we apply a predicate condition.  For example, 
column c has
+   * [min, max] value as [0, 100].  In a range condition such as (c > 40 
AND c <= 50),
+   * we need to set the column's [min, max] value to [40, 100] after we 
evaluate the
+   * first condition c > 40.  We need to set the column's [min, max] value 
to [40, 50]
+   * after we evaluate the second condition c <= 50.
+   */
+  private var mutableColStats: mutable.Map[ExprId, ColumnStat] = 
mutable.Map.empty
+
+  /**
+   * Returns an option of Statistics for a Filter logical plan node.
+   * For a given compound expression condition, this method computes 
filter selectivity
+   * (or the percentage of rows meeting the filter condition), which
+   * is used to compute row count, size in bytes, and the updated 
statistics after a given
+   * predicated is applied.
+   *
+   * @return Option[Statistics] When there is no statistics collected, it 
returns None.
+   */
+  def estimate: Option[Statistics] = {
+val stats: Statistics = plan.child.stats(catalystConf)
+if (stats.rowCount.isEmpty) return None
+
+// save a mutable copy of colStats so that we can later change it 
recursively
+mutableColStats = mutable.Map(stats.attributeStats.map(kv => 
(kv._1.exprId, kv._2)).toSeq: _*)
+
+// estimate selectivity of this filter predicate
+val filterSelectivity: Double = calculateConditions(plan.condition)
+
+// attributeStats has mapping Attribute-to-ColumnStat.
+// mutableColStats has mapping ExprId-to-ColumnStat.
+// We use an ExprId-to-Attribute map to facilitate the mapping 
Attribute-to-ColumnStat
+val expridToAttrMap: Map[ExprId, Attribute] =
+  stats.attributeStats.map(kv => (kv._1.exprId, kv._1))
+// copy mutableColStats contents to an immutable AttributeMap.
+val mutableAttributeStats: mutable.Map[Attribute, ColumnStat] =
+  mutableColStats.map(kv => expridToAttrMap(kv._1) -> kv._2)
+val newColStats = AttributeMap(mutableAttributeStats.toSeq)
+
+val filteredRowCount: BigInt =
+  EstimationUtils.ceil(BigDecimal(stats.rowCount.get) * 
filterSelectivity)
+val filteredSizeInBytes: BigInt = EstimationUtils.ceil(BigDecimal(
+EstimationUtils.getOutputSize(plan.output, filteredRowCount, 
newColStats)
+))
+
+Some(stats.copy(sizeInBytes = filteredSizeInBytes, rowCount = 
Some(filteredRowCount),
+  attributeStats = newColStats))
+  }
+
+  /**
+   * Returns a percentage of rows meeting a compound condition in Filter 
node.
+   * A compound condition is decomposed into multiple single conditions 
linked with AND, O

[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation

2017-02-13 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16395#discussion_r100941831
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -0,0 +1,623 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import java.sql.{Date, Timestamp}
+
+import scala.collection.immutable.{HashSet, Map}
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * @param plan a LogicalPlan node that must be an instance of Filter
+ * @param catalystConf a configuration showing if CBO is enabled
+ */
+case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) 
extends Logging {
+
+  /**
+   * We use a mutable colStats because we need to update the corresponding 
ColumnStat
+   * for a column after we apply a predicate condition.  For example, 
column c has
+   * [min, max] value as [0, 100].  In a range condition such as (c > 40 
AND c <= 50),
+   * we need to set the column's [min, max] value to [40, 100] after we 
evaluate the
+   * first condition c > 40.  We need to set the column's [min, max] value 
to [40, 50]
+   * after we evaluate the second condition c <= 50.
+   */
+  private var mutableColStats: mutable.Map[ExprId, ColumnStat] = 
mutable.Map.empty
+
+  /**
+   * Returns an option of Statistics for a Filter logical plan node.
+   * For a given compound expression condition, this method computes 
filter selectivity
+   * (or the percentage of rows meeting the filter condition), which
+   * is used to compute row count, size in bytes, and the updated 
statistics after a given
+   * predicated is applied.
+   *
+   * @return Option[Statistics] When there is no statistics collected, it 
returns None.
+   */
+  def estimate: Option[Statistics] = {
+val stats: Statistics = plan.child.stats(catalystConf)
--- End diff --

Here we mean the Statistics of the current node.  We first copy from its 
child node's statistics and then we modify it to become the output statistics 
of the current Filter node.  I can add comments to make it clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation

2017-02-13 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16395#discussion_r100940942
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -0,0 +1,623 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import java.sql.{Date, Timestamp}
+
+import scala.collection.immutable.{HashSet, Map}
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * @param plan a LogicalPlan node that must be an instance of Filter
+ * @param catalystConf a configuration showing if CBO is enabled
+ */
+case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) 
extends Logging {
+
+  /**
+   * We use a mutable colStats because we need to update the corresponding 
ColumnStat
+   * for a column after we apply a predicate condition.  For example, 
column c has
+   * [min, max] value as [0, 100].  In a range condition such as (c > 40 
AND c <= 50),
+   * we need to set the column's [min, max] value to [40, 100] after we 
evaluate the
+   * first condition c > 40.  We need to set the column's [min, max] value 
to [40, 50]
+   * after we evaluate the second condition c <= 50.
+   */
+  private var mutableColStats: mutable.Map[ExprId, ColumnStat] = 
mutable.Map.empty
--- End diff --

mutableColStats is a class member variable.  When we instantiate a Filter 
object, we need to set mutableColStats to mutable.Map.empty.  Afterwards, we 
need to re-assign it to a new value in method estimate.  Since we change the 
value of  mutableColStats, we need to declare it as a var variable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16696: [SPARK-19350] [SQL] Cardinality estimation of Lim...

2017-02-04 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16696#discussion_r99474494
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationSuite.scala
 ---
@@ -18,12 +18,41 @@
 package org.apache.spark.sql.catalyst.statsEstimation
 
 import org.apache.spark.sql.catalyst.CatalystConf
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
AttributeReference}
-import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, 
LogicalPlan, Statistics}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
AttributeReference, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.types.IntegerType
 
 
-class StatsConfSuite extends StatsEstimationTestBase {
+class StatsEstimationSuite extends StatsEstimationTestBase {
+  val (ar, colStat) = (attr("key"), ColumnStat(distinctCount = 10, min = 
Some(1), max = Some(10),
+nullCount = 0, avgLen = 4, maxLen = 4))
+
+  val plan = StatsTestPlan(
+outputList = Seq(ar),
+attributeStats = AttributeMap(Seq(ar -> colStat)),
+rowCount = 10,
+size = Some(10 * (8 + 4)))
+
+  test("limit estimation") {
+val localLimit = LocalLimit(Literal(2), plan)
+val globalLimit = GlobalLimit(Literal(2), plan)
+// LocalLimit and GlobalLimit share the same stats estimation logic.
+val expected = Statistics(sizeInBytes = 24, rowCount = Some(2))
+checkStats(localLimit, expected)
+checkStats(globalLimit, expected)
+  }
+
+  test("sample estimation") {
+val sample = Sample(0.0, 0.5, withReplacement = false, (math.random * 
1000).toLong, plan)()
+checkStats(sample, expected = Statistics(sizeInBytes = 60, rowCount = 
Some(5)))
+
+// Test if Sample's child doesn't have rowCount in stats
+val stats2 = Statistics(sizeInBytes = 120)
--- End diff --

For limit estimation test cases, we may add a test with limit number 
greater than a child node's row count.  This test can show if we properly 
select the smaller value between limit number child node's row count. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16696: [SPARK-19350] [SQL] Cardinality estimation of Lim...

2017-01-31 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16696#discussion_r98776491
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -727,37 +728,18 @@ case class GlobalLimit(limitExpr: Expression, child: 
LogicalPlan) extends UnaryN
   }
   override def computeStats(conf: CatalystConf): Statistics = {
 val limit = limitExpr.eval().asInstanceOf[Int]
-val sizeInBytes = if (limit == 0) {
-  // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also 
be zero
-  // (product of children).
-  1
-} else {
-  (limit: Long) * output.map(a => a.dataType.defaultSize).sum
-}
-child.stats(conf).copy(sizeInBytes = sizeInBytes)
+val childStats = child.stats(conf)
+// Don't propagate column stats, because we don't know the 
distribution after a limit operation
+Statistics(
+  sizeInBytes = EstimationUtils.getOutputSize(output, limit, 
childStats.attributeStats),
--- End diff --

Agreed.  We can pick the smaller value between the child node's row count 
and the limit number. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16594: [SPARK-17078] [SQL] Show stats when explain

2017-01-23 Thread ron8hu
Github user ron8hu commented on the issue:

https://github.com/apache/spark/pull/16594
  
To show a very large Long number, there is no need to print out every digit 
in the number.  We can use exponent.  For example, a number 120,000,000,005,123 
can be printed as 1.2*10**14, where 10**14 means 10 to the power 14.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16395: [SPARK-17075][SQL] implemented filter estimation

2017-01-18 Thread ron8hu
Github user ron8hu commented on the issue:

https://github.com/apache/spark/pull/16395
  
@wzhfy For predicate condition d_date >= '2000-01-27', we do not support it 
because Spark SQL cast d_date column to String first before comparison.  For 
predicate condition d_date >= cast('2000-01-27' AS DATE), CBO supports it by 
comparing values in date type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation

2017-01-18 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16395#discussion_r96718076
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.statsEstimation
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types.{DateType, IntegerType, LongType, 
TimestampType}
+
+/**
+ * In this test suite, we test predicates containing the following 
operators:
+ * =, <, <=, >, >=, AND, OR, IS NULL, IS NOT NULL, IN, NOT IN
+ */
+
+class FilterEstimationSuite extends StatsEstimationTestBase {
+
+  // Suppose our test table has 10 rows and 3 columns.
+  // First column cint has values: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
+  // Hence, distinctCount:10, min:1, max:10, nullCount:0, avgLen:4, 
maxLen:4
+  val arInt = AttributeReference("cint", IntegerType)()
+  val childColStatInt = ColumnStat(distinctCount = 10, min = Some(1), max 
= Some(10),
+nullCount = 0, avgLen = 4, maxLen = 4)
+
+  // Second column cdate has values, from 2017-01-01 through 2017-01-10 
for 10 values.
+  val dMin = Date.valueOf("2017-01-01")
+  val dMax = Date.valueOf("2017-01-10")
+  val arDate = AttributeReference("cdate", DateType)()
+  val childColStatDate = ColumnStat(distinctCount = 10, min = Some(dMin), 
max = Some(dMax),
+nullCount = 0, avgLen = 4, maxLen = 4)
+
+  // Third column ctimestamp has values from "2017-01-01 01:00:00" through
+  // "2017-01-01 10:00:00" for 10 distinct timestamps (or hours).
+  val tsMin = Timestamp.valueOf("2017-01-01 01:00:00")
+  val tsMax = Timestamp.valueOf("2017-01-01 10:00:00")
+  val arTimestamp = AttributeReference("ctimestamp", TimestampType)()
+  val childColStatTimestamp = ColumnStat(distinctCount = 10, min = 
Some(tsMin), max = Some(tsMax),
+nullCount = 0, avgLen = 8, maxLen = 8)
+
+  test("cint = 2") {
+validateEstimatedStats(
+  arInt,
+  Filter(EqualTo(arInt, Literal(2)), childStatsTestPlan(Seq(arInt))),
+  ColumnStat(distinctCount = 1, min = Some(2), max = Some(2),
+nullCount = 0, avgLen = 4, maxLen = 4),
+  Some(1L)
+)
+  }
+
+  test("cint = 0") {
+// This is an out-of-range case since 0 is outside the range [min, max]
+validateEstimatedStats(
+  arInt,
+  Filter(EqualTo(arInt, Literal(0)), childStatsTestPlan(Seq(arInt))),
+  ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
+nullCount = 0, avgLen = 4, maxLen = 4),
+  Some(0L)
+)
+  }
+
+  test("cint < 3") {
+validateEstimatedStats(
+  arInt,
+  Filter(LessThan(arInt, Literal(3)), childStatsTestPlan(Seq(arInt))),
+  ColumnStat(distinctCount = 2, min = Some(1), max = Some(3),
+nullCount = 0, avgLen = 4, maxLen = 4),
+  Some(3L)
+)
+  }
+
+  test("cint < 0") {
+// This is a corner case since literal 0 is smaller than min.
+validateEstimatedStats(
+  arInt,
+  Filter(LessThan(arInt, Literal(0)), childStatsTestPlan(Seq(arInt))),
+  ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
+nullCount = 0, avgLen = 4, maxLen = 4),
+  Some(0L)
+)
+  }
+
+  test("cint <= 3") {
+validateEstimatedStats(
+  arInt,
+  Filter(LessThanOrEqual(arInt, Literal(

[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation

2017-01-16 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16395#discussion_r96325991
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import java.sql.{Date, Timestamp}
+
+import scala.collection.immutable.{HashSet, Map}
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * @param plan a LogicalPlan node that must be an instance of Filter
+ * @param catalystConf a configuration showing if CBO is enabled
+ */
+case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) 
extends Logging {
+
+  /**
+   * We use a mutable colStats because we need to update the corresponding 
ColumnStat
+   * for a column after we apply a predicate condition.  For example, A 
column c has
+   * [min, max] value as [0, 100].  In a range condition such as (c > 40 
AND c <= 50),
+   * we need to set the column's [min, max] value to [40, 100] after we 
evaluate the
+   * first condition c > 40.  We need to set the column's [min, max] value 
to [40, 50]
+   * after we evaluate the second condition c <= 50.
+   */
+  private var mutableColStats: mutable.Map[ExprId, ColumnStat] = 
mutable.Map.empty
+
+  /**
+   * Returns an option of Statistics for a Filter logical plan node.
+   * For a given compound expression condition, this method computes 
filter selectivity
+   * (or the percentage of rows meeting the filter condition), which
+   * is used to compute row count, size in bytes, and the updated 
statistics after a given
+   * predicated is applied.
+   *
+   * @return Option[Statistics] When there is no statistics collected, it 
returns None.
+   */
+  def estimate: Option[Statistics] = {
+val stats: Statistics = plan.child.stats(catalystConf)
+if (stats.rowCount.isEmpty) return None
+
+// save a mutable copy of colStats so that we can later change it 
recursively
+val statsExprIdMap: Map[ExprId, ColumnStat] =
+  stats.attributeStats.map(kv => (kv._1.exprId, kv._2))
+mutableColStats = mutable.Map.empty ++= statsExprIdMap
+
+// estimate selectivity of this filter predicate
+val filterSelectivity: Double = calculateConditions(plan.condition)
+
+// attributeStats has mapping Attribute-to-ColumnStat.
+// mutableColStats has mapping ExprId-to-ColumnStat.
+// We use an ExprId-to-Attribute map to facilitate the mapping 
Attribute-to-ColumnStat
+val expridToAttrMap: Map[ExprId, Attribute] =
+  stats.attributeStats.map(kv => (kv._1.exprId, kv._1))
+// copy mutableColStats contents to an immutable AttributeMap.
+val mutableAttributeStats: mutable.Map[Attribute, ColumnStat] =
+  mutableColStats.map(kv => expridToAttrMap(kv._1) -> kv._2)
+val newColStats = AttributeMap(mutableAttributeStats.toSeq)
+
+val filteredRowCountValue: BigInt =
+  EstimationUtils.ceil(BigDecimal(stats.rowCount.get) * 
filterSelectivity)
+val filteredSizeInBytes: BigInt = EstimationUtils.ceil(BigDecimal(
+EstimationUtils.getOutputSize(plan.output, newColStats, 
filteredRowCountValue)
+))
+
+Some(stats.copy(sizeInBytes = filteredSizeInBytes, rowCount = 
Some(filteredRowCountValue),
+  attributeStats = newColStats))
+  }
+
+  /**
+   * Returns a percentage of rows meeting a compound condition in Filter 
node.

[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation

2017-01-16 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16395#discussion_r96324552
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import java.sql.{Date, Timestamp}
+
+import scala.collection.immutable.{HashSet, Map}
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * @param plan a LogicalPlan node that must be an instance of Filter
+ * @param catalystConf a configuration showing if CBO is enabled
+ */
+case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) 
extends Logging {
+
+  /**
+   * We use a mutable colStats because we need to update the corresponding 
ColumnStat
+   * for a column after we apply a predicate condition.  For example, A 
column c has
+   * [min, max] value as [0, 100].  In a range condition such as (c > 40 
AND c <= 50),
+   * we need to set the column's [min, max] value to [40, 100] after we 
evaluate the
+   * first condition c > 40.  We need to set the column's [min, max] value 
to [40, 50]
+   * after we evaluate the second condition c <= 50.
+   */
+  private var mutableColStats: mutable.Map[ExprId, ColumnStat] = 
mutable.Map.empty
+
+  /**
+   * Returns an option of Statistics for a Filter logical plan node.
+   * For a given compound expression condition, this method computes 
filter selectivity
+   * (or the percentage of rows meeting the filter condition), which
+   * is used to compute row count, size in bytes, and the updated 
statistics after a given
+   * predicated is applied.
+   *
+   * @return Option[Statistics] When there is no statistics collected, it 
returns None.
+   */
+  def estimate: Option[Statistics] = {
+val stats: Statistics = plan.child.stats(catalystConf)
+if (stats.rowCount.isEmpty) return None
+
+// save a mutable copy of colStats so that we can later change it 
recursively
+val statsExprIdMap: Map[ExprId, ColumnStat] =
+  stats.attributeStats.map(kv => (kv._1.exprId, kv._2))
+mutableColStats = mutable.Map.empty ++= statsExprIdMap
+
+// estimate selectivity of this filter predicate
+val filterSelectivity: Double = calculateConditions(plan.condition)
+
+// attributeStats has mapping Attribute-to-ColumnStat.
+// mutableColStats has mapping ExprId-to-ColumnStat.
+// We use an ExprId-to-Attribute map to facilitate the mapping 
Attribute-to-ColumnStat
+val expridToAttrMap: Map[ExprId, Attribute] =
+  stats.attributeStats.map(kv => (kv._1.exprId, kv._1))
+// copy mutableColStats contents to an immutable AttributeMap.
+val mutableAttributeStats: mutable.Map[Attribute, ColumnStat] =
+  mutableColStats.map(kv => expridToAttrMap(kv._1) -> kv._2)
+val newColStats = AttributeMap(mutableAttributeStats.toSeq)
+
+val filteredRowCountValue: BigInt =
+  EstimationUtils.ceil(BigDecimal(stats.rowCount.get) * 
filterSelectivity)
+val filteredSizeInBytes: BigInt = EstimationUtils.ceil(BigDecimal(
+EstimationUtils.getOutputSize(plan.output, newColStats, 
filteredRowCountValue)
+))
+
+Some(stats.copy(sizeInBytes = filteredSizeInBytes, rowCount = 
Some(filteredRowCountValue),
+  attributeStats = newColStats))
+  }
+
+  /**
+   * Returns a percentage of rows meeting a compound condition in Filter 
node.

[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation

2017-01-16 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16395#discussion_r96308583
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.statsEstimation
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types.{DateType, IntegerType, LongType, 
TimestampType}
+
+/**
+ * In this test suite, we test predicates containing the following 
operators:
+ * =, <, <=, >, >=, AND, OR, IS NULL, IS NOT NULL, IN, NOT IN
+ */
+
+class FilterEstimationSuite extends StatsEstimationTestBase {
+
+  // Suppose our test table has 10 rows and 3 columns.
+  // First column cint has values: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
+  // Hence, distinctCount:10, min:1, max:10, nullCount:0, avgLen:4, 
maxLen:4
+  val arInt = AttributeReference("cint", IntegerType)()
+  val childColStatInt = ColumnStat(distinctCount = 10, min = Some(1), max 
= Some(10),
+nullCount = 0, avgLen = 4, maxLen = 4)
+
+  // Second column cdate has values, from 2017-01-01 through 2017-01-10 
for 10 values.
+  val dMin = Date.valueOf("2017-01-01")
+  val dMax = Date.valueOf("2017-01-10")
+  val arDate = AttributeReference("cdate", DateType)()
+  val childColStatDate = ColumnStat(distinctCount = 10, min = Some(dMin), 
max = Some(dMax),
+nullCount = 0, avgLen = 4, maxLen = 4)
+
+  // Third column ctimestamp has values from "2017-01-01 01:00:00" through
+  // "2017-01-01 10:00:00" for 10 distinct timestamps (or hours).
+  val tsMin = Timestamp.valueOf("2017-01-01 01:00:00")
+  val tsMax = Timestamp.valueOf("2017-01-01 10:00:00")
+  val arTimestamp = AttributeReference("ctimestamp", TimestampType)()
+  val childColStatTimestamp = ColumnStat(distinctCount = 10, min = 
Some(tsMin), max = Some(tsMax),
+nullCount = 0, avgLen = 8, maxLen = 8)
+
+  test("cint = 2") {
+// the predicate is "WHERE cint = 2"
+validateEstimatedStats(
+  arInt,
+  Filter(EqualTo(arInt, Literal(2)), ChildStatsTestPlan(Seq(arInt))),
+  ColumnStat(distinctCount = 1, min = Some(2), max = Some(2),
+nullCount = 0, avgLen = 4, maxLen = 4),
+  Some(1L)
+)
+  }
+
+  test("cint = 0") {
+// the predicate is "WHERE cint = 0"
+// This is an out-of-range case since 0 is outside the range [min, max]
+validateEstimatedStats(
+  arInt,
+  Filter(EqualTo(arInt, Literal(0)), ChildStatsTestPlan(Seq(arInt))),
+  ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
+nullCount = 0, avgLen = 4, maxLen = 4),
+  Some(0L)
+)
+  }
+
+  test("cint < 3") {
+// the predicate is "WHERE cint < 3"
+validateEstimatedStats(
+  arInt,
+  Filter(LessThan(arInt, Literal(3)), ChildStatsTestPlan(Seq(arInt))),
+  ColumnStat(distinctCount = 2, min = Some(1), max = Some(3),
+nullCount = 0, avgLen = 4, maxLen = 4),
+  Some(3L)
+)
+  }
+
+  test("cint < 0") {
+// the predicate is "WHERE cint < 0"
+// This is a corner case since literal 0 is smaller than min.
+validateEstimatedStats(
+  arInt,
+  Filter(LessThan(arInt, Literal(0)), ChildStatsTestPlan(Seq(arInt))),
+  ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
+nullCount = 0,

[GitHub] spark issue #16395: [SPARK-17075][SQL] implemented filter estimation

2017-01-14 Thread ron8hu
Github user ron8hu commented on the issue:

https://github.com/apache/spark/pull/16395
  
cc @rxin @wzhfy
Have updated code. Please review again.  Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation

2017-01-14 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16395#discussion_r96128057
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import java.math.{BigDecimal => JDecimal}
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types.{BooleanType, DateType, TimestampType, _}
+
+
+/** Value range of a column. */
+trait Range
+
+/** For simplicity we use decimal to unify operations of numeric ranges. */
+case class NumericRange(min: JDecimal, max: JDecimal) extends Range
+
+/**
+ * This version of Spark does not have min/max for binary/string types, we 
define their default
+ * behaviors by this class.
+ */
+class DefaultRange extends Range
+
+/** This is for columns with only null values. */
+class NullRange extends Range
+
+object Range {
+  def apply(min: Option[Any], max: Option[Any], dataType: DataType): Range 
= dataType match {
+case StringType | BinaryType => new DefaultRange()
+case _ if min.isEmpty || max.isEmpty => new NullRange()
+case _ => toNumericRange(min.get, max.get, dataType)
+  }
+
+  /**
+   * For simplicity we use decimal to unify operations of numeric types, 
the two methods below
+   * are the contract of conversion.
+   */
+  private def toNumericRange(min: Any, max: Any, dataType: DataType): 
NumericRange = {
+dataType match {
+  case _: NumericType =>
+NumericRange(new JDecimal(min.toString), new 
JDecimal(max.toString))
+  case BooleanType =>
+val min1 = if (min.asInstanceOf[Boolean]) 1 else 0
+val max1 = if (max.asInstanceOf[Boolean]) 1 else 0
+NumericRange(new JDecimal(min1), new JDecimal(max1))
+  case DateType =>
+val min1 = DateTimeUtils.fromJavaDate(min.asInstanceOf[Date])
+val max1 = DateTimeUtils.fromJavaDate(max.asInstanceOf[Date])
+NumericRange(new JDecimal(min1), new JDecimal(max1))
+  case TimestampType =>
+val min1 = 
DateTimeUtils.fromJavaTimestamp(min.asInstanceOf[Timestamp])
+val max1 = 
DateTimeUtils.fromJavaTimestamp(max.asInstanceOf[Timestamp])
+NumericRange(new JDecimal(min1), new JDecimal(max1))
+  case _ =>
+throw new AnalysisException(s"Type $dataType is not castable to 
numeric in estimation.")
--- End diff --

OK.  removed this case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation

2017-01-14 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16395#discussion_r96128021
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import java.math.{BigDecimal => JDecimal}
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types.{BooleanType, DateType, TimestampType, _}
+
+
+/** Value range of a column. */
+trait Range
+
+/** For simplicity we use decimal to unify operations of numeric ranges. */
+case class NumericRange(min: JDecimal, max: JDecimal) extends Range
+
+/**
+ * This version of Spark does not have min/max for binary/string types, we 
define their default
+ * behaviors by this class.
+ */
+class DefaultRange extends Range
+
+/** This is for columns with only null values. */
+class NullRange extends Range
+
+object Range {
+  def apply(min: Option[Any], max: Option[Any], dataType: DataType): Range 
= dataType match {
+case StringType | BinaryType => new DefaultRange()
+case _ if min.isEmpty || max.isEmpty => new NullRange()
+case _ => toNumericRange(min.get, max.get, dataType)
+  }
+
+  /**
+   * For simplicity we use decimal to unify operations of numeric types, 
the two methods below
+   * are the contract of conversion.
+   */
+  private def toNumericRange(min: Any, max: Any, dataType: DataType): 
NumericRange = {
+dataType match {
+  case _: NumericType =>
+NumericRange(new JDecimal(min.toString), new 
JDecimal(max.toString))
+  case BooleanType =>
+val min1 = if (min.asInstanceOf[Boolean]) 1 else 0
+val max1 = if (max.asInstanceOf[Boolean]) 1 else 0
+NumericRange(new JDecimal(min1), new JDecimal(max1))
+  case DateType =>
+val min1 = DateTimeUtils.fromJavaDate(min.asInstanceOf[Date])
+val max1 = DateTimeUtils.fromJavaDate(max.asInstanceOf[Date])
+NumericRange(new JDecimal(min1), new JDecimal(max1))
+  case TimestampType =>
+val min1 = 
DateTimeUtils.fromJavaTimestamp(min.asInstanceOf[Timestamp])
--- End diff --

Yes.  Added date and timestamp tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation

2017-01-12 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16395#discussion_r95935452
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -116,6 +116,12 @@ case class Filter(condition: Expression, child: 
LogicalPlan)
   .filterNot(SubqueryExpression.hasCorrelatedSubquery)
 child.constraints.union(predicates.toSet)
   }
+
+  override lazy val statistics: Statistics = {
--- End diff --

OK.  fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation

2017-01-12 Thread ron8hu
Github user ron8hu commented on a diff in the pull request:

https://github.com/apache/spark/pull/16395#discussion_r95916520
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -0,0 +1,555 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import scala.collection.immutable.{HashSet, Map}
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+
+class FilterEstimation extends Logging {
+
+  /**
+   * We use a mutable colStats because we need to update the corresponding 
ColumnStat
+   * for a column after we apply a predicate condition.  For example, A 
column c has
+   * [min, max] value as [0, 100].  In a range condition such as (c > 40 
AND c <= 50),
+   * we need to set the column's [min, max] value to [40, 100] after we 
evaluate the
+   * first condition c > 40.  We need to set the column's [min, max] value 
to [40, 50]
+   * after we evaluate the second condition c <= 50.
+   */
+  private var mutableColStats: mutable.Map[ExprId, ColumnStat] = 
mutable.Map.empty
+
+  /**
+   * Returns an option of Statistics for a Filter logical plan node.
+   * For a given compound expression condition, this method computes 
filter selectivity
+   * (or the percentage of rows meeting the filter condition), which
+   * is used to compute row count, size in bytes, and the updated 
statistics after a given
+   * predicated is applied.
+   *
+   * @param plan a LogicalPlan node that must be an instance of Filter.
+   * @return Option[Statistics] When there is no statistics collected, it 
returns None.
+   */
+  def estimate(plan: Filter): Option[Statistics] = {
+val stats: Statistics = plan.child.statistics
+if (stats.rowCount.isEmpty) return None
+
+// save a mutable copy of colStats so that we can later change it 
recursively
+val statsExprIdMap: Map[ExprId, ColumnStat] =
+  stats.attributeStats.map(kv => (kv._1.exprId, kv._2))
+mutableColStats = mutable.Map.empty ++= statsExprIdMap
+
+// estimate selectivity of this filter predicate
+val percent: Double = calculateConditions(plan, plan.condition)
+
+// attributeStats has mapping Attribute-to-ColumnStat.
+// mutableColStats has mapping ExprId-to-ColumnStat.
+// We use an ExprId-to-Attribute map to facilitate the mapping 
Attribute-to-ColumnStat
+val expridToAttrMap: Map[ExprId, Attribute] =
+  stats.attributeStats.map(kv => (kv._1.exprId, kv._1))
+// copy mutableColStats contents to an immutable AttributeMap.
+val mutableAttributeStats: mutable.Map[Attribute, ColumnStat] =
+  mutableColStats.map(kv => expridToAttrMap(kv._1) -> kv._2)
+val newColStats = AttributeMap(mutableAttributeStats.toSeq)
+
+val filteredRowCountValue: BigInt =
+  EstimationUtils.ceil(BigDecimal(stats.rowCount.get) * percent)
+val avgRowSize = BigDecimal(EstimationUtils.getRowSize(plan.output, 
newColStats))
+val filteredSizeInBytes: BigInt =
+  EstimationUtils.ceil(BigDecimal(filteredRowCountValue) * avgRowSize)
+
+Some(stats.copy(sizeInBytes = filteredSizeInBytes, rowCount = 
Some(filteredRowCountValue),
+  attributeStats = newColStats))
+  }
+
+  /**
+   * Returns a percentage of rows meeting a compound condition in Filter 
node.
+   * A compound condition is depomposed into multiple single conditions 
linked with AND, OR, NOT.
+   * For logical AND conditions, we need to update stats after a condition 
estimation
+

  1   2   >