[2/2] spark git commit: [SPARK-12244][SPARK-12245][STREAMING] Rename trackStateByKey to mapWithState and change tracking function signature

2015-12-09 Thread zsxwing
[SPARK-12244][SPARK-12245][STREAMING] Rename trackStateByKey to mapWithState 
and change tracking function signature

SPARK-12244:

Based on feedback from early users and personal experience attempting to 
explain it, the name trackStateByKey had two problem.
"trackState" is a completely new term which really does not give any intuition 
on what the operation is
the resultant data stream of objects returned by the function is called in docs 
as the "emitted" data for the lack of a better.
"mapWithState" makes sense because the API is like a mapping function like 
(Key, Value) => T with State as an additional parameter. The resultant data 
stream is "mapped data". So both problems are solved.

SPARK-12245:

>From initial experiences, not having the key in the function makes it hard to 
>return mapped stuff, as the whole information of the records is not there. 
>Basically the user is restricted to doing something like mapValue() instead of 
>map(). So adding the key as a parameter.

Author: Tathagata Das 

Closes #10224 from tdas/rename.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6d86617
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6d86617
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6d86617

Branch: refs/heads/branch-1.6
Commit: f6d8661738b5a4b139c4800d5c4e9f0094068451
Parents: 699f497
Author: Tathagata Das 
Authored: Wed Dec 9 20:47:15 2015 -0800
Committer: Shixiong Zhu 
Committed: Wed Dec 9 20:59:21 2015 -0800

--
 .../streaming/JavaStatefulNetworkWordCount.java |  16 +-
 .../streaming/StatefulNetworkWordCount.scala|  12 +-
 .../apache/spark/streaming/Java8APISuite.java   |  18 +-
 .../org/apache/spark/streaming/State.scala  |  20 +-
 .../org/apache/spark/streaming/StateSpec.scala  | 160 ++---
 .../api/java/JavaMapWithStateDStream.scala  |  44 ++
 .../streaming/api/java/JavaPairDStream.scala|  50 +-
 .../api/java/JavaTrackStateDStream.scala|  44 --
 .../streaming/dstream/MapWithStateDStream.scala | 170 ++
 .../dstream/PairDStreamFunctions.scala  |  41 +-
 .../streaming/dstream/TrackStateDStream.scala   | 171 --
 .../spark/streaming/rdd/MapWithStateRDD.scala   | 223 +++
 .../spark/streaming/rdd/TrackStateRDD.scala | 228 
 .../spark/streaming/JavaMapWithStateSuite.java  | 210 +++
 .../streaming/JavaTrackStateByKeySuite.java | 210 ---
 .../spark/streaming/MapWithStateSuite.scala | 581 +++
 .../spark/streaming/TrackStateByKeySuite.scala  | 581 ---
 .../streaming/rdd/MapWithStateRDDSuite.scala| 389 +
 .../streaming/rdd/TrackStateRDDSuite.scala  | 389 -
 19 files changed, 1782 insertions(+), 1775 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f6d86617/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
--
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
index c400e42..14997c6 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
@@ -65,7 +65,7 @@ public class JavaStatefulNetworkWordCount {
 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(1));
 ssc.checkpoint(".");
 
-// Initial RDD input to trackStateByKey
+// Initial state RDD input to mapWithState
 @SuppressWarnings("unchecked")
 List> tuples = Arrays.asList(new Tuple2("hello", 1),
 new Tuple2("world", 1));
@@ -90,21 +90,21 @@ public class JavaStatefulNetworkWordCount {
 });
 
 // Update the cumulative count function
