This is an automated email from the ASF dual-hosted git repository.

meng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a21c4c  [SPARK-31497][ML][PYSPARK] Fix Pyspark 
CrossValidator/TrainValidationSplit with pipeline estimator cannot save and 
load model
4a21c4c is described below

commit 4a21c4cc92805b034ade0593eea3c4a9b6122083
Author: Weichen Xu <weichen...@databricks.com>
AuthorDate: Sun Apr 26 21:04:14 2020 -0700

    [SPARK-31497][ML][PYSPARK] Fix Pyspark CrossValidator/TrainValidationSplit 
with pipeline estimator cannot save and load model
    
    ### What changes were proposed in this pull request?
    Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator 
cannot save and load model.
    
    Most pyspark estimators/transformers inherit `JavaParams`, but some 
estimators are special (in order to support pure python implemented nested 
estimators/transformers):
    * Pipeline
    * OneVsRest
    * CrossValidator
    * TrainValidationSplit
    
    But note that, currently, in pyspark, estimators listed above, their model 
reader/writer do NOT support pure python implemented nested 
estimators/transformers. Because they use java reader/writer wrapper as python 
side reader/writer.
    
    Pyspark CrossValidator/TrainValidationSplit model reader/writer require all 
estimators define the `_transfer_param_map_to_java` and 
`_transfer_param_map_from_java` (used in model read/write).
    
    OneVsRest class already defines the two methods, but Pipeline do not, so it 
lead to this bug.
    
    In this PR I add `_transfer_param_map_to_java` and 
`_transfer_param_map_from_java` into Pipeline class.
    
    ### Why are the changes needed?
    Bug fix.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    Unit test.
    
    Manually test in pyspark shell:
    1) CrossValidator with Simple Pipeline estimator
    ```
    from pyspark.ml import Pipeline
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    from pyspark.ml.feature import HashingTF, Tokenizer
    from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, 
ParamGridBuilder
    
    training = spark.createDataFrame([
        (0, "a b c d e spark", 1.0),
        (1, "b d", 0.0),
        (2, "spark f g h", 1.0),
        (3, "hadoop mapreduce", 0.0),
        (4, "b spark who", 1.0),
        (5, "g d a y", 0.0),
        (6, "spark fly", 1.0),
        (7, "was mapreduce", 0.0),
    ], ["id", "text", "label"])
    
    # Configure an ML pipeline, which consists of tree stages: tokenizer, 
hashingTF, and lr.
    tokenizer = Tokenizer(inputCol="text", outputCol="words")
    hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), 
outputCol="features")
    lr = LogisticRegression(maxIter=10)
    pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
    
    paramGrid = ParamGridBuilder() \
        .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
        .addGrid(lr.regParam, [0.1, 0.01]) \
        .build()
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=BinaryClassificationEvaluator(),
                              numFolds=2)  # use 3+ folds in practice
    
    # Run cross-validation, and choose the best set of parameters.
    cvModel = crossval.fit(training)
    
    cvModel.save('/tmp/cv_model001')
    CrossValidatorModel.load('/tmp/cv_model001')
    ```
    
    2) CrossValidator with Pipeline estimator which include a OneVsRest 
estimator stage, and OneVsRest estimator nest a LogisticRegression estimator.
    
    ```
    from pyspark.ml.linalg import Vectors
    from pyspark.ml import Estimator, Model
    from pyspark.ml.classification import LogisticRegression, 
LogisticRegressionModel, OneVsRest
    from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
        MulticlassClassificationEvaluator, RegressionEvaluator
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.param import Param, Params
    from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, 
ParamGridBuilder, \
        TrainValidationSplit, TrainValidationSplitModel
    from pyspark.sql.functions import rand
    from pyspark.testing.mlutils import SparkSessionTestCase
    
    dataset = spark.createDataFrame(
        [(Vectors.dense([0.0]), 0.0),
         (Vectors.dense([0.4]), 1.0),
         (Vectors.dense([0.5]), 0.0),
         (Vectors.dense([0.6]), 1.0),
         (Vectors.dense([1.0]), 1.0)] * 10,
        ["features", "label"])
    
    ova = OneVsRest(classifier=LogisticRegression())
    lr1 = LogisticRegression().setMaxIter(100)
    lr2 = LogisticRegression().setMaxIter(150)
    grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
    evaluator = MulticlassClassificationEvaluator()
    
    pipeline = Pipeline(stages=[ova])
    
    cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, 
evaluator=evaluator)
    cvModel = cv.fit(dataset)
    cvModel.save('/tmp/model002')
    
    cvModel2 = CrossValidatorModel.load('/tmp/model002')
    ```
    
    TrainValidationSplit testing code are similar so I do not paste them.
    
    Closes #28279 from WeichenXu123/fix_pipeline_tuning.
    
    Authored-by: Weichen Xu <weichen...@databricks.com>
    Signed-off-by: Xiangrui Meng <m...@databricks.com>
