Github user wzhfy commented on a diff in the pull request:
https://github.com/apache/spark/pull/19479#discussion_r147887853
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
---
@@ -89,19 +93,159 @@ case class AnalyzeColumnCommand(
// The first element in the result will be the overall row count, the
following elements
// will be structs containing all column stats.
// The layout of each struct follows the layout of the ColumnStats.
- val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError
val expressions = Count(Literal(1)).toAggregateExpression() +:
- attributesToAnalyze.map(ColumnStat.statExprs(_, ndvMaxErr))
+ attributesToAnalyze.map(statExprs(_, sparkSession.sessionState.conf))
val namedExpressions = expressions.map(e => Alias(e, e.toString)())
val statsRow = new QueryExecution(sparkSession, Aggregate(Nil,
namedExpressions, relation))
.executedPlan.executeTake(1).head
val rowCount = statsRow.getLong(0)
- val columnStats = attributesToAnalyze.zipWithIndex.map { case (attr,
i) =>
- // according to `ColumnStat.statExprs`, the stats struct always have
6 fields.
- (attr.name, ColumnStat.rowToColumnStat(statsRow.getStruct(i + 1, 6),
attr))
- }.toMap
- (rowCount, columnStats)
+ val colStats = rowToColumnStats(sparkSession, relation,
attributesToAnalyze, statsRow, rowCount)
+ (rowCount, colStats)
+ }
+
+ /**
+ * Constructs an expression to compute column statistics for a given
column.
+ *
+ * The expression should create a single struct column with the
following schema:
+ * distinctCount: Long, min: T, max: T, nullCount: Long, avgLen: Long,
maxLen: Long,
+ * percentiles: Array[T]
+ *
+ * Together with [[rowToColumnStats]], this function is used to create
[[ColumnStat]] and
+ * as a result should stay in sync with it.
+ */
+ private def statExprs(col: Attribute, conf: SQLConf): CreateNamedStruct
= {
+ def struct(exprs: Expression*): CreateNamedStruct =
CreateStruct(exprs.map { expr =>
+ expr.transformUp { case af: AggregateFunction =>
af.toAggregateExpression() }
+ })
+ val one = Literal(1, LongType)
+
+ // the approximate ndv (num distinct value) should never be larger
than the number of rows
+ val numNonNulls = if (col.nullable) Count(col) else Count(one)
+ val ndv = Least(Seq(HyperLogLogPlusPlus(col, conf.ndvMaxError),
numNonNulls))
+ val numNulls = Subtract(Count(one), numNonNulls)
+ val defaultSize = Literal(col.dataType.defaultSize, LongType)
+ val nullArray = Literal(null, ArrayType(DoubleType))
+
+ def fixedLenTypeExprs(castType: DataType) = {
+ // For fixed width types, avg size should be the same as max size.
+ Seq(ndv, Cast(Min(col), castType), Cast(Max(col), castType),
numNulls, defaultSize,
+ defaultSize)
+ }
+
+ def fixedLenTypeStruct(castType: DataType, genHistogram: Boolean) = {
+ val percentileExpr = if (genHistogram) {
+ // To generate equi-height histogram, we need to:
+ // 1. get percentiles p(1/n), p(2/n) ... p((n-1)/n),
+ // 2. use min, max, and percentiles as range values of buckets,
e.g. [min, p(1/n)],
+ // [p(1/n), p(2/n)] ... [p((n-1)/n), max], and then count ndv in
each bucket.
+ // Step 2 will be performed in `rowToColumnStats`.
--- End diff --
Do you mean calculate percentiles for min/max at the step 1? Currently
other percentiles are already calculated at step 1.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]