so basically writing them into a temporary directory named with the batch time and then move the files to their destination on success ? I wished there was a way to skip moving files around and be able to set the output filenames.
Thanks Burak :) -Michael On Mon, Nov 23, 2015, at 09:19 PM, Burak Yavuz wrote: > Not sure if it would be the most efficient, but maybe you can think of > the filesystem as a key value store, and write each batch to a sub- > directory, where the directory name is the batch time. If the > directory already exists, then you shouldn't write it. Then you may > have a following batch job that will coalesce files, in order to > "close the day". > > Burak > > On Mon, Nov 23, 2015 at 8:58 PM, Michael <mfr...@fastest.cc> wrote: >> Hi all, >> >> I'm working on project with spark streaming, the goal is to process log >> files from S3 and save them on hadoop to later analyze them with >> sparkSQL. >> Everything works well except when I kill the spark application and >> restart it: it picks up from the latest processed batch and reprocesses >> it which results in duplicate data on hdfs. >> >> How can I make the writing step on hdfs idempotent ? I couldn't find any >> way to control for example the filenames of the parquet files being >> written, the idea being to include the batch time so that the same batch >> gets written always on the same path. >> I've also tried with mode("overwrite") but looks that each batch gets >> written on the same file every time. >> Any help would be greatly appreciated. >> >> Thanks, >> Michael >> >> -- >> >> def save_rdd(batch_time, rdd): >> sqlContext = SQLContext(rdd.context) >> df = sqlContext.createDataFrame(rdd, log_schema) >> df.write.mode("append").partitionBy("log_date").parquet(hdfs_dest_directory) >> >> def create_ssc(checkpoint_dir, spark_master): >> >> sc = SparkContext(spark_master, app_name) >> ssc = StreamingContext(sc, batch_interval) >> ssc.checkpoint(checkpoint_dir) >> >> parsed = dstream.map(lambda line: log_parser(line)) >> parsed.foreachRDD(lambda batch_time, rdd: save_rdd(batch_time, rdd) >> >> return ssc >> >> ssc = StreamingContext.getOrCreate(checkpoint_dir, lambda: >> create_ssc(checkpoint_dir, spark_master) >> ssc.start() >> ssc.awaitTermination() >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >>