Hi Friends,
I am new to Spark and Spark streaming. I am trying to save a DStream to
file but can't figure out how to do it with provided methods on DStream
(saveAsTextFiles). Following is what I am trying to do
Eg. DStream of type DStream[(String, String)]
("file1.txt", "msg_a"), ("file1.txt", "msg_b"), ("file2.txt",
"msg_c"), ("file2.txt", "msg_d")
I want the messages msg_a & msg_b to be stored in file1.txt and msg_c &
msg_d in file2.txt
I got as far as grouping the stream on the first item in tuple but I can't
figure out how to get the file name out of the stream and pass that to
saveAsTextFile() method on DStream
Following is what I came up with so far. I am consuming messages from Kafka.
object SparkStreamingTest2 {
val sparkHome = "/Users/parth/Projects/spark-0.8.0-incubating"
def main(args: Array[String]) {
if (args.length < 5) {
System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group>
<topics> <numThreads>")
System.exit(1)
}
val Array(master, zkQuorum, zkGroup, topics, numThreads) = args
val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(4),
sparkHome, List("target/scala-2.9.3/sparktest_2.9.3-0.0.1.jar"))
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
// lines are in format "filename.txt#msg_x"
val lines: DStream[String] = ssc.kafkaStream(zkQuorum, zkGroup,
topicpMap)
val fileNameAndMsgTuples = lines.map { line =>
val Array(fileName, message) = line.split("#")
(fileName, message)
}
val groupedByFileName: DStream[(String, Seq[String])] =
fileNameAndMsgTuples.groupByKey()
// Save message groups to file ??
}
}
Thanks for your help in advance!
--
Best,
Parth