---
 python/pyspark/ml/pipeline.py          |  53 ++++++++++++-
 python/pyspark/ml/tests/test_tuning.py | 139 ++++++++++++++++++++++++++++++++-
 2 files changed, 189 insertions(+), 3 deletions(-)

diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py
index 09e0748..53d07ec 100644
--- a/python/pyspark/ml/pipeline.py
+++ b/python/pyspark/ml/pipeline.py
@@ -25,8 +25,8 @@ from pyspark import since, keyword_only, SparkContext
 from pyspark.ml.base import Estimator, Model, Transformer
 from pyspark.ml.param import Param, Params
 from pyspark.ml.util import *
-from pyspark.ml.wrapper import JavaParams
-from pyspark.ml.common import inherit_doc
+from pyspark.ml.wrapper import JavaParams, JavaWrapper
+from pyspark.ml.common import inherit_doc, _java2py, _py2java
 
 
 @inherit_doc
@@ -174,6 +174,55 @@ class Pipeline(Estimator, MLReadable, MLWritable):
 
         return _java_obj
 
+    def _make_java_param_pair(self, param, value):
+        """
+        Makes a Java param pair.
+        """
+        sc = SparkContext._active_spark_context
+        param = self._resolveParam(param)
+        java_param = sc._jvm.org.apache.spark.ml.param.Param(param.parent, 
param.name, param.doc)
+        if isinstance(value, Params) and hasattr(value, "_to_java"):
+            # Convert JavaEstimator/JavaTransformer object or 
Estimator/Transformer object which
+            # implements `_to_java` method (such as OneVsRest, Pipeline 
object) to java object.
+            # used in the case of an estimator having another estimator as a 
parameter
+            # the reason why this is not in _py2java in common.py is that 
importing
+            # Estimator and Model in common.py results in a circular import 
with inherit_doc
+            java_value = value._to_java()
+        else:
+            java_value = _py2java(sc, value)
+        return java_param.w(java_value)
+
+    def _transfer_param_map_to_java(self, pyParamMap):
+        """
+        Transforms a Python ParamMap into a Java ParamMap.
+        """
+        paramMap = 
JavaWrapper._new_java_obj("org.apache.spark.ml.param.ParamMap")
+        for param in self.params:
+            if param in pyParamMap:
+                pair = self._make_java_param_pair(param, pyParamMap[param])
+                paramMap.put([pair])
+        return paramMap
+
+    def _transfer_param_map_from_java(self, javaParamMap):
+        """
+        Transforms a Java ParamMap into a Python ParamMap.
+        """
+        sc = SparkContext._active_spark_context
+        paramMap = dict()
+        for pair in javaParamMap.toList():
+            param = pair.param()
+            if self.hasParam(str(param.name())):
+                java_obj = pair.value()
+                if 
sc._jvm.Class.forName("org.apache.spark.ml.PipelineStage").isInstance(java_obj):
+                    # Note: JavaParams._from_java support both 
JavaEstimator/JavaTransformer class
+                    # and Estimator/Transformer class which implements 
`_from_java` static method
+                    # (such as OneVsRest, Pipeline class).
+                    py_obj = JavaParams._from_java(java_obj)
+                else:
+                    py_obj = _java2py(sc, java_obj)
+                paramMap[self.getParam(param.name())] = py_obj
+        return paramMap
+
 
 @inherit_doc
 class PipelineWriter(MLWriter):
