dbtsai commented on a change in pull request #26197: [SPARK-29577] Implement
p-value simulation and unit tests for chi2 test
URL: https://github.com/apache/spark/pull/26197#discussion_r344415251
##########
File path:
mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala
##########
@@ -256,4 +269,80 @@ private[spark] object ChiSqTest extends Logging {
new ChiSqTestResult(pValue, df, statistic, methodName,
NullHypothesis.independence.toString)
}
}
+
+ /**
+ * @param spark SparkSession
+ * @param exp A vector of expected counts for each category
+ * @param numDraw The number of monte-carlo draws to perform
+ * @param numSamplesPerPart number of samples each partition/executor will
draw
+ * @param k TDigest compression factor
+ *
+ * @return A TDigest of the empirical distribution of the chi2 metric for
exp
+ */
+ def getChi2Digest(
+ spark: SparkSession,
+ exp: BDV[Double],
+ numDraw: Int = 50000,
+
+ // 1073741824 = 2**30 -> ~10 minutes per digest
+ numSamplesPerPart: Int = 1073741824 / 50,
+ k: Int = 1024): TDigest = {
+
+ import breeze.linalg.{argmax, sum}
+
+ val expSum = sum(exp)
+ val expNorm = exp / expSum
+ val expFrac = { // cumulative sum
+ val e = (0 to expNorm.size).map{ i => sum(expNorm.slice(0, i)) }
+ e.slice(0, e.size - 1).zip(e.slice(1, e.size)) // boundaries for exp
categories
+ }
+
+ // amount of CPU 'work' to do = constant * expSum * numDraw
+ val numPart = scala.math.max(1, (expSum * numDraw /
numSamplesPerPart).ceil.toInt)
+
+ logInfo(
+ s"Starting MC simulation for chi2 digest with k=$k, numDraw=$numDraw, " +
+ s"exp.length=${exp.length}, exp=$exp, expSum=$expSum,
numPart=$numPart")
+
+ val drawRange = spark.range(0, numDraw, 1, numPartitions =
numPart).toDF().rdd
+
+ // Probably LOTS of room for optimization building d, but this approach
works...
+ val tic = DateTime.now(DateTimeZone.UTC)
+ implicit val enc: Encoder[MergingDigest] = Encoders.kryo[MergingDigest]
+
+ val d: TDigest = drawRange.mapPartitions { part =>
+ val d: MergingDigest = new MergingDigest(k)
+ logDebug(s"building digest")
+
+ part.foreach { drawId => // do a draw
+ logDebug(s"drawId=$drawId adding to digest")
+
+ val obs = BDV.zeros[Double](exp.size)
Review comment:
re-inialize will be probably more efficient than reallocating the memory for
each record.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]