mengxr commented on code in PR #37734: URL: https://github.com/apache/spark/pull/37734#discussion_r1068515229
########## python/pyspark/ml/functions.py: ########## @@ -106,6 +152,597 @@ def array_to_vector(col: Column) -> Column: return Column(sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col))) +def _batched( + data: pd.Series | pd.DataFrame | Tuple[pd.Series], batch_size: int +) -> Iterator[pd.DataFrame]: + """Generator that splits a pandas dataframe/series into batches.""" + if isinstance(data, pd.DataFrame): + df = data + elif isinstance(data, pd.Series): + df = pd.concat((data,), axis=1) + else: # isinstance(data, Tuple[pd.Series]): + df = pd.concat(data, axis=1) + + index = 0 + data_size = len(df) + while index < data_size: + yield df.iloc[index : index + batch_size] + index += batch_size + + +def _is_tensor_col(data: pd.Series | pd.DataFrame) -> bool: + if isinstance(data, pd.Series): + return data.dtype == np.object_ and isinstance(data.iloc[0], (np.ndarray, list)) + elif isinstance(data, pd.DataFrame): + return any(data.dtypes == np.object_) and any( + [isinstance(d, (np.ndarray, list)) for d in data.iloc[0]] + ) + else: + raise ValueError( + "Unexpected data type: {}, expected pd.Series or pd.DataFrame.".format(type(data)) + ) + + +def _has_tensor_cols(data: pd.Series | pd.DataFrame | Tuple[pd.Series]) -> bool: + """Check if input Series/DataFrame/Tuple contains any tensor-valued columns.""" + if isinstance(data, (pd.Series, pd.DataFrame)): + return _is_tensor_col(data) + else: # isinstance(data, Tuple): + return any(_is_tensor_col(elem) for elem in data) + + +def _validate_and_transform_multiple_inputs( + batch: pd.DataFrame, input_shapes: List[List[int] | None], num_input_cols: int +) -> List[np.ndarray]: + multi_inputs = [batch[col].to_numpy() for col in batch.columns] + if input_shapes: + if len(input_shapes) == num_input_cols: + multi_inputs = [ + np.vstack(v).reshape([-1] + input_shapes[i]) # type: ignore + if input_shapes[i] + else v + for i, v in enumerate(multi_inputs) + ] + if not all([len(x) == len(batch) for x in multi_inputs]): + raise ValueError("Input data does not match expected shape.") + else: + raise ValueError("input_tensor_shapes must match columns") + + return multi_inputs + + +def _validate_and_transform_single_input( + batch: pd.DataFrame, + input_shapes: List[List[int] | None], + has_tensors: bool, + has_tuple: bool, +) -> np.ndarray: + # multiple input columns for single expected input + if has_tensors: + # tensor columns + if len(batch.columns) == 1: + # one tensor column and one expected input, vstack rows + single_input = np.vstack(batch.iloc[:, 0]) + else: + raise ValueError( + "Multiple input columns found, but model expected a single " + "input, use `struct` or `array` to combine columns into tensors." + ) + else: + # scalar columns + if len(batch.columns) == 1: + # single scalar column, remove extra dim + single_input = np.squeeze(batch.to_numpy()) + if input_shapes and input_shapes[0] not in [None, [], [1]]: + raise ValueError("Invalid input_tensor_shape for scalar column.") + elif not has_tuple: + # columns grouped via struct/array, convert to single tensor + single_input = batch.to_numpy() + if input_shapes and input_shapes[0] != [len(batch.columns)]: + raise ValueError("Input data does not match expected shape.") + else: + raise ValueError( + "Multiple input columns found, but model expected a single " + "input, use `struct` or `array` to combine columns into tensors." + ) + + # if input_tensor_shapes provided, try to reshape input + if input_shapes: + if len(input_shapes) == 1: + single_input = single_input.reshape([-1] + input_shapes[0]) # type: ignore + if len(single_input) != len(batch): + raise ValueError("Input data does not match expected shape.") + else: + raise ValueError("Multiple input_tensor_shapes found, but model expected one input") + + return single_input + + +def _validate_and_transform_prediction_result( + preds: np.ndarray | Mapping[str, np.ndarray] | List[Mapping[str, Any]], + num_input_rows: int, + return_type: DataType, +) -> pd.DataFrame | pd.Series: + """Validate numpy-based model predictions against the expected pandas_udf return_type and + transforms the predictions into an equivalent pandas DataFrame or Series.""" + if isinstance(return_type, StructType): + struct_rtype: StructType = return_type + fieldNames = struct_rtype.names + if isinstance(preds, dict): + # dictionary of columns + predNames = list(preds.keys()) + for field in struct_rtype.fields: + if isinstance(field.dataType, ArrayType): + if len(preds[field.name].shape) == 2: + preds[field.name] = list(preds[field.name]) + else: + raise ValueError( + "Prediction results for ArrayType must be two-dimensional." + ) + elif isinstance(field.dataType, supported_scalar_types): + if len(preds[field.name].shape) != 1: + raise ValueError( + "Prediction results for scalar types must be one-dimensional." + ) + else: + raise ValueError("Unsupported field type in return struct type.") + + if len(preds[field.name]) != num_input_rows: + raise ValueError("Prediction results must have same length as input data") + + elif isinstance(preds, list) and isinstance(preds[0], dict): + # rows of dictionaries + predNames = list(preds[0].keys()) + if len(preds) != num_input_rows: + raise ValueError("Prediction results must have same length as input data.") + for field in struct_rtype.fields: + if isinstance(field.dataType, ArrayType): + if len(preds[0][field.name].shape) != 1: + raise ValueError( + "Prediction results for ArrayType must be one-dimensional." + ) + elif isinstance(field.dataType, supported_scalar_types): + if not np.isscalar(preds[0][field.name]): + raise ValueError("Invalid scalar prediction result.") + else: + raise ValueError("Unsupported field type in return struct type.") + else: + raise ValueError( + "Prediction results for StructType must be a dictionary or " + "a list of dictionary, got: {}".format(type(preds)) + ) + + # check column names + if set(predNames) != set(fieldNames): + raise ValueError( + "Prediction result columns did not match expected return_type " + "columns: expected {}, got: {}".format(fieldNames, predNames) + ) + + return pd.DataFrame(preds) + elif isinstance(return_type, ArrayType): + if isinstance(preds, np.ndarray): + if len(preds) != num_input_rows: + raise ValueError("Prediction results must have same length as input data.") + if len(preds.shape) != 2: + raise ValueError("Prediction results for ArrayType must be two-dimensional.") + else: + raise ValueError("Prediction results for ArrayType must be an ndarray.") + + return pd.Series(list(preds)) + elif isinstance(return_type, supported_scalar_types): + preds_array: np.ndarray = preds # type: ignore + if len(preds_array) != num_input_rows: + raise ValueError("Prediction results must have same length as input data.") + if not ( + (len(preds_array.shape) == 2 and preds_array.shape[1] == 1) + or len(preds_array.shape) == 1 + ): + raise ValueError("Invalid shape for scalar prediction result.") + + return pd.Series(np.squeeze(preds)) # type: ignore + else: + raise ValueError("Unsupported return type") + + +def predict_batch_udf( + make_predict_fn: Callable[ + [], + PredictFunction, + ], + *, + return_type: DataType, + batch_size: int, + input_tensor_shapes: List[List[int] | None] | Mapping[int, List[int]] | None = None, +) -> UserDefinedFunctionLike: + """Given a function which loads a model and makes batch predictions, returns a Pandas UDF for + inferencing over that model. + + The returned UDF does the following on each DataFrame partition: + - calls `make_predict_fn` to load the model and cache its `predict_fn`. + - batches the input records as numpy arrays and invokes `predict_fn` on each batch. + + This assumes that the `make_predict_fn` encapsulates all of the necessary dependencies for + running the model or the Spark executor environment already satisfies all runtime requirements. + + For the conversion of Spark DataFrame to numpy arrays, there is a one-to-one mapping between the + input arguments of the `predict_fn` (returned by the `make_predict_fn`) and the input columns to + the UDF (returned by the `predict_batch_udf`) at runtime. Each input column will be converted + as follows: + - scalar column -> np.ndarray + - tensor column + tensor shape -> np.ndarray + + Note that tensor columns in the Spark DataFrame must be represented as a flattened 1-D array, + and multiple scalar columns can be combined into a single tensor column using standard PySpark + SQL functions like `array()` or `struct()`. + + Example (tensor column): + + Input DataFrame has a single column with a flattened tensor value, represented as an array of + float. + ``` + from pyspark.ml.functions import predict_batch_udf + + def make_predict_fn(): + # load/init happens once per python worker + import tensorflow as tf + model = tf.keras.models.load_model('/path/to/mnist_model') + + # predict on batches of tasks/partitions, using cached model + def predict(inputs: np.ndarray) -> np.ndarray: + # inputs.shape = [batch_size, 784] + # outputs.shape = [batch_size, 10], return_type = ArrayType(FloatType()) + return model.predict(inputs) + + return predict + + mnist = predict_batch_udf(make_predict_fn, + return_type=ArrayType(FloatType()), + batch_size=100, + input_tensor_shapes=[[784]]) + + df = spark.read.parquet("/path/to/mnist_data") + df.show(5) + # +--------------------+ + # | data| + # +--------------------+ + # |[0.0, 0.0, 0.0, 0...| + # |[0.0, 0.0, 0.0, 0...| + # |[0.0, 0.0, 0.0, 0...| + # |[0.0, 0.0, 0.0, 0...| + # |[0.0, 0.0, 0.0, 0...| + # +--------------------+ + + df.withColumn("preds", mnist("data")).show(5) + # +--------------------+--------------------+ + # | data| preds| + # +--------------------+--------------------+ + # |[0.0, 0.0, 0.0, 0...|[-13.511008, 8.84...| + # |[0.0, 0.0, 0.0, 0...|[-5.3957458, -2.2...| + # |[0.0, 0.0, 0.0, 0...|[-7.2014456, -8.8...| + # |[0.0, 0.0, 0.0, 0...|[-19.466187, -13....| + # |[0.0, 0.0, 0.0, 0...|[-5.7757926, -7.8...| + # +--------------------+--------------------+ + ``` + + Example (scalar column): + + Input DataFrame has a single scalar column, which will be passed to the `predict` function as + a 1-D numpy array. + ``` + import numpy as np + import pandas as pd + from pyspark.ml.functions import predict_batch_udf + from pyspark.sql.types import FloatType + + df = spark.createDataFrame(pd.DataFrame(np.arange(100))) + df.show(5) + # +---+ + # | 0| + # +---+ + # | 0| + # | 1| + # | 2| + # | 3| + # | 4| + # +---+ + + def make_predict_fn(): + def predict(inputs: np.ndarray) -> np.ndarray: + # inputs.shape = [batch_size] + # outputs.shape = [batch_size], return_type = FloatType() + return inputs * 2 + + return predict + + times_two = predict_batch_udf(make_predict_fn, + return_type=FloatType(), + batch_size=10) + + df = spark.createDataFrame(pd.DataFrame(np.arange(100))) + df.withColumn("x2", times_two("0")).show(5) + # +---+---+ + # | 0| x2| + # +---+---+ + # | 0|0.0| + # | 1|2.0| + # | 2|4.0| + # | 3|6.0| + # | 4|8.0| + # +---+---+ + ``` + + Example (multiple scalar columns): + + Input DataFrame has muliple columns of scalar values. If the user-provided `predict` function + expects a single input, then the user must combine the multiple columns into a single tensor + using `pyspark.sql.functions.struct` or `pyspark.sql.functions.array`. + ``` + import numpy as np + import pandas as pd + from pyspark.ml.functions import predict_batch_udf + from pyspark.sql.functions import struct + + data = np.arange(0, 1000, dtype=np.float64).reshape(-1, 4) + pdf = pd.DataFrame(data, columns=['a','b','c','d']) + df = spark.createDataFrame(pdf) + # +----+----+----+----+ + # | a| b| c| d| + # +----+----+----+----+ + # | 0.0| 1.0| 2.0| 3.0| + # | 4.0| 5.0| 6.0| 7.0| + # | 8.0| 9.0|10.0|11.0| + # |12.0|13.0|14.0|15.0| + # |16.0|17.0|18.0|19.0| + # +----+----+----+----+ + + def make_predict_fn(): + def predict(inputs: np.ndarray) -> np.ndarray: + # inputs.shape = [batch_size, 4] + # outputs.shape = [batch_size], return_type = FloatType() + return np.sum(inputs, axis=1) + + return predict + + sum_rows = predict_batch_udf(make_predict_fn, + return_type=FloatType(), + batch_size=10, + input_tensor_shapes=[[4]]) + + df.withColumn("sum", sum_rows(struct("a", "b", "c", "d"))).show(5) Review Comment: What if the nested columns are not all numerical? `array` would throw an error at plan analysis time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org