This is an automated email from the ASF dual-hosted git repository. meng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4a21c4c [SPARK-31497][ML][PYSPARK] Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model 4a21c4c is described below commit 4a21c4cc92805b034ade0593eea3c4a9b6122083 Author: Weichen Xu <weichen...@databricks.com> AuthorDate: Sun Apr 26 21:04:14 2020 -0700 [SPARK-31497][ML][PYSPARK] Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model ### What changes were proposed in this pull request? Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator cannot save and load model. Most pyspark estimators/transformers inherit `JavaParams`, but some estimators are special (in order to support pure python implemented nested estimators/transformers): * Pipeline * OneVsRest * CrossValidator * TrainValidationSplit But note that, currently, in pyspark, estimators listed above, their model reader/writer do NOT support pure python implemented nested estimators/transformers. Because they use java reader/writer wrapper as python side reader/writer. Pyspark CrossValidator/TrainValidationSplit model reader/writer require all estimators define the `_transfer_param_map_to_java` and `_transfer_param_map_from_java` (used in model read/write). OneVsRest class already defines the two methods, but Pipeline do not, so it lead to this bug. In this PR I add `_transfer_param_map_to_java` and `_transfer_param_map_from_java` into Pipeline class. ### Why are the changes needed? Bug fix. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit test. Manually test in pyspark shell: 1) CrossValidator with Simple Pipeline estimator ``` from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.evaluation import BinaryClassificationEvaluator from pyspark.ml.feature import HashingTF, Tokenizer from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder training = spark.createDataFrame([ (0, "a b c d e spark", 1.0), (1, "b d", 0.0), (2, "spark f g h", 1.0), (3, "hadoop mapreduce", 0.0), (4, "b spark who", 1.0), (5, "g d a y", 0.0), (6, "spark fly", 1.0), (7, "was mapreduce", 0.0), ], ["id", "text", "label"]) # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr. tokenizer = Tokenizer(inputCol="text", outputCol="words") hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") lr = LogisticRegression(maxIter=10) pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) paramGrid = ParamGridBuilder() \ .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \ .addGrid(lr.regParam, [0.1, 0.01]) \ .build() crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(), numFolds=2) # use 3+ folds in practice # Run cross-validation, and choose the best set of parameters. cvModel = crossval.fit(training) cvModel.save('/tmp/cv_model001') CrossValidatorModel.load('/tmp/cv_model001') ``` 2) CrossValidator with Pipeline estimator which include a OneVsRest estimator stage, and OneVsRest estimator nest a LogisticRegression estimator. ``` from pyspark.ml.linalg import Vectors from pyspark.ml import Estimator, Model from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest from pyspark.ml.evaluation import BinaryClassificationEvaluator, \ MulticlassClassificationEvaluator, RegressionEvaluator from pyspark.ml.linalg import Vectors from pyspark.ml.param import Param, Params from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, \ TrainValidationSplit, TrainValidationSplitModel from pyspark.sql.functions import rand from pyspark.testing.mlutils import SparkSessionTestCase dataset = spark.createDataFrame( [(Vectors.dense([0.0]), 0.0), (Vectors.dense([0.4]), 1.0), (Vectors.dense([0.5]), 0.0), (Vectors.dense([0.6]), 1.0), (Vectors.dense([1.0]), 1.0)] * 10, ["features", "label"]) ova = OneVsRest(classifier=LogisticRegression()) lr1 = LogisticRegression().setMaxIter(100) lr2 = LogisticRegression().setMaxIter(150) grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build() evaluator = MulticlassClassificationEvaluator() pipeline = Pipeline(stages=[ova]) cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator) cvModel = cv.fit(dataset) cvModel.save('/tmp/model002') cvModel2 = CrossValidatorModel.load('/tmp/model002') ``` TrainValidationSplit testing code are similar so I do not paste them. Closes #28279 from WeichenXu123/fix_pipeline_tuning. Authored-by: Weichen Xu <weichen...@databricks.com> Signed-off-by: Xiangrui Meng <m...@databricks.com> --- python/pyspark/ml/pipeline.py | 53 ++++++++++++- python/pyspark/ml/tests/test_tuning.py | 139 ++++++++++++++++++++++++++++++++- 2 files changed, 189 insertions(+), 3 deletions(-) diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py index 09e0748..53d07ec 100644 --- a/python/pyspark/ml/pipeline.py +++ b/python/pyspark/ml/pipeline.py @@ -25,8 +25,8 @@ from pyspark import since, keyword_only, SparkContext from pyspark.ml.base import Estimator, Model, Transformer from pyspark.ml.param import Param, Params from pyspark.ml.util import * -from pyspark.ml.wrapper import JavaParams -from pyspark.ml.common import inherit_doc +from pyspark.ml.wrapper import JavaParams, JavaWrapper +from pyspark.ml.common import inherit_doc, _java2py, _py2java @inherit_doc @@ -174,6 +174,55 @@ class Pipeline(Estimator, MLReadable, MLWritable): return _java_obj + def _make_java_param_pair(self, param, value): + """ + Makes a Java param pair. + """ + sc = SparkContext._active_spark_context + param = self._resolveParam(param) + java_param = sc._jvm.org.apache.spark.ml.param.Param(param.parent, param.name, param.doc) + if isinstance(value, Params) and hasattr(value, "_to_java"): + # Convert JavaEstimator/JavaTransformer object or Estimator/Transformer object which + # implements `_to_java` method (such as OneVsRest, Pipeline object) to java object. + # used in the case of an estimator having another estimator as a parameter + # the reason why this is not in _py2java in common.py is that importing + # Estimator and Model in common.py results in a circular import with inherit_doc + java_value = value._to_java() + else: + java_value = _py2java(sc, value) + return java_param.w(java_value) + + def _transfer_param_map_to_java(self, pyParamMap): + """ + Transforms a Python ParamMap into a Java ParamMap. + """ + paramMap = JavaWrapper._new_java_obj("org.apache.spark.ml.param.ParamMap") + for param in self.params: + if param in pyParamMap: + pair = self._make_java_param_pair(param, pyParamMap[param]) + paramMap.put([pair]) + return paramMap + + def _transfer_param_map_from_java(self, javaParamMap): + """ + Transforms a Java ParamMap into a Python ParamMap. + """ + sc = SparkContext._active_spark_context + paramMap = dict() + for pair in javaParamMap.toList(): + param = pair.param() + if self.hasParam(str(param.name())): + java_obj = pair.value() + if sc._jvm.Class.forName("org.apache.spark.ml.PipelineStage").isInstance(java_obj): + # Note: JavaParams._from_java support both JavaEstimator/JavaTransformer class + # and Estimator/Transformer class which implements `_from_java` static method + # (such as OneVsRest, Pipeline class). + py_obj = JavaParams._from_java(java_obj) + else: + py_obj = _java2py(sc, java_obj) + paramMap[self.getParam(param.name())] = py_obj + return paramMap + @inherit_doc class PipelineWriter(MLWriter): diff --git a/python/pyspark/ml/tests/test_tuning.py b/python/pyspark/ml/tests/test_tuning.py index 9d8ba37..6bcc3f9 100644 --- a/python/pyspark/ml/tests/test_tuning.py +++ b/python/pyspark/ml/tests/test_tuning.py @@ -18,7 +18,8 @@ import tempfile import unittest -from pyspark.ml import Estimator, Model +from pyspark.ml.feature import HashingTF, Tokenizer +from pyspark.ml import Estimator, Pipeline, Model from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest from pyspark.ml.evaluation import BinaryClassificationEvaluator, \ MulticlassClassificationEvaluator, RegressionEvaluator @@ -310,6 +311,75 @@ class CrossValidatorTests(SparkSessionTestCase): loadedModel = CrossValidatorModel.load(cvModelPath) self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid) + def test_save_load_pipeline_estimator(self): + temp_path = tempfile.mkdtemp() + training = self.spark.createDataFrame([ + (0, "a b c d e spark", 1.0), + (1, "b d", 0.0), + (2, "spark f g h", 1.0), + (3, "hadoop mapreduce", 0.0), + (4, "b spark who", 1.0), + (5, "g d a y", 0.0), + (6, "spark fly", 1.0), + (7, "was mapreduce", 0.0), + ], ["id", "text", "label"]) + + # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr. + tokenizer = Tokenizer(inputCol="text", outputCol="words") + hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") + + ova = OneVsRest(classifier=LogisticRegression()) + lr1 = LogisticRegression().setMaxIter(5) + lr2 = LogisticRegression().setMaxIter(10) + + pipeline = Pipeline(stages=[tokenizer, hashingTF, ova]) + + paramGrid = ParamGridBuilder() \ + .addGrid(hashingTF.numFeatures, [10, 100]) \ + .addGrid(ova.classifier, [lr1, lr2]) \ + .build() + + crossval = CrossValidator(estimator=pipeline, + estimatorParamMaps=paramGrid, + evaluator=MulticlassClassificationEvaluator(), + numFolds=2) # use 3+ folds in practice + + # Run cross-validation, and choose the best set of parameters. + cvModel = crossval.fit(training) + + # test save/load of CrossValidatorModel + cvModelPath = temp_path + "/cvModel" + cvModel.save(cvModelPath) + loadedModel = CrossValidatorModel.load(cvModelPath) + self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid) + self.assertEqual(len(loadedModel.bestModel.stages), len(cvModel.bestModel.stages)) + for loadedStage, originalStage in zip(loadedModel.bestModel.stages, + cvModel.bestModel.stages): + self.assertEqual(loadedStage.uid, originalStage.uid) + + # Test nested pipeline + nested_pipeline = Pipeline(stages=[tokenizer, Pipeline(stages=[hashingTF, ova])]) + crossval2 = CrossValidator(estimator=nested_pipeline, + estimatorParamMaps=paramGrid, + evaluator=MulticlassClassificationEvaluator(), + numFolds=2) # use 3+ folds in practice + + # Run cross-validation, and choose the best set of parameters. + cvModel2 = crossval2.fit(training) + # test save/load of CrossValidatorModel + cvModelPath2 = temp_path + "/cvModel2" + cvModel2.save(cvModelPath2) + loadedModel2 = CrossValidatorModel.load(cvModelPath2) + self.assertEqual(loadedModel2.bestModel.uid, cvModel2.bestModel.uid) + loaded_nested_pipeline_model = loadedModel2.bestModel.stages[1] + original_nested_pipeline_model = cvModel2.bestModel.stages[1] + self.assertEqual(loaded_nested_pipeline_model.uid, original_nested_pipeline_model.uid) + self.assertEqual(len(loaded_nested_pipeline_model.stages), + len(original_nested_pipeline_model.stages)) + for loadedStage, originalStage in zip(loaded_nested_pipeline_model.stages, + original_nested_pipeline_model.stages): + self.assertEqual(loadedStage.uid, originalStage.uid) + class TrainValidationSplitTests(SparkSessionTestCase): @@ -511,6 +581,73 @@ class TrainValidationSplitTests(SparkSessionTestCase): loadedModel = TrainValidationSplitModel.load(tvsModelPath) self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid) + def test_save_load_pipeline_estimator(self): + temp_path = tempfile.mkdtemp() + training = self.spark.createDataFrame([ + (0, "a b c d e spark", 1.0), + (1, "b d", 0.0), + (2, "spark f g h", 1.0), + (3, "hadoop mapreduce", 0.0), + (4, "b spark who", 1.0), + (5, "g d a y", 0.0), + (6, "spark fly", 1.0), + (7, "was mapreduce", 0.0), + ], ["id", "text", "label"]) + + # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr. + tokenizer = Tokenizer(inputCol="text", outputCol="words") + hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") + + ova = OneVsRest(classifier=LogisticRegression()) + lr1 = LogisticRegression().setMaxIter(5) + lr2 = LogisticRegression().setMaxIter(10) + + pipeline = Pipeline(stages=[tokenizer, hashingTF, ova]) + + paramGrid = ParamGridBuilder() \ + .addGrid(hashingTF.numFeatures, [10, 100]) \ + .addGrid(ova.classifier, [lr1, lr2]) \ + .build() + + tvs = TrainValidationSplit(estimator=pipeline, + estimatorParamMaps=paramGrid, + evaluator=MulticlassClassificationEvaluator()) + + # Run train validation split, and choose the best set of parameters. + tvsModel = tvs.fit(training) + + # test save/load of CrossValidatorModel + tvsModelPath = temp_path + "/tvsModel" + tvsModel.save(tvsModelPath) + loadedModel = TrainValidationSplitModel.load(tvsModelPath) + self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid) + self.assertEqual(len(loadedModel.bestModel.stages), len(tvsModel.bestModel.stages)) + for loadedStage, originalStage in zip(loadedModel.bestModel.stages, + tvsModel.bestModel.stages): + self.assertEqual(loadedStage.uid, originalStage.uid) + + # Test nested pipeline + nested_pipeline = Pipeline(stages=[tokenizer, Pipeline(stages=[hashingTF, ova])]) + tvs2 = TrainValidationSplit(estimator=nested_pipeline, + estimatorParamMaps=paramGrid, + evaluator=MulticlassClassificationEvaluator()) + + # Run train validation split, and choose the best set of parameters. + tvsModel2 = tvs2.fit(training) + # test save/load of CrossValidatorModel + tvsModelPath2 = temp_path + "/tvsModel2" + tvsModel2.save(tvsModelPath2) + loadedModel2 = TrainValidationSplitModel.load(tvsModelPath2) + self.assertEqual(loadedModel2.bestModel.uid, tvsModel2.bestModel.uid) + loaded_nested_pipeline_model = loadedModel2.bestModel.stages[1] + original_nested_pipeline_model = tvsModel2.bestModel.stages[1] + self.assertEqual(loaded_nested_pipeline_model.uid, original_nested_pipeline_model.uid) + self.assertEqual(len(loaded_nested_pipeline_model.stages), + len(original_nested_pipeline_model.stages)) + for loadedStage, originalStage in zip(loaded_nested_pipeline_model.stages, + original_nested_pipeline_model.stages): + self.assertEqual(loadedStage.uid, originalStage.uid) + def test_copy(self): dataset = self.spark.createDataFrame([ (10, 10.0), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org