mengxr commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r969862754


##########
python/pyspark/ml/functions.py:
##########
@@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column:
     return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]:

Review Comment:
   Should it be a private method?



##########
python/pyspark/ml/functions.py:
##########
@@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column:
     return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]:
+    """Generator that splits a pandas dataframe/series into batches."""
+    if batch_size <= 0 or batch_size >= len(df):
+        yield df
+    else:
+        # for batch in np.array_split(df, (len(df.index) + batch_size - 1) // 
batch_size):
+        for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+            yield batch
+
+
+def has_tensor_cols(df: pd.DataFrame) -> bool:
+    """Check if input DataFrame contains any tensor-valued columns"""
+    if any(df.dtypes == np.object_):
+        # pd.DataFrame object types can contain different types, e.g. string, 
dates, etc.
+        # so inspect a row and check for array/list type
+        sample = df.iloc[0]
+        return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in 
sample])
+    else:
+        return False
+
+
+def batch_infer_udf(
+    predict_batch_fn: Callable,
+    return_type: DataType = ArrayType(FloatType()),

Review Comment:
   * Please use `*` to force keyword-only. I think only the first param is 
trivial to guess.
   * Maybe enforce return type instead of having a default.
   



##########
python/pyspark/ml/functions.py:
##########
@@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column:
     return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]:
+    """Generator that splits a pandas dataframe/series into batches."""
+    if batch_size <= 0 or batch_size >= len(df):
+        yield df
+    else:
+        # for batch in np.array_split(df, (len(df.index) + batch_size - 1) // 
batch_size):
+        for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+            yield batch
+
+
+def has_tensor_cols(df: pd.DataFrame) -> bool:
+    """Check if input DataFrame contains any tensor-valued columns"""
+    if any(df.dtypes == np.object_):
+        # pd.DataFrame object types can contain different types, e.g. string, 
dates, etc.
+        # so inspect a row and check for array/list type
+        sample = df.iloc[0]
+        return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in 
sample])
+    else:
+        return False
+
+
+def batch_infer_udf(
+    predict_batch_fn: Callable,
+    return_type: DataType = ArrayType(FloatType()),
+    batch_size: int = -1,
+    input_names: list[str] = [],
+    input_tensor_shapes: list[list[int]] = [],
+    **kwargs: Any,
+) -> Callable:

Review Comment:
   It returns a Pandas UDF.



##########
python/pyspark/ml/functions.py:
##########
@@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column:
     return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]:
+    """Generator that splits a pandas dataframe/series into batches."""
+    if batch_size <= 0 or batch_size >= len(df):
+        yield df
+    else:
+        # for batch in np.array_split(df, (len(df.index) + batch_size - 1) // 
batch_size):
+        for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+            yield batch
+
+
+def has_tensor_cols(df: pd.DataFrame) -> bool:
+    """Check if input DataFrame contains any tensor-valued columns"""
+    if any(df.dtypes == np.object_):
+        # pd.DataFrame object types can contain different types, e.g. string, 
dates, etc.
+        # so inspect a row and check for array/list type
+        sample = df.iloc[0]
+        return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in 
sample])
+    else:
+        return False
+
+
+def batch_infer_udf(
+    predict_batch_fn: Callable,
+    return_type: DataType = ArrayType(FloatType()),
+    batch_size: int = -1,
+    input_names: list[str] = [],
+    input_tensor_shapes: list[list[int]] = [],
+    **kwargs: Any,
+) -> Callable:
+    """Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+    This will handle:
+    - conversion of the Spark DataFrame to numpy arrays.
+    - batching of the inputs sent to the model predict() function.
+    - caching of the model and prediction function on the executors.
+
+    This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+    running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+    When selecting columns in pyspark SQL, users are required to always use 
`struct` for simplicity.
+
+    For the conversion of Spark DataFrame to numpy, the following table 
describes the behavior,
+    where tensor columns in the Spark DataFrame must be represented as a 
flattened 1-D array/list.
+
+    | dataframe \\ model | single input | multiple inputs |
+    | :----------------- | :----------- | :-------------- |
+    | single-col scalar  | 1            | N/A             |
+    | single-col tensor  | 1,2          | N/A             |
+    | multi-col scalar   | 3            | 4               |

Review Comment:
   I feel we don't need to support this scenario. Users can use SQL function 
`array` to do the same.



##########
python/pyspark/ml/functions.py:
##########
@@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column:
     return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]:
+    """Generator that splits a pandas dataframe/series into batches."""
+    if batch_size <= 0 or batch_size >= len(df):
+        yield df
+    else:
+        # for batch in np.array_split(df, (len(df.index) + batch_size - 1) // 
batch_size):
+        for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+            yield batch
+
+
+def has_tensor_cols(df: pd.DataFrame) -> bool:
+    """Check if input DataFrame contains any tensor-valued columns"""
+    if any(df.dtypes == np.object_):
+        # pd.DataFrame object types can contain different types, e.g. string, 
dates, etc.
+        # so inspect a row and check for array/list type
+        sample = df.iloc[0]
+        return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in 
sample])
+    else:
+        return False
+
+
+def batch_infer_udf(
+    predict_batch_fn: Callable,
+    return_type: DataType = ArrayType(FloatType()),
+    batch_size: int = -1,
+    input_names: list[str] = [],
+    input_tensor_shapes: list[list[int]] = [],
+    **kwargs: Any,
+) -> Callable:
+    """Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+    This will handle:
+    - conversion of the Spark DataFrame to numpy arrays.
+    - batching of the inputs sent to the model predict() function.
+    - caching of the model and prediction function on the executors.
+
+    This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+    running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+    When selecting columns in pyspark SQL, users are required to always use 
`struct` for simplicity.

