HyukjinKwon opened a new pull request, #37285:
URL: https://github.com/apache/spark/pull/37285
### What changes were proposed in this pull request?
This PR adds the Python version of
`Dataset.groupByKey(...).flatMapGroupsWithState(...)` that is
`DataFrame.groupby(...).applyInPandasWithState(...)` in PySpark.
TBD
Note that documentation will be done in a separate PR given the size of the
PR.
### Why are the changes needed?
TBD
### Does this PR introduce _any_ user-facing change?
Yes, this PR adds a new API
`DataFrame.groupby(...).applyInPandasWithState(...)` in PySpark.
```python
import typing
import pandas as pd
from pyspark.sql.types import StructType, StructField, LongType, StringType
from pyspark.sql.streaming.state import GroupStateTimeout, GroupStateImpl
output_type = StructType([
StructField("key", LongType()),
StructField("countAsString", StringType())])
state_type = StructType([StructField("count", LongType())])
# Type hints are optional in `func`.
def func(key: typing.Tuple, pdf: pd.DataFrame, state: GroupStateImpl) ->
pd.DataFrame:
count = state.getOption
if count is None:
count = 0
else:
count = count[0]
count += len(pdf)
state.update((count,))
return pd.DataFrame({'key': [key[0]], 'countAsString': [str(count)]})
df = spark.readStream.format("rate").option("rowsPerSecond",
10).load().selectExpr("value % 3 as v")
df.groupBy(df["v"]).applyInPandasWithState(
func, output_type, state_type, "Update", GroupStateTimeout.NoTimeout
).writeStream.format("console").queryName("test").start()
```
### How was this patch tested?
Manually tested, unittests and e2e tests were added in both Python and Scala
sides.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]