Repository: spark
Updated Branches:
  refs/heads/branch-1.0 69ec3149f -> c7571d8c6


Fixed streaming examples docs to use run-example instead of spark-submit

Pretty self-explanatory

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #722 from tdas/example-fix and squashes the following commits:

7839979 [Tathagata Das] Minor changes.
0673441 [Tathagata Das] Fixed java docs of java streaming example
e687123 [Tathagata Das] Fixed scala style errors.
9b8d112 [Tathagata Das] Fixed streaming examples docs to use run-example 
instead of spark-submit.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c7571d8c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c7571d8c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c7571d8c

Branch: refs/heads/branch-1.0
Commit: c7571d8c6ba058b67cca2b910fd0efacc06642cd
Parents: 69ec314
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Wed May 14 04:17:32 2014 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed May 14 04:24:48 2014 -0700

----------------------------------------------------------------------
 .../examples/streaming/JavaCustomReceiver.java  | 13 ++---
 .../examples/streaming/JavaFlumeEventCount.java |  6 +-
 .../examples/streaming/JavaKafkaWordCount.java  |  6 +-
 .../streaming/JavaNetworkWordCount.java         | 13 +++--
 .../examples/streaming/ActorWordCount.scala     |  6 +-
 .../examples/streaming/CustomReceiver.scala     | 19 +++----
 .../examples/streaming/FlumeEventCount.scala    |  9 ++-
 .../examples/streaming/HdfsWordCount.scala      |  5 +-
 .../examples/streaming/KafkaWordCount.scala     |  6 +-
 .../examples/streaming/MQTTWordCount.scala      | 10 ++--
 .../examples/streaming/NetworkWordCount.scala   | 14 +++--
 .../streaming/RecoverableNetworkWordCount.scala |  7 +--
 .../streaming/StatefulNetworkWordCount.scala    |  6 +-
 .../examples/streaming/TwitterPopularTags.scala | 22 +++++++-
 .../examples/streaming/ZeroMQWordCount.scala    |  8 +--
 .../clickstream/PageViewGenerator.scala         | 10 ++--
 .../streaming/clickstream/PageViewStream.scala  |  7 ++-
 .../streaming/twitter/TwitterInputDStream.scala | 58 ++++++++++++--------
 18 files changed, 130 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c7571d8c/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
index 7f558f3..5622df5 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
@@ -19,6 +19,7 @@ package org.apache.spark.examples.streaming;
 
 import com.google.common.collect.Lists;
 
+import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFunction;
@@ -48,25 +49,23 @@ import java.util.regex.Pattern;
  * To run this on your local machine, you need to first run a Netcat server
  *    `$ nc -lk 9999`
  * and then run the example
