zhengruifeng commented on code in PR #49547:
URL: https://github.com/apache/spark/pull/49547#discussion_r1919749377


##########
python/pyspark/ml/tests/connect/test_parity_evaluation.py:
##########
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import unittest
+
+from pyspark.ml.tests.test_evaluation import EvaluatorTestsMixin
+from pyspark.sql import SparkSession
+
+
+class EvaluatorParityTests(EvaluatorTestsMixin, unittest.TestCase):
+    def setUp(self) -> None:
+        self.spark = SparkSession.builder.remote(
+            os.environ.get("SPARK_CONNECT_TESTING_REMOTE", "local[2]")
+        ).getOrCreate()
+
+    def test_assert_remote_mode(self):
+        from pyspark.sql import is_remote
+
+        self.assertTrue(is_remote())
+
+    def tearDown(self) -> None:
+        self.spark.stop()
+
+
+if __name__ == "__main__":
+    from pyspark.ml.tests.connect.test_parity_evaluation import *  # noqa: F401

Review Comment:
   we should add it to `modules.py`



##########
python/pyspark/ml/wrapper.py:
##########
@@ -353,7 +353,7 @@ def copy(self: "JP", extra: Optional["ParamMap"] = None) -> 
"JP":
         if extra is None:
             extra = dict()
         that = super(JavaParams, self).copy(extra)
-        if self._java_obj is not None:
+        if self._java_obj is not None and not isinstance(self._java_obj, str):

Review Comment:
   why need this change?



##########
python/pyspark/ml/tests/test_evaluation.py:
##########
@@ -14,18 +14,368 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
+import tempfile
 import unittest
 
 import numpy as np
 
-from pyspark.ml.evaluation import ClusteringEvaluator, RegressionEvaluator
+from pyspark.ml.evaluation import (
+    ClusteringEvaluator,
+    RegressionEvaluator,
+    BinaryClassificationEvaluator,
+    MulticlassClassificationEvaluator,
+    MultilabelClassificationEvaluator,
+    RankingEvaluator,
+)
 from pyspark.ml.linalg import Vectors
-from pyspark.sql import Row
-from pyspark.testing.mlutils import SparkSessionTestCase
+from pyspark.sql import Row, SparkSession
+
+
+class EvaluatorTestsMixin:
+    def test_ranking_evaluator(self):
+        scoreAndLabels = [
+            ([1.0, 6.0, 2.0, 7.0, 8.0, 3.0, 9.0, 10.0, 4.0, 5.0], [1.0, 2.0, 
3.0, 4.0, 5.0]),
+            ([4.0, 1.0, 5.0, 6.0, 2.0, 7.0, 3.0, 8.0, 9.0, 10.0], [1.0, 2.0, 
3.0]),
+            ([1.0, 2.0, 3.0, 4.0, 5.0], []),
+        ]
+        dataset = self.spark.createDataFrame(scoreAndLabels, ["prediction", 
"label"])
+
+        # Initialize RankingEvaluator
+        evaluator = RankingEvaluator().setPredictionCol("prediction")
+
+        # Evaluate the dataset using the default metric (mean average 
precision)
+        mean_average_precision = evaluator.evaluate(dataset)
+        self.assertTrue(np.allclose(mean_average_precision, 0.3550, atol=1e-4))
+
+        # Evaluate the dataset using precisionAtK for k=2
+        precision_at_k = evaluator.evaluate(
+            dataset, {evaluator.metricName: "precisionAtK", evaluator.k: 2}
+        )
+        self.assertTrue(np.allclose(precision_at_k, 0.3333, atol=1e-4))
+
+        # read/write
+        with tempfile.TemporaryDirectory(prefix="save") as tmp_dir:
+            # Save the evaluator
+            ranke_path = tmp_dir + "/ranke"
+            evaluator.write().overwrite().save(ranke_path)
+            # Load the saved evaluator
+            evaluator2 = RankingEvaluator.load(ranke_path)
+            self.assertEqual(evaluator2.getPredictionCol(), "prediction")

Review Comment:
   can we assert `str(evaluator) == str(evaluator2)`



##########
python/pyspark/ml/tests/test_evaluation.py:
##########
@@ -14,18 +14,368 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
+import tempfile
 import unittest
 
 import numpy as np
 
-from pyspark.ml.evaluation import ClusteringEvaluator, RegressionEvaluator
+from pyspark.ml.evaluation import (
+    ClusteringEvaluator,
+    RegressionEvaluator,
+    BinaryClassificationEvaluator,
+    MulticlassClassificationEvaluator,
+    MultilabelClassificationEvaluator,
+    RankingEvaluator,
+)
 from pyspark.ml.linalg import Vectors
-from pyspark.sql import Row
-from pyspark.testing.mlutils import SparkSessionTestCase
+from pyspark.sql import Row, SparkSession
+
+
+class EvaluatorTestsMixin:
+    def test_ranking_evaluator(self):
+        scoreAndLabels = [
+            ([1.0, 6.0, 2.0, 7.0, 8.0, 3.0, 9.0, 10.0, 4.0, 5.0], [1.0, 2.0, 
3.0, 4.0, 5.0]),
+            ([4.0, 1.0, 5.0, 6.0, 2.0, 7.0, 3.0, 8.0, 9.0, 10.0], [1.0, 2.0, 
3.0]),
+            ([1.0, 2.0, 3.0, 4.0, 5.0], []),
+        ]
+        dataset = self.spark.createDataFrame(scoreAndLabels, ["prediction", 
"label"])
+
+        # Initialize RankingEvaluator
+        evaluator = RankingEvaluator().setPredictionCol("prediction")
+
+        # Evaluate the dataset using the default metric (mean average 
precision)
+        mean_average_precision = evaluator.evaluate(dataset)
+        self.assertTrue(np.allclose(mean_average_precision, 0.3550, atol=1e-4))
+
+        # Evaluate the dataset using precisionAtK for k=2
+        precision_at_k = evaluator.evaluate(
+            dataset, {evaluator.metricName: "precisionAtK", evaluator.k: 2}
+        )
+        self.assertTrue(np.allclose(precision_at_k, 0.3333, atol=1e-4))
+
+        # read/write
+        with tempfile.TemporaryDirectory(prefix="save") as tmp_dir:
+            # Save the evaluator
+            ranke_path = tmp_dir + "/ranke"

