On 12 Sep 2016, at 19:58, Srikanth <srikanth...@gmail.com<mailto:srikanth...@gmail.com>> wrote:
Thanks Steve! We are already using HDFS as an intermediate store. This is for the last stage of processing which has to put data in S3. The output is partitioned by 3 fields, like .../field1=111/field2=999/date=YYYY-MM-DD/* Given that there are 100s for folders and 1000s of subfolder and part files, rename from _temporary is just not practical in S3. I guess we have to add another stage with S3Distcp?? Afraid so Srikanth On Sun, Sep 11, 2016 at 2:34 PM, Steve Loughran <ste...@hortonworks.com<mailto:ste...@hortonworks.com>> wrote: > On 9 Sep 2016, at 21:54, Srikanth > <srikanth...@gmail.com<mailto:srikanth...@gmail.com>> wrote: > > Hello, > > I'm trying to use DirectOutputCommitter for s3a in Spark 2.0. I've tried a > few configs and none of them seem to work. > Output always creates _temporary directory. Rename is killing performance. > I read some notes about DirectOutputcommitter causing problems with > speculation turned on. Was this option removed entirely? Spark turns off any committer with the word "direct' in its name if speculation==true . Concurrency, see. even on on-speculative execution, the trouble with the direct options is that executor/job failures can leave incomplete/inconsistent work around —and the things downstream wouldn't even notice There's work underway to address things, work which requires a consistent metadata store alongside S3 ( HADOOP-13345 : S3Guard). For now: stay with the file output committer hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 hadoop.mapreduce.fileoutputcommitter.cleanup.skipped=true Even better: use HDFS as the intermediate store for work, only do a bulk upload at the end. > > val spark = SparkSession.builder() > .appName("MergeEntities") > .config("spark.sql.warehouse.dir", > mergeConfig.getString("sparkSqlWarehouseDir")) > .config("fs.s3a.buffer.dir", "/tmp") > .config("spark.hadoop.mapred.output.committer.class", > classOf[DirectOutputCommitter].getCanonicalName) > .config("mapred.output.committer.class", > classOf[DirectOutputCommitter].getCanonicalName) > .config("mapreduce.use.directfileoutputcommitter", "true") > //.config("spark.sql.sources.outputCommitterClass", > classOf[DirectOutputCommitter].getCanonicalName) > .getOrCreate() > > Srikanth