Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/897#discussion_r13359976
--- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
---
@@ -214,40 +213,112 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
/**
+ * :: Experimental ::
+ *
* Return approximate number of distinct values for each key in this RDD.
- * The accuracy of approximation can be controlled through the relative
standard deviation
- * (relativeSD) parameter, which also controls the amount of memory
used. Lower values result in
- * more accurate counts but increase the memory footprint and vice
versa. Uses the provided
- * Partitioner to partition the output RDD.
+ *
+ * The algorithm used is based on streamlib's implementation of
"HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation
Algorithm", available
+ * <a href="http://research.google.com/pubs/pub40671.html">here</a>.
+ *
+ * @param p The precision value for the normal set.
+ * `p` must be a value between 4 and `sp` (32 max).
+ * @param sp The precision value for the sparse set, between 0 and 32.
+ * If `sp` equals 0, the sparse representation is skipped.
+ * @param partitioner Partitioner to use for the resulting RDD.
*/
- def countApproxDistinctByKey(relativeSD: Double, partitioner:
Partitioner): RDD[(K, Long)] = {
- val createHLL = (v: V) => new SerializableHyperLogLog(new
HyperLogLog(relativeSD)).add(v)
- val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v)
- val mergeHLL = (h1: SerializableHyperLogLog, h2:
SerializableHyperLogLog) => h1.merge(h2)
+ @Experimental
+ def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner):
RDD[(K, Long)] = {
+ val createHLL = (v: V) => {
+ val hll = new HyperLogLogPlus(p, sp)
+ hll.offer(v)
+ hll
+ }
+ val mergeValueHLL = (hll: HyperLogLogPlus, v: V) => {
+ hll.offer(v)
+ hll
+ }
+ val mergeHLL = (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
+ h1.addAll(h2)
+ h1
+ }
+
+ combineByKey(createHLL, mergeValueHLL, mergeHLL,
partitioner).mapValues(_.cardinality())
+ }
+
+ /**
+ * :: Experimental ::
+ *
+ * Return approximate number of distinct values for each key in this RDD.
+ *
+ * The algorithm used is based on streamlib's implementation of
"HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation
Algorithm", available
+ * <a href="http://research.google.com/pubs/pub40671.html">here</a>.
+ *
+ * @param p The precision value for the normal set.
+ * `p` must be a value between 4 and `sp` (32 max).
+ * @param sp The precision value for the sparse set, between 0 and 32.
+ * If `sp` equals 0, the sparse representation is skipped.
+ * @param numPartitions Number of partitions in the resulting RDD.
+ */
+ @Experimental
+ def countApproxDistinctByKey(p: Int, sp: Int, numPartitions: Int):
RDD[(K, Long)] = {
+ countApproxDistinctByKey(p, sp, new HashPartitioner(numPartitions))
+ }
- combineByKey(createHLL, mergeValueHLL, mergeHLL,
partitioner).mapValues(_.value.cardinality())
+ /**
+ * :: Experimental ::
+ *
+ * Return approximate number of distinct values for each key in this RDD.
+ *
+ * The algorithm used is based on streamlib's implementation of
"HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation
Algorithm", available
+ * <a href="http://research.google.com/pubs/pub40671.html">here</a>.
+ *
+ * @param p The precision value for the normal set.
+ * `p` must be a value between 4 and `sp` (32 max).
+ * @param sp The precision value for the sparse set, between 0 and 32.
+ * If `sp` equals 0, the sparse representation is skipped.
+ */
+ @Experimental
+ def countApproxDistinctByKey(p: Int, sp: Int): RDD[(K, Long)] = {
+ countApproxDistinctByKey(p, sp, defaultPartitioner(self))
}
/**
* Return approximate number of distinct values for each key in this RDD.
- * The accuracy of approximation can be controlled through the relative
standard deviation
- * (relativeSD) parameter, which also controls the amount of memory
used. Lower values result in
- * more accurate counts but increase the memory footprint and vice
versa. HashPartitions the
- * output RDD into numPartitions.
*
+ * The algorithm used is based on streamlib's implementation of
"HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation
Algorithm", available
+ * <a href="http://research.google.com/pubs/pub40671.html">here</a>.
*/
+ @deprecated("Use countApproxDistinctByKey with parameter p and sp",
"1.0.1")
+ def countApproxDistinctByKey(relativeSD: Double, partitioner:
Partitioner): RDD[(K, Long)] = {
+ // See stream-lib's HyperLogLog implementation on the conversion from
relativeSD to p.
+ val p = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) /
math.log(2)).toInt
--- End diff --
I checked the paper. I think the correct mapping should be
~~~
val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
~~~
`HyperLogLogPlus` uses 64-bit hash. The constant `beta` is `1.054` in the
paper. To guarantee the precision, we need `ceil`. We also need to check
whether `p` is out of bound if `relativeSD` is really small.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---