Github user MechCoder commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6849#discussion_r33152107
  
    --- Diff: python/pyspark/mllib/classification.py ---
    @@ -580,6 +583,102 @@ def train(cls, data, lambda_=1.0):
             return NaiveBayesModel(labels.toArray(), pi.toArray(), 
numpy.array(theta))
     
     
    +class StreamingLinearAlgorithm(object):
    +    """
    +    Base class that has to be inherited by any StreamingLinearAlgorithm.
    +
    +    Prevents reimplementation of methods predictOn and predictOnValues.
    +    """
    +    def __init__(self, model):
    +        self._model = model
    +
    +    def latestModel(self):
    +        """
    +        Returns the latest model.
    +        """
    +        return self._model
    +
    +    def _validate(self, dstream):
    +        if not isinstance(dstream, DStream):
    +            raise TypeError(
    +                "dstream should be a DStream object, got %s" % 
type(dstream))
    +        if not self._model:
    +            raise ValueError(
    +                "Model must be intialized using setInitialWeights")
    +
    +    def predictOn(self, dstream):
    +        """
    +        Make predictions on a dstream.
    +
    +        Returns a transformed dstream object.
    +        """
    +        self._validate(dstream)
    +        return dstream.map(lambda x: self._model.predict(x))
    +
    +    def predictOnValues(self, dstream):
    +        """
    +        Make predictions on a keyed dstream.
    +
    +        Returns a transformed dstream object.
    +        """
    +        self._validate(dstream)
    +        return dstream.mapValues(lambda x: self._model.predict(x))
    +
    +
    +@inherit_doc
    +class StreamingLogisticRegressionWithSGD(StreamingLinearAlgorithm):
    +    """
    +    Run LogisticRegression with SGD on a stream of data.
    +
    +    The weights obtained at the end of training a stream are used as 
initial
    +    weights for the next stream.
    +    :param stepSize: Step size for each iteration of gradient descent.
    +    :param numIterations: Number of iterations run for each batch of data.
    +    :param miniBatchFraction: Fraction of data on which SGD is run for each
    +                              iteration.
    +    :param regParam: L2 Regularization parameter.
    +    """
    +    def __init__(self, stepSize=0.1, numIterations=50,
    --- End diff --
    
    Is it not 79? Or is the convention in PySpark 100?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to