jingz-db commented on code in PR #49488:
URL: https://github.com/apache/spark/pull/49488#discussion_r1924487703


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -140,28 +144,50 @@ class KeyValueGroupedDataset[K, V] private[sql] () 
extends api.KeyValueGroupedDa
       statefulProcessor: StatefulProcessor[K, V, U],
       timeMode: TimeMode,
       outputMode: OutputMode): Dataset[U] =
-    unsupported()
+    transformWithStateHelper(statefulProcessor, timeMode, outputMode)
 
   /** @inheritdoc */
   private[sql] def transformWithState[U: Encoder, S: Encoder](
       statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
       timeMode: TimeMode,
       outputMode: OutputMode,
       initialState: KeyValueGroupedDataset[K, S]): Dataset[U] =
-    unsupported()
+    transformWithStateHelper(
+      statefulProcessor.asInstanceOf[StatefulProcessor[K, V, U]],
+      timeMode,
+      outputMode,
+      Some(initialState))
 
   /** @inheritdoc */
   override private[sql] def transformWithState[U: Encoder](
       statefulProcessor: StatefulProcessor[K, V, U],
       eventTimeColumnName: String,
-      outputMode: OutputMode): Dataset[U] = unsupported()
+      outputMode: OutputMode): Dataset[U] =
+    transformWithStateHelper(
+      statefulProcessor,
+      TimeMode.EventTime(),
+      outputMode,
+      eventTimeColumnName = eventTimeColumnName)
 
   /** @inheritdoc */
   override private[sql] def transformWithState[U: Encoder, S: Encoder](
       statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
       eventTimeColumnName: String,
       outputMode: OutputMode,
-      initialState: KeyValueGroupedDataset[K, S]): Dataset[U] = unsupported()
+      initialState: KeyValueGroupedDataset[K, S]): Dataset[U] =
+    transformWithStateHelper(
+      statefulProcessor,
+      TimeMode.EventTime(),
+      outputMode,
+      Some(initialState),
+      eventTimeColumnName)
+
+  private[sql] def transformWithStateHelper[U: Encoder, S: Encoder](
+      statefulProcessor: StatefulProcessor[K, V, U],
+      timeMode: TimeMode,
+      outputMode: OutputMode,
+      initialState: Option[KeyValueGroupedDataset[K, S]] = None,
+      eventTimeColumnName: String = ""): Dataset[U] = unsupported()

Review Comment:
   I am following same behavior of FMGWS here: 
https://github.com/apache/spark/blob/master/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala#L86
   The `unsupported()` is the interface and the real implementation is in the 
following class, see comments 
[here](https://github.com/apache/spark/blob/master/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala#L378):
   ```
   /**
    * This class is the implementation of class [[KeyValueGroupedDataset]]. 
This class memorizes the
    * initial types of the grouping function so that the original function will 
be sent to the server
    * to perform the grouping first. Then any type modifications on the keys 
and the values will be
    * applied sequentially to ensure the final type of the result remains the 
same as how
    * [[KeyValueGroupedDataset]] behaves on the server.
    */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to