Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8362#discussion_r42054243
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala
 ---
    @@ -302,3 +307,397 @@ case class Sum(child: Expression) extends 
AlgebraicAggregate {
     
       override val evaluateExpression = Cast(currentSum, resultType)
     }
    +
    +// scalastyle:off
    +/**
    + * HyperLogLog++ (HLL++) is a state of the art cardinality estimation 
algorithm. This class
    + * implements the dense version of the HLL++ algorithm as an Aggregate 
Function.
    + *
    + * This implementation has been based on the following papers:
    + * HyperLogLog: the analysis of a near-optimal cardinality estimation 
algorithm
    + * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
    + *
    + * HyperLogLog in Practice: Algorithmic Engineering of a State of The Art 
Cardinality Estimation
    + * Algorithm
    + * 
http://static.googleusercontent.com/external_content/untrusted_dlcp/research.google.com/en/us/pubs/archive/40671.pdf
    + *
    + * Appendix to HyperLogLog in Practice: Algorithmic Engineering of a State 
of the Art Cardinality
    + * Estimation Algorithm
    + * 
https://docs.google.com/document/d/1gyjfMHy43U9OWBXxfaeG-3MjGzejW1dlpyMwEYAAWEI/view?fullscreen#
    + *
    + * @param child to estimate the cardinality of.
    + * @param relativeSD the maximum estimation error allowed.
    + */
    +// scalastyle:on
    +case class HyperLogLogPlusPlus(child: Expression, relativeSD: Double = 
0.05)
    +    extends AggregateFunction2 {
    +  import HyperLogLogPlusPlus._
    +
    +  /**
    +   * HLL++ uses 'p' bits for addressing. The more addressing bits we use, 
the more precise the
    +   * algorithm will be, and the more memory it will require. The 'p' value 
is based on the relative
    +   * error requested.
    +   *
    +   * HLL++ requires that we use at least 4 bits of addressing space (a 
minimum precision of 27%).
    +   *
    +   * This method rounds up to the nearest integer. This means that the 
error is always equal to or
    +   * lower than the requested error. Use the <code>trueRsd</code> method 
to get the actual RSD
    +   * value.
    +   */
    +  private[this] val p = Math.ceil(2.0d * Math.log(1.106d / relativeSD) / 
Math.log(2.0d)).toInt
    +
    +  require(p >= 4, "HLL++ requires at least 4 bits for addressing. " +
    +    "Use a lower error, at most 27%.")
    +
    +  /**
    +   * Shift used to extract the index of the register from the hashed value.
    +   *
    +   * This assumes the use of 64-bit hashcodes.
    +   */
    +  private[this] val idxShift = JLong.SIZE - p
    +
    +  /**
    +   * Value to pad the 'w' value with before the number of leading zeros is 
determined.
    +   */
    +  private[this] val wPadding = 1L << (p - 1)
    +
    +  /**
    +   * The number of registers used.
    +   */
    +  private[this] val m = 1 << p
    +
    +  /**
    +   * The pre-calculated combination of: alpha * m * m
    +   *
    +   * 'alpha' corrects the raw cardinality estimate 'Z'. See the FlFuGaMe07 
paper for its
    +   * derivation.
    +   */
    +  private[this] val alphaM2 = p match {
    +    case 4 => 0.673d * m * m
    +    case 5 => 0.697d * m * m
    +    case 6 => 0.709d * m * m
    +    case _ => (0.7213d / (1.0d + 1.079d / m)) * m * m
    +  }
    +
    +  /**
    +   * The number of words used to store the registers. We use Longs for 
storage because this is the
    +   * most compact way of storage; Spark aligns to 8-byte words or uses 
Long wrappers.
    +   *
    +   * We only store whole registers per word in order to prevent overly 
complex bitwise operations.
    +   * In practice this means we only use 60 out of 64 bits.
    +   */
    +  private[this] val numWords = m / REGISTERS_PER_WORD + 1
    +
    +  def children: Seq[Expression] = Seq(child)
    +
    +  def nullable: Boolean = false
    +
    +  def dataType: DataType = LongType
    +
    +  def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType)
    +
    +  def bufferSchema: StructType = 
StructType.fromAttributes(bufferAttributes)
    +
    +  def cloneBufferAttributes: Seq[Attribute] = 
bufferAttributes.map(_.newInstance())
    +
    +  /** Allocate enough words to store all registers. */
    +  val bufferAttributes: Seq[AttributeReference] = Seq.tabulate(numWords) { 
i =>
    +    AttributeReference(s"MS[$i]", LongType)()
    +  }
    +
    +  /** Fill all words with zeros. */
    +  def initialize(buffer: MutableRow): Unit = {
    +    var word = 0
    +    while (word < numWords) {
    +      buffer.setLong(mutableBufferOffset + word, 0)
    +      word += 1
    +    }
    +  }
    +
    +  /**
    +   * Update the HLL++ buffer.
    +   *
    +   * Variable names in the HLL++ paper match variable names in the code.
    +   */
    +  def update(buffer: MutableRow, input: InternalRow): Unit = {
    +    val v = child.eval(input)
    +    if (v != null) {
    +      // Create the hashed value 'x'.
    +      val x = MurmurHash.hash64(v)
    --- End diff --
    
    @hvanhovell Does HLL++ require using `hash64`? I took a look at the 
implementation of it. Looks we will convert it to string in many cases 
(https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/hash/MurmurHash.java#L135-L159).
 For our old function, the offer method of HyperLogLog class use `hash` 
internally, which has some specializations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to