Review Comment:
   it seems we can directly save evaluator to `tmp_dir` ?



##########
python/pyspark/ml/tests/test_evaluation.py:
##########
@@ -14,18 +14,368 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
+import tempfile
 import unittest
 
 import numpy as np
 
-from pyspark.ml.evaluation import ClusteringEvaluator, RegressionEvaluator
+from pyspark.ml.evaluation import (
+    ClusteringEvaluator,
+    RegressionEvaluator,
+    BinaryClassificationEvaluator,
+    MulticlassClassificationEvaluator,
+    MultilabelClassificationEvaluator,
+    RankingEvaluator,
+)
 from pyspark.ml.linalg import Vectors
-from pyspark.sql import Row
-from pyspark.testing.mlutils import SparkSessionTestCase
+from pyspark.sql import Row, SparkSession
+
+
+class EvaluatorTestsMixin:
+    def test_ranking_evaluator(self):
+        scoreAndLabels = [
+            ([1.0, 6.0, 2.0, 7.0, 8.0, 3.0, 9.0, 10.0, 4.0, 5.0], [1.0, 2.0, 
3.0, 4.0, 5.0]),
+            ([4.0, 1.0, 5.0, 6.0, 2.0, 7.0, 3.0, 8.0, 9.0, 10.0], [1.0, 2.0, 
3.0]),
+            ([1.0, 2.0, 3.0, 4.0, 5.0], []),
+        ]
+        dataset = self.spark.createDataFrame(scoreAndLabels, ["prediction", 
"label"])
+
+        # Initialize RankingEvaluator
+        evaluator = RankingEvaluator().setPredictionCol("prediction")
+
+        # Evaluate the dataset using the default metric (mean average 
precision)
+        mean_average_precision = evaluator.evaluate(dataset)
+        self.assertTrue(np.allclose(mean_average_precision, 0.3550, atol=1e-4))
+
+        # Evaluate the dataset using precisionAtK for k=2
+        precision_at_k = evaluator.evaluate(
+            dataset, {evaluator.metricName: "precisionAtK", evaluator.k: 2}
+        )
+        self.assertTrue(np.allclose(precision_at_k, 0.3333, atol=1e-4))
+
+        # read/write
+        with tempfile.TemporaryDirectory(prefix="save") as tmp_dir:
+            # Save the evaluator
+            ranke_path = tmp_dir + "/ranke"
+            evaluator.write().overwrite().save(ranke_path)
+            # Load the saved evaluator
+            evaluator2 = RankingEvaluator.load(ranke_path)
+            self.assertEqual(evaluator2.getPredictionCol(), "prediction")
+
+    def test_multilabel_classification_evaluator(self):
+        dataset = self.spark.createDataFrame(
+            [
+                ([0.0, 1.0], [0.0, 2.0]),
+                ([0.0, 2.0], [0.0, 1.0]),
+                ([], [0.0]),
+                ([2.0], [2.0]),
+                ([2.0, 0.0], [2.0, 0.0]),
+                ([0.0, 1.0, 2.0], [0.0, 1.0]),
+                ([1.0], [1.0, 2.0]),
+            ],
+            ["prediction", "label"],
+        )
+
+        evaluator = 
MultilabelClassificationEvaluator().setPredictionCol("prediction")
+
+        # Evaluate the dataset using the default metric (f1 measure by default)
+        f1_score = evaluator.evaluate(dataset)
+        self.assertTrue(np.allclose(f1_score, 0.6380, atol=1e-4))
+        # Evaluate the dataset using accuracy
+        accuracy = evaluator.evaluate(dataset, {evaluator.metricName: 
"accuracy"})
+        self.assertTrue(np.allclose(accuracy, 0.5476, atol=1e-4))
+
+        # read/write
+        with tempfile.TemporaryDirectory(prefix="save") as tmp_dir:
+            # Save the evaluator
+            mlce_path = tmp_dir + "/mlce"
+            evaluator.write().overwrite().save(mlce_path)
+            # Load the saved evaluator
+            evaluator2 = MultilabelClassificationEvaluator.load(mlce_path)
+            self.assertEqual(evaluator2.getPredictionCol(), "prediction")
+
+    def test_multiclass_classification_evaluator(self):
+        dataset = self.spark.createDataFrame(
+            [
+                (0.0, 0.0),
+                (0.0, 1.0),
+                (0.0, 0.0),
+                (1.0, 0.0),
+                (1.0, 1.0),
+                (1.0, 1.0),
+                (1.0, 1.0),
+                (2.0, 2.0),
+                (2.0, 0.0),
+            ],
+            ["prediction", "label"],
+        )
+
+        evaluator = 
MulticlassClassificationEvaluator().setPredictionCol("prediction")
+
+        f1_score = evaluator.evaluate(dataset)
+        self.assertTrue(np.allclose(f1_score, 0.6613, atol=1e-4))
+
+        # Evaluate the dataset using accuracy
+        accuracy = evaluator.evaluate(dataset, {evaluator.metricName: 
"accuracy"})
+        self.assertTrue(np.allclose(accuracy, 0.6666, atol=1e-4))
+
+        # Evaluate the true positive rate for label 1.0
+        true_positive_rate_label_1 = evaluator.evaluate(
+            dataset, {evaluator.metricName: "truePositiveRateByLabel", 
evaluator.metricLabel: 1.0}
+        )
+        self.assertEqual(true_positive_rate_label_1, 0.75)
+
+        # Set the metric to Hamming loss
+        evaluator.setMetricName("hammingLoss")
+
+        # Evaluate the dataset using Hamming loss
+        hamming_loss = evaluator.evaluate(dataset)
+        self.assertTrue(np.allclose(hamming_loss, 0.3333, atol=1e-4))
+
+        # read/write
+        with tempfile.TemporaryDirectory(prefix="save") as tmp_dir:
+            # Save the evaluator
+            mce_path = tmp_dir + "/mce"
+            evaluator.write().overwrite().save(mce_path)
+            # Load the saved evaluator
+            evaluator2 = MulticlassClassificationEvaluator.load(mce_path)
+            self.assertEqual(evaluator2.getPredictionCol(), "prediction")
+
+        # Create a DataFrame with weights
+        dataset_with_weight = self.spark.createDataFrame(
+            [
+                (0.0, 0.0, 1.0),
+                (0.0, 1.0, 1.0),
+                (0.0, 0.0, 1.0),
+                (1.0, 0.0, 1.0),
+                (1.0, 1.0, 1.0),
+                (1.0, 1.0, 1.0),
+                (1.0, 1.0, 1.0),
+                (2.0, 2.0, 1.0),
+                (2.0, 0.0, 1.0),
+            ],
+            ["prediction", "label", "weight"],
+        )
+
+        # Initialize MulticlassClassificationEvaluator with weight column
+        evaluator = MulticlassClassificationEvaluator(
+            predictionCol="prediction", weightCol="weight"
+        )
+
+        # Evaluate the dataset with weights using default metric (f1 score)
+        weighted_f1_score = evaluator.evaluate(dataset_with_weight)
+        self.assertTrue(np.allclose(weighted_f1_score, 0.6613, atol=1e-4))
+
+        # Evaluate the dataset with weights using accuracy
+        weighted_accuracy = evaluator.evaluate(
+            dataset_with_weight, {evaluator.metricName: "accuracy"}
+        )
+        self.assertTrue(np.allclose(weighted_accuracy, 0.6666, atol=1e-4))
+
+        # Create a DataFrame with probabilities
+        dataset_with_probabilities = self.spark.createDataFrame(

Review Comment:
   can we just reuse this dataset in this 
`test_multiclass_classification_evaluator`



##########
python/pyspark/ml/tests/test_evaluation.py:
##########
@@ -14,18 +14,368 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
+import tempfile
 import unittest
 
 import numpy as np
 
-from pyspark.ml.evaluation import ClusteringEvaluator, RegressionEvaluator
+from pyspark.ml.evaluation import (
+    ClusteringEvaluator,
+    RegressionEvaluator,
+    BinaryClassificationEvaluator,
+    MulticlassClassificationEvaluator,
+    MultilabelClassificationEvaluator,
+    RankingEvaluator,
+)
 from pyspark.ml.linalg import Vectors
-from pyspark.sql import Row
-from pyspark.testing.mlutils import SparkSessionTestCase
+from pyspark.sql import Row, SparkSession
+
+
+class EvaluatorTestsMixin:
+    def test_ranking_evaluator(self):
+        scoreAndLabels = [
+            ([1.0, 6.0, 2.0, 7.0, 8.0, 3.0, 9.0, 10.0, 4.0, 5.0], [1.0, 2.0, 
3.0, 4.0, 5.0]),
+            ([4.0, 1.0, 5.0, 6.0, 2.0, 7.0, 3.0, 8.0, 9.0, 10.0], [1.0, 2.0, 
3.0]),
+            ([1.0, 2.0, 3.0, 4.0, 5.0], []),
+        ]
+        dataset = self.spark.createDataFrame(scoreAndLabels, ["prediction", 
"label"])
+
+        # Initialize RankingEvaluator
+        evaluator = RankingEvaluator().setPredictionCol("prediction")
+
+        # Evaluate the dataset using the default metric (mean average 
precision)
+        mean_average_precision = evaluator.evaluate(dataset)
+        self.assertTrue(np.allclose(mean_average_precision, 0.3550, atol=1e-4))
+
+        # Evaluate the dataset using precisionAtK for k=2
+        precision_at_k = evaluator.evaluate(
+            dataset, {evaluator.metricName: "precisionAtK", evaluator.k: 2}
+        )
+        self.assertTrue(np.allclose(precision_at_k, 0.3333, atol=1e-4))
+
+        # read/write
+        with tempfile.TemporaryDirectory(prefix="save") as tmp_dir:
+            # Save the evaluator
+            ranke_path = tmp_dir + "/ranke"
+            evaluator.write().overwrite().save(ranke_path)
+            # Load the saved evaluator
+            evaluator2 = RankingEvaluator.load(ranke_path)
+            self.assertEqual(evaluator2.getPredictionCol(), "prediction")
+
+    def test_multilabel_classification_evaluator(self):

