Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/897#discussion_r13365503
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
---
    @@ -214,39 +213,88 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       }
     
       /**
    +   * :: Experimental ::
    +   *
        * Return approximate number of distinct values for each key in this RDD.
    -   * The accuracy of approximation can be controlled through the relative 
standard deviation
    -   * (relativeSD) parameter, which also controls the amount of memory 
used. Lower values result in
    -   * more accurate counts but increase the memory footprint and vice 
versa. Uses the provided
    -   * Partitioner to partition the output RDD.
    +   *
    +   * The algorithm used is based on streamlib's implementation of 
"HyperLogLog in Practice:
    +   * Algorithmic Engineering of a State of The Art Cardinality Estimation 
Algorithm", available
    +   * <a href="http://dx.doi.org/10.1145/2452376.2452456";>here</a>.
    +   *
    +   * The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a 
nonzero `sp > p`
    +   * would trigger sparse representation of registers, which may reduce 
the memory consumption
    +   * and increase accuracy when the cardinality is small.
    +   *
    +   *@param p The precision value for the normal set.
    +   *          `p` must be a value between 4 and `sp` (32 max).
    +   * @param sp The precision value for the sparse set, between 0 and 32.
    +   *           If `sp` equals 0, the sparse representation is skipped.
    +   * @param partitioner Partitioner to use for the resulting RDD.
        */
    -  def countApproxDistinctByKey(relativeSD: Double, partitioner: 
Partitioner): RDD[(K, Long)] = {
    -    val createHLL = (v: V) => new SerializableHyperLogLog(new 
HyperLogLog(relativeSD)).add(v)
    -    val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v)
    -    val mergeHLL = (h1: SerializableHyperLogLog, h2: 
SerializableHyperLogLog) => h1.merge(h2)
    +  @Experimental
    +  def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): 
RDD[(K, Long)] = {
    +    require(p >= 4, s"p ($p) should be >= 4")
    +    require(sp <= 32, s"sp ($sp) should be <= 32")
    +    require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
    +    val createHLL = (v: V) => {
    +      val hll = new HyperLogLogPlus(p, sp)
    +      hll.offer(v)
    +      hll
    +    }
    +    val mergeValueHLL = (hll: HyperLogLogPlus, v: V) => {
    +      hll.offer(v)
    +      hll
    +    }
    +    val mergeHLL = (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
    +      h1.addAll(h2)
    +      h1
    +    }
    +
    +    combineByKey(createHLL, mergeValueHLL, mergeHLL, 
partitioner).mapValues(_.cardinality())
    +  }
     
    -    combineByKey(createHLL, mergeValueHLL, mergeHLL, 
partitioner).mapValues(_.value.cardinality())
    +  /**
    +   * Return approximate number of distinct values for each key in this RDD.
    +   *
    +   * The algorithm used is based on streamlib's implementation of 
"HyperLogLog in Practice:
    +   * Algorithmic Engineering of a State of The Art Cardinality Estimation 
Algorithm", available
    +   * <a href="http://dx.doi.org/10.1145/2452376.2452456";>here</a>.
    +   *
    +   * @param relativeSD Relative accuracy. Smaller values create counters 
that require more space.
    +   *                   It should be greater than 0.000017.
    --- End diff --
    
    "should" -> "must"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to