leewyang commented on code in PR #37734: URL: https://github.com/apache/spark/pull/37734#discussion_r974646917
########## python/pyspark/ml/functions.py: ########## @@ -106,6 +111,167 @@ def array_to_vector(col: Column) -> Column: return Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col))) +def batched(df: pd.DataFrame, batch_size: int = -1) -> Iterator[pd.DataFrame]: + """Generator that splits a pandas dataframe/series into batches.""" + if batch_size <= 0 or batch_size >= len(df): + yield df + else: + # for batch in np.array_split(df, (len(df.index) + batch_size - 1) // batch_size): + for _, batch in df.groupby(np.arange(len(df)) // batch_size): + yield batch + + +def has_tensor_cols(df: pd.DataFrame) -> bool: + """Check if input DataFrame contains any tensor-valued columns""" + if any(df.dtypes == np.object_): + # pd.DataFrame object types can contain different types, e.g. string, dates, etc. + # so inspect a row and check for array/list type + sample = df.iloc[0] + return any([isinstance(x, np.ndarray) or isinstance(x, list) for x in sample]) + else: + return False + + +def batch_infer_udf( + predict_batch_fn: Callable, + return_type: DataType = ArrayType(FloatType()), + batch_size: int = -1, + input_names: list[str] = [], + input_tensor_shapes: list[list[int]] = [], + **kwargs: Any, +) -> Callable: + """Given a function which loads a model, returns a pandas_udf for inferencing over that model. + + This will handle: + - conversion of the Spark DataFrame to numpy arrays. + - batching of the inputs sent to the model predict() function. + - caching of the model and prediction function on the executors. + + This assumes that the `predict_batch_fn` encapsulates all of the necessary dependencies for + running the model or the Spark executor environment already satisfies all runtime requirements. + + When selecting columns in pyspark SQL, users are required to always use `struct` for simplicity. + + For the conversion of Spark DataFrame to numpy, the following table describes the behavior, + where tensor columns in the Spark DataFrame must be represented as a flattened 1-D array/list. + + | dataframe \\ model | single input | multiple inputs | + | :----------------- | :----------- | :-------------- | + | single-col scalar | 1 | N/A | + | single-col tensor | 1,2 | N/A | + | multi-col scalar | 3 | 4 | Review Comment: I think it's fine to leave this for now... Otherwise, we would just add an error response in this case to tell folks to use `array`. So it seems it'd be better to just support it vs. raising an error (unless this causes issues elsewhere). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org