bogao007 commented on code in PR #41558:
URL: https://github.com/apache/spark/pull/41558#discussion_r1230203950
##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -460,6 +461,159 @@ abstract class KeyValueGroupedDataset[K, V] private[sql]
() extends Serializable
cogroupSorted(other)(thisSortExprs: _*)(otherSortExprs: _*)(
UdfUtils.coGroupFunctionToScalaFunc(f))(encoder)
}
+
+ /**
+ * (Scala-specific) Applies the given function to each group of data, while
maintaining a
+ * user-defined per-group state. The result Dataset will represent the
objects returned by the
+ * function. For a static batch Dataset, the function will be invoked once
per group. For a
+ * streaming Dataset, the function will be invoked for each group repeatedly
in every trigger,
+ * and updates to each group's state will be saved across invocations. See
+ * [[org.apache.spark.sql.streaming.GroupState]] for more details.
+ *
+ * @tparam S
+ * The type of the user-defined state. Must be encodable to Spark SQL
types.
+ * @tparam U
+ * The type of the output objects. Must be encodable to Spark SQL types.
+ * @param func
+ * Function to be called on every group.
+ *
+ * See [[Encoder]] for more details on what types are encodable to Spark SQL.
+ * @since 3.5.0
+ */
+ def mapGroupsWithState[S: Encoder, U: Encoder](
+ func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = {
+ mapGroupsWithState(GroupStateTimeout.NoTimeout)(func)
+ }
+
+ /**
+ * (Scala-specific) Applies the given function to each group of data, while
maintaining a
+ * user-defined per-group state. The result Dataset will represent the
objects returned by the
+ * function. For a static batch Dataset, the function will be invoked once
per group. For a
+ * streaming Dataset, the function will be invoked for each group repeatedly
in every trigger,
+ * and updates to each group's state will be saved across invocations. See
+ * [[org.apache.spark.sql.streaming.GroupState]] for more details.
+ *
+ * @tparam S
+ * The type of the user-defined state. Must be encodable to Spark SQL
types.
+ * @tparam U
+ * The type of the output objects. Must be encodable to Spark SQL types.
+ * @param func
+ * Function to be called on every group.
+ * @param timeoutConf
+ * Timeout configuration for groups that do not receive data for a while.
+ *
+ * See [[Encoder]] for more details on what types are encodable to Spark SQL.
+ * @since 3.5.0
+ */
+ def mapGroupsWithState[S: Encoder, U: Encoder](timeoutConf:
GroupStateTimeout)(
+ func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = {
Review Comment:
Updated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]