Github user sryza commented on a diff in the pull request:
https://github.com/apache/spark/pull/7075#discussion_r36125885
--- Diff:
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
---
@@ -190,5 +191,104 @@ private[stat] object KolmogorovSmirnovTest extends
Logging {
val pval = 1 - new CommonMathKolmogorovSmirnovTest().cdf(ksStat,
n.toInt)
new KolmogorovSmirnovTestResult(pval, ksStat,
NullHypothesis.OneSampleTwoSided.toString)
}
+
+ /**
+ * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which
tests if the 2 samples
+ * come from the same distribution
+ * @param data1 `RDD[Double]` first sample of data
+ * @param data2 `RDD[Double]` second sample of data
+ * @return
[[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test
+ * statistic, p-value, and appropriate null hypothesis
+ */
+ def testTwoSamples(data1: RDD[Double], data2: RDD[Double]):
KolmogorovSmirnovTestResult = {
+ val n1 = data1.count().toDouble
+ val n2 = data2.count().toDouble
+ // identifier for sample 1, needed after co-sort
+ val isSample1 = true
+ // combine identified samples
+ val unionedData = data1.map((_, isSample1)).union(data2.map((_,
!isSample1)))
+ // co-sort and operate on each partition, returning local extrema to
the driver
+ val localData = unionedData.sortByKey().mapPartitions(
+ searchTwoSampleCandidates(_, n1, n2)
+ ).collect()
+ // result: global extreme
+ val ksStat = searchTwoSampleStatistic(localData, n1 * n2)
+ evalTwoSampleP(ksStat, n1.toInt, n2.toInt)
+ }
+
+ /**
+ * Calculates maximum distance candidates and counts of elements from
each sample within one
+ * partition for the two-sample, two-sided Kolmogorov-Smirnov test
implementation
+ * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition
of the co-sorted RDDs,
+ * each element is additionally tagged with a boolean
flag for sample 1 membership
+ * @param n1 `Double` sample 1 size
+ * @param n2 `Double` sample 2 size
+ * @return `Iterator[(Double, Double, Double)]` where the first element
is an unadjusted minimum
+ * distance, the second is an unadjusted maximum distance (both
of which will later
+ * be adjusted by a constant to account for elements in prior
partitions), and the third is
+ * a count corresponding to the numerator of the adjustment
constant coming from this
+ * partition. This last value, the numerator of the adjustment
constant, is calculated as
+ * |sample 2| * |sample 1 in partition| - |sample 1| * |sample 2
in partition|. This comes
+ * from the fact that when we adjust for prior partitions, what
we are doing is
+ * adding the difference of the fractions (|prior elements in
sample 1| / |sample 1| -
+ * |prior elements in sample 2| / |sample 2|). We simply keep
track of the numerator
+ * portion that is attributable to each partition so that
following partitions can
+ * use it to cumulatively adjust their values.
+ */
+ private def searchTwoSampleCandidates(
+ partData: Iterator[(Double, Boolean)],
+ n1: Double,
+ n2: Double): Iterator[(Double, Double, Double)] = {
+ // fold accumulator: local minimum, local maximum, index for sample 1,
index for sample2
+ case class ExtremaAndIndices(min: Double, max: Double, ix1: Int, ix2:
Int)
+ val initAcc = ExtremaAndIndices(Double.MaxValue, Double.MinValue, 0, 0)
+ // traverse the data in the partition and calculate distances and
counts
+ val pResults = partData.foldLeft(initAcc) { case (acc, (v, isSample1))
=>
+ val (add1, add2) = if (isSample1) (1, 0) else (0, 1)
+ val cdf1 = (acc.ix1 + add1) / n1
+ val cdf2 = (acc.ix2 + add2) / n2
+ val dist = cdf1 - cdf2
+ ExtremaAndIndices(
+ math.min(acc.min, dist),
+ math.max(acc.max, dist),
+ acc.ix1 + add1, acc.ix2 + add2)
+ }
+ val results = if (pResults == initAcc) {
+ Array[(Double, Double, Double)]()
+ } else {
+ Array((pResults.min, pResults.max, (pResults.ix1 + 1) * n2 -
(pResults.ix2 + 1) * n1))
+ }
+ results.iterator
+ }
+
+ /**
+ * Adjust candidate extremes by the appropriate constant. The resulting
maximum corresponds to
+ * the two-sample, two-sided Kolmogorov-Smirnov test
+ * @param localData `Array[(Double, Double, Double)]` contains the
candidate extremes from each
+ * partition, along with the numerator for the necessary
constant adjustments
+ * @param n `Double` The denominator in the constant adjustment (i.e.
(size of sample 1 ) * (size
+ * of sample 2))
+ * @return The two-sample, two-sided Kolmogorov-Smirnov statistic
+ */
+ private def searchTwoSampleStatistic(localData: Array[(Double, Double,
Double)], n: Double)
+ : Double = {
+ // maximum distance and numerator for constant adjustment
+ val initAcc = (Double.MinValue, 0.0)
+ // adjust differences based on the number of elements preceding it,
which should provide
+ // the correct distance between the 2 empirical CDFs
+ val results = localData.foldLeft(initAcc) { case ((prevMax, prevCt),
(minCand, maxCand, ct)) =>
+ val adjConst = prevCt / n
+ val dist1 = math.abs(minCand + adjConst)
+ val dist2 = math.abs(maxCand + adjConst)
+ val maxVal = Array(prevMax, dist1, dist2).max
+ (maxVal, prevCt + ct)
+ }
--- End diff --
Indent this back two spaces
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]