zhengruifeng commented on code in PR #41176:
URL: https://github.com/apache/spark/pull/41176#discussion_r1198449915


##########
python/pyspark/mlv2/base.py:
##########
@@ -0,0 +1,258 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from abc import ABCMeta, abstractmethod
+
+import copy
+import threading
+import pandas as pd
+
+from typing import (
+    Any,
+    Callable,
+    Generic,
+    Iterator,
+    List,
+    Optional,
+    Sequence,
+    Tuple,
+    TypeVar,
+    Union,
+    cast,
+    overload,
+    TYPE_CHECKING,
+)
+
+from pyspark import since
+from pyspark.ml.param import P
+from pyspark.ml.common import inherit_doc
+from pyspark.ml.param.shared import (
+    HasInputCols,
+    HasOutputCols,
+    HasLabelCol,
+    HasFeaturesCol,
+    HasPredictionCol,
+)
+from pyspark.sql.dataframe import DataFrame
+from pyspark.sql.functions import udf
+from pyspark.sql.types import DataType, StructField, StructType
+from pyspark.ml.param import Param, Params, TypeConverters
+from pyspark.sql.functions import col, pandas_udf, struct
+import pickle
+
+from pyspark.mlv2.util import transform_dataframe_column
+
+if TYPE_CHECKING:
+    from pyspark.ml._typing import ParamMap
+
+T = TypeVar("T")
+M = TypeVar("M", bound="Transformer")
+
+
+@inherit_doc
+class Estimator(Params, Generic[M], metaclass=ABCMeta):
+    """
+    Abstract class for estimators that fit models to data.
+
+    .. versionadded:: 1.3.0

Review Comment:
   versions should be 3.5.0 in this PR ?



##########
python/pyspark/mlv2/feature.py:
##########
@@ -0,0 +1,127 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import numpy as np
+import pandas as pd
+
+from pyspark.sql.functions import col, pandas_udf
+
+from pyspark.mlv2.base import Estimator, Model, Transformer
+from pyspark.mlv2.util import transform_dataframe_column
+from pyspark.mlv2.summarizer import summarize_dataframe
+from pyspark.ml.param.shared import HasInputCol, HasOutputCol
+from pyspark.ml.functions import vector_to_array
+
+
+class MaxAbsScaler(Estimator, HasInputCol, HasOutputCol):
+    """
+    Rescale each feature individually to range [-1, 1] by dividing through the 
largest maximum
+    absolute value in each feature. It does not shift/center the data, and 
thus does not destroy
+    any sparsity.
+    """
+
+    def __init__(self, inputCol, outputCol):
+        super().__init__()
+        self.set(self.inputCol, inputCol)
+        self.set(self.outputCol, outputCol)
+
+    def _fit(self, dataset):
+        input_col = self.getInputCol()
+
+        min_max_res = summarize_dataframe(dataset, input_col, ["min", "max"])
+        min_values = min_max_res["min"]
+        max_values = min_max_res["max"]
+
+        max_abs_values = np.maximum(np.abs(min_values), np.abs(max_values))
+
+        model = MaxAbsScalerModel(max_abs_values)
+        model._resetUid(self.uid)
+        return self._copyValues(model)
+
+
+class MaxAbsScalerModel(Transformer, HasInputCol, HasOutputCol):
+
+    def __init__(self, max_abs_values):
+        super().__init__()
+        self.max_abs_values = max_abs_values
+
+    def _input_column_name(self):
+        return self.getInputCol()
+
+    def _output_columns(self):
+        return [(self.getOutputCol(), "array<double>")]
+
+    def _get_transform_fn(self):
+        max_abs_values = self.max_abs_values
+        max_abs_values_zero_cond = (max_abs_values == 0.0)
+
+        def transform_fn(series):
+            def map_value(x):
+                return np.where(max_abs_values_zero_cond, 0.0, x / 
max_abs_values)
+
+            return series.apply(map_value)
+
+        return transform_fn
+
+
+class StandardScaler(Estimator, HasInputCol, HasOutputCol):
+    """
+    Standardizes features by removing the mean and scaling to unit variance 
using column summary
+    statistics on the samples in the training set.
+    """
+
+    def __init__(self, inputCol, outputCol):
+        super().__init__()
+        self.set(self.inputCol, inputCol)
+        self.set(self.outputCol, outputCol)
+
+    def _fit(self, dataset):
+        input_col = self.getInputCol()
+
+        min_max_res = summarize_dataframe(dataset, input_col, ["mean", "std"])
+        mean_values = min_max_res["mean"]
+        std_values = min_max_res["std"]
+
+        model = StandardScalerModel(mean_values, std_values)
+        model._resetUid(self.uid)
+        return self._copyValues(model)
+
+
+class StandardScalerModel(Transformer, HasInputCol, HasOutputCol):

Review Comment:
   For simple computation logic like `MaxAbsScalerModel /StandardScaler`, I 
guess we can implement in two ways:
   
   1. leverage torch utils functions (in this PR);
   2. leverage built-in SQL functions (may add a few helper functions if 
needed), for example
   
   ```
   In [19]: import pyspark.sql.functions as SF
   
   In [20]: import pyspark.ml.functions as MF
   
   In [21]: bdf.show()
   +-----+------+---------+
   |label|weight| features|
   +-----+------+---------+
   |  1.0|   1.0|[0.0,5.0]|
   |  0.0|   2.0|[1.0,2.0]|
   |  1.0|   3.0|[2.0,1.0]|
   |  0.0|   4.0|[3.0,3.0]|
   +-----+------+---------+
   
   
   In [22]: 
bdf.select(SF.posexplode(MF.vector_to_array(bdf.features)).alias("index", 
"value")).groupBy("index").agg(SF.avg("value"), SF.stddev_samp("value")).show()
   +-----+----------+------------------+
   |index|avg(value)|stddev_samp(value)|
   +-----+----------+------------------+
   |    1|      2.75|1.7078251276599332|
   |    0|       1.5|1.2909944487358056|
   +-----+----------+------------------+
   
   ```
   



##########
python/pyspark/mlv2/base.py:
##########
@@ -0,0 +1,258 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from abc import ABCMeta, abstractmethod
+
+import copy
+import threading
+import pandas as pd
+
+from typing import (
+    Any,
+    Callable,
+    Generic,
+    Iterator,
+    List,
+    Optional,
+    Sequence,
+    Tuple,
+    TypeVar,
+    Union,
+    cast,
+    overload,
+    TYPE_CHECKING,
+)
+
+from pyspark import since
+from pyspark.ml.param import P
+from pyspark.ml.common import inherit_doc
+from pyspark.ml.param.shared import (
+    HasInputCols,
+    HasOutputCols,
+    HasLabelCol,
+    HasFeaturesCol,
+    HasPredictionCol,
+)
+from pyspark.sql.dataframe import DataFrame
+from pyspark.sql.functions import udf
+from pyspark.sql.types import DataType, StructField, StructType
+from pyspark.ml.param import Param, Params, TypeConverters
+from pyspark.sql.functions import col, pandas_udf, struct

Review Comment:
   ```suggestion
   from pyspark.sql.types import DataType, StructField, StructType
   from pyspark.ml.param import Param, Params, TypeConverters
   from pyspark.sql.functions import col, pandas_udf, struct, udf
   ```



##########
python/pyspark/mlv2/tests/test_feature.py:
##########
@@ -0,0 +1,103 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+import numpy as np
+
+from pyspark.ml.linalg import DenseVector, SparseVector, Vectors
+from pyspark.sql import Row
+from pyspark.testing.utils import QuietTest
+from pyspark.testing.mlutils import check_params, SparkSessionTestCase
+
+from pyspark.mlv2.feature import MaxAbsScaler, StandardScaler
+
+
+class FeatureTests(SparkSessionTestCase):

Review Comment:
   dose this works on Spark Connect?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to