so basically writing them into a temporary directory named with the
batch time and then move the files to their destination on success ? I
wished there was a way to skip moving files around and be able to set
the output filenames.

Thanks Burak :)

-Michael


On Mon, Nov 23, 2015, at 09:19 PM, Burak Yavuz wrote:
> Not sure if it would be the most efficient, but maybe you can think of
> the filesystem as a key value store, and write each batch to a sub-
> directory, where the directory name is the batch time. If the
> directory already exists, then you shouldn't write it. Then you may
> have a following batch job that will coalesce files, in order to
> "close the day".
>
> Burak
>
> On Mon, Nov 23, 2015 at 8:58 PM, Michael <mfr...@fastest.cc> wrote:
>> Hi all,
>>
>>
I'm working on project with spark streaming, the goal is to process log
>>
files from S3 and save them on hadoop to later analyze them with
>>
sparkSQL.
>>
Everything works well except when I kill the spark application and
>>
restart it: it picks up from the latest processed batch and reprocesses
>>
it which results in duplicate data on hdfs.
>>
>>
How can I make the writing step on hdfs idempotent ? I couldn't find any
>>
way to control for example the filenames of the parquet files being
>>
written, the idea being to include the batch time so that the same batch
>>
gets written always on the same path.
>>
I've also tried with mode("overwrite") but looks that each batch gets
>>
written on the same file every time.
>>
Any help would be greatly appreciated.
>>
>>
Thanks,
>>
Michael
>>
>>
--
>>  
>> 
def save_rdd(batch_time, rdd):
>> 
        sqlContext = SQLContext(rdd.context)
>> 
        df = sqlContext.createDataFrame(rdd, log_schema)
>> 
        
df.write.mode("append").partitionBy("log_date").parquet(hdfs_dest_directory)
>>  
>> 
def create_ssc(checkpoint_dir, spark_master):
>>  
>> 
    sc = SparkContext(spark_master, app_name)
>> 
    ssc = StreamingContext(sc, batch_interval)
>> 
    ssc.checkpoint(checkpoint_dir)
>>  
>> 
    parsed = dstream.map(lambda line: log_parser(line))
>> 
    parsed.foreachRDD(lambda batch_time, rdd: save_rdd(batch_time, rdd)
>>  
>> 
    return ssc
>>  
>> 
ssc = StreamingContext.getOrCreate(checkpoint_dir, lambda:
>> 
create_ssc(checkpoint_dir, spark_master)
>> 
ssc.start()
>> 
ssc.awaitTermination()
>>  
>> 
---------------------------------------------------------------------
>> 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> 
For additional commands, e-mail: user-h...@spark.apache.org
>>  
 

Reply via email to