Re: streaming: missing data. does saveAsTextFile() append or replace?
Thank Gerard I¹ll give that a try. It seems like this approach is going to create a very large number of files. I guess I could write a cron job to concatenate the files by hour or maybe days. I imagine this is a common problem. Do you know of something that does this already ? I am using the stand alone cluster manager. I do not think it directly supports cron job/table functionality. It should be easy to use the hdfs api and linux crontab or may https://quartz-scheduler.org/ Kind regards andy From: Gerard Maas <gerard.m...@gmail.com> Date: Sunday, November 8, 2015 at 2:13 AM To: Andrew Davidson <a...@santacruzintegration.com> Cc: "user @spark" <user@spark.apache.org> Subject: Re: streaming: missing data. does saveAsTextFile() append or replace? > Andy, > > Using the rdd.saveAsTextFile(...) will overwrite the data if your target is > the same file. > > If you want to save to HDFS, DStream offers dstream.saveAsTextFiles(prefix, > suffix) where a new file will be written at each streaming interval. > Note that this will result in a saved file for each streaming interval. If you > want to increase the file size (usually a good idea in HDFS), you can use a > window function over the dstream and save the 'windowed' dstream instead. > > kind regards, Gerard. > > On Sat, Nov 7, 2015 at 10:55 PM, Andy Davidson <a...@santacruzintegration.com> > wrote: >> Hi >> >> I just started a new spark streaming project. In this phase of the system all >> we want to do is save the data we received to hdfs. I after running for a >> couple of days it looks like I am missing a lot of data. I wonder if >> saveAsTextFile("hdfs:///rawSteamingData²); is overwriting the data I capture >> in previous window? I noticed that after running for a couple of days my >> hdfs file system has 25 file. The names are something like ³part-6². I >> used 'hadoop fs dus¹ to check the total data captured. While the system was >> running I would periodically call dus¹ I was surprised sometimes the numbers >> of total bytes actually dropped. >> >> >> Is there a better way to save write my data to disk? >> >> Any suggestions would be appreciated >> >> Andy >> >> >>public static void main(String[] args) { >> >> SparkConf conf = new SparkConf().setAppName(appName); >> >> JavaSparkContext jsc = new JavaSparkContext(conf); >> >> JavaStreamingContext ssc = new JavaStreamingContext(jsc, new >> Duration(5 * 1000)); >> >> >> >> [ deleted code ] >> >> >> >> data.foreachRDD(new Function<JavaRDD, Void>(){ >> >> private static final long serialVersionUID = >> -7957854392903581284L; >> >> >> >> @Override >> >> public Void call(JavaRDD jsonStr) throws Exception { >> >> jsonStr.saveAsTextFile("hdfs:///rawSteamingData²); // >> /rawSteamingData is a directory >> >> return null; >> >> } >> >> }); >> >> >> >> ssc.checkpoint(checkPointUri); >> >> >> >> ssc.start(); >> >> ssc.awaitTermination(); >> >> } >
Re: streaming: missing data. does saveAsTextFile() append or replace?
Andy, Using the rdd.saveAsTextFile(...) will overwrite the data if your target is the same file. If you want to save to HDFS, DStream offers dstream.saveAsTextFiles(prefix, suffix) where a new file will be written at each streaming interval. Note that this will result in a saved file for each streaming interval. If you want to increase the file size (usually a good idea in HDFS), you can use a window function over the dstream and save the 'windowed' dstream instead. kind regards, Gerard. On Sat, Nov 7, 2015 at 10:55 PM, Andy Davidson < a...@santacruzintegration.com> wrote: > Hi > > I just started a new spark streaming project. In this phase of the system > all we want to do is save the data we received to hdfs. I after running for > a couple of days it looks like I am missing a lot of data. I wonder if > saveAsTextFile("hdfs:///rawSteamingData”); is overwriting the data I > capture in previous window? I noticed that after running for a couple of > days my hdfs file system has 25 file. The names are something like > “part-6”. I > used 'hadoop fs –dus’ to check the total data captured. While the system > was running I would periodically call ‘dus’ I was surprised sometimes the > numbers of total bytes actually dropped. > > > Is there a better way to save write my data to disk? > > Any suggestions would be appreciated > > Andy > > >public static void main(String[] args) { > >SparkConf conf = new SparkConf().setAppName(appName); > > JavaSparkContext jsc = new JavaSparkContext(conf); > > JavaStreamingContext ssc = new JavaStreamingContext(jsc, new > Duration(5 * 1000)); > > > [ deleted code …] > > > data.foreachRDD(new Function(){ > > private static final long serialVersionUID = > -7957854392903581284L; > > > @Override > > public Void call(JavaRDD jsonStr) throws Exception { > > jsonStr.saveAsTextFile("hdfs:///rawSteamingData”); // > /rawSteamingData > is a directory > > return null; > > } > > }); > > > > ssc.checkpoint(checkPointUri); > > > > ssc.start(); > > ssc.awaitTermination(); > > } >
streaming: missing data. does saveAsTextFile() append or replace?
Hi I just started a new spark streaming project. In this phase of the system all we want to do is save the data we received to hdfs. I after running for a couple of days it looks like I am missing a lot of data. I wonder if saveAsTextFile("hdfs:///rawSteamingData²); is overwriting the data I capture in previous window? I noticed that after running for a couple of days my hdfs file system has 25 file. The names are something like ³part-6². I used 'hadoop fs dus¹ to check the total data captured. While the system was running I would periodically call dus¹ I was surprised sometimes the numbers of total bytes actually dropped. Is there a better way to save write my data to disk? Any suggestions would be appreciated Andy public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName(appName); JavaSparkContext jsc = new JavaSparkContext(conf); JavaStreamingContext ssc = new JavaStreamingContext(jsc, new Duration(5 * 1000)); [ deleted code ] data.foreachRDD(new Function(){ private static final long serialVersionUID = -7957854392903581284L; @Override public Void call(JavaRDD jsonStr) throws Exception { jsonStr.saveAsTextFile("hdfs:///rawSteamingData²); // /rawSteamingData is a directory return null; } }); ssc.checkpoint(checkPointUri); ssc.start(); ssc.awaitTermination(); }