HyukjinKwon opened a new pull request, #37285:
URL: https://github.com/apache/spark/pull/37285

   ### What changes were proposed in this pull request?
   
   This PR adds the Python version of 
`Dataset.groupByKey(...).flatMapGroupsWithState(...)` that is 
`DataFrame.groupby(...).applyInPandasWithState(...)` in PySpark.
   
   TBD
   
   Note that documentation will be done in a separate PR given the size of the 
PR.
   
   ### Why are the changes needed?
   
   TBD
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, this PR adds a new API 
`DataFrame.groupby(...).applyInPandasWithState(...)` in PySpark.
   
   ```python
   import typing
   
   import pandas as pd
   
   from pyspark.sql.types import StructType, StructField, LongType, StringType
   from pyspark.sql.streaming.state import GroupStateTimeout, GroupStateImpl
   
   output_type = StructType([
       StructField("key", LongType()),
       StructField("countAsString", StringType())])
   state_type = StructType([StructField("count", LongType())])
   
   # Type hints are optional in `func`.
   def func(key: typing.Tuple, pdf: pd.DataFrame, state: GroupStateImpl) -> 
pd.DataFrame:
       count = state.getOption
       if count is None:
           count = 0
       else:
           count = count[0]
       count += len(pdf)
       state.update((count,))
       return pd.DataFrame({'key': [key[0]], 'countAsString': [str(count)]})
   
   
   df = spark.readStream.format("rate").option("rowsPerSecond", 
10).load().selectExpr("value % 3 as v")
   df.groupBy(df["v"]).applyInPandasWithState(
       func, output_type, state_type, "Update", GroupStateTimeout.NoTimeout
   ).writeStream.format("console").queryName("test").start()
   ```
   
   ### How was this patch tested?
   
   Manually tested, unittests and e2e tests were added in both Python and Scala 
sides.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to