Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19479#discussion_r145906444 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -216,65 +218,61 @@ object ColumnStat extends Logging { } } - /** - * Constructs an expression to compute column statistics for a given column. - * - * The expression should create a single struct column with the following schema: - * distinctCount: Long, min: T, max: T, nullCount: Long, avgLen: Long, maxLen: Long - * - * Together with [[rowToColumnStat]], this function is used to create [[ColumnStat]] and - * as a result should stay in sync with it. - */ - def statExprs(col: Attribute, relativeSD: Double): CreateNamedStruct = { - def struct(exprs: Expression*): CreateNamedStruct = CreateStruct(exprs.map { expr => - expr.transformUp { case af: AggregateFunction => af.toAggregateExpression() } - }) - val one = Literal(1, LongType) + private def convertToHistogram(s: String): EquiHeightHistogram = { + val idx = s.indexOf(",") + if (idx <= 0) { + throw new AnalysisException("Failed to parse histogram.") + } + val height = s.substring(0, idx).toDouble + val pattern = "Bucket\\(([^,]+), ([^,]+), ([^\\)]+)\\)".r + val buckets = pattern.findAllMatchIn(s).map { m => + EquiHeightBucket(m.group(1).toDouble, m.group(2).toDouble, m.group(3).toLong) + }.toSeq + EquiHeightHistogram(height, buckets) + } - // the approximate ndv (num distinct value) should never be larger than the number of rows - val numNonNulls = if (col.nullable) Count(col) else Count(one) - val ndv = Least(Seq(HyperLogLogPlusPlus(col, relativeSD), numNonNulls)) - val numNulls = Subtract(Count(one), numNonNulls) - val defaultSize = Literal(col.dataType.defaultSize, LongType) +} - def fixedLenTypeStruct(castType: DataType) = { - // For fixed width types, avg size should be the same as max size. - struct(ndv, Cast(Min(col), castType), Cast(Max(col), castType), numNulls, defaultSize, - defaultSize) - } +/** + * There are a few types of histograms in state-of-the-art estimation methods. E.g. equi-width + * histogram, equi-height histogram, frequency histogram (value-frequency pairs) and hybrid + * histogram, etc. + * Currently in Spark, we support equi-height histogram since it is good at handling skew + * distribution, and also provides reasonable accuracy in other cases. + * We can add other histograms in the future, which will make estimation logic more complicated. + * Because we will have to deal with computation between different types of histograms in some + * cases, e.g. for join columns. + */ +trait Histogram - col.dataType match { - case dt: IntegralType => fixedLenTypeStruct(dt) - case _: DecimalType => fixedLenTypeStruct(col.dataType) - case dt @ (DoubleType | FloatType) => fixedLenTypeStruct(dt) - case BooleanType => fixedLenTypeStruct(col.dataType) - case DateType => fixedLenTypeStruct(col.dataType) - case TimestampType => fixedLenTypeStruct(col.dataType) - case BinaryType | StringType => - // For string and binary type, we don't store min/max. - val nullLit = Literal(null, col.dataType) - struct( - ndv, nullLit, nullLit, numNulls, - // Set avg/max size to default size if all the values are null or there is no value. - Coalesce(Seq(Ceil(Average(Length(col))), defaultSize)), - Coalesce(Seq(Cast(Max(Length(col)), LongType), defaultSize))) - case _ => - throw new AnalysisException("Analyzing column statistics is not supported for column " + - s"${col.name} of data type: ${col.dataType}.") - } - } +/** + * Equi-height histogram represents column value distribution by a sequence of buckets. Each bucket + * has a value range and contains approximately the same number of rows. + * @param height number of rows in each bucket + * @param ehBuckets equi-height histogram buckets + */ +case class EquiHeightHistogram(height: Double, ehBuckets: Seq[EquiHeightBucket]) extends Histogram { - /** Convert a struct for column stats (defined in statExprs) into [[ColumnStat]]. */ - def rowToColumnStat(row: InternalRow, attr: Attribute): ColumnStat = { - ColumnStat( - distinctCount = BigInt(row.getLong(0)), - // for string/binary min/max, get should return null - min = Option(row.get(1, attr.dataType)), - max = Option(row.get(2, attr.dataType)), - nullCount = BigInt(row.getLong(3)), - avgLen = row.getLong(4), - maxLen = row.getLong(5) - ) + override def toString: String = { + def bucketString(bucket: EquiHeightBucket): String = { + val sb = new StringBuilder + sb.append("Bucket(") + sb.append(bucket.lo) + sb.append(", ") + sb.append(bucket.hi) + sb.append(", ") + sb.append(bucket.ndv) + sb.append(")") + sb.toString() + } + height + ", " + ehBuckets.map(bucketString).mkString(", ") } - } + +/** + * A bucket in an equi-height histogram. We use double type for lower/higher bound for simplicity. + * @param lo lower bound of the value range in this bucket + * @param hi higher bound of the value range in this bucket + * @param ndv approximate number of distinct values in this bucket + */ +case class EquiHeightBucket(lo: Double, hi: Double, ndv: Long) --- End diff -- Using Long is to simplify computation. I think Long is enough for ndv, maybe we can change the BigInt type in `ColumnStat` to Long.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org