Hi all,

I'm working on project with spark streaming, the goal is to process log
files from S3 and save them on hadoop to later analyze them with
sparkSQL. 
Everything works well except when I kill the spark application and
restart it: it picks up from the latest processed batch and reprocesses
it which results in duplicate data on hdfs.

How can I make the writing step on hdfs idempotent ? I couldn't find any
way to control for example the filenames of the parquet files being
written, the idea being to include the batch time so that the same batch
gets written always on the same path.
I've also tried with mode("overwrite") but looks that each batch gets
written on the same file every time.
Any help would be greatly appreciated.

Thanks,
Michael

--

def save_rdd(batch_time, rdd):
        sqlContext = SQLContext(rdd.context)
        df = sqlContext.createDataFrame(rdd, log_schema)
        
df.write.mode("append").partitionBy("log_date").parquet(hdfs_dest_directory)

def create_ssc(checkpoint_dir, spark_master):

    sc = SparkContext(spark_master, app_name)
    ssc = StreamingContext(sc, batch_interval)
    ssc.checkpoint(checkpoint_dir)   

    parsed = dstream.map(lambda line: log_parser(line))
    parsed.foreachRDD(lambda batch_time, rdd: save_rdd(batch_time, rdd)

    return ssc

ssc = StreamingContext.getOrCreate(checkpoint_dir, lambda:
create_ssc(checkpoint_dir, spark_master)
ssc.start()
ssc.awaitTermination()

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to