Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19594#discussion_r157503949 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -212,4 +213,186 @@ object EstimationUtils { } } + /** + * Returns overlapped ranges between two histograms, in the given value range + * [lowerBound, upperBound]. + */ + def getOverlappedRanges( + leftHistogram: Histogram, + rightHistogram: Histogram, + lowerBound: Double, + upperBound: Double): Seq[OverlappedRange] = { + val overlappedRanges = new ArrayBuffer[OverlappedRange]() + // Only bins whose range intersect [lowerBound, upperBound] have join possibility. + val leftBins = leftHistogram.bins + .filter(b => b.lo <= upperBound && b.hi >= lowerBound) + val rightBins = rightHistogram.bins + .filter(b => b.lo <= upperBound && b.hi >= lowerBound) + + leftBins.foreach { lb => + rightBins.foreach { rb => + val (left, leftHeight) = trimBin(lb, leftHistogram.height, lowerBound, upperBound) + val (right, rightHeight) = trimBin(rb, rightHistogram.height, lowerBound, upperBound) + // Only collect overlapped ranges. + if (left.lo <= right.hi && left.hi >= right.lo) { + // Collect overlapped ranges. + val range = if (left.lo == left.hi) { + // Case1: the left bin has only one value + OverlappedRange( + lo = left.lo, + hi = left.lo, + leftNdv = 1, + rightNdv = 1, + leftNumRows = leftHeight, + rightNumRows = rightHeight / right.ndv + ) + } else if (right.lo == right.hi) { + // Case2: the right bin has only one value + OverlappedRange( + lo = right.lo, + hi = right.lo, + leftNdv = 1, + rightNdv = 1, + leftNumRows = leftHeight / left.ndv, + rightNumRows = rightHeight + ) + } else if (right.lo >= left.lo && right.hi >= left.hi) { + // Case3: the left bin is "smaller" than the right bin + // left.lo right.lo left.hi right.hi + // --------+------------------+------------+----------------+-------> + if (left.hi == right.lo) { + // The overlapped range has only one value. + OverlappedRange( + lo = right.lo, + hi = right.lo, + leftNdv = 1, + rightNdv = 1, + leftNumRows = leftHeight / left.ndv, + rightNumRows = rightHeight / right.ndv + ) + } else { + val leftRatio = (left.hi - right.lo) / (left.hi - left.lo) + val rightRatio = (left.hi - right.lo) / (right.hi - right.lo) + OverlappedRange( + lo = right.lo, + hi = left.hi, + leftNdv = left.ndv * leftRatio, + rightNdv = right.ndv * rightRatio, + leftNumRows = leftHeight * leftRatio, + rightNumRows = rightHeight * rightRatio + ) + } + } else if (right.lo <= left.lo && right.hi <= left.hi) { + // Case4: the left bin is "larger" than the right bin + // right.lo left.lo right.hi left.hi + // --------+------------------+------------+----------------+-------> + if (right.hi == left.lo) { + // The overlapped range has only one value. + OverlappedRange( + lo = right.hi, + hi = right.hi, + leftNdv = 1, + rightNdv = 1, + leftNumRows = leftHeight / left.ndv, + rightNumRows = rightHeight / right.ndv + ) + } else { + val leftRatio = (right.hi - left.lo) / (left.hi - left.lo) + val rightRatio = (right.hi - left.lo) / (right.hi - right.lo) + OverlappedRange( + lo = left.lo, + hi = right.hi, + leftNdv = left.ndv * leftRatio, + rightNdv = right.ndv * rightRatio, + leftNumRows = leftHeight * leftRatio, + rightNumRows = rightHeight * rightRatio + ) + } + } else if (right.lo >= left.lo && right.hi <= left.hi) { + // Case5: the left bin contains the right bin + // left.lo right.lo right.hi left.hi + // --------+------------------+------------+----------------+-------> + val leftRatio = (right.hi - right.lo) / (left.hi - left.lo) + OverlappedRange( + lo = right.lo, + hi = right.hi, + leftNdv = left.ndv * leftRatio, + rightNdv = right.ndv, + leftNumRows = leftHeight * leftRatio, + rightNumRows = rightHeight + ) + } else { + assert(right.lo <= left.lo && right.hi >= left.hi) + // Case6: the right bin contains the left bin + // right.lo left.lo left.hi right.hi + // --------+------------------+------------+----------------+-------> + val rightRatio = (left.hi - left.lo) / (right.hi - right.lo) + OverlappedRange( + lo = left.lo, + hi = left.hi, + leftNdv = left.ndv, + rightNdv = right.ndv * rightRatio, + leftNumRows = leftHeight, + rightNumRows = rightHeight * rightRatio + ) + } + overlappedRanges += range + } + } + } + overlappedRanges + } + + /** + * Given an original bin and a value range [lowerBound, upperBound], returns the trimmed part + * of the bin in that range and its number of rows. + */ + def trimBin(bin: HistogramBin, height: Double, lowerBound: Double, upperBound: Double) --- End diff -- maybe explain in the comment that `height` means the average number of rows of the given bin inside a equi-height histogram.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org