Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10224#discussion_r47173727
  
    --- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
 ---
    @@ -430,42 +430,37 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, 
V)])(
     
       /**
        * :: Experimental ::
    -   * Return a new [[JavaDStream]] of data generated by combining the 
key-value data in `this` stream
    -   * with a continuously updated per-key state. The user-provided state 
tracking function is
    -   * applied on each keyed data item along with its corresponding state. 
The function can choose to
    -   * update/remove the state and return a transformed data, which forms the
    -   * [[JavaTrackStateDStream]].
    +   * Return a [[JavaMapWithStateDStream]] by applying a function to every 
key-value element of
    +   * `this` stream, while maintaining some state data for each unique key. 
The mapping function
    +   * and other specification (e.g. partitioners, timeouts, initial state 
data, etc.) of this
    +   * transformation can be specified using [[StateSpec]] class. The state 
data is accessible in
    +   * as a parameter of type [[State]] in the mapping function.
        *
    -   * The specifications of this transformation is made through the
    -   * [[org.apache.spark.streaming.StateSpec StateSpec]] class. Besides the 
tracking function, there
    -   * are a number of optional parameters - initial state data, number of 
partitions, timeouts, etc.
    -   * See the [[org.apache.spark.streaming.StateSpec StateSpec]] for more 
details.
    -   *
    -   * Example of using `trackStateByKey`:
    +   * Example of using `mapWithState`:
        * {{{
    -   *   // A tracking function that maintains an integer state and return a 
String
    -   *   Function2<Optional<Integer>, State<Integer>, Optional<String>> 
trackStateFunc =
    -   *       new Function2<Optional<Integer>, State<Integer>, 
Optional<String>>() {
    -   *
    -   *         @Override
    -   *         public Optional<String> call(Optional<Integer> one, 
State<Integer> state) {
    -   *           // Check if state exists, accordingly update/remove state 
and return transformed data
    -   *         }
    +   *   // A mapping function that maintains an integer state and return a 
string
    +   *   Function3<String, Optional<Integer>, State<Integer>, String> 
mappingFunction =
    +   *       new Function3<String, Optional<Integer>, State<Integer>, 
String>() {
    +   *           @Override
    +   *           public Optional<String> call(Optional<Integer> value, 
State<Integer> state) {
    +   *               // Use state.exists(), state.get(), state.update() and 
state.remove()
    +   *               // to manage state, and return the necessary string
    +   *           }
        *       };
        *
    -   *    JavaTrackStateDStream<Integer, Integer, Integer, String> 
trackStateDStream =
    -   *        keyValueDStream.<Integer, String>trackStateByKey(
    -   *                 StateSpec.function(trackStateFunc).numPartitions(10));
    +   *    JavaMapWithStateDStream<Integer, Integer, Integer, String> 
mapWithStateDStream =
    --- End diff --
    
    nit: the key type is `String` now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to