Re: streaming: missing data. does saveAsTextFile() append or replace?

2015-11-09 Thread Andy Davidson
Thank Gerard

I¹ll give that a try. It seems like this approach is going to create a very
large number of files. I guess I could write a cron job to concatenate the
files by hour or maybe days. I imagine this is a common problem. Do you know
of something that does this already ?

I am using the stand alone cluster manager. I do not think it directly
supports cron job/table functionality. It should be easy to use the hdfs api
and linux crontab or may https://quartz-scheduler.org/

Kind regards

andy

From:  Gerard Maas <gerard.m...@gmail.com>
Date:  Sunday, November 8, 2015 at 2:13 AM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: streaming: missing data. does saveAsTextFile() append or
replace?

> Andy,
> 
> Using the rdd.saveAsTextFile(...)  will overwrite the data if your target is
> the same file.
> 
> If you want to save to HDFS, DStream offers dstream.saveAsTextFiles(prefix,
> suffix)  where a new file will be written at each streaming interval.
> Note that this will result in a saved file for each streaming interval. If you
> want to increase the file size (usually a good idea in HDFS), you can use a
> window function over the dstream and save the 'windowed'  dstream instead.
> 
> kind regards, Gerard.
> 
> On Sat, Nov 7, 2015 at 10:55 PM, Andy Davidson <a...@santacruzintegration.com>
> wrote:
>> Hi
>> 
>> I just started a new spark streaming project. In this phase of the system all
>> we want to do is save the data we received to hdfs. I after running for a
>> couple of days it looks like I am missing a lot of data. I wonder if
>> saveAsTextFile("hdfs:///rawSteamingData²); is overwriting the data I capture
>> in previous window? I noticed that after running for a couple of days  my
>> hdfs file system has 25 file. The names are something like ³part-6². I
>> used 'hadoop fs ­dus¹ to check the total data captured. While the system was
>> running I would periodically call Œdus¹ I was surprised sometimes the numbers
>> of total bytes actually dropped.
>> 
>> 
>> Is there a better way to save write my data to disk?
>> 
>> Any suggestions would be appreciated
>> 
>> Andy
>> 
>> 
>>public static void main(String[] args) {
>> 
>>   SparkConf conf = new SparkConf().setAppName(appName);
>> 
>> JavaSparkContext jsc = new JavaSparkContext(conf);
>> 
>> JavaStreamingContext ssc = new JavaStreamingContext(jsc, new
>> Duration(5 * 1000));
>> 
>> 
>> 
>> [ deleted code Š]
>> 
>> 
>> 
>> data.foreachRDD(new Function<JavaRDD, Void>(){
>> 
>> private static final long serialVersionUID =
>> -7957854392903581284L;
>> 
>> 
>> 
>> @Override
>> 
>> public Void call(JavaRDD jsonStr) throws Exception {
>> 
>> jsonStr.saveAsTextFile("hdfs:///rawSteamingData²); //
>> /rawSteamingData is a directory
>> 
>> return null;
>> 
>> }   
>> 
>> });
>> 
>> 
>> 
>> ssc.checkpoint(checkPointUri);
>> 
>> 
>> 
>> ssc.start();
>> 
>> ssc.awaitTermination();
>> 
>> }
> 




Re: streaming: missing data. does saveAsTextFile() append or replace?

2015-11-08 Thread Gerard Maas
Andy,

Using the rdd.saveAsTextFile(...)  will overwrite the data if your target
is the same file.

If you want to save to HDFS, DStream offers dstream.saveAsTextFiles(prefix,
suffix)  where a new file will be written at each streaming interval.
Note that this will result in a saved file for each streaming interval. If
you want to increase the file size (usually a good idea in HDFS), you can
use a window function over the dstream and save the 'windowed'  dstream
instead.

kind regards, Gerard.

On Sat, Nov 7, 2015 at 10:55 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> Hi
>
> I just started a new spark streaming project. In this phase of the system
> all we want to do is save the data we received to hdfs. I after running for
> a couple of days it looks like I am missing a lot of data. I wonder if
> saveAsTextFile("hdfs:///rawSteamingData”); is overwriting the data I
> capture in previous window? I noticed that after running for a couple of
> days  my hdfs file system has 25 file. The names are something like 
> “part-6”. I
> used 'hadoop fs –dus’ to check the total data captured. While the system
> was running I would periodically call ‘dus’ I was surprised sometimes the
> numbers of total bytes actually dropped.
>
>
> Is there a better way to save write my data to disk?
>
> Any suggestions would be appreciated
>
> Andy
>
>
>public static void main(String[] args) {
>
>SparkConf conf = new SparkConf().setAppName(appName);
>
> JavaSparkContext jsc = new JavaSparkContext(conf);
>
> JavaStreamingContext ssc = new JavaStreamingContext(jsc, new
> Duration(5 * 1000));
>
>
> [ deleted code …]
>
>
> data.foreachRDD(new Function(){
>
> private static final long serialVersionUID =
> -7957854392903581284L;
>
>
> @Override
>
> public Void call(JavaRDD jsonStr) throws Exception {
>
> jsonStr.saveAsTextFile("hdfs:///rawSteamingData”); // 
> /rawSteamingData
> is a directory
>
> return null;
>
> }
>
> });
>
>
>
> ssc.checkpoint(checkPointUri);
>
>
>
> ssc.start();
>
> ssc.awaitTermination();
>
> }
>


streaming: missing data. does saveAsTextFile() append or replace?

2015-11-07 Thread Andy Davidson
Hi

I just started a new spark streaming project. In this phase of the system
all we want to do is save the data we received to hdfs. I after running for
a couple of days it looks like I am missing a lot of data. I wonder if
saveAsTextFile("hdfs:///rawSteamingData²); is overwriting the data I capture
in previous window? I noticed that after running for a couple of days  my
hdfs file system has 25 file. The names are something like ³part-6². I
used 'hadoop fs ­dus¹ to check the total data captured. While the system was
running I would periodically call Œdus¹ I was surprised sometimes the
numbers of total bytes actually dropped.


Is there a better way to save write my data to disk?

Any suggestions would be appreciated

Andy


   public static void main(String[] args) {

  SparkConf conf = new SparkConf().setAppName(appName);

JavaSparkContext jsc = new JavaSparkContext(conf);

JavaStreamingContext ssc = new JavaStreamingContext(jsc, new
Duration(5 * 1000));



[ deleted code Š]



data.foreachRDD(new Function(){

private static final long serialVersionUID =
-7957854392903581284L;



@Override

public Void call(JavaRDD jsonStr) throws Exception {

jsonStr.saveAsTextFile("hdfs:///rawSteamingData²); //
/rawSteamingData is a directory

return null;

}  

});



ssc.checkpoint(checkPointUri);



ssc.start();

ssc.awaitTermination();

}