Repository: spark
Updated Branches:
  refs/heads/master 515910e9b -> 720c94fe7


[SPARK-21027][ML][PYTHON] Added tunable parallelism to one vs. rest in both 
Scala mllib and Pyspark

# What changes were proposed in this pull request?

Added tunable parallelism to the pyspark implementation of one vs. rest 
classification. Added a parallelism parameter to the Scala implementation of 
one vs. rest along with functionality for using the parameter to tune the level 
of parallelism.

I take this PR #18281 over because the original author is busy but we need 
merge this PR soon.
After this been merged, we can close #18281 .

## How was this patch tested?

Test suite added.

Author: Ajay Saini <ajays...@gmail.com>
Author: WeichenXu <weichen...@databricks.com>

Closes #19110 from WeichenXu123/spark-21027.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/720c94fe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/720c94fe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/720c94fe

Branch: refs/heads/master
Commit: 720c94fe774431a8a40215757ded3dae9f267c7f
Parents: 515910e
Author: Ajay Saini <ajays...@gmail.com>
Authored: Tue Sep 12 10:02:27 2017 -0700
Committer: Joseph K. Bradley <jos...@databricks.com>
Committed: Tue Sep 12 10:02:27 2017 -0700

----------------------------------------------------------------------
 docs/ml-guide.md                                | 18 ++++++++
 .../spark/ml/classification/OneVsRest.scala     | 45 ++++++++++++++------
 .../ml/classification/OneVsRestSuite.scala      | 42 +++++++++++++++++-
 python/pyspark/ml/classification.py             | 25 ++++++-----
 .../pyspark/ml/param/_shared_params_code_gen.py |  4 +-
 python/pyspark/ml/param/shared.py               | 24 +++++++++++
 python/pyspark/ml/tests.py                      | 16 ++++++-
 7 files changed, 146 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/720c94fe/docs/ml-guide.md
----------------------------------------------------------------------
diff --git a/docs/ml-guide.md b/docs/ml-guide.md
index 7aec6a4..f6288e7 100644
--- a/docs/ml-guide.md
+++ b/docs/ml-guide.md
@@ -105,6 +105,24 @@ MLlib is under active development.
 The APIs marked `Experimental`/`DeveloperApi` may change in future releases,
 and the migration guide below will explain all changes between releases.
 
+## From 2.2 to 2.3
+
+### Breaking changes
+
+There are no breaking changes.
+
+### Deprecations and changes of behavior
+
+**Deprecations**
+
+There are no deprecations.
+
+**Changes of behavior**
+
+* [SPARK-21027](https://issues.apache.org/jira/browse/SPARK-21027):
+ We are now setting the default parallelism used in `OneVsRest` to be 1 (i.e. 
serial), in 2.2 and earlier version,
+ the `OneVsRest` parallelism would be parallelism of the default threadpool in 
scala.
+
 ## From 2.1 to 2.2
 
 ### Breaking changes

http://git-wip-us.apache.org/repos/asf/spark/blob/720c94fe/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala 
b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
index 05b8c3a..99bb234 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
@@ -17,10 +17,10 @@
 
 package org.apache.spark.ml.classification
 
-import java.util.{List => JList}
 import java.util.UUID
 
-import scala.collection.JavaConverters._
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
 import scala.language.existentials
 
 import org.apache.hadoop.fs.Path
@@ -34,12 +34,13 @@ import org.apache.spark.ml._
 import org.apache.spark.ml.attribute._
 import org.apache.spark.ml.linalg.Vector
 import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params}
