Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r157514899
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 ---
    @@ -212,4 +213,186 @@ object EstimationUtils {
         }
       }
     
    +  /**
    +   * Returns overlapped ranges between two histograms, in the given value 
range
    +   * [lowerBound, upperBound].
    +   */
    +  def getOverlappedRanges(
    +    leftHistogram: Histogram,
    +    rightHistogram: Histogram,
    +    lowerBound: Double,
    +    upperBound: Double): Seq[OverlappedRange] = {
    +    val overlappedRanges = new ArrayBuffer[OverlappedRange]()
    +    // Only bins whose range intersect [lowerBound, upperBound] have join 
possibility.
    +    val leftBins = leftHistogram.bins
    +      .filter(b => b.lo <= upperBound && b.hi >= lowerBound)
    +    val rightBins = rightHistogram.bins
    +      .filter(b => b.lo <= upperBound && b.hi >= lowerBound)
    +
    +    leftBins.foreach { lb =>
    +      rightBins.foreach { rb =>
    +        val (left, leftHeight) = trimBin(lb, leftHistogram.height, 
lowerBound, upperBound)
    +        val (right, rightHeight) = trimBin(rb, rightHistogram.height, 
lowerBound, upperBound)
    +        // Only collect overlapped ranges.
    +        if (left.lo <= right.hi && left.hi >= right.lo) {
    +          // Collect overlapped ranges.
    +          val range = if (left.lo == left.hi) {
    +            // Case1: the left bin has only one value
    +            OverlappedRange(
    +              lo = left.lo,
    +              hi = left.lo,
    +              leftNdv = 1,
    +              rightNdv = 1,
    +              leftNumRows = leftHeight,
    +              rightNumRows = rightHeight / right.ndv
    +            )
    +          } else if (right.lo == right.hi) {
    +            // Case2: the right bin has only one value
    +            OverlappedRange(
    +              lo = right.lo,
    +              hi = right.lo,
    +              leftNdv = 1,
    +              rightNdv = 1,
    +              leftNumRows = leftHeight / left.ndv,
    +              rightNumRows = rightHeight
    +            )
    +          } else if (right.lo >= left.lo && right.hi >= left.hi) {
    +            // Case3: the left bin is "smaller" than the right bin
    +            //      left.lo            right.lo     left.hi          
right.hi
    +            // 
--------+------------------+------------+----------------+------->
    +            if (left.hi == right.lo) {
    +              // The overlapped range has only one value.
    +              OverlappedRange(
    +                lo = right.lo,
    +                hi = right.lo,
    +                leftNdv = 1,
    +                rightNdv = 1,
    +                leftNumRows = leftHeight / left.ndv,
    +                rightNumRows = rightHeight / right.ndv
    +              )
    +            } else {
    +              val leftRatio = (left.hi - right.lo) / (left.hi - left.lo)
    +              val rightRatio = (left.hi - right.lo) / (right.hi - right.lo)
    +              OverlappedRange(
    +                lo = right.lo,
    +                hi = left.hi,
    +                leftNdv = left.ndv * leftRatio,
    +                rightNdv = right.ndv * rightRatio,
    +                leftNumRows = leftHeight * leftRatio,
    +                rightNumRows = rightHeight * rightRatio
    +              )
    +            }
    +          } else if (right.lo <= left.lo && right.hi <= left.hi) {
    +            // Case4: the left bin is "larger" than the right bin
    +            //      right.lo           left.lo      right.hi         
left.hi
    +            // 
--------+------------------+------------+----------------+------->
    +            if (right.hi == left.lo) {
    +              // The overlapped range has only one value.
    +              OverlappedRange(
    +                lo = right.hi,
    +                hi = right.hi,
    +                leftNdv = 1,
    +                rightNdv = 1,
    +                leftNumRows = leftHeight / left.ndv,
    +                rightNumRows = rightHeight / right.ndv
    +              )
    +            } else {
    +              val leftRatio = (right.hi - left.lo) / (left.hi - left.lo)
    +              val rightRatio = (right.hi - left.lo) / (right.hi - right.lo)
    +              OverlappedRange(
    +                lo = left.lo,
    +                hi = right.hi,
    +                leftNdv = left.ndv * leftRatio,
    +                rightNdv = right.ndv * rightRatio,
    +                leftNumRows = leftHeight * leftRatio,
    +                rightNumRows = rightHeight * rightRatio
    +              )
    +            }
    +          } else if (right.lo >= left.lo && right.hi <= left.hi) {
    +            // Case5: the left bin contains the right bin
    +            //      left.lo            right.lo     right.hi         
left.hi
    +            // 
--------+------------------+------------+----------------+------->
    +            val leftRatio = (right.hi - right.lo) / (left.hi - left.lo)
    +            OverlappedRange(
    +              lo = right.lo,
    +              hi = right.hi,
    +              leftNdv = left.ndv * leftRatio,
    +              rightNdv = right.ndv,
    +              leftNumRows = leftHeight * leftRatio,
    +              rightNumRows = rightHeight
    +            )
    +          } else {
    +            assert(right.lo <= left.lo && right.hi >= left.hi)
    +            // Case6: the right bin contains the left bin
    +            //      right.lo           left.lo      left.hi          
right.hi
    +            // 
--------+------------------+------------+----------------+------->
    +            val rightRatio = (left.hi - left.lo) / (right.hi - right.lo)
    +            OverlappedRange(
    +              lo = left.lo,
    +              hi = left.hi,
    +              leftNdv = left.ndv,
    +              rightNdv = right.ndv * rightRatio,
    +              leftNumRows = leftHeight,
    +              rightNumRows = rightHeight * rightRatio
    +            )
    +          }
    +          overlappedRanges += range
    +        }
    +      }
    +    }
    +    overlappedRanges
    +  }
    +
    +  /**
    +   * Given an original bin and a value range [lowerBound, upperBound], 
returns the trimmed part
    +   * of the bin in that range and its number of rows.
    +   */
    +  def trimBin(bin: HistogramBin, height: Double, lowerBound: Double, 
upperBound: Double)
    +  : (HistogramBin, Double) = {
    +    val (lo, hi) = if (bin.lo <= lowerBound && bin.hi >= upperBound) {
    +      //       bin.lo          lowerBound     upperBound      bin.hi
    +      // --------+------------------+------------+-------------+------->
    +      (lowerBound, upperBound)
    +    } else if (bin.lo <= lowerBound && bin.hi >= lowerBound) {
    +      //       bin.lo          lowerBound      bin.hi      upperBound
    +      // --------+------------------+------------+-------------+------->
    +      (lowerBound, bin.hi)
    +    } else if (bin.lo <= upperBound && bin.hi >= upperBound) {
    +      //    lowerBound            bin.lo     upperBound       bin.hi
    +      // --------+------------------+------------+-------------+------->
    +      (bin.lo, upperBound)
    +    } else {
    +      //    lowerBound            bin.lo        bin.hi     upperBound
    +      // --------+------------------+------------+-------------+------->
    +      (bin.lo, bin.hi)
    --- End diff --
    
    add an assert to make sure if we reach here, the case is what we want.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to