Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/897#discussion_r13360735
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -921,15 +920,44 @@ abstract class RDD[T: ClassTag](
* :: Experimental ::
* Return approximate number of distinct elements in the RDD.
*
- * The accuracy of approximation can be controlled through the relative
standard deviation
- * (relativeSD) parameter, which also controls the amount of memory
used. Lower values result in
- * more accurate counts but increase the memory footprint and vise
versa. The default value of
- * relativeSD is 0.05.
+ * The algorithm used is based on streamlib's implementation of
"HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation
Algorithm", available
+ * <a href="http://research.google.com/pubs/pub40671.html">here</a>.
+ *
+ * @param p The precision value for the normal set.
+ * <code>p</code> must be a value between 4 and <code>sp</code>.
+ * @param sp The precision value for the sparse set, between 0 and 32.
+ * If <code>sp</code> equals 0, the sparse representation is
skipped.
*/
@Experimental
+ def countApproxDistinct(p: Int, sp: Int): Long = {
+ require(p >= 4, s"p ($p) must be greater than 0")
+ require(sp <= 32, s"sp ($sp) cannot be greater than 32")
+ require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
+ val zeroCounter = new HyperLogLogPlus(p, sp)
+ aggregate(zeroCounter)(
+ (hll: HyperLogLogPlus, v: T) => {
+ hll.offer(v)
+ hll
+ },
+ (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
+ h1.addAll(h2)
+ h2
+ }).cardinality()
+ }
+
+ /**
+ * Return approximate number of distinct elements in the RDD.
+ *
+ * The algorithm used is based on streamlib's implementation of
"HyperLogLog in Practice:
+ * Algorithmic Engineering of a State of The Art Cardinality Estimation
Algorithm", available
+ * <a href="http://research.google.com/pubs/pub40671.html">here</a>.
+ */
+ @deprecated("Use countApproxDistinct with parameter p and sp", "1.0.1")
def countApproxDistinct(relativeSD: Double = 0.05): Long = {
- val zeroCounter = new SerializableHyperLogLog(new
HyperLogLog(relativeSD))
- aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality()
+ // See stream-lib's HyperLogLog implementation on the conversion from
relativeSD to p.
+ val p = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) /
math.log(2)).toInt
--- End diff --
See the formula I mentioned above.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---