I am trying to run a simple Spark Streaming job: counting words from hdfs. I 
cannot even compile the scala source. I get the following error:
"error: value awaitTermination is not a member of 
org.apache.spark.streaming.StreamingContext
    ssc.awaitTermination()
"

This is the code from the .scala file:

"import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._

/**
 * Counts words in new text files created in the given directory
 * Usage: HdfsWordCount <master> <directory>
 *   <master> is the Spark master URL.
 *   <directory> is the directory that Spark Streaming will use to find and 
read new text files.
 *
 * To run this on your local machine on directory `localdir`, run this example
 *    `$ ./bin/run-example org.apache.spark.streaming.examples.HdfsWordCount 
local[2] localdir`
 * Then create a text file in `localdir` and the words in the file will get 
counted.
 */
object HdfsWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: HdfsWordCount <master> <directory>")
      System.exit(1)
    }

  //  StreamingExamples.setStreamingLogLevels()

    // Create the context
//    val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2), 
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_HOME")))
    val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2))
    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created
    val lines = ssc.textFileStream(args(1))
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
"

Anyone knows what is going on behind this ?

Reply via email to