On 14 Feb 2017, at 11:12, Mendelson, Assaf <assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:
I know how to get the filesystem, the problem is that this means using Hadoop directly so if in the future we change to something else (e.g. S3) I would need to rewrite the code. well, no, because the s3 and hfs clients use the same API FileSystem fs = FileSystem.get("hdfs://nn:8020/users/stevel", conf) vs FileSystem fs = FileSystem.get("s3a:/bucket1/dataset", conf) same for wasb:// (which, being consistent and with fast atomic rename, can be used instead of HDFS), other cluster filesystems. If it's a native fs, then file:// should work everywhere, or some derivative (as redhat do with gluster) This also relate to finding the last iteration, I would need to use Hadoop filesystem which is not agnostic to the deployment. see above. if you are using a spark cluster of size > 1 you will need some distributed filesystem, which is going to have to provide a If there is an issue here, it is that if you rely on FileSystem.rename() being an atomic O(1) operation then you are going to be disappointed on S3, as its a non-atomic O(data) copy & delete whose failure state is "undefined". The solution here comes from having specific commiter logic for the different object stores. You really, really don' t want to go there. If you do, have a start by looking at the S3guard WiP one: https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer.md further reading: http://www.slideshare.net/steve_l/spark-summit-east-2017-apache-spark-and-object-stores Kyroserializer still costs much more than dataframe write. As for the use case, I am doing a very large number of iterations. So the idea is that every X iterations I want to save to disk so that if something crashes I do not have to begin from the first iteration but just from the relevant iteration. sounds like you don't really want the output to always be the FS, more checkpointing iterations. Couldn't you do something like every 20 iterations, write() the relevant RDD to the DFS Basically I would have liked to see something like saving normally and the original data would not be removed until a successful write. Assaf. From: Jörn Franke [mailto:jornfra...@gmail.com] Sent: Tuesday, February 14, 2017 12:54 PM To: Mendelson, Assaf Cc: user Subject: Re: fault tolerant dataframe write with overwrite Normally you can fetch the filesystem interface from the configuration ( I assume you mean URI). Managing to get the last iteration: I do not understand the issue. You can have as the directory the current timestamp and at the end you simply select the directory with the highest number. Regards to checkpointing , you can use also kyroserializer to avoid some space overhead. Aside from that, can you elaborate on the use case why you need to write every iteration? On 14 Feb 2017, at 11:22, Mendelson, Assaf <assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote: Hi, I have a case where I have an iterative process which overwrites the results of a previous iteration. Every iteration I need to write a dataframe with the results. The problem is that when I write, if I simply overwrite the results of the previous iteration, this is not fault tolerant. i.e. if the program crashes in the middle of an iteration, the data from previous ones is lost as overwrite first removes the previous data and then starts writing. Currently we simply write to a new directory and then rename but this is not the best way as it requires us to know the interfaces to the underlying file system (as well as requiring some extra work to manage which is the last one etc.) I know I can also use checkpoint (although I haven’t fully tested the process there), however, checkpointing converts the result to RDD which both takes more time and more space. I was wondering if there is any efficient method of managing this from inside spark. Thanks, Assaf.