Hi,
I am counting values in each window and find the top values and want to save 
only the top 10 frequent values of each window to hdfs rather than all the 
values.
eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) -> 
1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)val counts = eegStreams(a).map(x 
=> (math.round(x.toDouble), 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), 
Seconds(4))val sortedCounts = counts.map(_.swap).transform(rdd => 
rdd.sortByKey(false)).map(_.swap)//sortedCounts.foreachRDD(rdd =>println("\nTop 
10 amplitudes:\n" + rdd.take(10).mkString("\n")))sortedCounts.map(tuple => 
"%s,%s".format(tuple._1, 
tuple._2)).saveAsTextFiles("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/"
 + (a+1))

I can print top 10 as above in red.
I have also tried 
sortedCounts.foreachRDD{ rdd => 
ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/"
 + (a+1))} 

but I get the following error.
15/01/05 17:12:23 ERROR actor.OneForOneStrategy: 
org.apache.spark.streaming.StreamingContextjava.io.NotSerializableException: 
org.apache.spark.streaming.StreamingContext
Regards,Laeeq   

Reply via email to