On 29 Aug 2016, at 18:18, Everett Anderson <ever...@nuna.com.INVALID<mailto:ever...@nuna.com.invalid>> wrote:
Okay, I don't think it's really just S3A issue, anymore. I can run the job using fs.s3.impl/spark.hadoop.fs.s3.impl set to the S3A impl as a --conf param from the EMR console successfully, as well. The problem seems related to the fact that we're trying to spark-submit jobs to a YARN cluster from outside the cluster itself. The docs<https://spark.apache.org/docs/1.6.2/running-on-yarn.html> suggest one must copy the Hadoop/YARN config XML outside of the cluster to do this, which feels gross, but it's what we did. We had changed fs.s3.impl to use S3A in that config, and that seems to result in the failure, though I still can't figure out why. Interestingly, if I don't make that change to the XML, and leave it as the EMRFS implementation, it will work, as long as I use s3a:// URIs for the jar, otherwise spark-submit won't be able to ship them to the cluster since it won't have the EMRFS implementation locally. I see: you are trying to use EMR's "special" S3 in-cluster, but spark-submit is trying to submit remotely. 1. Trying to change the value of fs.s3.impl to S3a works for upload, but not runtime 2. use s3a for the upload, leave things alone and all works. I would just go with S3a, this is just the JARs being discussed here right —not the actual data? When the JARs are needed, they'll be copied on EMR using the amazon S3A implementation —whatever they've done there— to the local filesystem, where classloaders can pick them up and use. It might be that s3a:// URLs are slower on EMR than s3:// URLs, but there's no fundamental reason wny it isn't going to work. On Sun, Aug 28, 2016 at 4:19 PM, Everett Anderson <ever...@nuna.com<mailto:ever...@nuna.com>> wrote: (Sorry, typo -- I was using spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 not 'hadooop', of course) On Sun, Aug 28, 2016 at 12:51 PM, Everett Anderson <ever...@nuna.com<mailto:ever...@nuna.com>> wrote: Hi, I'm having some trouble figuring out a failure when using S3A when writing a DataFrame as Parquet on EMR 4.7.2 (which is Hadoop 2.7.2 and Spark 1.6.2). It works when using EMRFS (s3://), though. I'm using these extra conf params, though I've also tried without everything but the encryption one with the same result: --conf spark.hadooop.mapreduce.fileoutputcommitter.algorithm.version=2 --conf spark.hadoop.mapreduce.fileoutputcommitter.cleanup.skipped=true --conf spark.hadoop.fs.s3a.server-side-encryption-algorithm=AES256 --conf spark.sql.parquet.output.committer.class=org.apache.spark.sql.parquet.DirectParquetOutputCommitter It looks like it does actually write the parquet shards under <output root S3>/_temporary/0/_temporary/<attempt>/ but then must hit that S3 exception when trying to copy/rename. I think the NullPointerException deep down in Parquet is due to it causing close() more than once so isn't the root cause, but I'm not sure. given the stack trace has abortTask() in it, I'd suspect that's a follow-on failure. One possibility here may be related to how EMR will handle your credentials (session credentials served up over IAM HTTP) and how Apache Hadoop 2.7's s3a auth works (IAM isn't supported until 2.8). That could trigger the problem. But I don't know. I do know that I have dataframes writing back to s3a on Hadoop 2.7.3, *not on EMR*. Anyone seen something like this? 16/08/28 19:46:28 ERROR InsertIntoHadoopFsRelation: Aborting job. org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 1.0 failed 4 times, most recent failure: Lost task 9.3 in stage 1.0 (TID 54, ip-10-8-38-103.us<http://ip-10-8-38-103.us>-west-2.computk.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266) ... 8 more Suppressed: java.lang.NullPointerException at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:147) at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:113) at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetRelation.scala:101) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org<http://org.apache.spark.sql.execution.datasources.defaultwritercontainer.org/>$apache$spark$sql$execution$datasources$DefaultWriterContainer$$abortTask$1(WriterContainer.scala:290) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$2.apply$mcV$sp(WriterContainer.scala:266) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1286) ... 9 more probable bug in parquet cleanup if it never started right...you may want to report that to them. Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: EA0E434768316935), S3 Extended Request ID: fHtu7Q9VSi/8h0RAyfRiyK6uAJnajZBrwqZH3eBfF5kM13H6dDl006031NTwU/whyGu1uNqW1mI= at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1389) at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:902) at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607) at com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376) at com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826) at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1405) at com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131) at com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123) at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139) at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47) at java.util.concurrent.FutureTask.run(FutureTask.java:262) ... 3 more The ASF implementation of S3A only uses the transfer manager for async transfers when using the fast output stream ( "fs.s3a.fast.upload" = true), or on a rename. Sounds a bit like a rename in the codepath —except I don't see why the 403 would wait until the rename(). I'd have expected it to start on the attempt to create the temporary directory Overall, it's not a problem I've seen before. I think trying to mix things is the root cause -Steve