Hi Friends,
I am new to Spark and Spark streaming. I am trying to save a DStream to
file but can't figure out how to do it with provided methods on DStream
(saveAsTextFiles). Following is what I am trying to do

Eg. DStream of type DStream[(String, String)]

("file1.txt", "msg_a"), ("file1.txt", "msg_b"), ("file2.txt",
"msg_c"), ("file2.txt", "msg_d")

I want the messages msg_a & msg_b to be stored in file1.txt and msg_c &
msg_d in file2.txt

I got as far as grouping the stream on the first item in tuple but I can't
figure out how to get the file name out of the stream and pass that to
saveAsTextFile() method on DStream

Following is what I came up with so far. I am consuming messages from Kafka.

object SparkStreamingTest2 {
  val sparkHome = "/Users/parth/Projects/spark-0.8.0-incubating"

  def main(args: Array[String]) {

    if (args.length < 5) {
      System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group>
<topics> <numThreads>")
      System.exit(1)
    }

    val Array(master, zkQuorum, zkGroup, topics, numThreads) = args

    val ssc =  new StreamingContext(master, "KafkaWordCount", Seconds(4),
sparkHome, List("target/scala-2.9.3/sparktest_2.9.3-0.0.1.jar"))

    ssc.checkpoint("checkpoint")

    val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap

    // lines are in format "filename.txt#msg_x"
    val lines: DStream[String] = ssc.kafkaStream(zkQuorum, zkGroup,
topicpMap)

    val fileNameAndMsgTuples = lines.map { line =>
      val Array(fileName, message) = line.split("#")
      (fileName, message)
    }

    val groupedByFileName: DStream[(String, Seq[String])] =
fileNameAndMsgTuples.groupByKey()

    // Save message groups to file ??
  }
}



Thanks for your help in advance!


-- 
Best,
Parth

Reply via email to