Hi, I am facing a similar issue. I am trying a Spark Streaming job with a Text File Stream on HDFS with Spark 0.9.0 from cloudera. I am saving the RDD (100 seconds is streaming frequency) to HDFS in a different directory. Every 100 seconds, it is creating a new directory in HDFS with _Success(stream-Random/_Success). But, it is not adding any data/output to it. I verified that I am adding new files to the correct HDFS directory. Although, at specified interval, it does create a new folder in HDFS with _Success. So, the major issue is that it is not able to recognize new files created in HDFS.
Code used : val ssc = new StreamingContext(ClusterConfig.sparkMaster, "Hybrid", Duration(100000), ClusterConfig.sparkHome, ClusterConfig.jars) val data = ssc.textFileStream(ClusterConfig.hdfsNN + "correct/path/to/data") data.foreachRDD(rdd => rdd.saveAsObjectFile(ClusterConfig.hdfsNN + "/user/<path/to/file/stream>" + Random.nextInt)) ssc.start It is creating these directories with only _Success : stream562343230 stream1228731977 stream318151149 stream603511115 This is the log that I get : 14/02/17 14:08:20 INFO FileInputDStream: Finding new files took 549 ms 14/02/17 14:08:20 INFO FileInputDStream: New files at time 1392626300000 ms: 14/02/17 14:08:20 INFO JobScheduler: Added jobs for time 1392626300000 ms 14/02/17 14:08:20 INFO JobScheduler: Starting job streaming job 1392626300000 ms.0 from job set of time 1392626300000 ms 14/02/17 14:08:20 INFO SequenceFileRDDFunctions: Saving as sequence file of type (NullWritable,BytesWritable) 14/02/17 14:08:20 WARN Configuration: mapred.job.id is deprecated. Instead, use mapreduce.job.id 14/02/17 14:08:20 WARN Configuration: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 14/02/17 14:08:20 WARN Configuration: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 14/02/17 14:08:20 WARN Configuration: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 14/02/17 14:08:20 WARN Configuration: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 14/02/17 14:08:20 INFO SparkContext: Starting job: saveAsObjectFile at TestStreaming.scala:29 14/02/17 14:08:20 INFO SparkContext: Job finished: saveAsObjectFile at TestStreaming.scala:29, took 0.001934866 s 14/02/17 14:08:20 INFO JobScheduler: Finished job streaming job 1392626300000 ms.0 from job set of time 1392626300000 ms 14/02/17 14:08:20 INFO JobScheduler: Total delay: 0.741 s for time 1392626300000 ms (execution: 0.167 s) 14/02/17 14:08:20 INFO FileInputDStream: Cleared 0 old files that were older than 1392626200000 ms: 14/02/17 14:10:00 INFO FileInputDStream: Finding new files took 6 ms 14/02/17 14:10:00 INFO FileInputDStream: New files at time 1392626400000 ms: 14/02/17 14:10:00 INFO JobScheduler: Added jobs for time 1392626400000 ms 14/02/17 14:10:00 INFO JobScheduler: Starting job streaming job 1392626400000 ms.0 from job set of time 1392626400000 ms 14/02/17 14:10:00 INFO SequenceFileRDDFunctions: Saving as sequence file of type (NullWritable,BytesWritable) 14/02/17 14:10:00 INFO SparkContext: Starting job: saveAsObjectFile at TestStreaming.scala:29 14/02/17 14:10:00 INFO SparkContext: Job finished: saveAsObjectFile at TestStreaming.scala:29, took 1.9016E-5 s 14/02/17 14:10:00 INFO JobScheduler: Finished job streaming job 1392626400000 ms.0 from job set of time 1392626400000 ms 14/02/17 14:10:00 INFO JobScheduler: Total delay: 0.085 s for time 1392626400000 ms (execution: 0.077 s) 14/02/17 14:10:00 INFO FileInputDStream: Cleared 0 old files that were older than 1392626300000 ms: 14/02/17 14:11:40 INFO FileInputDStream: Finding new files took 5 ms 14/02/17 14:11:40 INFO FileInputDStream: New files at time 1392626500000 ms: 14/02/17 14:11:40 INFO JobScheduler: Added jobs for time 1392626500000 ms 14/02/17 14:11:40 INFO JobScheduler: Starting job streaming job 1392626500000 ms.0 from job set of time 1392626500000 ms 14/02/17 14:11:40 INFO SequenceFileRDDFunctions: Saving as sequence file of type (NullWritable,BytesWritable) 14/02/17 14:11:40 INFO SparkContext: Starting job: saveAsObjectFile at TestStreaming.scala:29 14/02/17 14:11:40 INFO SparkContext: Job finished: saveAsObjectFile at TestStreaming.scala:29, took 1.8111E-5 s 14/02/17 14:11:40 INFO JobScheduler: Finished job streaming job 1392626500000 ms.0 from job set of time 1392626500000 ms 14/02/17 14:11:40 INFO FileInputDStream: Cleared 1 old files that were older than 1392626400000 ms: 1392626300000 ms 14/02/17 14:11:40 INFO JobScheduler: Total delay: 0.110 s for time 1392626500000 ms (execution: 0.102 s) Thanks and Regards, Suraj Sheth -----Original Message----- From: robin_up [mailto:[email protected]] Sent: 18 February 2014 12:55 To: [email protected] Subject: DStream.saveAsTextFiles() saves nothing Hi, I have a Streaming app which reads from inputs, does some text transformation and try to output to a HDFS text file by using saveAsTextFiles in DSteam object. But the method saves nothing (not even an empty file), the jobs successfully run, i.e. no error/warning. When I replace the save-to-file part with "print()", it prints out contents on the screen. Also tried "saveAsTextFiles" in SC RDD, works. Not sure why, did anyone get "saveAsTextFiles" working with DStream? Here is the line of code I use for output: actions.saveAsTextFiles("hdfs://nn1:8020/user/ds/actions/test", "test") I'm using Spark 0.9.0, hadoop2.0.0-cdh4.5.0. thanks Robin ----- -- Robin Li -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DStream-saveAsTextFiles-saves-nothing-tp1666.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
