zhenlineo commented on code in PR #41558:
URL: https://github.com/apache/spark/pull/41558#discussion_r1231388771
##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -572,6 +916,55 @@ private class KeyValueGroupedDatasetImpl[K, V, IK, IV](
agg(aggregator)
}
+ override protected def flatMapGroupsWithStateHelper[S: Encoder, U: Encoder](
+ outputMode: OutputMode,
+ timeoutConf: GroupStateTimeout,
+ initialState: KeyValueGroupedDataset[K, S],
+ isMapGroupWithState: Boolean)(
+ func: (K, Iterator[V], GroupState[S]) => Iterator[U]): Dataset[U] = {
+ if (outputMode != null && outputMode != OutputMode.Append && outputMode !=
OutputMode.Update) {
Review Comment:
nit: How about use `Option` for the nullable fields?
##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala:
##########
@@ -95,6 +96,31 @@ private[sql] object UdfUtils extends Serializable {
}
}
+ def mapValuesAdaptor[K, V, S, U, IV](
+ f: (K, Iterator[V], GroupState[S]) => Iterator[U],
+ valueMapFunc: IV => V): (K, Iterator[IV], GroupState[S]) => Iterator[U]
= {
+ print("Used function: mapValuesAdaptor")
Review Comment:
Debugging code?
##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -857,6 +857,21 @@ message GroupMap {
// (Optional) Expressions for sorting. Only used by Scala Sorted Group Map
API.
repeated Expression sorting_expressions = 4;
+
+ // (Optional) Input relation for initial State.
+ Relation initial_input = 5;
+
+ // (Optional) Expressions for grouping keys of the initial state input
relation.
+ repeated Expression initial_grouping_expressions = 6;
+
+ // (Optional) True if MapGroupsWithState, false if FlatMapGroupsWithState.
+ optional bool is_map_groups_with_state = 7;
+
+ // (Optional) The output mode of the function.
+ optional string output_mode = 8;
+
+ // (Optional) Timeout configuration for groups that do not receive data for
a while.
+ optional string timeout_conf = 9;
Review Comment:
cc @grundprinzip @xinrong-meng for the proto change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]