HeartSaVioR commented on code in PR #37893: URL: https://github.com/apache/spark/pull/37893#discussion_r975910639
########## python/pyspark/sql/pandas/group_ops.py: ########## @@ -216,6 +218,104 @@ def applyInPandas( jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr()) return DataFrame(jdf, self.session) + def applyInPandasWithState( + self, + func: "PandasGroupedMapFunctionWithState", + outputStructType: Union[StructType, str], + stateStructType: Union[StructType, str], + outputMode: str, + timeoutConf: str, + ) -> DataFrame: + """ + Applies the given function to each group of data, while maintaining a user-defined + per-group state. The result Dataset will represent the flattened record returned by the + function. + + For a streaming Dataset, the function will be invoked first for all input groups and then + for all timed out states where the input data is set to be empty. Updates to each group's + state will be saved across invocations. + + The function should take parameters (key, Iterator[`pandas.DataFrame`], state) and + returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will be passed as a tuple + of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The state will be passed as + :class:`pyspark.sql.streaming.state.GroupState`. + + For each group, all columns are passed together as `pandas.DataFrame` to the user-function, + and the returned `pandas.DataFrame` across all invocations are combined as a + :class:`DataFrame`. Note that the user function should loop through and process all + elements in the iterator. The user function should not make a guess of the number of + elements in the iterator. + + The `outputStructType` should be a :class:`StructType` describing the schema of all + elements in the returned value, `pandas.DataFrame`. The column labels of all elements in + returned `pandas.DataFrame` must either match the field names in the defined schema if + specified as strings, or match the field data types by position if not strings, + e.g. integer indices. + + The `stateStructType` should be :class:`StructType` describing the schema of the + user-defined state. The value of the state will be presented as a tuple, as well as the + update should be performed with the tuple. The corresponding Python types for + :class:DataType are supported. Please refer to the page + https://spark.apache.org/docs/latest/sql-ref-datatypes.html (python tab). + + The size of each DataFrame in both the input and output can be arbitrary. The number of + DataFrames in both the input and output can also be arbitrary. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + func : function + a Python native function to be called on every group. It should take parameters + (key, Iterator[`pandas.DataFrame`], state) and return Iterator[`pandas.DataFrame`]. + Note that the type of the key is tuple and the type of the state is + :class:`pyspark.sql.streaming.state.GroupState`. + outputStructType : :class:`pyspark.sql.types.DataType` or str + the type of the output records. The value can be either a + :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string. + stateStructType : :class:`pyspark.sql.types.DataType` or str + the type of the user-defined state. The value can be either a + :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string. + outputMode : str + the output mode of the function. + timeoutConf : str + timeout configuration for groups that do not receive data for a while. valid values + are defined in :class:`pyspark.sql.streaming.state.GroupStateTimeout`. + + # TODO: Examples Review Comment: I just added a simple example - let me come up with full example code in examples directory. I'll file a new JIRA ticket for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org