Github user davies commented on a diff in the pull request:
https://github.com/apache/spark/pull/11578#discussion_r56201728
--- Diff:
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -332,31 +456,44 @@ class GapSamplingReplacementIterator[T: ClassTag](
}
}
- private val q = math.exp(-f)
+ /** advance to first sample as part of object construction. */
+ advance()
+ // Attempting to invoke this closer to the top with other object
initialization
+ // was causing it to break in strange ways, so I'm invoking it last,
which seems to
+ // work reliably.
+}
- /**
- * Sample from Poisson distribution, conditioned such that the sampled
value is >= 1.
- * This is an adaptation from the algorithm for Generating Poisson
distributed random variables:
- * http://en.wikipedia.org/wiki/Poisson_distribution
- */
- private def poissonGE1: Int = {
- // simulate that the standard poisson sampling
- // gave us at least one iteration, for a sample of >= 1
- var pp = q + ((1.0 - q) * rng.nextDouble())
- var r = 1
+private[spark]
+class GapSamplingReplacement(
+ val f: Double,
+ val rng: Random = RandomSampler.newDefaultRNG,
+ epsilon: Double = RandomSampler.rngEpsilon) extends PoissonGE with
Serializable {
- // now continue with standard poisson sampling algorithm
- pp *= rng.nextDouble()
- while (pp > q) {
- r += 1
- pp *= rng.nextDouble()
+ require(f > 0.0, s"Sampling fraction ($f) must be > 0")
+ require(epsilon > 0.0, s"epsilon ($epsilon) must be > 0")
+
+ private var countForDropping: Int = 0
+
+ def sample(): Int = {
+ if (countForDropping > 0) {
+ countForDropping -= 1
+ 0
+ } else {
+ advance()
+ poissonGE1
}
- r
+ }
+
+ /**
+ * Skip elements with replication factor zero (i.e. elements that won't
be sampled).
+ * Samples 'k' from geometric distribution P(k) = (1-q)(q)^k, where q =
e^(-f), that is
+ * q is the probabililty of Poisson(0; f)
+ */
+ private def advance(): Unit = {
+ val u = math.max(rng.nextDouble(), epsilon)
+ countForDropping = (math.log(u) / (-f)).toInt
}
/** advance to first sample as part of object construction. */
advance()
- // Attempting to invoke this closer to the top with other object
initialization
--- End diff --
keep this comment
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]