WweiL commented on code in PR #41558:
URL: https://github.com/apache/spark/pull/41558#discussion_r1227307156


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -460,6 +461,159 @@ abstract class KeyValueGroupedDataset[K, V] private[sql] 
() extends Serializable
     cogroupSorted(other)(thisSortExprs: _*)(otherSortExprs: _*)(
       UdfUtils.coGroupFunctionToScalaFunc(f))(encoder)
   }
+
+  /**
+   * (Scala-specific) Applies the given function to each group of data, while 
maintaining a
+   * user-defined per-group state. The result Dataset will represent the 
objects returned by the
+   * function. For a static batch Dataset, the function will be invoked once 
per group. For a
+   * streaming Dataset, the function will be invoked for each group repeatedly 
in every trigger,
+   * and updates to each group's state will be saved across invocations. See
+   * [[org.apache.spark.sql.streaming.GroupState]] for more details.
+   *
+   * @tparam S
+   *   The type of the user-defined state. Must be encodable to Spark SQL 
types.
+   * @tparam U
+   *   The type of the output objects. Must be encodable to Spark SQL types.
+   * @param func
+   *   Function to be called on every group.
+   *
+   * See [[Encoder]] for more details on what types are encodable to Spark SQL.
+   * @since 3.5.0
+   */
+  def mapGroupsWithState[S: Encoder, U: Encoder](
+      func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = {
+    mapGroupsWithState(GroupStateTimeout.NoTimeout)(func)
+  }
+
+  /**
+   * (Scala-specific) Applies the given function to each group of data, while 
maintaining a
+   * user-defined per-group state. The result Dataset will represent the 
objects returned by the
+   * function. For a static batch Dataset, the function will be invoked once 
per group. For a
+   * streaming Dataset, the function will be invoked for each group repeatedly 
in every trigger,
+   * and updates to each group's state will be saved across invocations. See
+   * [[org.apache.spark.sql.streaming.GroupState]] for more details.
+   *
+   * @tparam S
+   *   The type of the user-defined state. Must be encodable to Spark SQL 
types.
+   * @tparam U
+   *   The type of the output objects. Must be encodable to Spark SQL types.
+   * @param func
+   *   Function to be called on every group.
+   * @param timeoutConf
+   *   Timeout configuration for groups that do not receive data for a while.
+   *
+   * See [[Encoder]] for more details on what types are encodable to Spark SQL.
+   * @since 3.5.0
+   */
+  def mapGroupsWithState[S: Encoder, U: Encoder](timeoutConf: 
GroupStateTimeout)(
+    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = {

Review Comment:
   nvm see your comments here 
https://github.com/apache/spark/pull/40959#discussion_r1206059114



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to