-final Function4>> trackStateFunc =
-new Function4>>() {
+final Function3> mappingFunc =
+new Function3>() {
 
   @Override
-  public Optional> call(Time time, String 
word, Optional one, State state) {
+  public Tuple2 call(String word, Optional 
one, State state) {
 int sum = one.or(0) + (state.exists() ? state.get() : 0);
 Tuple2 output = new Tuple2(word, 

[2/2] spark git commit: [SPARK-12244][SPARK-12245][STREAMING] Rename trackStateByKey to mapWithState and change tracking function signature

2015-12-09 Thread zsxwing
[SPARK-12244][SPARK-12245][STREAMING] Rename trackStateByKey to mapWithState 
and change tracking function signature

SPARK-12244:

Based on feedback from early users and personal experience attempting to 
explain it, the name trackStateByKey had two problem.
"trackState" is a completely new term which really does not give any intuition 
on what the operation is
the resultant data stream of objects returned by the function is called in docs 
as the "emitted" data for the lack of a better.
"mapWithState" makes sense because the API is like a mapping function like 
(Key, Value) => T with State as an additional parameter. The resultant data 
stream is "mapped data". So both problems are solved.

SPARK-12245:

>From initial experiences, not having the key in the function makes it hard to 
>return mapped stuff, as the whole information of the records is not there. 
>Basically the user is restricted to doing something like mapValue() instead of 
>map(). So adding the key as a parameter.

Author: Tathagata Das 

Closes #10224 from tdas/rename.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd2cd4f5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd2cd4f5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd2cd4f5

Branch: refs/heads/master
Commit: bd2cd4f53d1ca10f4896bd39b0e180d4929867a2
Parents: 2166c2a
Author: Tathagata Das 
Authored: Wed Dec 9 20:47:15 2015 -0800
Committer: Shixiong Zhu 
Committed: Wed Dec 9 20:47:15 2015 -0800

--
 .../streaming/JavaStatefulNetworkWordCount.java |  16 +-
 .../streaming/StatefulNetworkWordCount.scala|  12 +-
 .../apache/spark/streaming/Java8APISuite.java   |  18 +-
 .../org/apache/spark/streaming/State.scala  |  20 +-
 .../org/apache/spark/streaming/StateSpec.scala  | 160 ++---
 .../api/java/JavaMapWithStateDStream.scala  |  44 ++
 .../streaming/api/java/JavaPairDStream.scala|  50 +-
 .../api/java/JavaTrackStateDStream.scala|  44 --
 .../streaming/dstream/MapWithStateDStream.scala | 170 ++
 .../dstream/PairDStreamFunctions.scala  |  41 +-
 .../streaming/dstream/TrackStateDStream.scala   | 171 --
 .../spark/streaming/rdd/MapWithStateRDD.scala   | 223 +++
 .../spark/streaming/rdd/TrackStateRDD.scala | 228 
 .../spark/streaming/JavaMapWithStateSuite.java  | 210 +++
 .../streaming/JavaTrackStateByKeySuite.java | 210 ---
 .../spark/streaming/MapWithStateSuite.scala | 581 +++
 .../spark/streaming/TrackStateByKeySuite.scala  | 581 ---
 .../streaming/rdd/MapWithStateRDDSuite.scala| 389 +
 .../streaming/rdd/TrackStateRDDSuite.scala  | 389 -
 19 files changed, 1782 insertions(+), 1775 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bd2cd4f5/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
--
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
index c400e42..14997c6 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
@@ -65,7 +65,7 @@ public class JavaStatefulNetworkWordCount {
 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(1));
 ssc.checkpoint(".");
 
-// Initial RDD input to trackStateByKey
+// Initial state RDD input to mapWithState
 @SuppressWarnings("unchecked")
 List> tuples = Arrays.asList(new Tuple2("hello", 1),
 new Tuple2("world", 1));
@@ -90,21 +90,21 @@ public class JavaStatefulNetworkWordCount {
 });
 
 // Update the cumulative count function
-final Function4>> trackStateFunc =
-new Function4>>() {
+final Function3> mappingFunc =
+new Function3>() {
 
   @Override
-  public Optional> call(Time time, String 
word, Optional one, State state) {
+  public Tuple2 call(String word, Optional 
one, State state) {
 int sum = one.or(0) + (state.exists() ? state.get() : 0);
 Tuple2 output = new Tuple2(word,