HyukjinKwon commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r974870249


##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -216,6 +218,105 @@ def applyInPandas(
         jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
         return DataFrame(jdf, self.session)
 
+    def applyInPandasWithState(
+        self,
+        func: "PandasGroupedMapFunctionWithState",
+        outputStructType: Union[StructType, str],
+        stateStructType: Union[StructType, str],
+        outputMode: str,
+        timeoutConf: str,
+    ) -> DataFrame:
+        """
+        Applies the given function to each group of data, while maintaining a 
user-defined
+        per-group state. The result Dataset will represent the flattened 
record returned by the
+        function.
+
+        For a streaming Dataset, the function will be invoked for each group 
repeatedly in every
+        trigger, and updates to each group's state will be saved across 
invocations. The function
+        will also be invoked for each timed-out state repeatedly. The sequence 
of the invocation
+        will be input data -> state timeout. When the function is invoked for 
state timeout, there
+        will be no data being presented.
+
+        The function should takes parameters (key, 
Iterator[`pandas.DataFrame`], state) and
+        returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will 
be passed as a tuple
+        of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The 
state will be passed as
+        :class:`pyspark.sql.streaming.state.GroupStateImpl`.
+
+        For each group, all columns are passed together as `pandas.DataFrame` 
to the user-function,
+        and the returned `pandas.DataFrame` across all invocations are 
combined as a
+        :class:`DataFrame`. Note that the user function should loop through 
and process all
+        elements in the iterator. The user function should not make a guess of 
the number of
+        elements in the iterator.
+
+        The `outputStructType` should be a :class:`StructType` describing the 
schema of all
+        elements in returned value, `pandas.DataFrame`. The column labels of 
all elements in
+        returned value, `pandas.DataFrame` must either match the field names 
in the defined
+        schema if specified as strings, or match the field data types by 
position if not strings,
+        e.g. integer indices.
+
+        The `stateStructType` should be :class:`StructType` describing the 
schema of user-defined
+        state. The value of state will be presented as a tuple, as well as the 
update should be
+        performed with the tuple. User defined types e.g. native Python class 
types are not

Review Comment:
   I think we can just say that "the corresponding Python types for 
:class:`DataType` are supported".  Documented here 
https://spark.apache.org/docs/latest/sql-ref-datatypes.html (click python tab)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to