Hi,
I am facing a similar issue.

I am trying a Spark Streaming job with a Text File Stream on HDFS with Spark 
0.9.0 from cloudera. 
I am saving the RDD (100 seconds is streaming frequency) to HDFS in a different 
directory. Every 100 seconds, it is creating a new directory in HDFS with 
_Success(stream-Random/_Success). But, it is not adding any data/output to it. 
I verified that I am adding new files to the correct HDFS directory. Although, 
at specified interval, it does create a new folder in HDFS with _Success.
So, the major issue is that it is not able to recognize new files created in 
HDFS.

Code used :
val ssc = new StreamingContext(ClusterConfig.sparkMaster, "Hybrid", 
Duration(100000), ClusterConfig.sparkHome, ClusterConfig.jars)
   
 val data = ssc.textFileStream(ClusterConfig.hdfsNN + "correct/path/to/data")  
data.foreachRDD(rdd => rdd.saveAsObjectFile(ClusterConfig.hdfsNN + 
"/user/<path/to/file/stream>" + Random.nextInt))  
ssc.start


It is creating these directories with only _Success : 
stream562343230
stream1228731977
stream318151149
stream603511115


This is the log that I get :
14/02/17 14:08:20 INFO FileInputDStream: Finding new files took 549 ms
14/02/17 14:08:20 INFO FileInputDStream: New files at time 1392626300000 ms:

14/02/17 14:08:20 INFO JobScheduler: Added jobs for time 1392626300000 ms
14/02/17 14:08:20 INFO JobScheduler: Starting job streaming job 1392626300000 
ms.0 from job set of time 1392626300000 ms
14/02/17 14:08:20 INFO SequenceFileRDDFunctions: Saving as sequence file of 
type (NullWritable,BytesWritable)
14/02/17 14:08:20 WARN Configuration: mapred.job.id is deprecated. Instead, use 
mapreduce.job.id
14/02/17 14:08:20 WARN Configuration: mapred.tip.id is deprecated. Instead, use 
mapreduce.task.id
14/02/17 14:08:20 WARN Configuration: mapred.task.id is deprecated. Instead, 
use mapreduce.task.attempt.id
14/02/17 14:08:20 WARN Configuration: mapred.task.is.map is deprecated. 
Instead, use mapreduce.task.ismap
14/02/17 14:08:20 WARN Configuration: mapred.task.partition is deprecated. 
Instead, use mapreduce.task.partition
14/02/17 14:08:20 INFO SparkContext: Starting job: saveAsObjectFile at 
TestStreaming.scala:29
14/02/17 14:08:20 INFO SparkContext: Job finished: saveAsObjectFile at 
TestStreaming.scala:29, took 0.001934866 s
14/02/17 14:08:20 INFO JobScheduler: Finished job streaming job 1392626300000 
ms.0 from job set of time 1392626300000 ms
14/02/17 14:08:20 INFO JobScheduler: Total delay: 0.741 s for time 
1392626300000 ms (execution: 0.167 s)
14/02/17 14:08:20 INFO FileInputDStream: Cleared 0 old files that were older 
than 1392626200000 ms: 
14/02/17 14:10:00 INFO FileInputDStream: Finding new files took 6 ms
14/02/17 14:10:00 INFO FileInputDStream: New files at time 1392626400000 ms:

14/02/17 14:10:00 INFO JobScheduler: Added jobs for time 1392626400000 ms
14/02/17 14:10:00 INFO JobScheduler: Starting job streaming job 1392626400000 
ms.0 from job set of time 1392626400000 ms
14/02/17 14:10:00 INFO SequenceFileRDDFunctions: Saving as sequence file of 
type (NullWritable,BytesWritable)
14/02/17 14:10:00 INFO SparkContext: Starting job: saveAsObjectFile at 
TestStreaming.scala:29
14/02/17 14:10:00 INFO SparkContext: Job finished: saveAsObjectFile at 
TestStreaming.scala:29, took 1.9016E-5 s
14/02/17 14:10:00 INFO JobScheduler: Finished job streaming job 1392626400000 
ms.0 from job set of time 1392626400000 ms
14/02/17 14:10:00 INFO JobScheduler: Total delay: 0.085 s for time 
1392626400000 ms (execution: 0.077 s)
14/02/17 14:10:00 INFO FileInputDStream: Cleared 0 old files that were older 
than 1392626300000 ms: 
14/02/17 14:11:40 INFO FileInputDStream: Finding new files took 5 ms
14/02/17 14:11:40 INFO FileInputDStream: New files at time 1392626500000 ms:

14/02/17 14:11:40 INFO JobScheduler: Added jobs for time 1392626500000 ms
14/02/17 14:11:40 INFO JobScheduler: Starting job streaming job 1392626500000 
ms.0 from job set of time 1392626500000 ms
14/02/17 14:11:40 INFO SequenceFileRDDFunctions: Saving as sequence file of 
type (NullWritable,BytesWritable)
14/02/17 14:11:40 INFO SparkContext: Starting job: saveAsObjectFile at 
TestStreaming.scala:29
14/02/17 14:11:40 INFO SparkContext: Job finished: saveAsObjectFile at 
TestStreaming.scala:29, took 1.8111E-5 s
14/02/17 14:11:40 INFO JobScheduler: Finished job streaming job 1392626500000 
ms.0 from job set of time 1392626500000 ms
14/02/17 14:11:40 INFO FileInputDStream: Cleared 1 old files that were older 
than 1392626400000 ms: 1392626300000 ms
14/02/17 14:11:40 INFO JobScheduler: Total delay: 0.110 s for time 
1392626500000 ms (execution: 0.102 s)


Thanks and Regards,
Suraj Sheth

-----Original Message-----
From: robin_up [mailto:[email protected]] 
Sent: 18 February 2014 12:55
To: [email protected]
Subject: DStream.saveAsTextFiles() saves nothing

Hi,

I have a Streaming app which reads from inputs, does some text transformation 
and try to output to a HDFS text file by using saveAsTextFiles in DSteam object.

But the method saves nothing (not even an empty file), the jobs successfully 
run, i.e. no error/warning. When I replace the save-to-file part with 
"print()", it prints out contents on the screen. Also tried "saveAsTextFiles" 
in SC RDD, works.

Not sure why, did anyone get "saveAsTextFiles" working with DStream?

Here is the line of code I use for output:
actions.saveAsTextFiles("hdfs://nn1:8020/user/ds/actions/test", "test")

I'm using Spark 0.9.0, hadoop2.0.0-cdh4.5.0.

thanks
Robin



-----
-- Robin Li
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DStream-saveAsTextFiles-saves-nothing-tp1666.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to