diff --git a/python/pyspark/ml/tests/test_tuning.py 
b/python/pyspark/ml/tests/test_tuning.py
index 9d8ba37..6bcc3f9 100644
--- a/python/pyspark/ml/tests/test_tuning.py
+++ b/python/pyspark/ml/tests/test_tuning.py
@@ -18,7 +18,8 @@
 import tempfile
 import unittest
 
-from pyspark.ml import Estimator, Model
+from pyspark.ml.feature import HashingTF, Tokenizer
+from pyspark.ml import Estimator, Pipeline, Model
 from pyspark.ml.classification import LogisticRegression, 
LogisticRegressionModel, OneVsRest
 from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
     MulticlassClassificationEvaluator, RegressionEvaluator
@@ -310,6 +311,75 @@ class CrossValidatorTests(SparkSessionTestCase):
         loadedModel = CrossValidatorModel.load(cvModelPath)
         self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
 
+    def test_save_load_pipeline_estimator(self):
+        temp_path = tempfile.mkdtemp()
+        training = self.spark.createDataFrame([
+            (0, "a b c d e spark", 1.0),
+            (1, "b d", 0.0),
+            (2, "spark f g h", 1.0),
+            (3, "hadoop mapreduce", 0.0),
+            (4, "b spark who", 1.0),
+            (5, "g d a y", 0.0),
+            (6, "spark fly", 1.0),
+            (7, "was mapreduce", 0.0),
+        ], ["id", "text", "label"])
+
+        # Configure an ML pipeline, which consists of tree stages: tokenizer, 
hashingTF, and lr.
+        tokenizer = Tokenizer(inputCol="text", outputCol="words")
+        hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), 
outputCol="features")
+
+        ova = OneVsRest(classifier=LogisticRegression())
+        lr1 = LogisticRegression().setMaxIter(5)
+        lr2 = LogisticRegression().setMaxIter(10)
+
+        pipeline = Pipeline(stages=[tokenizer, hashingTF, ova])
+
+        paramGrid = ParamGridBuilder() \
+            .addGrid(hashingTF.numFeatures, [10, 100]) \
+            .addGrid(ova.classifier, [lr1, lr2]) \
+            .build()
+
+        crossval = CrossValidator(estimator=pipeline,
+                                  estimatorParamMaps=paramGrid,
+                                  
evaluator=MulticlassClassificationEvaluator(),
+                                  numFolds=2)  # use 3+ folds in practice
+
+        # Run cross-validation, and choose the best set of parameters.
+        cvModel = crossval.fit(training)
+
+        # test save/load of CrossValidatorModel
+        cvModelPath = temp_path + "/cvModel"
+        cvModel.save(cvModelPath)
+        loadedModel = CrossValidatorModel.load(cvModelPath)
+        self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
+        self.assertEqual(len(loadedModel.bestModel.stages), 
len(cvModel.bestModel.stages))
+        for loadedStage, originalStage in zip(loadedModel.bestModel.stages,
+                                              cvModel.bestModel.stages):
+            self.assertEqual(loadedStage.uid, originalStage.uid)
+
+        # Test nested pipeline
+        nested_pipeline = Pipeline(stages=[tokenizer, 
Pipeline(stages=[hashingTF, ova])])
+        crossval2 = CrossValidator(estimator=nested_pipeline,
+                                   estimatorParamMaps=paramGrid,
+                                   
evaluator=MulticlassClassificationEvaluator(),
+                                   numFolds=2)  # use 3+ folds in practice
+
+        # Run cross-validation, and choose the best set of parameters.
+        cvModel2 = crossval2.fit(training)
+        # test save/load of CrossValidatorModel
+        cvModelPath2 = temp_path + "/cvModel2"
+        cvModel2.save(cvModelPath2)
+        loadedModel2 = CrossValidatorModel.load(cvModelPath2)
+        self.assertEqual(loadedModel2.bestModel.uid, cvModel2.bestModel.uid)
+        loaded_nested_pipeline_model = loadedModel2.bestModel.stages[1]
+        original_nested_pipeline_model = cvModel2.bestModel.stages[1]
+        self.assertEqual(loaded_nested_pipeline_model.uid, 
original_nested_pipeline_model.uid)
+        self.assertEqual(len(loaded_nested_pipeline_model.stages),
+                         len(original_nested_pipeline_model.stages))
+        for loadedStage, originalStage in 
zip(loaded_nested_pipeline_model.stages,
+                                              
original_nested_pipeline_model.stages):
+            self.assertEqual(loadedStage.uid, originalStage.uid)
+
 
 class TrainValidationSplitTests(SparkSessionTestCase):
 
