Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16758#discussion_r98776628
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---
    @@ -219,6 +219,160 @@ class KeyValueGroupedDataset[K, V] private[sql](
       }
     
       /**
    +   * ::Experimental::
    +   * (Scala-specific)
    +   * Applies the given function to each group of data, while using an 
additional keyed state.
    +   * For each unique group, the function will be passed the group key and 
an iterator that contains
    +   * all of the elements in the group. The function can return an object 
of arbitrary type, and
    +   * optionally update or remove the corresponding state. The returned 
object will form a new
    +   * [[Dataset]].
    +   *
    +   * This function can be applied on both batch and streaming Datasets. 
With a streaming dataset,
    +   * this function will be once for each in every trigger. For each key, 
the updated state from the
    +   * function call in a trigger will be the state available in the 
function call in the next
    +   * trigger. However, for batch, `mapGroupsWithState` behaves exactly as 
`mapGroups` and the
    +   * function is called only once per key without any prior state.
    +   *
    +   * There is no guaranteed ordering of values in the iterator in the 
function.
    +   *
    +   * This function does not support partial aggregation, and as a result 
requires shuffling all
    +   * the data in the [[Dataset]].
    +   *
    +   * Internally, the implementation will spill to disk if any given group 
is too large to fit into
    +   * memory.  However, users must take care to avoid materializing the 
whole iterator for a group
    +   * (for example, by calling `toList`) unless they are sure that this is 
possible given the memory
    +   * constraints of their cluster.
    +   *
    +   * @see [[State]] for more details of how to update/remove state in the 
function.
    +   * @since 2.1.1
    +   */
    +  @Experimental
    +  @InterfaceStability.Evolving
    +  def mapGroupsWithState[STATE: Encoder, OUT: Encoder](
    --- End diff --
    
    I think it would be useful to come up with an example use case since this 
is pretty complicated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to