Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/15959#discussion_r89258891
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
---
@@ -58,60 +61,176 @@ case class Statistics(
}
}
+
/**
- * Statistics for a column.
+ * Statistics collected for a column.
+ *
+ * 1. Supported data types are defined in `ColumnStat.supportsType`.
+ * 2. The JVM data type stored in min/max is the external data type (used
in Row) for the
+ * corresponding Catalyst data type. For example, for DateType we store
java.sql.Date, and for
+ * TimestampType we store java.sql.Timestamp.
+ * 3. For integral types, they are all upcasted to longs, i.e. shorts are
stored as longs.
+ * 4. There is no guarantee that the statistics collected are accurate.
Approximation algorithms
+ * (sketches) might have been used, and the data collected can also be
stale.
+ *
+ * @param distinctCount number of distinct values
+ * @param min minimum value
+ * @param max maximum value
+ * @param nullCount number of nulls
+ * @param avgLen average length of the values. For fixed-length types,
this should be a constant.
+ * @param maxLen maximum length of the values. For fixed-length types,
this should be a constant.
*/
-case class ColumnStat(statRow: InternalRow) {
+case class ColumnStat(
+ distinctCount: BigInt,
+ min: Option[Any],
+ max: Option[Any],
+ nullCount: BigInt,
+ avgLen: Long,
+ maxLen: Long) {
- def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = {
- NumericColumnStat(statRow, dataType)
- }
- def forString: StringColumnStat = StringColumnStat(statRow)
- def forBinary: BinaryColumnStat = BinaryColumnStat(statRow)
- def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow)
+ // We currently don't store min/max for byte arrays. This can change in
the future and then
+ // we need to remove this require.
+ require(min.isEmpty || !min.get.isInstanceOf[Array[Byte]])
+ require(max.isEmpty || !max.get.isInstanceOf[Array[Byte]])
- override def toString: String = {
- // use Base64 for encoding
- Base64.encodeBase64String(statRow.asInstanceOf[UnsafeRow].getBytes)
+ /**
+ * Returns a map from string to string that can be used to serialize the
column stats.
+ * The key is the name of the field (e.g. "distinctCount" or "min"), and
the value is the string
+ * representation for the value. The deserialization side is defined in
[[ColumnStat.fromMap]].
+ *
+ * As part of the protocol, the returned map always contains a key
called "version".
+ * In the case min/max values are null (None), they won't appear in the
map.
+ */
+ def toMap: Map[String, String] = {
+ val map = new scala.collection.mutable.HashMap[String, String]
+ map.put(ColumnStat.KEY_VERSION, "1")
+ map.put(ColumnStat.KEY_DISTINCT_COUNT, distinctCount.toString)
+ map.put(ColumnStat.KEY_NULL_COUNT, nullCount.toString)
+ map.put(ColumnStat.KEY_AVG_LEN, avgLen.toString)
+ map.put(ColumnStat.KEY_MAX_LEN, maxLen.toString)
+ min.foreach { v => map.put(ColumnStat.KEY_MIN_VALUE, v.toString) }
+ max.foreach { v => map.put(ColumnStat.KEY_MAX_VALUE, v.toString) }
+ map.toMap
}
}
-object ColumnStat {
- def apply(numFields: Int, str: String): ColumnStat = {
- // use Base64 for decoding
- val bytes = Base64.decodeBase64(str)
- val unsafeRow = new UnsafeRow(numFields)
- unsafeRow.pointTo(bytes, bytes.length)
- ColumnStat(unsafeRow)
+
+object ColumnStat extends Logging {
+
+ // List of string keys used to serialize ColumnStat
+ val KEY_VERSION = "version"
+ private val KEY_DISTINCT_COUNT = "distinctCount"
+ private val KEY_MIN_VALUE = "min"
+ private val KEY_MAX_VALUE = "max"
+ private val KEY_NULL_COUNT = "nullCount"
+ private val KEY_AVG_LEN = "avgLen"
+ private val KEY_MAX_LEN = "maxLen"
+
+ /** Returns true iff the we support gathering column statistics on
column of the given type. */
+ def supportsType(dataType: DataType): Boolean = dataType match {
+ case _: IntegralType => true
+ case _: DecimalType => true
+ case DoubleType | FloatType => true
+ case BooleanType => true
+ case DateType => true
+ case TimestampType => true
+ case BinaryType | StringType => true
+ case _ => false
}
-}
-case class NumericColumnStat[T <: AtomicType](statRow: InternalRow,
dataType: T) {
- // The indices here must be consistent with
`ColumnStatStruct.numericColumnStat`.
- val numNulls: Long = statRow.getLong(0)
- val max: T#InternalType = statRow.get(1,
dataType).asInstanceOf[T#InternalType]
- val min: T#InternalType = statRow.get(2,
dataType).asInstanceOf[T#InternalType]
- val ndv: Long = statRow.getLong(3)
-}
+ /**
+ * Creates a [[ColumnStat]] object from the given map. This is used to
deserialize column stats
+ * from some external storage. The serialization side is defined in
[[ColumnStat.toMap]].
+ */
+ def fromMap(table: String, field: StructField, map: Map[String, String])
+ : Option[ColumnStat] = {
+ val str2val: (String => Any) = field.dataType match {
+ case _: IntegralType => _.toLong
+ case _: DecimalType => new java.math.BigDecimal(_)
+ case DoubleType | FloatType => _.toDouble
+ case BooleanType => _.toBoolean
+ case DateType => java.sql.Date.valueOf
+ case TimestampType => java.sql.Timestamp.valueOf
+ case StringType => identity
+ // This version of Spark does not use min/max for binary columns so
we ignore it.
--- End diff --
actually it's also true for string type:
https://github.com/apache/spark/pull/15959/files#diff-a4113ed6a89e8d19a39f5c27ce95658bR213
, shall we put the `StringType` and `BinaryType` in the same case statement?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]