Review Comment:
   I guess this is the main design discussion.



##########
python/pyspark/ml/functions.py:
##########
@@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column:
     return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]:
+    """Generator that splits a pandas dataframe/series into batches."""
+    if batch_size <= 0 or batch_size >= len(df):
+        yield df
+    else:
+        # for batch in np.array_split(df, (len(df.index) + batch_size - 1) // 
batch_size):
+        for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+            yield batch
+
+
+def has_tensor_cols(df: pd.DataFrame) -> bool:
+    """Check if input DataFrame contains any tensor-valued columns"""
+    if any(df.dtypes == np.object_):
+        # pd.DataFrame object types can contain different types, e.g. string, 
dates, etc.
+        # so inspect a row and check for array/list type
+        sample = df.iloc[0]
+        return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in 
sample])
+    else:
+        return False
+
+
+def batch_infer_udf(
+    predict_batch_fn: Callable,
+    return_type: DataType = ArrayType(FloatType()),
+    batch_size: int = -1,
+    input_names: list[str] = [],
+    input_tensor_shapes: list[list[int]] = [],
+    **kwargs: Any,
+) -> Callable:
+    """Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+    This will handle:
+    - conversion of the Spark DataFrame to numpy arrays.
+    - batching of the inputs sent to the model predict() function.
+    - caching of the model and prediction function on the executors.
+
+    This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+    running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+    When selecting columns in pyspark SQL, users are required to always use 
`struct` for simplicity.
+
+    For the conversion of Spark DataFrame to numpy, the following table 
describes the behavior,
+    where tensor columns in the Spark DataFrame must be represented as a 
flattened 1-D array/list.
+
+    | dataframe \\ model | single input | multiple inputs |
+    | :----------------- | :----------- | :-------------- |
+    | single-col scalar  | 1            | N/A             |
+    | single-col tensor  | 1,2          | N/A             |
+    | multi-col scalar   | 3            | 4               |
+    | multi-col tensor   | N/A          | 4,2             |
+
+    Notes:
+    1. pass thru dataframe column => model input as single numpy array.
+    2. reshape flattened tensors into expected tensor shapes.
+    3. convert entire dataframe into single numpy array via df.to_numpy(), or 
user can use
+       `pyspark.sql.functions.array()` to transform the input into a 
single-col tensor first.
+    4. pass thru dataframe column => model input as an (ordered) dictionary of 
numpy arrays.
+
+    Parameters
+    ----------
+    predict_batch_fn : Callable

Review Comment:
   Should provide accepted signatures and examples in the API doc.



##########
python/pyspark/ml/functions.py:
##########
@@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column:
     return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]:
+    """Generator that splits a pandas dataframe/series into batches."""
+    if batch_size <= 0 or batch_size >= len(df):
+        yield df
+    else:
+        # for batch in np.array_split(df, (len(df.index) + batch_size - 1) // 
batch_size):
+        for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+            yield batch
+
+
+def has_tensor_cols(df: pd.DataFrame) -> bool:
+    """Check if input DataFrame contains any tensor-valued columns"""
+    if any(df.dtypes == np.object_):
+        # pd.DataFrame object types can contain different types, e.g. string, 
dates, etc.
+        # so inspect a row and check for array/list type
+        sample = df.iloc[0]
+        return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in 
sample])
+    else:
+        return False
+
+
+def batch_infer_udf(
+    predict_batch_fn: Callable,
+    return_type: DataType = ArrayType(FloatType()),
+    batch_size: int = -1,
+    input_names: list[str] = [],
+    input_tensor_shapes: list[list[int]] = [],
+    **kwargs: Any,
+) -> Callable:
+    """Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+    This will handle:
+    - conversion of the Spark DataFrame to numpy arrays.
+    - batching of the inputs sent to the model predict() function.
+    - caching of the model and prediction function on the executors.
+
+    This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+    running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+    When selecting columns in pyspark SQL, users are required to always use 
`struct` for simplicity.
+
+    For the conversion of Spark DataFrame to numpy, the following table 
describes the behavior,
+    where tensor columns in the Spark DataFrame must be represented as a 
flattened 1-D array/list.
+
+    | dataframe \\ model | single input | multiple inputs |
+    | :----------------- | :----------- | :-------------- |
+    | single-col scalar  | 1            | N/A             |
+    | single-col tensor  | 1,2          | N/A             |
+    | multi-col scalar   | 3            | 4               |
+    | multi-col tensor   | N/A          | 4,2             |
+
+    Notes:
+    1. pass thru dataframe column => model input as single numpy array.
+    2. reshape flattened tensors into expected tensor shapes.
+    3. convert entire dataframe into single numpy array via df.to_numpy(), or 
user can use
+       `pyspark.sql.functions.array()` to transform the input into a 
single-col tensor first.
+    4. pass thru dataframe column => model input as an (ordered) dictionary of 
numpy arrays.
+
+    Parameters
+    ----------
+    predict_batch_fn : Callable
+        Function which is responsible for loading a model and returning a 
`predict` function.
+    return_type : :class:`pspark.sql.types.DataType` or str.
+        Spark SQL datatype for the expected output.
+        Default: ArrayType(FloatType())
+    batch_size : int
+        Batch size to use for inference, note that this is typically a 
limitation of the model
+        and/or the hardware resources and is usually smaller than the Spark 
partition size.
+        Default: -1, which sends the entire Spark partition to the model.

