Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/6849#discussion_r33084955
--- Diff: python/pyspark/mllib/classification.py ---
@@ -580,6 +583,102 @@ def train(cls, data, lambda_=1.0):
return NaiveBayesModel(labels.toArray(), pi.toArray(),
numpy.array(theta))
+class StreamingLinearAlgorithm(object):
+ """
+ Base class that has to be inherited by any StreamingLinearAlgorithm.
+
+ Prevents reimplementation of methods predictOn and predictOnValues.
+ """
+ def __init__(self, model):
+ self._model = model
+
+ def latestModel(self):
+ """
+ Returns the latest model.
+ """
+ return self._model
+
+ def _validate(self, dstream):
+ if not isinstance(dstream, DStream):
+ raise TypeError(
+ "dstream should be a DStream object, got %s" %
type(dstream))
+ if not self._model:
+ raise ValueError(
+ "Model must be intialized using setInitialWeights")
+
+ def predictOn(self, dstream):
+ """
+ Make predictions on a dstream.
+
+ Returns a transformed dstream object.
+ """
+ self._validate(dstream)
+ return dstream.map(lambda x: self._model.predict(x))
+
+ def predictOnValues(self, dstream):
+ """
+ Make predictions on a keyed dstream.
+
+ Returns a transformed dstream object.
+ """
+ self._validate(dstream)
+ return dstream.mapValues(lambda x: self._model.predict(x))
+
+
+@inherit_doc
+class StreamingLogisticRegressionWithSGD(StreamingLinearAlgorithm):
+ """
+ Run LogisticRegression with SGD on a stream of data.
+
+ The weights obtained at the end of training a stream are used as
initial
+ weights for the next stream.
+ :param stepSize: Step size for each iteration of gradient descent.
+ :param numIterations: Number of iterations run for each batch of data.
+ :param miniBatchFraction: Fraction of data on which SGD is run for each
+ iteration.
+ :param regParam: L2 Regularization parameter.
+ """
+ def __init__(self, stepSize=0.1, numIterations=50,
+ miniBatchFraction=1.0, regParam=0.01):
+ self.stepSize = stepSize
+ self.numIterations = numIterations
+ self.regParam = regParam
+ self.miniBatchFraction = miniBatchFraction
+ self._model = None
+ super(StreamingLogisticRegressionWithSGD, self).__init__(
+ model=self._model)
+
+ def setInitialWeights(self, initialWeights):
+ """
+ Set the initial value of weights.
+
+ This must be set before running trainOn and predictOn.
+ """
+ initialWeights = _convert_to_vector(initialWeights)
+
+ # LogisticRegressionWithSGD does only binary classification.
+ self._model = LogisticRegressionModel(
+ initialWeights, 0, initialWeights.size, 2)
+
+ def trainOn(self, dstream):
+ """Train the model on the incoming dstream."""
+ self._validate(dstream)
+
+ def update(rdd):
+ # Check for empty RDD Streams. If this try except clause is
+ # removed, then calling train raises an error.
+ try:
--- End diff --
Shouldn't use `try ... except` to test this. Use `if rdd.isEmpty():`
instead.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]