Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/10224#discussion_r47173727
--- Diff:
streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
---
@@ -430,42 +430,37 @@ class JavaPairDStream[K, V](val dstream: DStream[(K,
V)])(
/**
* :: Experimental ::
- * Return a new [[JavaDStream]] of data generated by combining the
key-value data in `this` stream
- * with a continuously updated per-key state. The user-provided state
tracking function is
- * applied on each keyed data item along with its corresponding state.
The function can choose to
- * update/remove the state and return a transformed data, which forms the
- * [[JavaTrackStateDStream]].
+ * Return a [[JavaMapWithStateDStream]] by applying a function to every
key-value element of
+ * `this` stream, while maintaining some state data for each unique key.
The mapping function
+ * and other specification (e.g. partitioners, timeouts, initial state
data, etc.) of this
+ * transformation can be specified using [[StateSpec]] class. The state
data is accessible in
+ * as a parameter of type [[State]] in the mapping function.
*
- * The specifications of this transformation is made through the
- * [[org.apache.spark.streaming.StateSpec StateSpec]] class. Besides the
tracking function, there
- * are a number of optional parameters - initial state data, number of
partitions, timeouts, etc.
- * See the [[org.apache.spark.streaming.StateSpec StateSpec]] for more
details.
- *
- * Example of using `trackStateByKey`:
+ * Example of using `mapWithState`:
* {{{
- * // A tracking function that maintains an integer state and return a
String
- * Function2<Optional<Integer>, State<Integer>, Optional<String>>
trackStateFunc =
- * new Function2<Optional<Integer>, State<Integer>,
Optional<String>>() {
- *
- * @Override
- * public Optional<String> call(Optional<Integer> one,
State<Integer> state) {
- * // Check if state exists, accordingly update/remove state
and return transformed data
- * }
+ * // A mapping function that maintains an integer state and return a
string
+ * Function3<String, Optional<Integer>, State<Integer>, String>
mappingFunction =
+ * new Function3<String, Optional<Integer>, State<Integer>,
String>() {
+ * @Override
+ * public Optional<String> call(Optional<Integer> value,
State<Integer> state) {
+ * // Use state.exists(), state.get(), state.update() and
state.remove()
+ * // to manage state, and return the necessary string
+ * }
* };
*
- * JavaTrackStateDStream<Integer, Integer, Integer, String>
trackStateDStream =
- * keyValueDStream.<Integer, String>trackStateByKey(
- * StateSpec.function(trackStateFunc).numPartitions(10));
+ * JavaMapWithStateDStream<Integer, Integer, Integer, String>
mapWithStateDStream =
--- End diff --
nit: the key type is `String` now.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]