On 29 Aug 2016, at 18:18, Everett Anderson 
<ever...@nuna.com.INVALID<mailto:ever...@nuna.com.invalid>> wrote:

Okay, I don't think it's really just S3A issue, anymore. I can run the job 
using fs.s3.impl/spark.hadoop.fs.s3.impl set to the S3A impl as a --conf param 
from the EMR console successfully, as well.

The problem seems related to the fact that we're trying to spark-submit jobs to 
a YARN cluster from outside the cluster itself.

The docs<https://spark.apache.org/docs/1.6.2/running-on-yarn.html> suggest one 
must copy the Hadoop/YARN config XML outside of the cluster to do this, which 
feels gross, but it's what we did. We had changed fs.s3.impl to use S3A in that 
config, and that seems to result in the failure, though I still can't figure 
out why.

Interestingly, if I don't make that change to the XML, and leave it as the 
EMRFS implementation, it will work, as long as I use s3a:// URIs for the jar, 
otherwise spark-submit won't be able to ship them to the cluster since it won't 
have the EMRFS implementation locally.


I see: you are trying to use EMR's "special" S3 in-cluster, but spark-submit is 
trying to submit remotely.

1.  Trying to change the value of fs.s3.impl to S3a works for upload, but not 
runtime
2. use s3a for the upload, leave things alone and all works.

I would just go with S3a, this is just the JARs being discussed here right —not 
the actual data?

When the JARs are needed, they'll be copied on EMR using the amazon S3A 
implementation —whatever they've done there— to the local filesystem, where 
classloaders can pick them up and use. It might be that s3a:// URLs are slower 
on EMR than s3:// URLs, but there's no fundamental reason wny it isn't going to 
work.





On Sun, Aug 28, 2016 at 4:19 PM, Everett Anderson 
<ever...@nuna.com<mailto:ever...@nuna.com>> wrote:
(Sorry, typo -- I was using 
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 not 'hadooop', 
of course)

On Sun, Aug 28, 2016 at 12:51 PM, Everett Anderson 
<ever...@nuna.com<mailto:ever...@nuna.com>> wrote:
Hi,

I'm having some trouble figuring out a failure when using S3A when writing a 
DataFrame as Parquet on EMR 4.7.2 (which is Hadoop 2.7.2 and Spark 1.6.2). It 
works when using EMRFS (s3://), though.

I'm using these extra conf params, though I've also tried without everything 
but the encryption one with the same result:

--conf spark.hadooop.mapreduce.fileoutputcommitter.algorithm.version=2
--conf spark.hadoop.mapreduce.fileoutputcommitter.cleanup.skipped=true
--conf spark.hadoop.fs.s3a.server-side-encryption-algorithm=AES256
--conf 
spark.sql.parquet.output.committer.class=org.apache.spark.sql.parquet.DirectParquetOutputCommitter

It looks like it does actually write the parquet shards under

<output root S3>/_temporary/0/_temporary/<attempt>/

but then must hit that S3 exception when trying to copy/rename. I think the 
NullPointerException deep down in Parquet is due to it causing close() more 
than once so isn't the root cause, but I'm not sure.

given the stack trace has abortTask() in it, I'd suspect that's a follow-on 
failure.



One possibility here may be related to how EMR will handle your credentials 
(session credentials served up over IAM HTTP) and how Apache Hadoop 2.7's s3a 
auth works (IAM isn't supported until 2.8). That could trigger the problem. But 
I don't know.

I do know that I have dataframes writing back to s3a on Hadoop 2.7.3, *not on 
EMR*.



Anyone seen something like this?


16/08/28 19:46:28 ERROR InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in 
stage 1.0 failed 4 times, most recent failure: Lost task 9.3 in stage 1.0 (TID 
54, 
ip-10-8-38-103.us<http://ip-10-8-38-103.us>-west-2.computk.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266)
        ... 8 more
        Suppressed: java.lang.NullPointerException
                at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:147)
                at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:113)
                at 
org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
                at 
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetRelation.scala:101)
                at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org<http://org.apache.spark.sql.execution.datasources.defaultwritercontainer.org/>$apache$spark$sql$execution$datasources$DefaultWriterContainer$$abortTask$1(WriterContainer.scala:290)
                at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$2.apply$mcV$sp(WriterContainer.scala:266)
                at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1286)
                ... 9 more


probable bug in parquet cleanup if it never started right...you may want to 
report that to them.



Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied 
(Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 
EA0E434768316935), S3 Extended Request ID: 
fHtu7Q9VSi/8h0RAyfRiyK6uAJnajZBrwqZH3eBfF5kM13H6dDl006031NTwU/whyGu1uNqW1mI=
        at 
com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1389)
        at 
com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:902)
        at 
com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
        at 
com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
        at 
com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
        at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
        at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
        at 
com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1405)
        at 
com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131)
        at 
com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123)
        at 
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139)
        at 
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        ... 3 more



The ASF implementation of S3A only uses the transfer manager for async 
transfers when using the fast output stream ( "fs.s3a.fast.upload" = true), or 
on a rename. Sounds a bit like a rename in the codepath —except I don't see why 
the 403 would wait until the rename(). I'd have expected it to start on the 
attempt to create the temporary directory

Overall, it's not a problem I've seen before. I think trying to mix things is 
the root cause

-Steve




Reply via email to