zhengruifeng commented on code in PR #43199:
URL: https://github.com/apache/spark/pull/43199#discussion_r1348225386
##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,127 @@ def _load_core_model(self, path: str) -> None:
self.scale_values = sk_model.scale_
self.mean_values = sk_model.mean_
self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class ArrayAssembler(
+ Transformer,
+ HasInputCols,
+ HasOutputCol,
+ HasInputFeatureSizeList,
+ HasHandleInvalid,
+ ParamsReadWrite,
+):
+ """
+ A feature transformer that merges multiple input columns into an array
type column.
+
+ Parameters
+ ----------
+ You need to set param `inputCols` for specifying input column names,
+ and set param `inputFeatureSizeList` for specifying corresponding input
column
+ feature size, for scalar type input column, corresponding feature size
must be set to 1,
+ otherwise, set corresponding feature size to feature array length.
+ Output column is "array<double"> type and contains array of assembled
features.
+ All elements in input feature columns must be convertible to double type.
+
+ You can set 'handler_invalid' param to specify how to handle invalid input
value
+ (None or NaN), if it is set to 'error', error is thrown for invalid input
value,
+ if it is set to 'keep', it returns relevant number of NaN in the output.
+
+ .. versionadded:: 4.0.0
+
+ Examples
+ --------
+ >>> from pyspark.ml.connect.feature import VectorAssembler
+ >>> import numpy as np
+ >>>
+ >>> spark_df = spark.createDataFrame(
+ ... [
+ ... ([2.0, 3.5, 1.5], 3.0, True, 1),
+ ... ([-3.0, np.nan, -2.5], 4.0, False, 2),
+ ... ],
+ ... schema=["f1", "f2", "f3", "f4"],
+ ... )
+ >>> assembler = VectorAssembler(
+ ... inputCols=["f1", "f2", "f3", "f4"],
+ ... outputCol="out",
+ ... inputFeatureSizeList=[3, 1, 1, 1],
+ ... handleInvalid="keep",
+ ... )
+ >>> assembler.transform(spark_df).select("out").show(truncate=False)
+ """
+
+ _input_kwargs: Dict[str, Any]
+
+ @keyword_only
+ def __init__(
+ self,
+ *,
+ inputCols: Optional[List[str]] = None,
+ outputCol: Optional[str] = None,
+ inputFeatureSizeList: Optional[List[int]] = None,
+ handleInvalid: Optional[str] = "error",
+ ) -> None:
+ """
+ __init__(self, \\*, inputCols=None, outputCol=None,
inputFeatureSizeList=None, handleInvalid="error")
+ """
+ super().__init__()
+ kwargs = self._input_kwargs
+ self._set(**kwargs)
+ self._setDefault(handleInvalid="error")
+
+ def _input_columns(self) -> List[str]:
+ return self.getInputCols()
+
+ def _output_columns(self) -> List[Tuple[str, str]]:
+ return [(self.getOutputCol(), "array<double>")]
+
+ def _get_transform_fn(self) -> Callable[..., Any]:
+ feature_size_list = self.getInputFeatureSizeList()
+ if feature_size_list is None or len(feature_size_list) !=
len(self.getInputCols()):
+ raise ValueError(
+ "'feature_size_list' param must be set with an array of
integer, and"
+ "its length must be equal to number of input columns."
+ )
+ assembled_feature_size = sum(feature_size_list)
+ handler_invalid = self.getHandleInvalid()
+
+ if handler_invalid not in ["error", "keep"]:
+ raise ValueError("'handler_invalid' param must be set with 'error'
or 'keep' value.")
+
+ keep_invalid = handler_invalid == "keep"
+
+ def assemble_features(*feature_list: Any):
+ assembled_array = np.empty(assembled_feature_size,
dtype=np.float64)
+ pos = 0
+ for index, feature in enumerate(feature_list):
+ feature_size = feature_size_list[index]
+
+ if keep_invalid:
+ if feature is None:
+ assembled_array[pos : pos + feature_size] = np.nan
+ else:
+ assembled_array[pos : pos + feature_size] = feature
+ else:
+ if feature is None or np.isnan(feature).any():
+ raise ValueError(
+ f"The input features contains invalid value:
{str(feature)}"
+ )
+ else:
+ assembled_array[pos : pos + feature_size] = feature
+
+ pos += feature_size
+
+ return assembled_array
+
+ def transform_fn(*series_list: Any) -> Any:
+ return pd.Series(assemble_features(*feature_list) for feature_list
in zip(*series_list))
+
+ return transform_fn
+
+
+# Override doc of VectorAssembler.handleInvalid param.
+VectorAssembler.handleInvalid.doc = (
+ "how to handle invalid entries. Options are 'error' (throw an error), "
+ "or 'keep' (return relevant number of NaN in the output). Default value "
+ "is 'error'"
+)
Review Comment:
why need this?
##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,127 @@ def _load_core_model(self, path: str) -> None:
self.scale_values = sk_model.scale_
self.mean_values = sk_model.mean_
self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class ArrayAssembler(
+ Transformer,
+ HasInputCols,
+ HasOutputCol,
+ HasInputFeatureSizeList,
+ HasHandleInvalid,
+ ParamsReadWrite,
+):
+ """
+ A feature transformer that merges multiple input columns into an array
type column.
+
+ Parameters
+ ----------
+ You need to set param `inputCols` for specifying input column names,
+ and set param `inputFeatureSizeList` for specifying corresponding input
column
+ feature size, for scalar type input column, corresponding feature size
must be set to 1,
+ otherwise, set corresponding feature size to feature array length.
+ Output column is "array<double"> type and contains array of assembled
features.
+ All elements in input feature columns must be convertible to double type.
+
+ You can set 'handler_invalid' param to specify how to handle invalid input
value
+ (None or NaN), if it is set to 'error', error is thrown for invalid input
value,
+ if it is set to 'keep', it returns relevant number of NaN in the output.
+
+ .. versionadded:: 4.0.0
+
+ Examples
+ --------
+ >>> from pyspark.ml.connect.feature import VectorAssembler
+ >>> import numpy as np
+ >>>
+ >>> spark_df = spark.createDataFrame(
+ ... [
+ ... ([2.0, 3.5, 1.5], 3.0, True, 1),
+ ... ([-3.0, np.nan, -2.5], 4.0, False, 2),
+ ... ],
+ ... schema=["f1", "f2", "f3", "f4"],
+ ... )
+ >>> assembler = VectorAssembler(
+ ... inputCols=["f1", "f2", "f3", "f4"],
+ ... outputCol="out",
+ ... inputFeatureSizeList=[3, 1, 1, 1],
+ ... handleInvalid="keep",
+ ... )
+ >>> assembler.transform(spark_df).select("out").show(truncate=False)
Review Comment:
this doc example is not complete?
##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,127 @@ def _load_core_model(self, path: str) -> None:
self.scale_values = sk_model.scale_
self.mean_values = sk_model.mean_
self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class ArrayAssembler(
+ Transformer,
+ HasInputCols,
+ HasOutputCol,
+ HasInputFeatureSizeList,
+ HasHandleInvalid,
+ ParamsReadWrite,
+):
+ """
+ A feature transformer that merges multiple input columns into an array
type column.
+
+ Parameters
+ ----------
+ You need to set param `inputCols` for specifying input column names,
+ and set param `inputFeatureSizeList` for specifying corresponding input
column
+ feature size, for scalar type input column, corresponding feature size
must be set to 1,
+ otherwise, set corresponding feature size to feature array length.
+ Output column is "array<double"> type and contains array of assembled
features.
+ All elements in input feature columns must be convertible to double type.
+
+ You can set 'handler_invalid' param to specify how to handle invalid input
value
+ (None or NaN), if it is set to 'error', error is thrown for invalid input
value,
+ if it is set to 'keep', it returns relevant number of NaN in the output.
+
+ .. versionadded:: 4.0.0
+
+ Examples
+ --------
+ >>> from pyspark.ml.connect.feature import VectorAssembler
+ >>> import numpy as np
+ >>>
+ >>> spark_df = spark.createDataFrame(
+ ... [
+ ... ([2.0, 3.5, 1.5], 3.0, True, 1),
+ ... ([-3.0, np.nan, -2.5], 4.0, False, 2),
+ ... ],
+ ... schema=["f1", "f2", "f3", "f4"],
+ ... )
+ >>> assembler = VectorAssembler(
+ ... inputCols=["f1", "f2", "f3", "f4"],
+ ... outputCol="out",
+ ... inputFeatureSizeList=[3, 1, 1, 1],
+ ... handleInvalid="keep",
+ ... )
+ >>> assembler.transform(spark_df).select("out").show(truncate=False)
+ """
+
+ _input_kwargs: Dict[str, Any]
+
+ @keyword_only
+ def __init__(
+ self,
+ *,
+ inputCols: Optional[List[str]] = None,
+ outputCol: Optional[str] = None,
+ inputFeatureSizeList: Optional[List[int]] = None,
Review Comment:
the name seems a bit long, what about renaming it `featureSizes`? but not
feel strong about it
##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,127 @@ def _load_core_model(self, path: str) -> None:
self.scale_values = sk_model.scale_
self.mean_values = sk_model.mean_
self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class ArrayAssembler(
+ Transformer,
+ HasInputCols,
+ HasOutputCol,
+ HasInputFeatureSizeList,
+ HasHandleInvalid,
+ ParamsReadWrite,
+):
+ """
+ A feature transformer that merges multiple input columns into an array
type column.
+
+ Parameters
+ ----------
+ You need to set param `inputCols` for specifying input column names,
+ and set param `inputFeatureSizeList` for specifying corresponding input
column
+ feature size, for scalar type input column, corresponding feature size
must be set to 1,
+ otherwise, set corresponding feature size to feature array length.
+ Output column is "array<double"> type and contains array of assembled
features.
+ All elements in input feature columns must be convertible to double type.
+
+ You can set 'handler_invalid' param to specify how to handle invalid input
value
+ (None or NaN), if it is set to 'error', error is thrown for invalid input
value,
+ if it is set to 'keep', it returns relevant number of NaN in the output.
+
+ .. versionadded:: 4.0.0
+
+ Examples
+ --------
+ >>> from pyspark.ml.connect.feature import VectorAssembler
+ >>> import numpy as np
+ >>>
+ >>> spark_df = spark.createDataFrame(
+ ... [
+ ... ([2.0, 3.5, 1.5], 3.0, True, 1),
+ ... ([-3.0, np.nan, -2.5], 4.0, False, 2),
+ ... ],
+ ... schema=["f1", "f2", "f3", "f4"],
+ ... )
+ >>> assembler = VectorAssembler(
+ ... inputCols=["f1", "f2", "f3", "f4"],
+ ... outputCol="out",
+ ... inputFeatureSizeList=[3, 1, 1, 1],
+ ... handleInvalid="keep",
+ ... )
+ >>> assembler.transform(spark_df).select("out").show(truncate=False)
+ """
+
+ _input_kwargs: Dict[str, Any]
+
+ @keyword_only
+ def __init__(
+ self,
+ *,
+ inputCols: Optional[List[str]] = None,
+ outputCol: Optional[str] = None,
+ inputFeatureSizeList: Optional[List[int]] = None,
+ handleInvalid: Optional[str] = "error",
+ ) -> None:
+ """
+ __init__(self, \\*, inputCols=None, outputCol=None,
inputFeatureSizeList=None, handleInvalid="error")
+ """
+ super().__init__()
+ kwargs = self._input_kwargs
+ self._set(**kwargs)
+ self._setDefault(handleInvalid="error")
+
+ def _input_columns(self) -> List[str]:
+ return self.getInputCols()
+
+ def _output_columns(self) -> List[Tuple[str, str]]:
+ return [(self.getOutputCol(), "array<double>")]
+
+ def _get_transform_fn(self) -> Callable[..., Any]:
+ feature_size_list = self.getInputFeatureSizeList()
+ if feature_size_list is None or len(feature_size_list) !=
len(self.getInputCols()):
+ raise ValueError(
+ "'feature_size_list' param must be set with an array of
integer, and"
+ "its length must be equal to number of input columns."
+ )
+ assembled_feature_size = sum(feature_size_list)
+ handler_invalid = self.getHandleInvalid()
+
+ if handler_invalid not in ["error", "keep"]:
+ raise ValueError("'handler_invalid' param must be set with 'error'
or 'keep' value.")
+
+ keep_invalid = handler_invalid == "keep"
+
+ def assemble_features(*feature_list: Any):
+ assembled_array = np.empty(assembled_feature_size,
dtype=np.float64)
+ pos = 0
+ for index, feature in enumerate(feature_list):
+ feature_size = feature_size_list[index]
+
+ if keep_invalid:
+ if feature is None:
+ assembled_array[pos : pos + feature_size] = np.nan
+ else:
+ assembled_array[pos : pos + feature_size] = feature
+ else:
+ if feature is None or np.isnan(feature).any():
Review Comment:
Not related to this PR, it seems existing impl of `VectorAssembler` doesn't
validate NaN values in Vectors.
Let me double check and fix it
##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,127 @@ def _load_core_model(self, path: str) -> None:
self.scale_values = sk_model.scale_
self.mean_values = sk_model.mean_
self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class ArrayAssembler(
+ Transformer,
+ HasInputCols,
+ HasOutputCol,
+ HasInputFeatureSizeList,
+ HasHandleInvalid,
+ ParamsReadWrite,
+):
+ """
+ A feature transformer that merges multiple input columns into an array
type column.
+
+ Parameters
+ ----------
+ You need to set param `inputCols` for specifying input column names,
+ and set param `inputFeatureSizeList` for specifying corresponding input
column
+ feature size, for scalar type input column, corresponding feature size
must be set to 1,
+ otherwise, set corresponding feature size to feature array length.
+ Output column is "array<double"> type and contains array of assembled
features.
+ All elements in input feature columns must be convertible to double type.
+
+ You can set 'handler_invalid' param to specify how to handle invalid input
value
+ (None or NaN), if it is set to 'error', error is thrown for invalid input
value,
+ if it is set to 'keep', it returns relevant number of NaN in the output.
+
+ .. versionadded:: 4.0.0
+
+ Examples
+ --------
+ >>> from pyspark.ml.connect.feature import VectorAssembler
+ >>> import numpy as np
+ >>>
+ >>> spark_df = spark.createDataFrame(
+ ... [
+ ... ([2.0, 3.5, 1.5], 3.0, True, 1),
+ ... ([-3.0, np.nan, -2.5], 4.0, False, 2),
+ ... ],
+ ... schema=["f1", "f2", "f3", "f4"],
+ ... )
+ >>> assembler = VectorAssembler(
+ ... inputCols=["f1", "f2", "f3", "f4"],
+ ... outputCol="out",
+ ... inputFeatureSizeList=[3, 1, 1, 1],
+ ... handleInvalid="keep",
+ ... )
+ >>> assembler.transform(spark_df).select("out").show(truncate=False)
+ """
+
+ _input_kwargs: Dict[str, Any]
+
+ @keyword_only
+ def __init__(
+ self,
+ *,
+ inputCols: Optional[List[str]] = None,
+ outputCol: Optional[str] = None,
+ inputFeatureSizeList: Optional[List[int]] = None,
+ handleInvalid: Optional[str] = "error",
Review Comment:
`inputFeatureSizeList` and `handleInvalid` not optional?
##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,127 @@ def _load_core_model(self, path: str) -> None:
self.scale_values = sk_model.scale_
self.mean_values = sk_model.mean_
self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class ArrayAssembler(
Review Comment:
add it to `pyspark.ml.connect.rst`
##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,127 @@ def _load_core_model(self, path: str) -> None:
self.scale_values = sk_model.scale_
self.mean_values = sk_model.mean_
self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class ArrayAssembler(
+ Transformer,
+ HasInputCols,
+ HasOutputCol,
+ HasInputFeatureSizeList,
+ HasHandleInvalid,
+ ParamsReadWrite,
+):
+ """
+ A feature transformer that merges multiple input columns into an array
type column.
+
+ Parameters
+ ----------
+ You need to set param `inputCols` for specifying input column names,
+ and set param `inputFeatureSizeList` for specifying corresponding input
column
+ feature size, for scalar type input column, corresponding feature size
must be set to 1,
+ otherwise, set corresponding feature size to feature array length.
+ Output column is "array<double"> type and contains array of assembled
features.
+ All elements in input feature columns must be convertible to double type.
+
+ You can set 'handler_invalid' param to specify how to handle invalid input
value
+ (None or NaN), if it is set to 'error', error is thrown for invalid input
value,
+ if it is set to 'keep', it returns relevant number of NaN in the output.
+
+ .. versionadded:: 4.0.0
+
+ Examples
+ --------
+ >>> from pyspark.ml.connect.feature import VectorAssembler
+ >>> import numpy as np
+ >>>
+ >>> spark_df = spark.createDataFrame(
+ ... [
+ ... ([2.0, 3.5, 1.5], 3.0, True, 1),
+ ... ([-3.0, np.nan, -2.5], 4.0, False, 2),
+ ... ],
+ ... schema=["f1", "f2", "f3", "f4"],
+ ... )
+ >>> assembler = VectorAssembler(
+ ... inputCols=["f1", "f2", "f3", "f4"],
+ ... outputCol="out",
+ ... inputFeatureSizeList=[3, 1, 1, 1],
+ ... handleInvalid="keep",
+ ... )
+ >>> assembler.transform(spark_df).select("out").show(truncate=False)
+ """
+
+ _input_kwargs: Dict[str, Any]
+
+ @keyword_only
+ def __init__(
+ self,
+ *,
+ inputCols: Optional[List[str]] = None,
+ outputCol: Optional[str] = None,
+ inputFeatureSizeList: Optional[List[int]] = None,
+ handleInvalid: Optional[str] = "error",
+ ) -> None:
+ """
+ __init__(self, \\*, inputCols=None, outputCol=None,
inputFeatureSizeList=None, handleInvalid="error")
+ """
+ super().__init__()
+ kwargs = self._input_kwargs
+ self._set(**kwargs)
+ self._setDefault(handleInvalid="error")
+
+ def _input_columns(self) -> List[str]:
+ return self.getInputCols()
+
+ def _output_columns(self) -> List[Tuple[str, str]]:
+ return [(self.getOutputCol(), "array<double>")]
+
+ def _get_transform_fn(self) -> Callable[..., Any]:
+ feature_size_list = self.getInputFeatureSizeList()
+ if feature_size_list is None or len(feature_size_list) !=
len(self.getInputCols()):
Review Comment:
since `ndarray` and `List` supports negative indices, e.g.
`assembled_array[-30: -20]`
I think we may need to check all feature size > 0
##########
python/pyspark/ml/connect/feature.py:
##########
@@ -256,3 +262,127 @@ def _load_core_model(self, path: str) -> None:
self.scale_values = sk_model.scale_
self.mean_values = sk_model.mean_
self.n_samples_seen = sk_model.n_samples_seen_
+
+
+class ArrayAssembler(
+ Transformer,
+ HasInputCols,
+ HasOutputCol,
+ HasInputFeatureSizeList,
+ HasHandleInvalid,
+ ParamsReadWrite,
+):
+ """
+ A feature transformer that merges multiple input columns into an array
type column.
+
+ Parameters
+ ----------
+ You need to set param `inputCols` for specifying input column names,
+ and set param `inputFeatureSizeList` for specifying corresponding input
column
+ feature size, for scalar type input column, corresponding feature size
must be set to 1,
+ otherwise, set corresponding feature size to feature array length.
+ Output column is "array<double"> type and contains array of assembled
features.
+ All elements in input feature columns must be convertible to double type.
+
+ You can set 'handler_invalid' param to specify how to handle invalid input
value
+ (None or NaN), if it is set to 'error', error is thrown for invalid input
value,
+ if it is set to 'keep', it returns relevant number of NaN in the output.
+
+ .. versionadded:: 4.0.0
+
+ Examples
+ --------
+ >>> from pyspark.ml.connect.feature import VectorAssembler
+ >>> import numpy as np
+ >>>
+ >>> spark_df = spark.createDataFrame(
+ ... [
+ ... ([2.0, 3.5, 1.5], 3.0, True, 1),
+ ... ([-3.0, np.nan, -2.5], 4.0, False, 2),
+ ... ],
+ ... schema=["f1", "f2", "f3", "f4"],
+ ... )
+ >>> assembler = VectorAssembler(
+ ... inputCols=["f1", "f2", "f3", "f4"],
+ ... outputCol="out",
+ ... inputFeatureSizeList=[3, 1, 1, 1],
+ ... handleInvalid="keep",
+ ... )
+ >>> assembler.transform(spark_df).select("out").show(truncate=False)
+ """
+
+ _input_kwargs: Dict[str, Any]
+
+ @keyword_only
+ def __init__(
+ self,
+ *,
+ inputCols: Optional[List[str]] = None,
+ outputCol: Optional[str] = None,
+ inputFeatureSizeList: Optional[List[int]] = None,
+ handleInvalid: Optional[str] = "error",
+ ) -> None:
+ """
+ __init__(self, \\*, inputCols=None, outputCol=None,
inputFeatureSizeList=None, handleInvalid="error")
+ """
+ super().__init__()
+ kwargs = self._input_kwargs
+ self._set(**kwargs)
+ self._setDefault(handleInvalid="error")
+
+ def _input_columns(self) -> List[str]:
+ return self.getInputCols()
+
+ def _output_columns(self) -> List[Tuple[str, str]]:
+ return [(self.getOutputCol(), "array<double>")]
+
+ def _get_transform_fn(self) -> Callable[..., Any]:
+ feature_size_list = self.getInputFeatureSizeList()
+ if feature_size_list is None or len(feature_size_list) !=
len(self.getInputCols()):
+ raise ValueError(
+ "'feature_size_list' param must be set with an array of
integer, and"
+ "its length must be equal to number of input columns."
+ )
+ assembled_feature_size = sum(feature_size_list)
+ handler_invalid = self.getHandleInvalid()
+
+ if handler_invalid not in ["error", "keep"]:
+ raise ValueError("'handler_invalid' param must be set with 'error'
or 'keep' value.")
+
+ keep_invalid = handler_invalid == "keep"
+
+ def assemble_features(*feature_list: Any):
+ assembled_array = np.empty(assembled_feature_size,
dtype=np.float64)
+ pos = 0
+ for index, feature in enumerate(feature_list):
+ feature_size = feature_size_list[index]
+
+ if keep_invalid:
+ if feature is None:
+ assembled_array[pos : pos + feature_size] = np.nan
+ else:
+ assembled_array[pos : pos + feature_size] = feature
Review Comment:
do we need to check the array size here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]