-import org.apache.spark.ml.param.shared.HasWeightCol
+import org.apache.spark.ml.param.shared.{HasParallelism, HasWeightCol}
 import org.apache.spark.ml.util._
 import org.apache.spark.sql.{DataFrame, Dataset, Row}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.ThreadUtils
 
 private[ml] trait ClassifierTypeTrait {
   // scalastyle:off structural.type
@@ -273,7 +274,7 @@ object OneVsRestModel extends MLReadable[OneVsRestModel] {
 @Since("1.4.0")
 final class OneVsRest @Since("1.4.0") (
     @Since("1.4.0") override val uid: String)
-  extends Estimator[OneVsRestModel] with OneVsRestParams with MLWritable {
+  extends Estimator[OneVsRestModel] with OneVsRestParams with HasParallelism 
with MLWritable {
 
   @Since("1.4.0")
   def this() = this(Identifiable.randomUID("oneVsRest"))
@@ -297,6 +298,16 @@ final class OneVsRest @Since("1.4.0") (
   def setPredictionCol(value: String): this.type = set(predictionCol, value)
 
   /**
+   * The implementation of parallel one vs. rest runs the classification for
+   * each class in a separate threads.
+   *
+   * @group expertSetParam
+   */
+  def setParallelism(value: Int): this.type = {
+    set(parallelism, value)
+  }
+
+  /**
    * Sets the value of param [[weightCol]].
    *
    * This is ignored if weight is not supported by [[classifier]].
@@ -318,7 +329,7 @@ final class OneVsRest @Since("1.4.0") (
     transformSchema(dataset.schema)
 
     val instr = Instrumentation.create(this, dataset)
-    instr.logParams(labelCol, featuresCol, predictionCol)
+    instr.logParams(labelCol, featuresCol, predictionCol, parallelism)
     instr.logNamedValue("classifier", $(classifier).getClass.getCanonicalName)
 
     // determine number of classes either from metadata if provided, or via 
computation.
@@ -352,8 +363,10 @@ final class OneVsRest @Since("1.4.0") (
       multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK)
     }
 
+    val executionContext = getExecutionContext
+
     // create k columns, one for each binary classifier.
-    val models = Range(0, numClasses).par.map { index =>
+    val modelFutures = Range(0, numClasses).map { index =>
       // generate new label metadata for the binary problem.
       val newLabelMeta = 
BinaryAttribute.defaultAttr.withName("label").toMetadata()
       val labelColName = "mc2b$" + index
@@ -364,14 +377,18 @@ final class OneVsRest @Since("1.4.0") (
       paramMap.put(classifier.labelCol -> labelColName)
       paramMap.put(classifier.featuresCol -> getFeaturesCol)
       paramMap.put(classifier.predictionCol -> getPredictionCol)
-      if (weightColIsUsed) {
-        val classifier_ = classifier.asInstanceOf[ClassifierType with 
HasWeightCol]
-        paramMap.put(classifier_.weightCol -> getWeightCol)
-        classifier_.fit(trainingDataset, paramMap)
-      } else {
-        classifier.fit(trainingDataset, paramMap)
-      }
-    }.toArray[ClassificationModel[_, _]]
+      Future {
+        if (weightColIsUsed) {
+          val classifier_ = classifier.asInstanceOf[ClassifierType with 
HasWeightCol]
+          paramMap.put(classifier_.weightCol -> getWeightCol)
+          classifier_.fit(trainingDataset, paramMap)
+        } else {
+          classifier.fit(trainingDataset, paramMap)
+        }
+      }(executionContext)
+    }
+    val models = modelFutures
+      .map(ThreadUtils.awaitResult(_, 
Duration.Inf)).toArray[ClassificationModel[_, _]]
     instr.logNumFeatures(models.head.numFeatures)
 
     if (handlePersistence) {

http://git-wip-us.apache.org/repos/asf/spark/blob/720c94fe/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
index 17f8282..25bad59 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
@@ -25,12 +25,12 @@ import org.apache.spark.ml.feature.StringIndexer
 import org.apache.spark.ml.linalg.Vectors
 import org.apache.spark.ml.param.{ParamMap, ParamsSuite}
 import org.apache.spark.ml.util.{DefaultReadWriteTest, MetadataUtils, 
MLTestingUtils}
+import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
 import org.apache.spark.mllib.evaluation.MulticlassMetrics
 import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
 import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint}
 import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.mllib.util.TestingUtils._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.Dataset
 import org.apache.spark.sql.functions._
@@ -98,7 +98,45 @@ class OneVsRestSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defau
     // bound how much error we allow compared to multinomial logistic 
regression.
     val expectedMetrics = new MulticlassMetrics(results)
     val ovaMetrics = new MulticlassMetrics(ovaResults)
-    assert(expectedMetrics.confusionMatrix ~== ovaMetrics.confusionMatrix 
absTol 400)
+    assert(expectedMetrics.confusionMatrix.asML ~== 
ovaMetrics.confusionMatrix.asML absTol 400)
+  }
+
+  test("one-vs-rest: tuning parallelism does not change output") {
+    val ovaPar1 = new OneVsRest()
+      .setClassifier(new LogisticRegression)
+
+    val ovaModelPar1 = ovaPar1.fit(dataset)
+
+    val transformedDatasetPar1 = ovaModelPar1.transform(dataset)
+
+    val ovaResultsPar1 = transformedDatasetPar1.select("prediction", 
"label").rdd.map {
+      row => (row.getDouble(0), row.getDouble(1))
+    }
+
+    val ovaPar2 = new OneVsRest()
+      .setClassifier(new LogisticRegression)
+      .setParallelism(2)
+
+    val ovaModelPar2 = ovaPar2.fit(dataset)
+
+    val transformedDatasetPar2 = ovaModelPar2.transform(dataset)
+
+    val ovaResultsPar2 = transformedDatasetPar2.select("prediction", 
"label").rdd.map {
+      row => (row.getDouble(0), row.getDouble(1))
+    }
+
+    val metricsPar1 = new MulticlassMetrics(ovaResultsPar1)
+    val metricsPar2 = new MulticlassMetrics(ovaResultsPar2)
+    assert(metricsPar1.confusionMatrix == metricsPar2.confusionMatrix)
+
+    ovaModelPar1.models.zip(ovaModelPar2.models).foreach {
+      case (lrModel1: LogisticRegressionModel, lrModel2: 
LogisticRegressionModel) =>
+        assert(lrModel1.coefficients ~== lrModel2.coefficients relTol 1E-3)
+        assert(lrModel1.intercept ~== lrModel2.intercept relTol 1E-3)
+      case other =>
+        throw new AssertionError(s"Loaded OneVsRestModel expected model of 
type" +
+          s" LogisticRegressionModel but found ${other.getClass.getName}")
+    }
   }
 
   test("one-vs-rest: pass label metadata correctly during train") {

http://git-wip-us.apache.org/repos/asf/spark/blob/720c94fe/python/pyspark/ml/classification.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/classification.py 
b/python/pyspark/ml/classification.py
index aa747f3..fbb9e7f 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -16,6 +16,7 @@
 #
 
 import operator
+from multiprocessing.pool import ThreadPool
 
 from pyspark import since, keyword_only
 from pyspark.ml import Estimator, Model
@@ -1567,7 +1568,7 @@ class OneVsRestParams(HasFeaturesCol, HasLabelCol, 
HasWeightCol, HasPredictionCo
 
 
 @inherit_doc
-class OneVsRest(Estimator, OneVsRestParams, JavaMLReadable, JavaMLWritable):
+class OneVsRest(Estimator, OneVsRestParams, HasParallelism, JavaMLReadable, 
JavaMLWritable):
     """
     .. note:: Experimental
 
@@ -1612,22 +1613,23 @@ class OneVsRest(Estimator, OneVsRestParams, 
JavaMLReadable, JavaMLWritable):
 
     @keyword_only
     def __init__(self, featuresCol="features", labelCol="label", 
predictionCol="prediction",
-                 classifier=None, weightCol=None):
+                 classifier=None, weightCol=None, parallelism=1):
         """
         __init__(self, featuresCol="features", labelCol="label", 
predictionCol="prediction", \
-                 classifier=None, weightCol=None)
+                 classifier=None, weightCol=None, parallelism=1):
         """
         super(OneVsRest, self).__init__()
+        self._setDefault(parallelism=1)
         kwargs = self._input_kwargs
         self._set(**kwargs)
 
     @keyword_only
     @since("2.0.0")
-    def setParams(self, featuresCol=None, labelCol=None, predictionCol=None,
-                  classifier=None, weightCol=None):
+    def setParams(self, featuresCol="features", labelCol="label", 
predictionCol="prediction",
+                  classifier=None, weightCol=None, parallelism=1):
         """
-        setParams(self, featuresCol=None, labelCol=None, predictionCol=None, \
-                  classifier=None, weightCol=None):
+        setParams(self, featuresCol="features", labelCol="label", 
predictionCol="prediction", \
+                  classifier=None, weightCol=None, parallelism=1):
         Sets params for OneVsRest.
         """
         kwargs = self._input_kwargs
@@ -1674,8 +1676,9 @@ class OneVsRest(Estimator, OneVsRestParams, 
JavaMLReadable, JavaMLWritable):
                 paramMap[classifier.weightCol] = weightCol
             return classifier.fit(trainingDataset, paramMap)
 
-        # TODO: Parallel training for all classes.
-        models = [trainSingleClass(i) for i in range(numClasses)]
+        pool = ThreadPool(processes=min(self.getParallelism(), numClasses))
+
+        models = pool.map(trainSingleClass, range(numClasses))
 
         if handlePersistence:
             multiclassLabeled.unpersist()
@@ -1709,8 +1712,9 @@ class OneVsRest(Estimator, OneVsRestParams, 
JavaMLReadable, JavaMLWritable):
         labelCol = java_stage.getLabelCol()
         predictionCol = java_stage.getPredictionCol()
         classifier = JavaParams._from_java(java_stage.getClassifier())
+        parallelism = java_stage.getParallelism()
         py_stage = cls(featuresCol=featuresCol, labelCol=labelCol, 
predictionCol=predictionCol,
-                       classifier=classifier)
+                       classifier=classifier, parallelism=parallelism)
         py_stage._resetUid(java_stage.uid())
         return py_stage
 
@@ -1723,6 +1727,7 @@ class OneVsRest(Estimator, OneVsRestParams, 
JavaMLReadable, JavaMLWritable):
         _java_obj = 
JavaParams._new_java_obj("org.apache.spark.ml.classification.OneVsRest",
                                              self.uid)
         _java_obj.setClassifier(self.getClassifier()._to_java())
+        _java_obj.setParallelism(self.getParallelism())
         _java_obj.setFeaturesCol(self.getFeaturesCol())
         _java_obj.setLabelCol(self.getLabelCol())
         _java_obj.setPredictionCol(self.getPredictionCol())

http://git-wip-us.apache.org/repos/asf/spark/blob/720c94fe/python/pyspark/ml/param/_shared_params_code_gen.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py 
b/python/pyspark/ml/param/_shared_params_code_gen.py
index 51d49b5..130d1a0 100644
--- a/python/pyspark/ml/param/_shared_params_code_gen.py
+++ b/python/pyspark/ml/param/_shared_params_code_gen.py
@@ -152,7 +152,9 @@ if __name__ == "__main__":
         ("varianceCol", "column name for the biased sample variance of 
prediction.",
          None, "TypeConverters.toString"),
         ("aggregationDepth", "suggested depth for treeAggregate (>= 2).", "2",
-         "TypeConverters.toInt")]
+         "TypeConverters.toInt"),
+        ("parallelism", "the number of threads to use when running parallel 
algorithms (>= 1).",
+         "1", "TypeConverters.toInt")]
 
     code = []
     for name, doc, defaultValueStr, typeConverter in shared:

http://git-wip-us.apache.org/repos/asf/spark/blob/720c94fe/python/pyspark/ml/param/shared.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/param/shared.py 
b/python/pyspark/ml/param/shared.py
index 163a0e2..4041d9c 100644
--- a/python/pyspark/ml/param/shared.py
+++ b/python/pyspark/ml/param/shared.py
@@ -608,6 +608,30 @@ class HasAggregationDepth(Params):
         return self.getOrDefault(self.aggregationDepth)
 
 
+class HasParallelism(Params):
+    """
+    Mixin for param parallelism: the number of threads to use when running 
parallel algorithms (>= 1).
+    """
+
+    parallelism = Param(Params._dummy(), "parallelism", "the number of threads 
to use when running parallel algorithms (>= 1).", 
typeConverter=TypeConverters.toInt)
+
+    def __init__(self):
+        super(HasParallelism, self).__init__()
+        self._setDefault(parallelism=1)
+
+    def setParallelism(self, value):
+        """
+        Sets the value of :py:attr:`parallelism`.
+        """
+        return self._set(parallelism=value)
+
+    def getParallelism(self):
+        """
+        Gets the value of parallelism or its default value.
+        """
+        return self.getOrDefault(self.parallelism)
+
+
 class DecisionTreeParams(Params):
     """
     Mixin for Decision Tree parameters.

http://git-wip-us.apache.org/repos/asf/spark/blob/720c94fe/python/pyspark/ml/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index 15d6c76..c66cd76 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -1548,11 +1548,25 @@ class OneVsRestTests(SparkSessionTestCase):
                                          (2.0, Vectors.dense(0.5, 0.5))],
                                         ["label", "features"])
         lr = LogisticRegression(maxIter=5, regParam=0.01)
-        ovr = OneVsRest(classifier=lr)
+        ovr = OneVsRest(classifier=lr, parallelism=1)
         model = ovr.fit(df)
         output = model.transform(df)
         self.assertEqual(output.columns, ["label", "features", "prediction"])
 
+    def test_parallelism_doesnt_change_output(self):
+        df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
+                                         (1.0, Vectors.sparse(2, [], [])),
+                                         (2.0, Vectors.dense(0.5, 0.5))],
+                                        ["label", "features"])
+        ovrPar1 = OneVsRest(classifier=LogisticRegression(maxIter=5, 
regParam=.01), parallelism=1)
+        modelPar1 = ovrPar1.fit(df)
+        ovrPar2 = OneVsRest(classifier=LogisticRegression(maxIter=5, 
regParam=.01), parallelism=2)
+        modelPar2 = ovrPar2.fit(df)
+        for i, model in enumerate(modelPar1.models):
+            self.assertTrue(np.allclose(model.coefficients.toArray(),
+                                        
modelPar2.models[i].coefficients.toArray(), atol=1E-4))
+            self.assertTrue(np.allclose(model.intercept, 
modelPar2.models[i].intercept, atol=1E-4))
+
     def test_support_for_weightCol(self):
         df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8), 1.0),
                                          (1.0, Vectors.sparse(2, [], []), 1.0),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to