HyukjinKwon commented on code in PR #37893: URL: https://github.com/apache/spark/pull/37893#discussion_r974870249
########## python/pyspark/sql/pandas/group_ops.py: ########## @@ -216,6 +218,105 @@ def applyInPandas( jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr()) return DataFrame(jdf, self.session) + def applyInPandasWithState( + self, + func: "PandasGroupedMapFunctionWithState", + outputStructType: Union[StructType, str], + stateStructType: Union[StructType, str], + outputMode: str, + timeoutConf: str, + ) -> DataFrame: + """ + Applies the given function to each group of data, while maintaining a user-defined + per-group state. The result Dataset will represent the flattened record returned by the + function. + + For a streaming Dataset, the function will be invoked for each group repeatedly in every + trigger, and updates to each group's state will be saved across invocations. The function + will also be invoked for each timed-out state repeatedly. The sequence of the invocation + will be input data -> state timeout. When the function is invoked for state timeout, there + will be no data being presented. + + The function should takes parameters (key, Iterator[`pandas.DataFrame`], state) and + returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will be passed as a tuple + of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The state will be passed as + :class:`pyspark.sql.streaming.state.GroupStateImpl`. + + For each group, all columns are passed together as `pandas.DataFrame` to the user-function, + and the returned `pandas.DataFrame` across all invocations are combined as a + :class:`DataFrame`. Note that the user function should loop through and process all + elements in the iterator. The user function should not make a guess of the number of + elements in the iterator. + + The `outputStructType` should be a :class:`StructType` describing the schema of all + elements in returned value, `pandas.DataFrame`. The column labels of all elements in + returned value, `pandas.DataFrame` must either match the field names in the defined + schema if specified as strings, or match the field data types by position if not strings, + e.g. integer indices. + + The `stateStructType` should be :class:`StructType` describing the schema of user-defined + state. The value of state will be presented as a tuple, as well as the update should be + performed with the tuple. User defined types e.g. native Python class types are not Review Comment: I think we can just say that "the corresponding Python types for :class:`DataType` are supported". Documented here https://spark.apache.org/docs/latest/sql-ref-datatypes.html (click python tab) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org