Review Comment:
   Instead of making the default to the entire partition, I would leave it to 
"auto" (`None`). I think in the future we can automatically estimate a good 
step size to use based on memory and CPU/GPU utilization. Default to the entire 
partition is error-prone. In the first version, we can be conservative about 
the default batch size or simply make it a required param.



##########
python/pyspark/ml/functions.py:
##########
@@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column:
     return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]:
+    """Generator that splits a pandas dataframe/series into batches."""
+    if batch_size <= 0 or batch_size >= len(df):
+        yield df
+    else:
+        # for batch in np.array_split(df, (len(df.index) + batch_size - 1) // 
batch_size):
+        for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+            yield batch
+
+
+def has_tensor_cols(df: pd.DataFrame) -> bool:
+    """Check if input DataFrame contains any tensor-valued columns"""
+    if any(df.dtypes == np.object_):
+        # pd.DataFrame object types can contain different types, e.g. string, 
dates, etc.
+        # so inspect a row and check for array/list type
+        sample = df.iloc[0]
+        return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in 
sample])
+    else:
+        return False
+
+
+def batch_infer_udf(

Review Comment:
   I think we can call it "predict_batch_udf" to match "predict_batch_fn" param 
name.



##########
python/pyspark/ml/functions.py:
##########
@@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column:
     return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]:
+    """Generator that splits a pandas dataframe/series into batches."""
+    if batch_size <= 0 or batch_size >= len(df):
+        yield df
+    else:
+        # for batch in np.array_split(df, (len(df.index) + batch_size - 1) // 
batch_size):
+        for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+            yield batch
+
+
+def has_tensor_cols(df: pd.DataFrame) -> bool:
+    """Check if input DataFrame contains any tensor-valued columns"""
+    if any(df.dtypes == np.object_):
+        # pd.DataFrame object types can contain different types, e.g. string, 
dates, etc.
+        # so inspect a row and check for array/list type
+        sample = df.iloc[0]
+        return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in 
sample])
+    else:
+        return False
+
+
+def batch_infer_udf(
+    predict_batch_fn: Callable,
+    return_type: DataType = ArrayType(FloatType()),
+    batch_size: int = -1,
+    input_names: list[str] = [],

Review Comment:
   * Shouldn't use mutable `[]` as default value. Use `None` instead.
   * Not sure if we need this param in the first version. We can restrict the 
input to either:
   ** One value column (no name), `model_udf(c1)`
   ** A struct column (caller can rename the sub-columns if needed). 
`model_udf(struct(c1, c2))`. Or another option is to accept a list of columns 
`(model_udf(c1, c2))`.  `predict_batch_fn` and `input_tensor_shapes` signature 
should match accordingly. I slightly prefer the latter which doesn't need to 
handle name conflicts and we can use list for tensor shapes.



##########
python/pyspark/ml/functions.py:
##########
@@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column:
     return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]:
+    """Generator that splits a pandas dataframe/series into batches."""
+    if batch_size <= 0 or batch_size >= len(df):
+        yield df
+    else:
+        # for batch in np.array_split(df, (len(df.index) + batch_size - 1) // 
batch_size):
+        for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+            yield batch
+
+
+def has_tensor_cols(df: pd.DataFrame) -> bool:
+    """Check if input DataFrame contains any tensor-valued columns"""
+    if any(df.dtypes == np.object_):
+        # pd.DataFrame object types can contain different types, e.g. string, 
dates, etc.
+        # so inspect a row and check for array/list type
+        sample = df.iloc[0]
+        return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in 
sample])
+    else:
+        return False
+
+
+def batch_infer_udf(
+    predict_batch_fn: Callable,

Review Comment:
   Any type hints we can specify here?



##########
python/pyspark/ml/functions.py:
##########
@@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column:
     return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]:
+    """Generator that splits a pandas dataframe/series into batches."""
+    if batch_size <= 0 or batch_size >= len(df):
+        yield df
+    else:
+        # for batch in np.array_split(df, (len(df.index) + batch_size - 1) // 
batch_size):
+        for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+            yield batch
+
+
+def has_tensor_cols(df: pd.DataFrame) -> bool:

Review Comment:
   Ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to