jingz-db commented on code in PR #48005:
URL: https://github.com/apache/spark/pull/48005#discussion_r1828424360
##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -551,25 +550,103 @@ def transformWithStateUDF(
# TODO(SPARK-49603) set the handle state in the lazily initialized
iterator
result = itertools.chain(*result_iter_list)
+ return result
+
+ def transformWithStateUDF(
+ statefulProcessorApiClient: StatefulProcessorApiClient,
+ key: Any,
+ inputRows: Iterator["PandasDataFrameLike"],
+ ) -> Iterator["PandasDataFrameLike"]:
+ handle = StatefulProcessorHandle(statefulProcessorApiClient)
+
+ if statefulProcessorApiClient.handle_state ==
StatefulProcessorHandleState.CREATED:
+ statefulProcessor.init(handle)
+ statefulProcessorApiClient.set_handle_state(
+ StatefulProcessorHandleState.INITIALIZED
+ )
+
+ result = handle_data_with_timers(statefulProcessorApiClient, key,
inputRows)
+ return result
+
+ def transformWithStateWithInitStateUDF(
+ statefulProcessorApiClient: StatefulProcessorApiClient,
+ key: Any,
+ inputRows: Iterator["PandasDataFrameLike"],
+ initialStates: Iterator["PandasDataFrameLike"] = None,
+ ) -> Iterator["PandasDataFrameLike"]:
+ """
+ UDF for TWS operator with non-empty initial states. Possible input
combinations
+ of inputRows and initialStates iterator:
+ - Both `inputRows` and `initialStates` are non-empty: for the
given key, both input rows
+ and initial states contains the grouping key, both input rows
and initial states contains data.
+ - `InitialStates` is non-empty, while `initialStates` is empty.
For the given key, only
+ initial states contains the grouping key and data, and it is
first batch.
+ - `initialStates` is empty, while `inputRows` is not empty. For
the given grouping key, only inputRows
+ contains the grouping key and data, and it is first batch.
+ - `initialStates` is None, while `inputRows` is not empty. This is
not first batch. `initialStates`
+ is initialized to the positional value as None.
+ """
+ handle = StatefulProcessorHandle(statefulProcessorApiClient)
+
+ if statefulProcessorApiClient.handle_state ==
StatefulProcessorHandleState.CREATED:
+ statefulProcessor.init(handle)
+ statefulProcessorApiClient.set_handle_state(
+ StatefulProcessorHandleState.INITIALIZED
+ )
+
+ # only process initial state if first batch
+ is_first_batch = statefulProcessorApiClient.is_first_batch()
+ if is_first_batch and initialStates is not None:
+ for cur_initial_state in initialStates:
+ statefulProcessorApiClient.set_implicit_key(key)
+ # TODO(SPARK-50194) integration with new timer API &
initial state timer register
+ statefulProcessor.handleInitialState(key,
cur_initial_state)
+
+ # if we don't have input rows for the given key but only have
initial state
+ # for the grouping key, the inputRows iterator could be empty
+ input_rows_empty = False
+ try:
+ first = next(inputRows)
+ except StopIteration:
+ input_rows_empty = True
+ else:
+ inputRows = itertools.chain([first], inputRows)
+
+ if not input_rows_empty:
Review Comment:
You are also right about this; that is the scenario covered in this PR:
https://github.com/apache/spark/pull/45780. I was planning to finish the
integration together with the new API for timer that Anish merged last week. I
also left a TODO here [few lines
above](https://github.com/apache/spark/pull/48005/files#diff-5862151bb5e9fe7a6b2d1978301c235504dcc6c1bbbd1f9745a204a3ba93146eR602).
If that's OK with you, I will finish this portion in SPARK-50194.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]