Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15959#discussion_r89257869 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -58,60 +61,170 @@ case class Statistics( } } + /** - * Statistics for a column. + * Statistics collected for a column. + * + * 1. Supported data types are defined in `ColumnStat.supportsType`. + * 2. The JVM data type stored in min/max is the external data type (used in Row) for the + * corresponding Catalyst data type. For example, for DateType we store java.sql.Date, and for + * TimestampType we store java.sql.Timestamp. + * 3. For integral types, they are all upcasted to longs, i.e. shorts are stored as longs. + * 4. There is no guarantee that the statistics collected are accurate. Approximation algorithms + * (sketches) might have been used, and the data collected can also be stale. + * + * @param distinctCount number of distinct values + * @param min minimum value + * @param max maximum value + * @param nullCount number of nulls + * @param avgLen average length of the values. For fixed-length types, this should be a constant. + * @param maxLen maximum length of the values. For fixed-length types, this should be a constant. */ -case class ColumnStat(statRow: InternalRow) { +case class ColumnStat( + distinctCount: BigInt, + min: Option[Any], + max: Option[Any], + nullCount: BigInt, + avgLen: Long, + maxLen: Long) { - def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = { - NumericColumnStat(statRow, dataType) - } - def forString: StringColumnStat = StringColumnStat(statRow) - def forBinary: BinaryColumnStat = BinaryColumnStat(statRow) - def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow) + /** + * Returns a map from string to string that can be used to serialize the column stats. + * The key is the name of the field (e.g. "ndv" or "min"), and the value is the string + * representation for the value. The deserialization side is defined in [[ColumnStat.fromMap]]. + * + * As part of the protocol, the returned map always contains a key called "version". + * In the case min/max values are null (None), they will be stored as "<null>". + */ + def toMap: Map[String, String] = Map( + ColumnStat.KEY_VERSION -> "1", + ColumnStat.KEY_DISTINCT_COUNT -> distinctCount.toString, + ColumnStat.KEY_MIN_VALUE -> min.map(_.toString).getOrElse(ColumnStat.NULL_STRING), + ColumnStat.KEY_MAX_VALUE -> max.map(_.toString).getOrElse(ColumnStat.NULL_STRING), + ColumnStat.KEY_NULL_COUNT -> nullCount.toString, + ColumnStat.KEY_AVG_LEN -> avgLen.toString, + ColumnStat.KEY_MAX_LEN -> maxLen.toString + ) +} + + +object ColumnStat extends Logging { + + /** String representation for null in serialization. */ + private val NULL_STRING: String = "<null>" + + // List of string keys used to serialize ColumnStat + val KEY_VERSION = "version" + private val KEY_DISTINCT_COUNT = "distinctCount" + private val KEY_MIN_VALUE = "min" + private val KEY_MAX_VALUE = "max" + private val KEY_NULL_COUNT = "nullCount" + private val KEY_AVG_LEN = "avgLen" + private val KEY_MAX_LEN = "maxLen" - override def toString: String = { - // use Base64 for encoding - Base64.encodeBase64String(statRow.asInstanceOf[UnsafeRow].getBytes) + /** Returns true iff the we support gathering column statistics on column of the given type. */ + def supportsType(dataType: DataType): Boolean = dataType match { + case _: IntegralType => true + case _: DecimalType => true + case DoubleType | FloatType => true + case BooleanType => true + case DateType => true + case TimestampType => true + case BinaryType | StringType => true + case _ => false } -} -object ColumnStat { - def apply(numFields: Int, str: String): ColumnStat = { - // use Base64 for decoding - val bytes = Base64.decodeBase64(str) - val unsafeRow = new UnsafeRow(numFields) - unsafeRow.pointTo(bytes, bytes.length) - ColumnStat(unsafeRow) + /** + * Creates a [[ColumnStat]] object from the given map. This is used to deserialize column stats + * from some external storage. The serialization side is defined in [[ColumnStat.toMap]]. + */ + def fromMap(table: String, field: StructField, map: Map[String, String]) + : Option[ColumnStat] = { + val str2val: (String => Any) = field.dataType match { + case _: IntegralType => _.toLong + case _: DecimalType => new java.math.BigDecimal(_) + case DoubleType | FloatType => _.toDouble + case BooleanType => _.toBoolean + case DateType => java.sql.Date.valueOf + case TimestampType => java.sql.Timestamp.valueOf + case BinaryType | StringType => (v: String) => if (v == NULL_STRING) null else v --- End diff -- so we use String as the external type for binary column? maybe we should mention it in the doc: https://github.com/apache/spark/pull/15959/files#diff-a4113ed6a89e8d19a39f5c27ce95658bR69
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org