Github user feynmanliang commented on a diff in the pull request: https://github.com/apache/spark/pull/11374#discussion_r85647905 --- Diff: python/pyspark/mllib/stat/test.py --- @@ -80,3 +85,118 @@ class KolmogorovSmirnovTestResult(TestResult): """ Contains test results for the Kolmogorov-Smirnov test. """ + + +class BinarySample(namedtuple("BinarySample", ["isExperiment", "value"])): + """ + Represents a (isExperiment, value) tuple. + + .. versionadded:: 2.0.0 + """ + + def __reduce__(self): + return BinarySample, (bool(self.isExperiment), float(self.value)) + + +class StreamingTestResult(namedtuple("StreamingTestResult", + ["pValue", "degreesOfFreedom", "statistic", "method", + "nullHypothesis"])): + """ + Contains test results for StreamingTest. + + .. versionadded:: 2.0.0 + """ + + def __reduce__(self): + return StreamingTestResult, (float(self.pValue), + float(self.degreesOfFreedom), float(self.statistic), + str(self.method), str(self.nullHypothesis)) + + +class StreamingTest(object): + """ + .. note:: Experimental + + Online 2-sample significance testing for a stream of (Boolean, Double) pairs. The Boolean + identifies which sample each observation comes from, and the Double is the numeric value of the + observation. + + To address novelty affects, the `peacePeriod` specifies a set number of initial RDD batches of + the DStream to be dropped from significance testing. + + The `windowSize` sets the number of batches each significance test is to be performed over. The + window is sliding with a stride length of 1 batch. Setting windowSize to 0 will perform + cumulative processing, using all batches seen so far. + + Different tests may be used for assessing statistical significance depending on assumptions + satisfied by data. For more details, see StreamingTestMethod. The `testMethod` specifies + which test will be used. + + .. versionadded:: 2.0.0 + """ + + def __init__(self): + self._peacePeriod = 0 + self._windowSize = 0 + self._testMethod = "welch" + + @since('2.0.0') + def setPeacePeriod(self, peacePeriod): + """ + Update peacePeriod + :param peacePeriod: + Set number of initial RDD batches of the DStream to be dropped from significance testing. + """ + self._peacePeriod = peacePeriod + + @since('2.0.0') + def setWindowSize(self, windowSize): + """ + Update windowSize + :param windowSize: + Set the number of batches each significance test is to be performed over. + """ + self._windowSize = windowSize + + @since('2.0.0') + def setTestMethod(self, testMethod): + """ + Update test method + :param testMethod: + Currently supported tests: `welch`, `student`. + """ + assert(testMethod in ("welch", "student"), + "Currently supported tests: \"welch\", \"student\"") + self._testMethod = testMethod + + @since('2.0.0') + def registerStream(self, data): + """ + Register a data stream to get its test result. + + :param data: + The input data stream, each element is a BinarySample instance. + """ + self._validate(data) + sc = SparkContext._active_spark_context + + streamingTest = sc._jvm.org.apache.spark.mllib.stat.test.StreamingTest() + streamingTest.setPeacePeriod(self._peacePeriod) + streamingTest.setWindowSize(self._windowSize) + streamingTest.setTestMethod(self._testMethod) + + javaDStream = sc._jvm.SerDe.pythonToJava(data._jdstream, True) + testResult = streamingTest.registerStream(javaDStream) --- End diff -- Why do we need `pythonToJava` and `javaToPython`; its not used for streaming K means https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/python/pyspark/mllib/clustering.py#L773
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org