Hey, Thanks for the reply and sorry for the late response!
I haven't been able to figure out the root cause, but I have been able to get things working if both the cluster and the remote submitter use S3A instead of EMRFS for all s3:// interactions, so I'm going with that, for now. My impression from reading your various other replies on S3A is that it's also best to use mapreduce.fileoutputcommitter.algorithm.version=2 (which might someday be the default <https://issues.apache.org/jira/browse/MAPREDUCE-6336>) and, presumably if your data fits well in memory, use fs.s3a.fast.upload=true. Is that right? On Tue, Aug 30, 2016 at 11:49 AM, Steve Loughran <ste...@hortonworks.com> wrote: > > On 29 Aug 2016, at 18:18, Everett Anderson <ever...@nuna.com.INVALID > <ever...@nuna.com.invalid>> wrote: > > Okay, I don't think it's really just S3A issue, anymore. I can run the job > using fs.s3.impl/spark.hadoop.fs.s3.impl set to the S3A impl as a --conf > param from the EMR console successfully, as well. > > The problem seems related to the fact that we're trying to spark-submit > jobs to a YARN cluster from outside the cluster itself. > > The docs <https://spark.apache.org/docs/1.6.2/running-on-yarn.html> > suggest one must copy the Hadoop/YARN config XML outside of the cluster to > do this, which feels gross, but it's what we did. We had changed fs.s3.impl > to use S3A in that config, and that seems to result in the failure, though > I still can't figure out why. > > Interestingly, if I don't make that change to the XML, and leave it as the > EMRFS implementation, it will work, as long as I use s3a:// URIs for the > jar, otherwise spark-submit won't be able to ship them to the cluster since > it won't have the EMRFS implementation locally. > > > I see: you are trying to use EMR's "special" S3 in-cluster, but > spark-submit is trying to submit remotely. > > 1. Trying to change the value of fs.s3.impl to S3a works for upload, but > not runtime > 2. use s3a for the upload, leave things alone and all works. > > I would just go with S3a, this is just the JARs being discussed here right > —not the actual data? > > When the JARs are needed, they'll be copied on EMR using the amazon S3A > implementation —whatever they've done there— to the local filesystem, where > classloaders can pick them up and use. It might be that s3a:// URLs are > slower on EMR than s3:// URLs, but there's no fundamental reason wny it > isn't going to work. > > > > > > On Sun, Aug 28, 2016 at 4:19 PM, Everett Anderson <ever...@nuna.com> > wrote: > >> (Sorry, typo -- I was using spark.hadoop.mapreduce.f >> ileoutputcommitter.algorithm.version=2 not 'hadooop', of course) >> >> On Sun, Aug 28, 2016 at 12:51 PM, Everett Anderson <ever...@nuna.com> >> wrote: >> >>> Hi, >>> >>> I'm having some trouble figuring out a failure when using S3A when >>> writing a DataFrame as Parquet on EMR 4.7.2 (which is Hadoop 2.7.2 and >>> Spark 1.6.2). It works when using EMRFS (s3://), though. >>> >>> I'm using these extra conf params, though I've also tried without >>> everything but the encryption one with the same result: >>> >>> --conf spark.hadooop.mapreduce.fileoutputcommitter.algorithm.version=2 >>> --conf spark.hadoop.mapreduce.fileoutputcommitter.cleanup.skipped=true >>> --conf spark.hadoop.fs.s3a.server-side-encryption-algorithm=AES256 >>> >> --conf spark.sql.parquet.output.committer.class=org.apache.spark.sq >>> l.parquet.DirectParquetOutputCommitter >>> >>> It looks like it does actually write the parquet shards under >>> >>> <output root S3>/_temporary/0/_temporary/<attempt>/ >>> >>> but then must hit that S3 exception when trying to copy/rename. I think >>> the NullPointerException deep down in Parquet is due to it causing close() >>> more than once so isn't the root cause, but I'm not sure. >>> >> > given the stack trace has abortTask() in it, I'd suspect that's a > follow-on failure. > > > > One possibility here may be related to how EMR will handle your > credentials (session credentials served up over IAM HTTP) and how Apache > Hadoop 2.7's s3a auth works (IAM isn't supported until 2.8). That could > trigger the problem. But I don't know. > > I do know that I have dataframes writing back to s3a on Hadoop 2.7.3, *not > on EMR*. > > > >>> Anyone seen something like this? >>> >>> 16/08/28 19:46:28 ERROR InsertIntoHadoopFsRelation: Aborting job. >>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 >>> in stage 1.0 failed 4 times, most recent failure: Lost task 9.3 in stage >>> 1.0 (TID 54, >>> ip-10-8-38-103.us-west-2.computk.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266) >>> ... 8 more >>> Suppressed: java.lang.NullPointerException >>> at >>> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:147) >>> at >>> org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:113) >>> at >>> org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112) >>> at >>> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetRelation.scala:101) >>> at >>> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org >>> <http://org.apache.spark.sql.execution.datasources.defaultwritercontainer.org/>$apache$spark$sql$execution$datasources$DefaultWriterContainer$$abortTask$1(WriterContainer.scala:290) >>> at >>> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$2.apply$mcV$sp(WriterContainer.scala:266) >>> at >>> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1286) >>> ... 9 more >>> >>> > probable bug in parquet cleanup if it never started right...you may want > to report that to them. > > > Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied > (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: > EA0E434768316935), S3 Extended Request ID: > fHtu7Q9VSi/8h0RAyfRiyK6uAJnajZBrwqZH3eBfF5kM13H6dDl006031NTwU/whyGu1uNqW1mI= >>> at >>> com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1389) >>> at >>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:902) >>> at >>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607) >>> at >>> com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376) >>> at >>> com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338) >>> at >>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287) >>> at >>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826) >>> at >>> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1405) >>> at >>> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131) >>> at >>> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123) >>> at >>> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139) >>> at >>> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:262) >>> ... 3 more >>> >>> >>> > > The ASF implementation of S3A only uses the transfer manager for async > transfers when using the fast output stream ( "fs.s3a.fast.upload" = true), > or on a rename. Sounds a bit like a rename in the codepath —except I don't > see why the 403 would wait until the rename(). I'd have expected it to > start on the attempt to create the temporary directory > > Overall, it's not a problem I've seen before. I think trying to mix things > is the root cause > > -Steve > > > > >