- *    `$ ./run org.apache.spark.examples.streaming.JavaCustomReceiver local[2] 
localhost 9999`
+ *    `$ bin/run-example 
org.apache.spark.examples.streaming.JavaCustomReceiver localhost 9999`
  */
 
 public class JavaCustomReceiver extends Receiver<String> {
   private static final Pattern SPACE = Pattern.compile(" ");
 
   public static void main(String[] args) {
-    if (args.length < 3) {
-      System.err.println("Usage: JavaNetworkWordCount <master> <hostname> 
<port>\n" +
-          "In local mode, <master> should be 'local[n]' with n > 1");
+    if (args.length < 2) {
+      System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
       System.exit(1);
     }
 
     StreamingExamples.setStreamingLogLevels();
 
     // Create the context with a 1 second batch size
-    JavaStreamingContext ssc = new JavaStreamingContext(args[0], 
"JavaNetworkWordCount",
-            new Duration(1000), System.getenv("SPARK_HOME"),
-            JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class));
+    SparkConf sparkConf = new SparkConf().setAppName("JavaCustomReceiver");
+    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new 
Duration(1000));
 
     // Create a input stream with the custom receiver on target ip:port and 
count the
     // words in input stream of \n delimited text (eg. generated by 'nc')

http://git-wip-us.apache.org/repos/asf/spark/blob/c7571d8c/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
index 400b68c..da56637 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
@@ -33,10 +33,12 @@ import org.apache.spark.streaming.flume.SparkFlumeEvent;
  *  Your Flume AvroSink should be pointed to this address.
  *
  *  Usage: JavaFlumeEventCount <host> <port>
- *
  *    <host> is the host the Flume receiver will be started on - a receiver
  *           creates a server and listens for flume events.
  *    <port> is the port the Flume receiver will listen on.
+ *
+ *  To run this example:
+ *     `$ bin/run-example 
org.apache.spark.examples.streaming.JavaFlumeEventCount <host> <port>`
  */
 public final class JavaFlumeEventCount {
   private JavaFlumeEventCount() {
@@ -56,7 +58,7 @@ public final class JavaFlumeEventCount {
     Duration batchInterval = new Duration(2000);
     SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
     JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
batchInterval);
-    JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = 
FlumeUtils.createStream(ssc, "localhost", port);
+    JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = 
FlumeUtils.createStream(ssc, host, port);
 
     flumeStream.count();
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c7571d8c/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
index 6a74cc5..16ae9a3 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
@@ -40,15 +40,15 @@ import org.apache.spark.streaming.kafka.KafkaUtils;
 
 /**
  * Consumes messages from one or more topics in Kafka and does wordcount.
+ *
  * Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>
  *   <zkQuorum> is a list of one or more zookeeper servers that make quorum
  *   <group> is the name of kafka consumer group
  *   <topics> is a list of one or more kafka topics to consume from
  *   <numThreads> is the number of threads the kafka consumer should use
  *
- * Example:
- *    `./bin/spark-submit examples.jar \
- *    --class org.apache.spark.examples.streaming.JavaKafkaWordCount 
zoo01,zoo02, \
+ * To run this example:
+ *   `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount 
zoo01,zoo02, \
  *    zoo03 my-consumer-group topic1,topic2 1`
  */
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c7571d8c/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
index e5cbd39..45bcede 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
@@ -24,7 +24,7 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.examples.streaming.StreamingExamples;
+import org.apache.spark.api.java.StorageLevels;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
@@ -41,8 +41,7 @@ import java.util.regex.Pattern;
  * To run this on your local machine, you need to first run a Netcat server
  *    `$ nc -lk 9999`
  * and then run the example
- *    `$ ./bin/spark-submit examples.jar \
- *    --class org.apache.spark.examples.streaming.JavaNetworkWordCount 
localhost 9999`
+ *    `$ bin/run-example 
org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999`
  */
 public final class JavaNetworkWordCount {
   private static final Pattern SPACE = Pattern.compile(" ");
@@ -54,13 +53,17 @@ public final class JavaNetworkWordCount {
     }
 
     StreamingExamples.setStreamingLogLevels();
-    SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
+
     // Create the context with a 1 second batch size
+    SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
     JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,  new 
Duration(1000));
 
     // Create a JavaReceiverInputDStream on target ip:port and count the
     // words in input stream of \n delimited text (eg. generated by 'nc')
-    JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[0], 
Integer.parseInt(args[1]));
+    // Note that no duplication in storage level only for running locally.
+    // Replication necessary in distributed scenario for fault tolerance.
+    JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
+            args[0], Integer.parseInt(args[1]), 
StorageLevels.MEMORY_AND_DISK_SER);
     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
       @Override
       public Iterable<String> call(String x) {

http://git-wip-us.apache.org/repos/asf/spark/blob/c7571d8c/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
index e29e16a..b433082 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
@@ -130,11 +130,9 @@ object FeederActor {
  *   <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is 
running on.
  *
  * To run this example locally, you may run Feeder Actor as
- *    `./bin/spark-submit examples.jar \
- *    --class org.apache.spark.examples.streaming.FeederActor 127.0.1.1 9999`
+ *    `$ bin/run-example org.apache.spark.examples.streaming.FeederActor 
127.0.1.1 9999`
  * and then run the example
- *    `./bin/spark-submit examples.jar --class 
org.apache.spark.examples.streaming.ActorWordCount \
- *     127.0.1.1 9999`
+ *    `$ bin/run-example org.apache.spark.examples.streaming.ActorWordCount 
127.0.1.1 9999`
  */
 object ActorWordCount {
   def main(args: Array[String]) {

http://git-wip-us.apache.org/repos/asf/spark/blob/c7571d8c/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
index e317e2d..6bb659f 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
@@ -20,7 +20,7 @@ package org.apache.spark.examples.streaming
 import java.io.{InputStreamReader, BufferedReader, InputStream}
 import java.net.Socket
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext._
@@ -30,32 +30,27 @@ import org.apache.spark.streaming.receiver.Receiver
  * Custom Receiver that receives data over a socket. Received bytes is 
interpreted as
  * text and \n delimited lines are considered as records. They are then 
counted and printed.
  *
- * Usage: CustomReceiver <master> <hostname> <port>
- *   <master> is the Spark master URL. In local mode, <master> should be 
'local[n]' with n > 1.
- *   <hostname> and <port> of the TCP server that Spark Streaming would 
connect to receive data.
- *
  * To run this on your local machine, you need to first run a Netcat server
  *    `$ nc -lk 9999`
  * and then run the example
- *    `$ ./run org.apache.spark.examples.streaming.CustomReceiver local[2] 
localhost 9999`
+ *    `$ bin/run-example org.apache.spark.examples.streaming.CustomReceiver 
localhost 9999`
  */
 object CustomReceiver {
   def main(args: Array[String]) {
-    if (args.length < 3) {
-      System.err.println("Usage: NetworkWordCount <master> <hostname> 
<port>\n" +
-        "In local mode, <master> should be 'local[n]' with n > 1")
+    if (args.length < 2) {
+      System.err.println("Usage: CustomReceiver <hostname> <port>")
       System.exit(1)
     }
 
     StreamingExamples.setStreamingLogLevels()
 
     // Create the context with a 1 second batch size
-    val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
-      System.getenv("SPARK_HOME"), 
StreamingContext.jarOfClass(this.getClass).toSeq)
+    val sparkConf = new SparkConf().setAppName("CustomReceiver")
+    val ssc = new StreamingContext(sparkConf, Seconds(1))
 
     // Create a input stream with the custom receiver on target ip:port and 
count the
     // words in input stream of \n delimited text (eg. generated by 'nc')
-    val lines = ssc.receiverStream(new CustomReceiver(args(1), args(2).toInt))
+    val lines = ssc.receiverStream(new CustomReceiver(args(0), args(1).toInt))
     val words = lines.flatMap(_.split(" "))
     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
     wordCounts.print()

http://git-wip-us.apache.org/repos/asf/spark/blob/c7571d8c/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
index 38362ed..20e7df7 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
@@ -31,14 +31,16 @@ import org.apache.spark.util.IntParam
  *  Your Flume AvroSink should be pointed to this address.
  *
  *  Usage: FlumeEventCount <host> <port>
- *
  *    <host> is the host the Flume receiver will be started on - a receiver
  *           creates a server and listens for flume events.
  *    <port> is the port the Flume receiver will listen on.
+ *
+ *  To run this example:
+ *    `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount 
<host> <port> `
  */
 object FlumeEventCount {
   def main(args: Array[String]) {
-    if (args.length != 3) {
+    if (args.length < 2) {
       System.err.println(
         "Usage: FlumeEventCount <host> <port>")
       System.exit(1)
@@ -49,8 +51,9 @@ object FlumeEventCount {
     val Array(host, IntParam(port)) = args
 
     val batchInterval = Milliseconds(2000)
-    val sparkConf = new SparkConf().setAppName("FlumeEventCount")
+
     // Create the context and set the batch size
+    val sparkConf = new SparkConf().setAppName("FlumeEventCount")
     val ssc = new StreamingContext(sparkConf, batchInterval)
 
     // Create a flume stream

http://git-wip-us.apache.org/repos/asf/spark/blob/c7571d8c/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
index 55ac48c..6c24bc3 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
@@ -27,8 +27,9 @@ import org.apache.spark.streaming.StreamingContext._
  *   <directory> is the directory that Spark Streaming will use to find and 
read new text files.
  *
  * To run this on your local machine on directory `localdir`, run this example
- *    `$ ./bin/spark-submit examples.jar \
- *    --class org.apache.spark.examples.streaming.HdfsWordCount localdir`
+ *    $ bin/run-example \
+ *       org.apache.spark.examples.streaming.HdfsWordCount localdir
+ *
  * Then create a text file in `localdir` and the words in the file will get 
counted.
  */
 object HdfsWordCount {

http://git-wip-us.apache.org/repos/asf/spark/blob/c7571d8c/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
index 3af8069..566ba6f 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
@@ -35,9 +35,9 @@ import org.apache.spark.SparkConf
  *   <numThreads> is the number of threads the kafka consumer should use
  *
  * Example:
- *    `./bin/spark-submit examples.jar \
- *    --class org.apache.spark.examples.streaming.KafkaWordCount local[2] 
zoo01,zoo02,zoo03 \
- *    my-consumer-group topic1,topic2 1`
+ *    `$ bin/run-example \
+ *      org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \
+ *      my-consumer-group topic1,topic2 1`
  */
 object KafkaWordCount {
   def main(args: Array[String]) {

http://git-wip-us.apache.org/repos/asf/spark/blob/c7571d8c/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
index 3a10daa..e4283e0 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
@@ -75,14 +75,14 @@ object MQTTPublisher {
  * Example Java code for Mqtt Publisher and Subscriber can be found here
  * https://bitbucket.org/mkjinesh/mqttclient
  * Usage: MQTTWordCount <MqttbrokerUrl> <topic>
-\ *   <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running.
+ *   <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running.
  *
  * To run this example locally, you may run publisher as
- *    `$ ./bin/spark-submit examples.jar \
- *    --class org.apache.spark.examples.streaming.MQTTPublisher 
tcp://localhost:1883 foo`
+ *    `$ bin/run-example \
+ *      org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 
foo`
  * and run the example as
- *    `$ ./bin/spark-submit examples.jar \
- *    --class org.apache.spark.examples.streaming.MQTTWordCount 
tcp://localhost:1883 foo`
+ *    `$ bin/run-example \
+ *      org.apache.spark.examples.streaming.MQTTWordCount tcp://localhost:1883 
foo`
  */
 object MQTTWordCount {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c7571d8c/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
index ad7a199..ae0a08c 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
@@ -23,7 +23,7 @@ import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.storage.StorageLevel
 
 /**
- * Counts words in text encoded with UTF8 received from the network every 
second.
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network 
every second.
  *
  * Usage: NetworkWordCount <hostname> <port>
  * <hostname> and <port> describe the TCP server that Spark Streaming would 
connect to receive data.
@@ -31,8 +31,7 @@ import org.apache.spark.storage.StorageLevel
  * To run this on your local machine, you need to first run a Netcat server
  *    `$ nc -lk 9999`
  * and then run the example
- *    `$ ./bin/spark-submit examples.jar \
- *    --class org.apache.spark.examples.streaming.NetworkWordCount localhost 
9999`
+ *    `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount 
localhost 9999`
  */
 object NetworkWordCount {
   def main(args: Array[String]) {
@@ -42,13 +41,16 @@ object NetworkWordCount {
     }
 
     StreamingExamples.setStreamingLogLevels()
-    val sparkConf = new SparkConf().setAppName("NetworkWordCount");
+
     // Create the context with a 1 second batch size
+    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
     val ssc = new StreamingContext(sparkConf, Seconds(1))
 
-    // Create a NetworkInputDStream on target ip:port and count the
+    // Create a socket stream on target ip:port and count the
     // words in input stream of \n delimited text (eg. generated by 'nc')
-    val lines = ssc.socketTextStream(args(0), args(1).toInt, 
StorageLevel.MEMORY_ONLY_SER)
+    // Note that no duplication in storage level only for running locally.
+    // Replication necessary in distributed scenario for fault tolerance.
+    val lines = ssc.socketTextStream(args(0), args(1).toInt, 
StorageLevel.MEMORY_AND_DISK_SER)
     val words = lines.flatMap(_.split(" "))
     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
     wordCounts.print()

http://git-wip-us.apache.org/repos/asf/spark/blob/c7571d8c/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
index ace785d..6af3a0f 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
@@ -46,8 +46,7 @@ import org.apache.spark.util.IntParam
  *
  * and run the example as
  *
- *      `$ ./bin/spark-submit examples.jar \
- *      --class 
org.apache.spark.examples.streaming.RecoverableNetworkWordCount \
+ *      `$ ./bin/run-example 
org.apache.spark.examples.streaming.RecoverableNetworkWordCount \
  *              localhost 9999 ~/checkpoint/ ~/out`
  *
  * If the directory ~/checkpoint/ does not exist (e.g. running for the first 
time), it will create
@@ -57,7 +56,7 @@ import org.apache.spark.util.IntParam
  *
  * To run this example in a local standalone cluster with automatic driver 
recovery,
  *
- *      `$ ./spark-class org.apache.spark.deploy.Client -s launch 
<cluster-url> \
+ *      `$ bin/spark-class org.apache.spark.deploy.Client -s launch 
<cluster-url> \
  *              <path-to-examples-jar> \
  *              
org.apache.spark.examples.streaming.RecoverableNetworkWordCount <cluster-url> \
  *              localhost 9999 ~/checkpoint ~/out`
@@ -81,7 +80,7 @@ object RecoverableNetworkWordCount {
     // Create the context with a 1 second batch size
     val ssc = new StreamingContext(sparkConf, Seconds(1))
 
-    // Create a NetworkInputDStream on target ip:port and count the
+    // Create a socket stream on target ip:port and count the
     // words in input stream of \n delimited text (eg. generated by 'nc')
     val lines = ssc.socketTextStream(ip, port)
     val words = lines.flatMap(_.split(" "))

http://git-wip-us.apache.org/repos/asf/spark/blob/c7571d8c/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
index 5e1415f..daa1ced 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
@@ -31,8 +31,8 @@ import org.apache.spark.streaming.StreamingContext._
  * To run this on your local machine, you need to first run a Netcat server
  *    `$ nc -lk 9999`
  * and then run the example
- *    `$ ./bin/spark-submit examples.jar
- *    --class org.apache.spark.examples.streaming.StatefulNetworkWordCount 
localhost 9999`
+ *    `$ bin/run-example
+ *      org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 
9999`
  */
 object StatefulNetworkWordCount {
   def main(args: Array[String]) {
@@ -51,7 +51,7 @@ object StatefulNetworkWordCount {
       Some(currentCount + previousCount)
     }
 
-    val sparkConf = new 
SparkConf().setAppName("NetworkWordCumulativeCountUpdateStateByKey")
+    val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
     // Create the context with a 1 second batch size
     val ssc = new StreamingContext(sparkConf, Seconds(1))
     ssc.checkpoint(".")

http://git-wip-us.apache.org/repos/asf/spark/blob/c7571d8c/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
index 1ddff22..f55d23a 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
@@ -28,13 +28,29 @@ import org.apache.spark.SparkConf
  * stream. The stream is instantiated with credentials and optionally filters 
supplied by the
  * command line arguments.
  *
+ * Run this on your local machine as
+ *
  */
 object TwitterPopularTags {
   def main(args: Array[String]) {
+    if (args.length < 4) {
+      System.err.println("Usage: TwitterPopularTags <consumer key> <consumer 
secret> " +
+        "<access token> <access token secret> [<filters>]")
+      System.exit(1)
+    }
 
     StreamingExamples.setStreamingLogLevels()
 
-    val filters = args
+    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = 
args.take(4)
+    val filters = args.takeRight(args.length - 4)
+
+    // Set the system properties so that Twitter4j library used by twitter 
stream
+    // can use them to generat OAuth credentials
+    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
+    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
+    System.setProperty("twitter4j.oauth.accessToken", accessToken)
+    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
+
     val sparkConf = new SparkConf().setAppName("TwitterPopularTags")
     val ssc = new StreamingContext(sparkConf, Seconds(2))
     val stream = TwitterUtils.createStream(ssc, None, filters)
@@ -52,13 +68,13 @@ object TwitterPopularTags {
 
     // Print popular hashtags
     topCounts60.foreachRDD(rdd => {
-      val topList = rdd.take(5)
+      val topList = rdd.take(10)
       println("\nPopular topics in last 60 seconds (%s 
total):".format(rdd.count()))
       topList.foreach{case (count, tag) => println("%s (%s 
tweets)".format(tag, count))}
     })
 
     topCounts10.foreachRDD(rdd => {
-      val topList = rdd.take(5)
+      val topList = rdd.take(10)
       println("\nPopular topics in last 10 seconds (%s 
total):".format(rdd.count()))
       topList.foreach{case (count, tag) => println("%s (%s 
tweets)".format(tag, count))}
     })

http://git-wip-us.apache.org/repos/asf/spark/blob/c7571d8c/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
index 7ade3f1..79905af 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
@@ -68,11 +68,11 @@ object SimpleZeroMQPublisher {
  *   <zeroMQurl> and <topic> describe where zeroMq publisher is running.
  *
  * To run this example locally, you may run publisher as
- *    `$ ./bin/spark-submit examples.jar \
- *    --class org.apache.spark.examples.streaming.SimpleZeroMQPublisher 
tcp://127.0.1.1:1234 foo.bar`
+ *    `$ bin/run-example \
+ *      org.apache.spark.examples.streaming.SimpleZeroMQPublisher 
tcp://127.0.1.1:1234 foo.bar`
  * and run the example as
- *    `$ ./bin/spark-submit examples.jar \
- *    --class org.apache.spark.examples.streaming.ZeroMQWordCount 
tcp://127.0.1.1:1234 foo`
+ *    `$ bin/run-example \
+ *      org.apache.spark.examples.streaming.ZeroMQWordCount 
tcp://127.0.1.1:1234 foo`
  */
 // scalastyle:on
 object ZeroMQWordCount {

http://git-wip-us.apache.org/repos/asf/spark/blob/c7571d8c/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
index 97e0cb9..8402491 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
@@ -40,11 +40,13 @@ object PageView extends Serializable {
 /** Generates streaming events to simulate page views on a website.
   *
   * This should be used in tandem with PageViewStream.scala. Example:
-  * $ ./bin/run-example 
org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10
-  * $ ./bin/run-example 
org.apache.spark.examples.streaming.clickstream.PageViewStream 
errorRatePerZipCode localhost 44444
   *
-  * When running this, you may want to set the root logging level to ERROR in
-  * conf/log4j.properties to reduce the verbosity of the output.
+  * To run the generator
+  * `$ bin/run-example 
org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10`
+  * To process the generated stream
+  * `$ bin/run-example \
+  *    org.apache.spark.examples.streaming.clickstream.PageViewStream 
errorRatePerZipCode localhost 44444`
+  *
   */
 // scalastyle:on
 object PageViewGenerator {

http://git-wip-us.apache.org/repos/asf/spark/blob/c7571d8c/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
index d30ceff..d9b886e 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
@@ -26,8 +26,11 @@ import org.apache.spark.examples.streaming.StreamingExamples
   * operators available in Spark streaming.
   *
   * This should be used in tandem with PageViewStream.scala. Example:
-  * $ ./bin/run-example 
org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10
-  * $ ./bin/run-example 
org.apache.spark.examples.streaming.clickstream.PageViewStream 
errorRatePerZipCode localhost 44444
+  * To run the generator
+  * `$ bin/run-example 
org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10`
+  * To process the generated stream
+  * `$ bin/run-example \
+  *    org.apache.spark.examples.streaming.clickstream.PageViewStream 
errorRatePerZipCode localhost 44444`
   */
 // scalastyle:on
 object PageViewStream {

http://git-wip-us.apache.org/repos/asf/spark/blob/c7571d8c/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
 
b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 7bca140..5ea2e55 100644
--- 
a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ 
b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -63,36 +63,48 @@ class TwitterReceiver(
     storageLevel: StorageLevel
   ) extends Receiver[Status](storageLevel) with Logging {
 
-  var twitterStream: TwitterStream = _
+  private var twitterStream: TwitterStream = _
 
   def onStart() {
-    twitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
-    twitterStream.addListener(new StatusListener {
-      def onStatus(status: Status) = {
-        store(status)
-      }
-      // Unimplemented
-      def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
-      def onTrackLimitationNotice(i: Int) {}
-      def onScrubGeo(l: Long, l1: Long) {}
-      def onStallWarning(stallWarning: StallWarning) {}
-      def onException(e: Exception) {
-        restart("Error receiving tweets", e)
-      }
-    })
+    try {
+      val newTwitterStream = new 
TwitterStreamFactory().getInstance(twitterAuth)
+      newTwitterStream.addListener(new StatusListener {
+        def onStatus(status: Status) = {
+          store(status)
+        }
+        // Unimplemented
+        def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
+        def onTrackLimitationNotice(i: Int) {}
+        def onScrubGeo(l: Long, l1: Long) {}
+        def onStallWarning(stallWarning: StallWarning) {}
+        def onException(e: Exception) {
+          restart("Error receiving tweets", e)
+        }
+      })
 
-    val query = new FilterQuery
-    if (filters.size > 0) {
-      query.track(filters.toArray)
-      twitterStream.filter(query)
-    } else {
-      twitterStream.sample()
+      val query = new FilterQuery
+      if (filters.size > 0) {
+        query.track(filters.toArray)
+        newTwitterStream.filter(query)
+      } else {
+        newTwitterStream.sample()
+      }
+      setTwitterStream(newTwitterStream)
+      logInfo("Twitter receiver started")
+    } catch {
+      case e: Exception => restart("Error starting Twitter stream", e)
     }
-    logInfo("Twitter receiver started")
   }
 
   def onStop() {
-    twitterStream.shutdown()
+    setTwitterStream(null)
     logInfo("Twitter receiver stopped")
   }
+
+  private def setTwitterStream(newTwitterStream: TwitterStream) = synchronized 
{
+    if (twitterStream != null) {
+      twitterStream.shutdown()
+    }
+    twitterStream = newTwitterStream
+  }
 }

Reply via email to