Review Comment:
   let's also test `isLargerBetter`, e.g.
   
   `MulticlassClassificationEvaluator`: logLoss vs f1
   `MultilabelClassificationEvaluator`: hammingLoss vs f1Measure
   



##########
python/pyspark/ml/tests/test_evaluation.py:
##########
@@ -14,18 +14,368 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
+import tempfile
 import unittest
 
 import numpy as np
 
-from pyspark.ml.evaluation import ClusteringEvaluator, RegressionEvaluator
+from pyspark.ml.evaluation import (
+    ClusteringEvaluator,
+    RegressionEvaluator,
+    BinaryClassificationEvaluator,
+    MulticlassClassificationEvaluator,
+    MultilabelClassificationEvaluator,
+    RankingEvaluator,
+)
 from pyspark.ml.linalg import Vectors
-from pyspark.sql import Row
-from pyspark.testing.mlutils import SparkSessionTestCase
+from pyspark.sql import Row, SparkSession
+
+
+class EvaluatorTestsMixin:
+    def test_ranking_evaluator(self):
+        scoreAndLabels = [
+            ([1.0, 6.0, 2.0, 7.0, 8.0, 3.0, 9.0, 10.0, 4.0, 5.0], [1.0, 2.0, 
3.0, 4.0, 5.0]),
+            ([4.0, 1.0, 5.0, 6.0, 2.0, 7.0, 3.0, 8.0, 9.0, 10.0], [1.0, 2.0, 
3.0]),
+            ([1.0, 2.0, 3.0, 4.0, 5.0], []),
+        ]
+        dataset = self.spark.createDataFrame(scoreAndLabels, ["prediction", 
"label"])
+
+        # Initialize RankingEvaluator
+        evaluator = RankingEvaluator().setPredictionCol("prediction")
+
+        # Evaluate the dataset using the default metric (mean average 
precision)
+        mean_average_precision = evaluator.evaluate(dataset)
+        self.assertTrue(np.allclose(mean_average_precision, 0.3550, atol=1e-4))
+
+        # Evaluate the dataset using precisionAtK for k=2
+        precision_at_k = evaluator.evaluate(
+            dataset, {evaluator.metricName: "precisionAtK", evaluator.k: 2}
+        )
+        self.assertTrue(np.allclose(precision_at_k, 0.3333, atol=1e-4))
+
+        # read/write
+        with tempfile.TemporaryDirectory(prefix="save") as tmp_dir:
+            # Save the evaluator
+            ranke_path = tmp_dir + "/ranke"
+            evaluator.write().overwrite().save(ranke_path)
+            # Load the saved evaluator
+            evaluator2 = RankingEvaluator.load(ranke_path)
+            self.assertEqual(evaluator2.getPredictionCol(), "prediction")
+
+    def test_multilabel_classification_evaluator(self):
+        dataset = self.spark.createDataFrame(
+            [
+                ([0.0, 1.0], [0.0, 2.0]),
+                ([0.0, 2.0], [0.0, 1.0]),
+                ([], [0.0]),
+                ([2.0], [2.0]),
+                ([2.0, 0.0], [2.0, 0.0]),
+                ([0.0, 1.0, 2.0], [0.0, 1.0]),
+                ([1.0], [1.0, 2.0]),
+            ],
+            ["prediction", "label"],
+        )
+
+        evaluator = 
MultilabelClassificationEvaluator().setPredictionCol("prediction")
+
+        # Evaluate the dataset using the default metric (f1 measure by default)
+        f1_score = evaluator.evaluate(dataset)
+        self.assertTrue(np.allclose(f1_score, 0.6380, atol=1e-4))
+        # Evaluate the dataset using accuracy
+        accuracy = evaluator.evaluate(dataset, {evaluator.metricName: 
"accuracy"})
+        self.assertTrue(np.allclose(accuracy, 0.5476, atol=1e-4))
+
+        # read/write
+        with tempfile.TemporaryDirectory(prefix="save") as tmp_dir:
+            # Save the evaluator
+            mlce_path = tmp_dir + "/mlce"
+            evaluator.write().overwrite().save(mlce_path)
+            # Load the saved evaluator
+            evaluator2 = MultilabelClassificationEvaluator.load(mlce_path)
+            self.assertEqual(evaluator2.getPredictionCol(), "prediction")
+
+    def test_multiclass_classification_evaluator(self):
+        dataset = self.spark.createDataFrame(
+            [
+                (0.0, 0.0),
+                (0.0, 1.0),
+                (0.0, 0.0),
+                (1.0, 0.0),
+                (1.0, 1.0),
+                (1.0, 1.0),
+                (1.0, 1.0),
+                (2.0, 2.0),
+                (2.0, 0.0),
+            ],
+            ["prediction", "label"],
+        )
+
+        evaluator = 
MulticlassClassificationEvaluator().setPredictionCol("prediction")
+
+        f1_score = evaluator.evaluate(dataset)
+        self.assertTrue(np.allclose(f1_score, 0.6613, atol=1e-4))
+
+        # Evaluate the dataset using accuracy
+        accuracy = evaluator.evaluate(dataset, {evaluator.metricName: 
"accuracy"})
+        self.assertTrue(np.allclose(accuracy, 0.6666, atol=1e-4))
+
+        # Evaluate the true positive rate for label 1.0
+        true_positive_rate_label_1 = evaluator.evaluate(
+            dataset, {evaluator.metricName: "truePositiveRateByLabel", 
evaluator.metricLabel: 1.0}
+        )
+        self.assertEqual(true_positive_rate_label_1, 0.75)
+
+        # Set the metric to Hamming loss
+        evaluator.setMetricName("hammingLoss")
+
+        # Evaluate the dataset using Hamming loss
+        hamming_loss = evaluator.evaluate(dataset)
+        self.assertTrue(np.allclose(hamming_loss, 0.3333, atol=1e-4))
+
+        # read/write
+        with tempfile.TemporaryDirectory(prefix="save") as tmp_dir:
+            # Save the evaluator
+            mce_path = tmp_dir + "/mce"
+            evaluator.write().overwrite().save(mce_path)
+            # Load the saved evaluator
+            evaluator2 = MulticlassClassificationEvaluator.load(mce_path)
+            self.assertEqual(evaluator2.getPredictionCol(), "prediction")
+
+        # Create a DataFrame with weights
+        dataset_with_weight = self.spark.createDataFrame(
+            [
+                (0.0, 0.0, 1.0),
+                (0.0, 1.0, 1.0),
+                (0.0, 0.0, 1.0),
+                (1.0, 0.0, 1.0),
+                (1.0, 1.0, 1.0),
+                (1.0, 1.0, 1.0),
+                (1.0, 1.0, 1.0),
+                (2.0, 2.0, 1.0),
+                (2.0, 0.0, 1.0),
+            ],
+            ["prediction", "label", "weight"],
+        )
+
+        # Initialize MulticlassClassificationEvaluator with weight column
+        evaluator = MulticlassClassificationEvaluator(
+            predictionCol="prediction", weightCol="weight"
+        )
+
+        # Evaluate the dataset with weights using default metric (f1 score)
+        weighted_f1_score = evaluator.evaluate(dataset_with_weight)
+        self.assertTrue(np.allclose(weighted_f1_score, 0.6613, atol=1e-4))
+
+        # Evaluate the dataset with weights using accuracy
+        weighted_accuracy = evaluator.evaluate(
+            dataset_with_weight, {evaluator.metricName: "accuracy"}
+        )
+        self.assertTrue(np.allclose(weighted_accuracy, 0.6666, atol=1e-4))
+
+        # Create a DataFrame with probabilities
+        dataset_with_probabilities = self.spark.createDataFrame(
+            [
+                (1.0, 1.0, 1.0, [0.1, 0.8, 0.1]),
+                (0.0, 2.0, 1.0, [0.9, 0.05, 0.05]),
+                (0.0, 0.0, 1.0, [0.8, 0.2, 0.0]),
+                (1.0, 1.0, 1.0, [0.3, 0.65, 0.05]),
+            ],
+            ["prediction", "label", "weight", "probability"],
+        )
+        # Initialize MulticlassClassificationEvaluator with probability column
+        evaluator = MulticlassClassificationEvaluator(
+            predictionCol="prediction", probabilityCol="probability"
+        )
+        # Set the metric to log loss
+        evaluator.setMetricName("logLoss")
+        # Evaluate the dataset using log loss
+        log_loss = evaluator.evaluate(dataset_with_probabilities)
+        self.assertTrue(np.allclose(log_loss, 0.9682, atol=1e-4))
+
+    def test_binary_classification_evaluator(self):
+        # Define score and labels data
+        scoreAndLabels = map(
+            lambda x: (Vectors.dense([1.0 - x[0], x[0]]), x[1]),
+            [(0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.6, 0.0), (0.6, 1.0), (0.6, 
1.0), (0.8, 1.0)],
+        )
+        dataset = self.spark.createDataFrame(scoreAndLabels, ["raw", "label"])
+
+        evaluator = BinaryClassificationEvaluator().setRawPredictionCol("raw")
+        auc_roc = evaluator.evaluate(dataset)
+        self.assertTrue(np.allclose(auc_roc, 0.7083, atol=1e-4))
+
+        # Evaluate the dataset using the areaUnderPR metric
+        auc_pr = evaluator.evaluate(dataset, {evaluator.metricName: 
"areaUnderPR"})
+        self.assertTrue(np.allclose(auc_pr, 0.8339, atol=1e-4))
 
