Github user jkbradley commented on a diff in the pull request:
https://github.com/apache/spark/pull/12124#discussion_r58811068
--- Diff: python/pyspark/ml/classification.py ---
@@ -1134,6 +1139,216 @@ def weights(self):
return self._call_java("weights")
+@inherit_doc
+class OneVsRest(Estimator, HasFeaturesCol, HasLabelCol, HasPredictionCol):
+ """
+ Reduction of Multiclass Classification to Binary Classification.
+ Performs reduction using one against all strategy.
+ For a multiclass classification with k classes, train k models (one
per class).
+ Each example is scored against all k models and the model with highest
score
+ is picked to label the example.
+
+ >>> from pyspark.sql import Row
+ >>> from pyspark.mllib.linalg import Vectors
+ >>> df = sc.parallelize([
+ ... Row(label=0.0, features=Vectors.dense(1.0, 0.8)),
+ ... Row(label=1.0, features=Vectors.sparse(2, [], [])),
+ ... Row(label=2.0, features=Vectors.dense(0.5, 0.5))]).toDF()
+ >>> lr = LogisticRegression(maxIter=5, regParam=0.01)
+ >>> ovr = OneVsRest(classifier=lr).setPredictionCol("indexed")
+ >>> model = ovr.fit(df)
+ >>> [x.coefficients for x in model.models]
+ [DenseVector([3.3925, 1.8785]), DenseVector([-4.3016, -6.3163]),
DenseVector([-4.5855, 6.1785])]
+ >>> [x.intercept for x in model.models]
+ [-3.6474708290602034, 2.5507881951814495, -1.1016513228162115]
+ >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0,
0.0))]).toDF()
+ >>> model.transform(test0).head().indexed
+ 1.0
+ >>> test1 = sc.parallelize([Row(features=Vectors.sparse(2, [0],
[1.0]))]).toDF()
+ >>> model.transform(test1).head().indexed
+ 0.0
+ >>> test2 = sc.parallelize([Row(features=Vectors.dense(0.5,
0.4))]).toDF()
+ >>> model.transform(test2).head().indexed
+ 2.0
+
+ .. versionadded:: 2.0.0
+ """
+
+ # a placeholder to make it appear in the generated doc
+ classifier = Param(Params._dummy(), "classifier", "base binary
classifier")
+
+ @keyword_only
+ def __init__(self, featuresCol="features", labelCol="label",
predictionCol="prediction",
+ classifier=None):
+ """
+ __init__(self, featuresCol="features", labelCol="label",
predictionCol="prediction", \
+ classifier=None)
+ """
+ super(OneVsRest, self).__init__()
+ kwargs = self.__init__._input_kwargs
+ self._set(**kwargs)
+
+ @keyword_only
+ @since("2.0.0")
+ def setParams(self, featuresCol=None, labelCol=None,
predictionCol=None, classifier=None):
+ """
+ setParams(self, featuresCol=None, labelCol=None,
predictionCol=None, classifier=None):
+ Sets params for OneVsRest.
+ """
+ kwargs = self.setParams._input_kwargs
+ return self._set(**kwargs)
+
+ @since("2.0.0")
+ def setClassifier(self, value):
+ """
+ Sets the value of :py:attr:`classifier`.
+ """
+ self._paramMap[self.classifier] = value
+ return self
+
+ @since("2.0.0")
+ def getClassifier(self):
+ """
+ Gets the value of classifier or its default value.
+ """
+ return self.getOrDefault(self.classifier)
+
+ def _fit(self, dataset):
+ labelCol = self.getLabelCol()
+ featuresCol = self.getFeaturesCol()
+ predictionCol = self.getPredictionCol()
+ classifier = self.getClassifier()
+
+ numClasses = int(dataset.agg({labelCol:
"max"}).head()["max("+labelCol+")"]) + 1
+
+ multiclassLabeled = dataset.select(labelCol, featuresCol)
+
+ # persist if underlying dataset is not persistent.
+ handlePersistence = \
+ dataset.rdd.getStorageLevel() == StorageLevel(False, False,
False, False)
+ if handlePersistence:
+ multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK)
+
+ def trainSingleClass(index):
+ binaryLabelCol = "mc2b$" + str(index)
+ trainingDataset = multiclassLabeled.withColumn(
+ binaryLabelCol,
+ when(multiclassLabeled[labelCol] == float(index),
1.0).otherwise(0.0))
+ paramMap = dict([(classifier.labelCol, binaryLabelCol),
+ (classifier.featuresCol, featuresCol),
+ (classifier.predictionCol, predictionCol)])
+ duplicatedClassifier = classifier.__class__()
+ duplicatedClassifier._resetUid(classifier.uid)
+ classifier._copyValues(duplicatedClassifier)
+ return duplicatedClassifier.fit(trainingDataset, paramMap)
--- End diff --
Thanks for doing this. But...I just talked with Josh, who strongly
recommended not using multiprocessing for fear of some possible side-effects.
Would you mind reverting the change and just training one model at a time? My
apologies for the switch!
I'd like us to do multiple jobs at once in the future, but we should do
more careful prototyping and testing than we have time for in Spark 2.0. I'll
make a new JIRA and link it to this one.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]