Github user wzhfy commented on a diff in the pull request:
https://github.com/apache/spark/pull/19479#discussion_r149864452
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
---
@@ -275,6 +317,122 @@ object ColumnStat extends Logging {
avgLen = row.getLong(4),
maxLen = row.getLong(5)
)
+ if (row.isNullAt(6)) {
+ cs
+ } else {
+ val ndvs = row.getArray(6).toLongArray()
+ assert(percentiles.get.numElements() == ndvs.length + 1)
+ val endpoints =
percentiles.get.toArray[Any](attr.dataType).map(_.toString.toDouble)
+ // Construct equi-height histogram
+ val buckets = ndvs.zipWithIndex.map { case (ndv, i) =>
+ HistogramBucket(endpoints(i), endpoints(i + 1), ndv)
+ }
+ val nonNullRows = rowCount - cs.nullCount
+ val histogram = Histogram(nonNullRows.toDouble / ndvs.length,
buckets)
+ cs.copy(histogram = Some(histogram))
+ }
+ }
+
+}
+
+/**
+ * This class is an implementation of equi-height histogram.
+ * Equi-height histogram represents the distribution of a column's values
by a sequence of buckets.
+ * Each bucket has a value range and contains approximately the same
number of rows.
+ * @param height number of rows in each bucket
+ * @param buckets equi-height histogram buckets
+ */
+case class Histogram(height: Double, buckets: Array[HistogramBucket]) {
+
+ // Only for histogram equality test.
+ override def equals(other: Any): Boolean = other match {
+ case otherHgm: Histogram =>
+ height == otherHgm.height && buckets.sameElements(otherHgm.buckets)
+ case _ => false
}
+ override def hashCode(): Int = super.hashCode()
+}
+
+/**
+ * A bucket in an equi-height histogram. We use double type for
lower/higher bound for simplicity.
+ * @param lo lower bound of the value range in this bucket
+ * @param hi higher bound of the value range in this bucket
+ * @param ndv approximate number of distinct values in this bucket
+ */
+case class HistogramBucket(lo: Double, hi: Double, ndv: Long)
+
+object HistogramSerializer {
+ /**
+ * Serializes a given histogram to a string. For advanced statistics
like histograms, sketches,
+ * etc, we don't provide readability for their serialized formats in
metastore
+ * (string-to-string table properties). This is because it's hard or
unnatural for these
+ * statistics to be human readable. For example, a histogram is probably
split into multiple
+ * key-value properties, instead of a single, self-described property.
And for
+ * count-min-sketch, it's essentially unnatural to make it a readable
string.
+ */
+ final def serialize(histogram: Histogram): String = {
+ val bos = new ByteArrayOutputStream()
+ val out = new DataOutputStream(new LZ4BlockOutputStream(bos))
+ out.writeDouble(histogram.height)
+ out.writeInt(histogram.buckets.length)
+ // Write data with same type together for compression.
+ var i = 0
+ while (i < histogram.buckets.length) {
+ out.writeDouble(histogram.buckets(i).lo)
+ i += 1
+ }
+ i = 0
+ while (i < histogram.buckets.length) {
+ out.writeDouble(histogram.buckets(i).hi)
+ i += 1
+ }
+ i = 0
+ while (i < histogram.buckets.length) {
+ out.writeLong(histogram.buckets(i).ndv)
+ i += 1
+ }
+ out.writeInt(-1)
--- End diff --
To denote the end of the stream. `SparkPlan.getByteArrayRdd`,
`Percentile.serialize` etc also write -1 at the end.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]