[GitHub] [spark] WeichenXu123 commented on a diff in pull request #41176: [SPARK-43516] [ML] Base interfaces of sparkML for spark3.5: estimator/transformer/model/evaluator

2023-05-22 Thread via GitHub


WeichenXu123 commented on code in PR #41176:
URL: https://github.com/apache/spark/pull/41176#discussion_r1201332234


##
python/pyspark/mlv2/summarizer.py:
##
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import numpy as np
+from pyspark.mlv2.util import aggregate_dataframe
+
+
+class SummarizerAggState:
+def __init__(self, input_array):
+self.min_values = input_array.copy()
+self.max_values = input_array.copy()
+self.count = 1
+self.sum_values = np.array(input_array.copy())
+self.square_sum_values = np.square(input_array.copy())
+
+def update(self, input_array):
+self.count += 1
+self.sum_values += input_array
+self.square_sum_values += np.square(input_array)
+self.min_values = np.minimum(self.min_values, input_array)
+self.max_values = np.maximum(self.max_values, input_array)
+
+def merge(self, state):
+self.count += state.count
+self.sum_values += state.sum_values
+self.square_sum_values += state.square_sum_values
+self.min_values = np.minimum(self.min_values, state.min_values)
+self.max_values = np.maximum(self.max_values, state.max_values)
+return self
+
+def to_result(self, metrics):
+result = {}
+
+for metric in metrics:
+if metric == "min":
+result["min"] = self.min_values.copy()
+if metric == "max":
+result["max"] = self.max_values.copy()
+if metric == "sum":
+result["sum"] = self.sum_values.copy()
+if metric == "mean":
+result["mean"] = self.sum_values / self.count
+if metric == "std":
+if self.count <= 1:
+raise ValueError(
+"Standard deviation evaluation requires more than one 
row data."
+)
+result["std"] = np.sqrt(

Review Comment:
   We need to evaluate std values over feature arrays, I think using summarizer 
will be more efficient. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #41176: [SPARK-43516] [ML] Base interfaces of sparkML for spark3.5: estimator/transformer/model/evaluator

2023-05-22 Thread via GitHub


WeichenXu123 commented on code in PR #41176:
URL: https://github.com/apache/spark/pull/41176#discussion_r1200559825


##
python/pyspark/mlv2/evaluation.py:
##
@@ -0,0 +1,79 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark.mlv2.base import Evaluator
+
+from pyspark.ml.param import Param, Params, TypeConverters
+from pyspark.ml.param.shared import HasLabelCol, HasPredictionCol
+from pyspark.mlv2.util import aggregate_dataframe
+
+import torch
+import torcheval.metrics as torchmetrics

Review Comment:
   Yes. I talked with @mengxr about this and he said we can use it :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #41176: [SPARK-43516] [ML] Base interfaces of sparkML for spark3.5: estimator/transformer/model/evaluator

2023-05-19 Thread via GitHub


WeichenXu123 commented on code in PR #41176:
URL: https://github.com/apache/spark/pull/41176#discussion_r1198664831


##
python/pyspark/mlv2/feature.py:
##
@@ -0,0 +1,127 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import numpy as np
+import pandas as pd
+
+from pyspark.sql.functions import col, pandas_udf
+
+from pyspark.mlv2.base import Estimator, Model, Transformer
+from pyspark.mlv2.util import transform_dataframe_column
+from pyspark.mlv2.summarizer import summarize_dataframe
+from pyspark.ml.param.shared import HasInputCol, HasOutputCol
+from pyspark.ml.functions import vector_to_array
+
+
+class MaxAbsScaler(Estimator, HasInputCol, HasOutputCol):
+"""
+Rescale each feature individually to range [-1, 1] by dividing through the 
largest maximum
+absolute value in each feature. It does not shift/center the data, and 
thus does not destroy
+any sparsity.
+"""
+
+def __init__(self, inputCol, outputCol):
+super().__init__()
+self.set(self.inputCol, inputCol)
+self.set(self.outputCol, outputCol)
+
+def _fit(self, dataset):
+input_col = self.getInputCol()
+
+min_max_res = summarize_dataframe(dataset, input_col, ["min", "max"])
+min_values = min_max_res["min"]
+max_values = min_max_res["max"]
+
+max_abs_values = np.maximum(np.abs(min_values), np.abs(max_values))
+
+model = MaxAbsScalerModel(max_abs_values)
+model._resetUid(self.uid)
+return self._copyValues(model)
+
+
+class MaxAbsScalerModel(Transformer, HasInputCol, HasOutputCol):
+
+def __init__(self, max_abs_values):
+super().__init__()
+self.max_abs_values = max_abs_values
+
+def _input_column_name(self):
+return self.getInputCol()
+
+def _output_columns(self):
+return [(self.getOutputCol(), "array")]
+
+def _get_transform_fn(self):
+max_abs_values = self.max_abs_values
+max_abs_values_zero_cond = (max_abs_values == 0.0)
+
+def transform_fn(series):
+def map_value(x):
+return np.where(max_abs_values_zero_cond, 0.0, x / 
max_abs_values)
+
+return series.apply(map_value)
+
+return transform_fn
+
+
+class StandardScaler(Estimator, HasInputCol, HasOutputCol):
+"""
+Standardizes features by removing the mean and scaling to unit variance 
using column summary
+statistics on the samples in the training set.
+"""
+
+def __init__(self, inputCol, outputCol):
+super().__init__()
+self.set(self.inputCol, inputCol)
+self.set(self.outputCol, outputCol)
+
+def _fit(self, dataset):
+input_col = self.getInputCol()
+
+min_max_res = summarize_dataframe(dataset, input_col, ["mean", "std"])
+mean_values = min_max_res["mean"]
+std_values = min_max_res["std"]
+
+model = StandardScalerModel(mean_values, std_values)
+model._resetUid(self.uid)
+return self._copyValues(model)
+
+
+class StandardScalerModel(Transformer, HasInputCol, HasOutputCol):

Review Comment:
   Discussed with @zhengruifeng offline:
   
   Because the goal is to support fit both pandas DataFrame and spark dataframe 
(the 2 cases we hope them sharing most of fit implementation code), and we need 
to ensure the array operation efficient, so current approach (implementing a 
summarizer via spark pandas UDF) should be best approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on a diff in pull request #41176: [SPARK-43516] [ML] Base interfaces of sparkML for spark3.5: estimator/transformer/model/evaluator

2023-05-18 Thread via GitHub


WeichenXu123 commented on code in PR #41176:
URL: https://github.com/apache/spark/pull/41176#discussion_r1198515956


##
python/pyspark/mlv2/tests/test_feature.py:
##
@@ -0,0 +1,103 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+import numpy as np
+
+from pyspark.ml.linalg import DenseVector, SparseVector, Vectors
+from pyspark.sql import Row
+from pyspark.testing.utils import QuietTest
+from pyspark.testing.mlutils import check_params, SparkSessionTestCase
+
+from pyspark.mlv2.feature import MaxAbsScaler, StandardScaler
+
+
+class FeatureTests(SparkSessionTestCase):

Review Comment:
   I will add a spark connect side test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org