Github user feynmanliang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11374#discussion_r85647905
  
    --- Diff: python/pyspark/mllib/stat/test.py ---
    @@ -80,3 +85,118 @@ class KolmogorovSmirnovTestResult(TestResult):
         """
         Contains test results for the Kolmogorov-Smirnov test.
         """
    +
    +
    +class BinarySample(namedtuple("BinarySample", ["isExperiment", "value"])):
    +    """
    +    Represents a (isExperiment, value) tuple.
    +
    +    .. versionadded:: 2.0.0
    +    """
    +
    +    def __reduce__(self):
    +        return BinarySample, (bool(self.isExperiment), float(self.value))
    +
    +
    +class StreamingTestResult(namedtuple("StreamingTestResult",
    +                                     ["pValue", "degreesOfFreedom", 
"statistic", "method",
    +                                      "nullHypothesis"])):
    +    """
    +    Contains test results for StreamingTest.
    +
    +    .. versionadded:: 2.0.0
    +    """
    +
    +    def __reduce__(self):
    +        return StreamingTestResult, (float(self.pValue),
    +                                     float(self.degreesOfFreedom), 
float(self.statistic),
    +                                     str(self.method), 
str(self.nullHypothesis))
    +
    +
    +class StreamingTest(object):
    +    """
    +    .. note:: Experimental
    +
    +    Online 2-sample significance testing for a stream of (Boolean, Double) 
pairs. The Boolean
    +    identifies which sample each observation comes from, and the Double is 
the numeric value of the
    +    observation.
    +
    +    To address novelty affects, the `peacePeriod` specifies a set number 
of initial RDD batches of
    +    the DStream to be dropped from significance testing.
    +
    +    The `windowSize` sets the number of batches each significance test is 
to be performed over. The
    +    window is sliding with a stride length of 1 batch. Setting windowSize 
to 0 will perform
    +    cumulative processing, using all batches seen so far.
    +
    +    Different tests may be used for assessing statistical significance 
depending on assumptions
    +    satisfied by data. For more details, see StreamingTestMethod. The 
`testMethod` specifies
    +    which test will be used.
    +
    +    .. versionadded:: 2.0.0
    +    """
    +
    +    def __init__(self):
    +        self._peacePeriod = 0
    +        self._windowSize = 0
    +        self._testMethod = "welch"
    +
    +    @since('2.0.0')
    +    def setPeacePeriod(self, peacePeriod):
    +        """
    +        Update peacePeriod
    +        :param peacePeriod:
    +          Set number of initial RDD batches of the DStream to be dropped 
from significance testing.
    +        """
    +        self._peacePeriod = peacePeriod
    +
    +    @since('2.0.0')
    +    def setWindowSize(self, windowSize):
    +        """
    +        Update windowSize
    +        :param windowSize:
    +          Set the number of batches each significance test is to be 
performed over.
    +        """
    +        self._windowSize = windowSize
    +
    +    @since('2.0.0')
    +    def setTestMethod(self, testMethod):
    +        """
    +        Update test method
    +        :param testMethod:
    +          Currently supported tests: `welch`, `student`.
    +        """
    +        assert(testMethod in ("welch", "student"),
    +               "Currently supported tests: \"welch\", \"student\"")
    +        self._testMethod = testMethod
    +
    +    @since('2.0.0')
    +    def registerStream(self, data):
    +        """
    +        Register a data stream to get its test result.
    +
    +        :param data:
    +          The input data stream, each element is a BinarySample instance.
    +        """
    +        self._validate(data)
    +        sc = SparkContext._active_spark_context
    +
    +        streamingTest = 
sc._jvm.org.apache.spark.mllib.stat.test.StreamingTest()
    +        streamingTest.setPeacePeriod(self._peacePeriod)
    +        streamingTest.setWindowSize(self._windowSize)
    +        streamingTest.setTestMethod(self._testMethod)
    +
    +        javaDStream = sc._jvm.SerDe.pythonToJava(data._jdstream, True)
    +        testResult = streamingTest.registerStream(javaDStream)
    --- End diff --
    
    Why do we need `pythonToJava` and `javaToPython`; its not used for 
streaming K means 
https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/python/pyspark/mllib/clustering.py#L773


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to