leewyang commented on code in PR #37734:
URL: https://github.com/apache/spark/pull/37734#discussion_r1061926045


##########
python/pyspark/ml/functions.py:
##########
@@ -106,6 +117,474 @@ def array_to_vector(col: Column) -> Column:
     return 
Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
 
 
+def _batched(
+    data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int
+) -> Iterator[pd.DataFrame]:
+    """Generator that splits a pandas dataframe/series into batches."""
+    if isinstance(data, pd.DataFrame):
+        for _, batch in data.groupby(np.arange(len(data)) // batch_size):
+            yield batch
+    else:
+        # convert (tuple of) pd.Series into pd.DataFrame
+        if isinstance(data, pd.Series):
+            df = pd.concat((data,), axis=1)
+        else:  # isinstance(data, Tuple[pd.Series]):
+            df = pd.concat(data, axis=1)
+        for _, batch in df.groupby(np.arange(len(df)) // batch_size):
+            yield batch
+
+
+def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> 
bool:
+    """Check if input DataFrame contains any tensor-valued columns"""
+    if isinstance(data, pd.Series):
+        return data.dtype == np.object_ and isinstance(data.iloc[0], 
(np.ndarray, list))
+    elif isinstance(data, pd.DataFrame):
+        return any(data.dtypes == np.object_) and any(
+            [isinstance(d, (np.ndarray, list)) for d in data.iloc[0]]
+        )
+    else:  # isinstance(data, Tuple):
+        return any([d.dtype == np.object_ for d in data]) and any(
+            [isinstance(d.iloc[0], (np.ndarray, list)) for d in data]
+        )
+
+
+def predict_batch_udf(
+    predict_batch_fn: Callable[
+        [],
+        Callable[
+            [np.ndarray | List[np.ndarray]],
+            np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, 
np.dtype]],
+        ],
+    ],
+    *,
+    return_type: DataType,
+    batch_size: int,
+    input_tensor_shapes: list[list[int] | None] | Mapping[int, list[int]] | 
None = None,
+) -> UserDefinedFunctionLike:
+    """Given a function which loads a model, returns a pandas_udf for 
inferencing over that model.
+
+    This will handle:
+    - conversion of the Spark DataFrame to numpy arrays.
+    - batching of the inputs sent to the model predict() function.
+    - caching of the model and prediction function on the executors.
+
+    This assumes that the `predict_batch_fn` encapsulates all of the necessary 
dependencies for
+    running the model or the Spark executor environment already satisfies all 
runtime requirements.
+
+    For the conversion of Spark DataFrame to numpy, the following table 
describes the behavior,
+    where tensor columns in the Spark DataFrame must be represented as a 
flattened 1-D array/list.
+
+    | dataframe \\ model | single input | multiple inputs |

Review Comment:
   Updated docs per comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to