+        # read/write
+        with tempfile.TemporaryDirectory(prefix="save") as tmp_dir:
+            # Save the evaluator
+            bce_path = tmp_dir + "/bce"
+            evaluator.write().overwrite().save(bce_path)
+            # Load the saved evaluator
+            evaluator2 = BinaryClassificationEvaluator.load(bce_path)
+            self.assertEqual(evaluator2.getRawPredictionCol(), "raw")
+
+        # Define score, labels, and weights data
+        scoreAndLabelsAndWeight = map(
+            lambda x: (Vectors.dense([1.0 - x[0], x[0]]), x[1], x[2]),
+            [
+                (0.1, 0.0, 1.0),
+                (0.1, 1.0, 0.9),
+                (0.4, 0.0, 0.7),
+                (0.6, 0.0, 0.9),
+                (0.6, 1.0, 1.0),
+                (0.6, 1.0, 0.3),
+                (0.8, 1.0, 1.0),
+            ],
+        )
+        # Create a DataFrame with weights
+        dataset_with_weight = self.spark.createDataFrame(
+            scoreAndLabelsAndWeight, ["raw", "label", "weight"]
+        )
+
+        evaluator = BinaryClassificationEvaluator(rawPredictionCol="raw", 
weightCol="weight")
+
+        # Evaluate the dataset with weights using the default metric 
(areaUnderROC)
+        auc_roc_weighted = evaluator.evaluate(dataset_with_weight)
+        self.assertTrue(np.allclose(auc_roc_weighted, 0.7025, atol=1e-4))
+
+        # Evaluate the dataset with weights using the areaUnderPR metric
+        auc_pr_weighted = evaluator.evaluate(
+            dataset_with_weight, {evaluator.metricName: "areaUnderPR"}
+        )
+        self.assertTrue(np.allclose(auc_pr_weighted, 0.8221, atol=1e-4))
+
+        # Get the number of bins used to compute areaUnderROC
+        num_bins = evaluator.getNumBins()
+        self.assertTrue(num_bins, 0.1000)
+
+    def test_clustering_evaluator(self):
+        # Define feature and predictions data
+        featureAndPredictions = map(
+            lambda x: (Vectors.dense(x[0]), x[1]),
+            [
+                ([0.0, 0.5], 0.0),
+                ([0.5, 0.0], 0.0),
+                ([10.0, 11.0], 1.0),
+                ([10.5, 11.5], 1.0),
+                ([1.0, 1.0], 0.0),
+                ([8.0, 6.0], 1.0),
+            ],
+        )
+        dataset = self.spark.createDataFrame(featureAndPredictions, 
["features", "prediction"])
+
+        evaluator = ClusteringEvaluator().setPredictionCol("prediction")
+        score = evaluator.evaluate(dataset)
+        self.assertTrue(np.allclose(score, 0.9079, atol=1e-4))
+
+        # Define feature, predictions, and weight data
+        featureAndPredictionsWithWeight = map(
+            lambda x: (Vectors.dense(x[0]), x[1], x[2]),
+            [
+                ([0.0, 0.5], 0.0, 2.5),
+                ([0.5, 0.0], 0.0, 2.5),
+                ([10.0, 11.0], 1.0, 2.5),
+                ([10.5, 11.5], 1.0, 2.5),
+                ([1.0, 1.0], 0.0, 2.5),
+                ([8.0, 6.0], 1.0, 2.5),
+            ],
+        )
+
+        dataset_with_weight = self.spark.createDataFrame(
+            featureAndPredictionsWithWeight, ["features", "prediction", 
"weight"]
+        )
+        evaluator.setWeightCol("weight")
+
+        # Evaluate the dataset with weights
+        score_with_weight = evaluator.evaluate(dataset_with_weight)
+        self.assertTrue(np.allclose(score_with_weight, 0.9079, atol=1e-4))
+
+        # read/write
+        with tempfile.TemporaryDirectory(prefix="save") as tmp_dir:
+            # Save the evaluator
+            ce_path = tmp_dir + "/ce"
+            evaluator.write().overwrite().save(ce_path)
+            # Load the saved evaluator
+            evaluator2 = ClusteringEvaluator.load(ce_path)
+            self.assertEqual(evaluator2.getPredictionCol(), "prediction")
+
+    def test_clustering_evaluator_with_cosine_distance(self):
+        featureAndPredictions = map(
+            lambda x: (Vectors.dense(x[0]), x[1]),
+            [
+                ([1.0, 1.0], 1.0),
+                ([10.0, 10.0], 1.0),
+                ([1.0, 0.5], 2.0),
+                ([10.0, 4.4], 2.0),
+                ([-1.0, 1.0], 3.0),
+                ([-100.0, 90.0], 3.0),
+            ],
+        )
+        dataset = self.spark.createDataFrame(featureAndPredictions, 
["features", "prediction"])
+        evaluator = ClusteringEvaluator(predictionCol="prediction", 
distanceMeasure="cosine")
+        self.assertEqual(evaluator.getDistanceMeasure(), "cosine")
+        self.assertTrue(np.isclose(evaluator.evaluate(dataset), 0.992671213, 
atol=1e-5))
+
+    def test_regression_evaluator(self):
+        dataset = self.spark.createDataFrame(
+            [
+                (-28.98343821, -27.0),
+                (20.21491975, 21.5),
+                (-25.98418959, -22.0),
+                (30.69731842, 33.0),
+                (74.69283752, 71.0),
+            ],
+            ["raw", "label"],
+        )
+
+        evaluator = RegressionEvaluator()
+        evaluator.setPredictionCol("raw")
+
+        # Evaluate dataset with default metric (RMSE)
+        rmse = evaluator.evaluate(dataset)
+        self.assertTrue(np.allclose(rmse, 2.8424, atol=1e-4))
+        # Evaluate dataset with R2 metric
+        r2 = evaluator.evaluate(dataset, {evaluator.metricName: "r2"})
+        self.assertTrue(np.allclose(r2, 0.9939, atol=1e-4))
+        # Evaluate dataset with MAE metric
+        mae = evaluator.evaluate(dataset, {evaluator.metricName: "mae"})
+        self.assertTrue(np.allclose(mae, 2.6496, atol=1e-4))
+        # read/write
+        with tempfile.TemporaryDirectory(prefix="save") as tmp_dir:
+            # Save the evaluator
+            re_path = tmp_dir + "/re"
+            evaluator.write().overwrite().save(re_path)
+            # Load the saved evaluator
+            evaluator2 = RegressionEvaluator.load(re_path)
+            self.assertEqual(evaluator2.getPredictionCol(), "raw")
+
+        dataset_with_weights = self.spark.createDataFrame(

Review Comment:
   Nit, I think for such cases, we can just build a dataset with weights, and 
then:
   1, test without weightCol;
   2, test with weightCol.
   
   Just to save some time (our CI is becoming slower and slower)



##########
python/pyspark/ml/tests/test_evaluation.py:
##########
@@ -14,18 +14,368 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
+import tempfile
 import unittest
 
 import numpy as np
 
-from pyspark.ml.evaluation import ClusteringEvaluator, RegressionEvaluator
+from pyspark.ml.evaluation import (
+    ClusteringEvaluator,
+    RegressionEvaluator,
+    BinaryClassificationEvaluator,
+    MulticlassClassificationEvaluator,
+    MultilabelClassificationEvaluator,
+    RankingEvaluator,
+)
 from pyspark.ml.linalg import Vectors
-from pyspark.sql import Row
-from pyspark.testing.mlutils import SparkSessionTestCase
+from pyspark.sql import Row, SparkSession
+
+
+class EvaluatorTestsMixin:
+    def test_ranking_evaluator(self):
+        scoreAndLabels = [
+            ([1.0, 6.0, 2.0, 7.0, 8.0, 3.0, 9.0, 10.0, 4.0, 5.0], [1.0, 2.0, 
3.0, 4.0, 5.0]),
+            ([4.0, 1.0, 5.0, 6.0, 2.0, 7.0, 3.0, 8.0, 9.0, 10.0], [1.0, 2.0, 
3.0]),
+            ([1.0, 2.0, 3.0, 4.0, 5.0], []),
+        ]
+        dataset = self.spark.createDataFrame(scoreAndLabels, ["prediction", 
"label"])
+
+        # Initialize RankingEvaluator
+        evaluator = RankingEvaluator().setPredictionCol("prediction")
+
+        # Evaluate the dataset using the default metric (mean average 
precision)
+        mean_average_precision = evaluator.evaluate(dataset)
+        self.assertTrue(np.allclose(mean_average_precision, 0.3550, atol=1e-4))
+
+        # Evaluate the dataset using precisionAtK for k=2
+        precision_at_k = evaluator.evaluate(
+            dataset, {evaluator.metricName: "precisionAtK", evaluator.k: 2}
+        )
+        self.assertTrue(np.allclose(precision_at_k, 0.3333, atol=1e-4))
+
+        # read/write
+        with tempfile.TemporaryDirectory(prefix="save") as tmp_dir:
+            # Save the evaluator
+            ranke_path = tmp_dir + "/ranke"
+            evaluator.write().overwrite().save(ranke_path)
+            # Load the saved evaluator
+            evaluator2 = RankingEvaluator.load(ranke_path)
+            self.assertEqual(evaluator2.getPredictionCol(), "prediction")
+
+    def test_multilabel_classification_evaluator(self):
+        dataset = self.spark.createDataFrame(
+            [
+                ([0.0, 1.0], [0.0, 2.0]),
+                ([0.0, 2.0], [0.0, 1.0]),
+                ([], [0.0]),
+                ([2.0], [2.0]),
+                ([2.0, 0.0], [2.0, 0.0]),
+                ([0.0, 1.0, 2.0], [0.0, 1.0]),
+                ([1.0], [1.0, 2.0]),
+            ],
+            ["prediction", "label"],
+        )
+
+        evaluator = 
MultilabelClassificationEvaluator().setPredictionCol("prediction")
+
+        # Evaluate the dataset using the default metric (f1 measure by default)
+        f1_score = evaluator.evaluate(dataset)
+        self.assertTrue(np.allclose(f1_score, 0.6380, atol=1e-4))
+        # Evaluate the dataset using accuracy
+        accuracy = evaluator.evaluate(dataset, {evaluator.metricName: 
"accuracy"})
+        self.assertTrue(np.allclose(accuracy, 0.5476, atol=1e-4))
+
+        # read/write
+        with tempfile.TemporaryDirectory(prefix="save") as tmp_dir:
+            # Save the evaluator
+            mlce_path = tmp_dir + "/mlce"
+            evaluator.write().overwrite().save(mlce_path)
+            # Load the saved evaluator
+            evaluator2 = MultilabelClassificationEvaluator.load(mlce_path)
+            self.assertEqual(evaluator2.getPredictionCol(), "prediction")
+
+    def test_multiclass_classification_evaluator(self):
+        dataset = self.spark.createDataFrame(
+            [
+                (0.0, 0.0),
+                (0.0, 1.0),
+                (0.0, 0.0),
+                (1.0, 0.0),
+                (1.0, 1.0),
+                (1.0, 1.0),
+                (1.0, 1.0),
+                (2.0, 2.0),
+                (2.0, 0.0),
+            ],
+            ["prediction", "label"],
+        )
+
+        evaluator = 
MulticlassClassificationEvaluator().setPredictionCol("prediction")
+
+        f1_score = evaluator.evaluate(dataset)
+        self.assertTrue(np.allclose(f1_score, 0.6613, atol=1e-4))
+
+        # Evaluate the dataset using accuracy
+        accuracy = evaluator.evaluate(dataset, {evaluator.metricName: 
"accuracy"})
+        self.assertTrue(np.allclose(accuracy, 0.6666, atol=1e-4))
+
+        # Evaluate the true positive rate for label 1.0
+        true_positive_rate_label_1 = evaluator.evaluate(
+            dataset, {evaluator.metricName: "truePositiveRateByLabel", 
evaluator.metricLabel: 1.0}
+        )
+        self.assertEqual(true_positive_rate_label_1, 0.75)
+
+        # Set the metric to Hamming loss
+        evaluator.setMetricName("hammingLoss")
+
+        # Evaluate the dataset using Hamming loss
+        hamming_loss = evaluator.evaluate(dataset)
+        self.assertTrue(np.allclose(hamming_loss, 0.3333, atol=1e-4))
+
+        # read/write
+        with tempfile.TemporaryDirectory(prefix="save") as tmp_dir:
+            # Save the evaluator
+            mce_path = tmp_dir + "/mce"
+            evaluator.write().overwrite().save(mce_path)
+            # Load the saved evaluator
+            evaluator2 = MulticlassClassificationEvaluator.load(mce_path)
+            self.assertEqual(evaluator2.getPredictionCol(), "prediction")
+
+        # Create a DataFrame with weights
+        dataset_with_weight = self.spark.createDataFrame(
+            [
+                (0.0, 0.0, 1.0),
+                (0.0, 1.0, 1.0),
+                (0.0, 0.0, 1.0),
+                (1.0, 0.0, 1.0),
+                (1.0, 1.0, 1.0),
+                (1.0, 1.0, 1.0),
+                (1.0, 1.0, 1.0),
+                (2.0, 2.0, 1.0),
+                (2.0, 0.0, 1.0),
+            ],
+            ["prediction", "label", "weight"],
+        )
+
+        # Initialize MulticlassClassificationEvaluator with weight column
+        evaluator = MulticlassClassificationEvaluator(
+            predictionCol="prediction", weightCol="weight"
+        )
+
+        # Evaluate the dataset with weights using default metric (f1 score)
+        weighted_f1_score = evaluator.evaluate(dataset_with_weight)
+        self.assertTrue(np.allclose(weighted_f1_score, 0.6613, atol=1e-4))
+
+        # Evaluate the dataset with weights using accuracy
+        weighted_accuracy = evaluator.evaluate(
+            dataset_with_weight, {evaluator.metricName: "accuracy"}
+        )
+        self.assertTrue(np.allclose(weighted_accuracy, 0.6666, atol=1e-4))
+
+        # Create a DataFrame with probabilities
+        dataset_with_probabilities = self.spark.createDataFrame(
+            [
+                (1.0, 1.0, 1.0, [0.1, 0.8, 0.1]),
+                (0.0, 2.0, 1.0, [0.9, 0.05, 0.05]),
+                (0.0, 0.0, 1.0, [0.8, 0.2, 0.0]),
+                (1.0, 1.0, 1.0, [0.3, 0.65, 0.05]),
+            ],
+            ["prediction", "label", "weight", "probability"],
+        )
+        # Initialize MulticlassClassificationEvaluator with probability column
+        evaluator = MulticlassClassificationEvaluator(
+            predictionCol="prediction", probabilityCol="probability"
+        )
+        # Set the metric to log loss
+        evaluator.setMetricName("logLoss")
+        # Evaluate the dataset using log loss
+        log_loss = evaluator.evaluate(dataset_with_probabilities)
+        self.assertTrue(np.allclose(log_loss, 0.9682, atol=1e-4))
+
+    def test_binary_classification_evaluator(self):
+        # Define score and labels data
+        scoreAndLabels = map(
+            lambda x: (Vectors.dense([1.0 - x[0], x[0]]), x[1]),
+            [(0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.6, 0.0), (0.6, 1.0), (0.6, 
1.0), (0.8, 1.0)],
+        )
+        dataset = self.spark.createDataFrame(scoreAndLabels, ["raw", "label"])
+
+        evaluator = BinaryClassificationEvaluator().setRawPredictionCol("raw")
+        auc_roc = evaluator.evaluate(dataset)
+        self.assertTrue(np.allclose(auc_roc, 0.7083, atol=1e-4))
+
+        # Evaluate the dataset using the areaUnderPR metric
+        auc_pr = evaluator.evaluate(dataset, {evaluator.metricName: 
"areaUnderPR"})
+        self.assertTrue(np.allclose(auc_pr, 0.8339, atol=1e-4))
 
