WeichenXu123 opened a new pull request #28279:
URL: https://github.com/apache/spark/pull/28279


   ### What changes were proposed in this pull request?
   Fix Pyspark CrossValidator/TrainValidationSplit with pipeline estimator 
cannot save and load model.
   
   Most pyspark estimators/transformers inherit `JavaParams`, but some 
estimators are special (in order to support pure python implemented nested 
estimators/transformers):
   * Pipeline
   * OneVsRest
   * CrossValidator
   * TrainValidationSplit
   
   But note that, currently, in pyspark, estimators listed above, their model 
reader/writer do NOT support pure python implemented nested 
estimators/transformers. Because they use java reader/writer wrapper as python 
side reader/writer.
   
   Pyspark CrossValidator/TrainValidationSplit model reader/writer require all 
estimators define the `_transfer_param_map_to_java` and 
`_transfer_param_map_from_java` (used in model read/write).
   
   OneVsRest class already defines the two methods, but Pipeline do not, so it 
lead to this bug.
   
   In this PR I add `_transfer_param_map_to_java` and 
`_transfer_param_map_from_java` into Pipeline class.
   
   ### Why are the changes needed?
   Bug fix.
   
   ### Does this PR introduce any user-facing change?
   No
   
   
   ### How was this patch tested?
   Unit test to be added.
   
   Manually test in pyspark shell:
   1) CrossValidator with Simple Pipeline estimator
   ```
   from pyspark.ml import Pipeline
   from pyspark.ml.classification import LogisticRegression
   from pyspark.ml.evaluation import BinaryClassificationEvaluator
   from pyspark.ml.feature import HashingTF, Tokenizer
   from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, 
ParamGridBuilder
   
   training = spark.createDataFrame([
       (0, "a b c d e spark", 1.0),
       (1, "b d", 0.0),
       (2, "spark f g h", 1.0),
       (3, "hadoop mapreduce", 0.0),
       (4, "b spark who", 1.0),
       (5, "g d a y", 0.0),
       (6, "spark fly", 1.0),
       (7, "was mapreduce", 0.0),
       (8, "e spark program", 1.0),
       (9, "a e c l", 0.0),
       (10, "spark compile", 1.0),
       (11, "hadoop software", 0.0)
   ], ["id", "text", "label"])
   
   # Configure an ML pipeline, which consists of tree stages: tokenizer, 
hashingTF, and lr.
   tokenizer = Tokenizer(inputCol="text", outputCol="words")
   hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), 
outputCol="features")
   lr = LogisticRegression(maxIter=10)
   pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
   
   paramGrid = ParamGridBuilder() \
       .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
       .addGrid(lr.regParam, [0.1, 0.01]) \
       .build()
   crossval = CrossValidator(estimator=pipeline,
                             estimatorParamMaps=paramGrid,
                             evaluator=BinaryClassificationEvaluator(),
                             numFolds=2)  # use 3+ folds in practice
   
   # Run cross-validation, and choose the best set of parameters.
   cvModel = crossval.fit(training)
   
   cvModel.save('/tmp/cv_model001')
   CrossValidatorModel.load('/tmp/cv_model001')
   ```
   
   2) CrossValidator with Pipeline estimator which include a OneVsRest 
estimator stage, and OneVsRest estimator nest a LogisticRegression estimator.
   
   ```
   from pyspark.ml.linalg import Vectors
   from pyspark.ml import Estimator, Model
   from pyspark.ml.classification import LogisticRegression, 
LogisticRegressionModel, OneVsRest
   from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
       MulticlassClassificationEvaluator, RegressionEvaluator
   from pyspark.ml.linalg import Vectors
   from pyspark.ml.param import Param, Params
   from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, 
ParamGridBuilder, \
       TrainValidationSplit, TrainValidationSplitModel
   from pyspark.sql.functions import rand
   from pyspark.testing.mlutils import SparkSessionTestCase
   
   dataset = spark.createDataFrame(
       [(Vectors.dense([0.0]), 0.0),
        (Vectors.dense([0.4]), 1.0),
        (Vectors.dense([0.5]), 0.0),
        (Vectors.dense([0.6]), 1.0),
        (Vectors.dense([1.0]), 1.0)] * 10,
       ["features", "label"])
   
   ova = OneVsRest(classifier=LogisticRegression())
   lr1 = LogisticRegression().setMaxIter(100)
   lr2 = LogisticRegression().setMaxIter(150)
   grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
   evaluator = MulticlassClassificationEvaluator()
   
   pipeline = Pipeline(stages=[ova])
   
   cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, 
evaluator=evaluator)
   cvModel = cv.fit(dataset)
   cvModel.save('/tmp/model002')
   
   cvModel2 = CrossValidatorModel.load('/tmp/model002')
   ```
   
   TrainValidationSplit testing code are similar so I do not paste them.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to