@@ -511,6 +581,73 @@ class TrainValidationSplitTests(SparkSessionTestCase):
         loadedModel = TrainValidationSplitModel.load(tvsModelPath)
         self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
 
+    def test_save_load_pipeline_estimator(self):
+        temp_path = tempfile.mkdtemp()
+        training = self.spark.createDataFrame([
+            (0, "a b c d e spark", 1.0),
+            (1, "b d", 0.0),
+            (2, "spark f g h", 1.0),
+            (3, "hadoop mapreduce", 0.0),
+            (4, "b spark who", 1.0),
+            (5, "g d a y", 0.0),
+            (6, "spark fly", 1.0),
+            (7, "was mapreduce", 0.0),
+        ], ["id", "text", "label"])
+
+        # Configure an ML pipeline, which consists of tree stages: tokenizer, 
hashingTF, and lr.
+        tokenizer = Tokenizer(inputCol="text", outputCol="words")
+        hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), 
outputCol="features")
+
+        ova = OneVsRest(classifier=LogisticRegression())
+        lr1 = LogisticRegression().setMaxIter(5)
+        lr2 = LogisticRegression().setMaxIter(10)
+
+        pipeline = Pipeline(stages=[tokenizer, hashingTF, ova])
+
+        paramGrid = ParamGridBuilder() \
+            .addGrid(hashingTF.numFeatures, [10, 100]) \
+            .addGrid(ova.classifier, [lr1, lr2]) \
+            .build()
+
+        tvs = TrainValidationSplit(estimator=pipeline,
+                                   estimatorParamMaps=paramGrid,
+                                   
evaluator=MulticlassClassificationEvaluator())
+
+        # Run train validation split, and choose the best set of parameters.
+        tvsModel = tvs.fit(training)
+
+        # test save/load of CrossValidatorModel
+        tvsModelPath = temp_path + "/tvsModel"
+        tvsModel.save(tvsModelPath)
+        loadedModel = TrainValidationSplitModel.load(tvsModelPath)
+        self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
+        self.assertEqual(len(loadedModel.bestModel.stages), 
len(tvsModel.bestModel.stages))
+        for loadedStage, originalStage in zip(loadedModel.bestModel.stages,
+                                              tvsModel.bestModel.stages):
+            self.assertEqual(loadedStage.uid, originalStage.uid)
+
+        # Test nested pipeline
+        nested_pipeline = Pipeline(stages=[tokenizer, 
Pipeline(stages=[hashingTF, ova])])
+        tvs2 = TrainValidationSplit(estimator=nested_pipeline,
+                                    estimatorParamMaps=paramGrid,
+                                    
evaluator=MulticlassClassificationEvaluator())
+
+        # Run train validation split, and choose the best set of parameters.
+        tvsModel2 = tvs2.fit(training)
+        # test save/load of CrossValidatorModel
+        tvsModelPath2 = temp_path + "/tvsModel2"
+        tvsModel2.save(tvsModelPath2)
+        loadedModel2 = TrainValidationSplitModel.load(tvsModelPath2)
+        self.assertEqual(loadedModel2.bestModel.uid, tvsModel2.bestModel.uid)
+        loaded_nested_pipeline_model = loadedModel2.bestModel.stages[1]
+        original_nested_pipeline_model = tvsModel2.bestModel.stages[1]
+        self.assertEqual(loaded_nested_pipeline_model.uid, 
original_nested_pipeline_model.uid)
+        self.assertEqual(len(loaded_nested_pipeline_model.stages),
+                         len(original_nested_pipeline_model.stages))
+        for loadedStage, originalStage in 
zip(loaded_nested_pipeline_model.stages,
+                                              
original_nested_pipeline_model.stages):
+            self.assertEqual(loadedStage.uid, originalStage.uid)
+
     def test_copy(self):
         dataset = self.spark.createDataFrame([
             (10, 10.0),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to