+        # read/write
+        with tempfile.TemporaryDirectory(prefix="save") as tmp_dir:
+            # Save the evaluator
+            bce_path = tmp_dir + "/bce"
+            evaluator.write().overwrite().save(bce_path)
+            # Load the saved evaluator
+            evaluator2 = BinaryClassificationEvaluator.load(bce_path)
+            self.assertEqual(evaluator2.getRawPredictionCol(), "raw")
+
+        # Define score, labels, and weights data
+        scoreAndLabelsAndWeight = map(
+            lambda x: (Vectors.dense([1.0 - x[0], x[0]]), x[1], x[2]),
+            [
+                (0.1, 0.0, 1.0),
+                (0.1, 1.0, 0.9),
+                (0.4, 0.0, 0.7),
+                (0.6, 0.0, 0.9),
+                (0.6, 1.0, 1.0),
+                (0.6, 1.0, 0.3),
+                (0.8, 1.0, 1.0),
+            ],
+        )
+        # Create a DataFrame with weights
+        dataset_with_weight = self.spark.createDataFrame(
+            scoreAndLabelsAndWeight, ["raw", "label", "weight"]
+        )
+
+        evaluator = BinaryClassificationEvaluator(rawPredictionCol="raw", 
weightCol="weight")
+
+        # Evaluate the dataset with weights using the default metric 
(areaUnderROC)
+        auc_roc_weighted = evaluator.evaluate(dataset_with_weight)
+        self.assertTrue(np.allclose(auc_roc_weighted, 0.7025, atol=1e-4))
+
+        # Evaluate the dataset with weights using the areaUnderPR metric
+        auc_pr_weighted = evaluator.evaluate(
+            dataset_with_weight, {evaluator.metricName: "areaUnderPR"}
+        )
+        self.assertTrue(np.allclose(auc_pr_weighted, 0.8221, atol=1e-4))
+
+        # Get the number of bins used to compute areaUnderROC
+        num_bins = evaluator.getNumBins()
+        self.assertTrue(num_bins, 0.1000)

Review Comment:
   num_bins is bool?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to