[GitHub] [spark] mengxr commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2023-01-12 Thread GitBox


mengxr commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1068515229


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +152,597 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_multiple_inputs(
+batch: pd.DataFrame, input_shapes: List[List[int] | None], num_input_cols: 
int
+) -> List[np.ndarray]:
+multi_inputs = [batch[col].to_numpy() for col in batch.columns]
+if input_shapes:
+if len(input_shapes) == num_input_cols:
+multi_inputs = [
+np.vstack(v).reshape([-1] + input_shapes[i])  # type: ignore
+if input_shapes[i]
+else v
+for i, v in enumerate(multi_inputs)
+]
+if not all([len(x) == len(batch) for x in multi_inputs]):
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError("input_tensor_shapes must match columns")
+
+return multi_inputs
+
+
+def _validate_and_transform_single_input(
+batch: pd.DataFrame,
+input_shapes: List[List[int] | None],
+has_tensors: bool,
+has_tuple: bool,
+) -> np.ndarray:
+# multiple input columns for single expected input
+if has_tensors:
+# tensor columns
+if len(batch.columns) == 1:
+# one tensor column and one expected input, vstack rows
+single_input = np.vstack(batch.iloc[:, 0])
+else:
+raise ValueError(
+"Multiple input columns found, but model expected a single "
+"input, use `struct` or `array` to combine columns into 
tensors."
+)
+else:
+# scalar columns
+if len(batch.columns) == 1:
+# single scalar column, remove extra dim
+single_input = np.squeeze(batch.to_numpy())
+if input_shapes and input_shapes[0] not in [None, [], [1]]:
+raise ValueError("Invalid input_tensor_shape for scalar 
column.")
+elif not has_tuple:
+# columns grouped via struct/array, convert to single tensor
+single_input = batch.to_numpy()
+if input_shapes and input_shapes[0] != [len(batch.columns)]:
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError(
+"Multiple input columns found, but model expected a single "
+"input, use `struct` or `array` to combine columns into 
tensors."
+)
+
+# if input_tensor_shapes provided, try to reshape input
+if input_shapes:
+if len(input_shapes) == 1:
+single_input = single_input.reshape([-1] + input_shapes[0])  # 
type: ignore
+if len(single_input) != len(batch):
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError("Multiple input_tensor_shapes found, but model 
expected one input")
+
+return single_input
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+if 

[GitHub] [spark] mengxr commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2023-01-12 Thread GitBox


mengxr commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1068357996


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +152,597 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+df = data
+elif isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+
+index = 0
+data_size = len(df)
+while index < data_size:
+yield df.iloc[index : index + batch_size]
+index += batch_size
+
+
+def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool:
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:
+raise ValueError(
+"Unexpected data type: {}, expected pd.Series or 
pd.DataFrame.".format(type(data))
+)
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input Series/DataFrame/Tuple contains any tensor-valued 
columns."""
+if isinstance(data, (pd.Series, pd.DataFrame)):
+return _is_tensor_col(data)
+else:  # isinstance(data, Tuple):
+return any(_is_tensor_col(elem) for elem in data)
+
+
+def _validate_and_transform_multiple_inputs(
+batch: pd.DataFrame, input_shapes: List[List[int] | None], num_input_cols: 
int
+) -> List[np.ndarray]:
+multi_inputs = [batch[col].to_numpy() for col in batch.columns]
+if input_shapes:
+if len(input_shapes) == num_input_cols:
+multi_inputs = [
+np.vstack(v).reshape([-1] + input_shapes[i])  # type: ignore
+if input_shapes[i]
+else v
+for i, v in enumerate(multi_inputs)
+]
+if not all([len(x) == len(batch) for x in multi_inputs]):
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError("input_tensor_shapes must match columns")
+
+return multi_inputs
+
+
+def _validate_and_transform_single_input(
+batch: pd.DataFrame,
+input_shapes: List[List[int] | None],
+has_tensors: bool,
+has_tuple: bool,
+) -> np.ndarray:
+# multiple input columns for single expected input
+if has_tensors:
+# tensor columns
+if len(batch.columns) == 1:
+# one tensor column and one expected input, vstack rows
+single_input = np.vstack(batch.iloc[:, 0])
+else:
+raise ValueError(
+"Multiple input columns found, but model expected a single "
+"input, use `struct` or `array` to combine columns into 
tensors."
+)
+else:
+# scalar columns
+if len(batch.columns) == 1:
+# single scalar column, remove extra dim
+single_input = np.squeeze(batch.to_numpy())
+if input_shapes and input_shapes[0] not in [None, [], [1]]:
+raise ValueError("Invalid input_tensor_shape for scalar 
column.")
+elif not has_tuple:
+# columns grouped via struct/array, convert to single tensor
+single_input = batch.to_numpy()
+if input_shapes and input_shapes[0] != [len(batch.columns)]:
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError(
+"Multiple input columns found, but model expected a single "
+"input, use `struct` or `array` to combine columns into 
tensors."
+)
+
+# if input_tensor_shapes provided, try to reshape input
+if input_shapes:
+if len(input_shapes) == 1:
+single_input = single_input.reshape([-1] + input_shapes[0])  # 
type: ignore
+if len(single_input) != len(batch):
+raise ValueError("Input data does not match expected shape.")
+else:
+raise ValueError("Multiple input_tensor_shapes found, but model 
expected one input")
+
+return single_input
+
+
+def _validate_and_transform_prediction_result(
+preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]],
+num_input_rows: int,
+return_type: DataType,
+) -> pd.DataFrame | pd.Series:
+"""Validate numpy-based model predictions against the expected pandas_udf 
return_type and
+transforms the predictions into an equivalent pandas DataFrame or 
Series."""
+if 

[GitHub] [spark] mengxr commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-12-29 Thread GitBox


mengxr commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1059094105


##
python/pyspark/ml/model_cache.py:
##
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from collections import OrderedDict
+from threading import Lock
+from typing import Callable, Optional
+from uuid import UUID
+
+
+class ModelCache:
+"""Cache for model prediction functions on executors."""
+
+_models: OrderedDict[UUID, Callable] = OrderedDict()
+_capacity: int = 8

Review Comment:
   From offline discussion:
   
   * Set the default capacity to 3. Later we might make it configurable via 
Spark conf if needed.
   * Document the behavior / trade-offs clearly. Link to 
https://github.com/apache/spark/blob/77339dc6a49d1d9d2a7a3aae966610acbe1a5d6e/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L395



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mengxr commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-12-28 Thread GitBox


mengxr commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1003749352


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,474 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+for _, batch in data.groupby(np.arange(len(data)) // batch_size):
+yield batch
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:  # isinstance(data, Tuple):
+return any([d.dtype == np.object_ for d in data]) and any(
+[isinstance(d.iloc[0], (np.ndarray, list)) for d in data]
+)
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],
+np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+],
+],
+*,
+return_type: DataType,
+batch_size: int,
+input_tensor_shapes: list[list[int] | None] | Mapping[int, list[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.

Review Comment:
   * `loads a model` -> `loads a model and makes batch predictions`
   * Pandas UDF



##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,474 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+for _, batch in data.groupby(np.arange(len(data)) // batch_size):
+yield batch
+else:
+# convert (tuple of) pd.Series into pd.DataFrame
+if isinstance(data, pd.Series):
+df = pd.concat((data,), axis=1)
+else:  # isinstance(data, Tuple[pd.Series]):
+df = pd.concat(data, axis=1)
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if isinstance(data, pd.Series):
+return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+elif isinstance(data, pd.DataFrame):
+return any(data.dtypes == np.object_) and any(
+[isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+)
+else:  # isinstance(data, Tuple):
+return any([d.dtype == np.object_ for d in data]) and any(
+[isinstance(d.iloc[0], (np.ndarray, list)) for d in data]
+)
+
+
+def predict_batch_udf(
+predict_batch_fn: Callable[
+[],
+Callable[
+[np.ndarray | List[np.ndarray]],

Review Comment:
   What is the scenario for `List[np.ndarray]`? If this is a single column of 
np arrays, we should expect a single np array after batch. If this is a single 
column of non-array typed values, we should expect `List[Any]`. If this is 
multiple columns, we should use `[np.ndarray, np.ndarray]`. We won't be able to 
enumerate all possible combinations. We can consider the following:
   
   ~~~
   [ndarray] | [List[Any]] | [ndarray, ndarray] | Any
   ~~~



##
python/pyspark/ml/functions.py:
##
@@ -106,6 +117,474 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if isinstance(data, pd.DataFrame):
+for _, batch in data.groupby(np.arange(len(data)) // batch_size):
+ 

[GitHub] [spark] mengxr commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-09-19 Thread GitBox


mengxr commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r974747883


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if batch_size <= 0 or batch_size >= len(df):
+yield df
+else:
+# for batch in np.array_split(df, (len(df.index) + batch_size - 1) // 
batch_size):
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def has_tensor_cols(df: pd.DataFrame) -> bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if any(df.dtypes == np.object_):
+# pd.DataFrame object types can contain different types, e.g. string, 
dates, etc.
+# so inspect a row and check for array/list type
+sample = df.iloc[0]
+return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in 
sample])
+else:
+return False
+
+
+def batch_infer_udf(
+predict_batch_fn: Callable,
+return_type: DataType = ArrayType(FloatType()),
+batch_size: int = -1,
+input_names: list[str] = [],
+input_tensor_shapes: list[list[int]] = [],
+**kwargs: Any,
+) -> Callable:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent to the model predict() function.
+- caching of the model and prediction function on the executors.
+
+This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+When selecting columns in pyspark SQL, users are required to always use 
`struct` for simplicity.
+
+For the conversion of Spark DataFrame to numpy, the following table 
describes the behavior,
+where tensor columns in the Spark DataFrame must be represented as a 
flattened 1-D array/list.
+
+| dataframe \\ model | single input | multiple inputs |
+| :- | :--- | :-- |
+| single-col scalar  | 1| N/A |
+| single-col tensor  | 1,2  | N/A |
+| multi-col scalar   | 3| 4   |

Review Comment:
   Discussed with @leewyang offline. We will drop this scenario.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mengxr commented on a diff in pull request #37734: [SPARK-40264][ML] add batch_infer_udf function to pyspark.ml.functions

2022-09-13 Thread GitBox


mengxr commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r969862754


##
python/pyspark/ml/functions.py:
##
@@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]:

Review Comment:
   Should it be a private method?



##
python/pyspark/ml/functions.py:
##
@@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if batch_size <= 0 or batch_size >= len(df):
+yield df
+else:
+# for batch in np.array_split(df, (len(df.index) + batch_size - 1) // 
batch_size):
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def has_tensor_cols(df: pd.DataFrame) -> bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if any(df.dtypes == np.object_):
+# pd.DataFrame object types can contain different types, e.g. string, 
dates, etc.
+# so inspect a row and check for array/list type
+sample = df.iloc[0]
+return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in 
sample])
+else:
+return False
+
+
+def batch_infer_udf(
+predict_batch_fn: Callable,
+return_type: DataType = ArrayType(FloatType()),

Review Comment:
   * Please use `*` to force keyword-only. I think only the first param is 
trivial to guess.
   * Maybe enforce return type instead of having a default.
   



##
python/pyspark/ml/functions.py:
##
@@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if batch_size <= 0 or batch_size >= len(df):
+yield df
+else:
+# for batch in np.array_split(df, (len(df.index) + batch_size - 1) // 
batch_size):
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def has_tensor_cols(df: pd.DataFrame) -> bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if any(df.dtypes == np.object_):
+# pd.DataFrame object types can contain different types, e.g. string, 
dates, etc.
+# so inspect a row and check for array/list type
+sample = df.iloc[0]
+return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in 
sample])
+else:
+return False
+
+
+def batch_infer_udf(
+predict_batch_fn: Callable,
+return_type: DataType = ArrayType(FloatType()),
+batch_size: int = -1,
+input_names: list[str] = [],
+input_tensor_shapes: list[list[int]] = [],
+**kwargs: Any,
+) -> Callable:

Review Comment:
   It returns a Pandas UDF.



##
python/pyspark/ml/functions.py:
##
@@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column:
 return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]:
+"""Generator that splits a pandas dataframe/series into batches."""
+if batch_size <= 0 or batch_size >= len(df):
+yield df
+else:
+# for batch in np.array_split(df, (len(df.index) + batch_size - 1) // 
batch_size):
+for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+yield batch
+
+
+def has_tensor_cols(df: pd.DataFrame) -> bool:
+"""Check if input DataFrame contains any tensor-valued columns"""
+if any(df.dtypes == np.object_):
+# pd.DataFrame object types can contain different types, e.g. string, 
dates, etc.
+# so inspect a row and check for array/list type
+sample = df.iloc[0]
+return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in 
sample])
+else:
+return False
+
+
+def batch_infer_udf(
+predict_batch_fn: Callable,
+return_type: DataType = ArrayType(FloatType()),
+batch_size: int = -1,
+input_names: list[str] = [],
+input_tensor_shapes: list[list[int]] = [],
+**kwargs: Any,
+) -> Callable:
+"""Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+This will handle:
+- conversion of the Spark DataFrame to numpy arrays.
+- batching of the inputs sent to the model predict() function.
+- caching of the model and prediction function on the executors.
+
+This assumes that the