Hey,

Thanks for the reply and sorry for the late response!

I haven't been able to figure out the root cause, but I have been able to
get things working if both the cluster and the remote submitter use S3A
instead of EMRFS for all s3:// interactions, so I'm going with that, for
now.

My impression from reading your various other replies on S3A is that it's
also best to use mapreduce.fileoutputcommitter.algorithm.version=2 (which might
someday be the default
<https://issues.apache.org/jira/browse/MAPREDUCE-6336>) and, presumably if
your data fits well in memory, use fs.s3a.fast.upload=true. Is that right?



On Tue, Aug 30, 2016 at 11:49 AM, Steve Loughran <ste...@hortonworks.com>
wrote:

>
> On 29 Aug 2016, at 18:18, Everett Anderson <ever...@nuna.com.INVALID
> <ever...@nuna.com.invalid>> wrote:
>
> Okay, I don't think it's really just S3A issue, anymore. I can run the job
> using fs.s3.impl/spark.hadoop.fs.s3.impl set to the S3A impl as a --conf
> param from the EMR console successfully, as well.
>
> The problem seems related to the fact that we're trying to spark-submit
> jobs to a YARN cluster from outside the cluster itself.
>
> The docs <https://spark.apache.org/docs/1.6.2/running-on-yarn.html>
> suggest one must copy the Hadoop/YARN config XML outside of the cluster to
> do this, which feels gross, but it's what we did. We had changed fs.s3.impl
> to use S3A in that config, and that seems to result in the failure, though
> I still can't figure out why.
>
> Interestingly, if I don't make that change to the XML, and leave it as the
> EMRFS implementation, it will work, as long as I use s3a:// URIs for the
> jar, otherwise spark-submit won't be able to ship them to the cluster since
> it won't have the EMRFS implementation locally.
>
>
> I see: you are trying to use EMR's "special" S3 in-cluster, but
> spark-submit is trying to submit remotely.
>
> 1.  Trying to change the value of fs.s3.impl to S3a works for upload, but
> not runtime
> 2. use s3a for the upload, leave things alone and all works.
>
> I would just go with S3a, this is just the JARs being discussed here right
> —not the actual data?
>
> When the JARs are needed, they'll be copied on EMR using the amazon S3A
> implementation —whatever they've done there— to the local filesystem, where
> classloaders can pick them up and use. It might be that s3a:// URLs are
> slower on EMR than s3:// URLs, but there's no fundamental reason wny it
> isn't going to work.
>
>
>
>
>
> On Sun, Aug 28, 2016 at 4:19 PM, Everett Anderson <ever...@nuna.com>
> wrote:
>
>> (Sorry, typo -- I was using spark.hadoop.mapreduce.f
>> ileoutputcommitter.algorithm.version=2 not 'hadooop', of course)
>>
>> On Sun, Aug 28, 2016 at 12:51 PM, Everett Anderson <ever...@nuna.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I'm having some trouble figuring out a failure when using S3A when
>>> writing a DataFrame as Parquet on EMR 4.7.2 (which is Hadoop 2.7.2 and
>>> Spark 1.6.2). It works when using EMRFS (s3://), though.
>>>
>>> I'm using these extra conf params, though I've also tried without
>>> everything but the encryption one with the same result:
>>>
>>> --conf spark.hadooop.mapreduce.fileoutputcommitter.algorithm.version=2
>>> --conf spark.hadoop.mapreduce.fileoutputcommitter.cleanup.skipped=true
>>> --conf spark.hadoop.fs.s3a.server-side-encryption-algorithm=AES256
>>>
>> --conf spark.sql.parquet.output.committer.class=org.apache.spark.sq
>>> l.parquet.DirectParquetOutputCommitter
>>>
>>> It looks like it does actually write the parquet shards under
>>>
>>> <output root S3>/_temporary/0/_temporary/<attempt>/
>>>
>>> but then must hit that S3 exception when trying to copy/rename. I think
>>> the NullPointerException deep down in Parquet is due to it causing close()
>>> more than once so isn't the root cause, but I'm not sure.
>>>
>>
> given the stack trace has abortTask() in it, I'd suspect that's a
> follow-on failure.
>
>
>
> One possibility here may be related to how EMR will handle your
> credentials (session credentials served up over IAM HTTP) and how Apache
> Hadoop 2.7's s3a auth works (IAM isn't supported until 2.8). That could
> trigger the problem. But I don't know.
>
> I do know that I have dataframes writing back to s3a on Hadoop 2.7.3, *not
> on EMR*.
>
>
>
>>> Anyone seen something like this?
>>>
>>> 16/08/28 19:46:28 ERROR InsertIntoHadoopFsRelation: Aborting job.
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 
>>> in stage 1.0 failed 4 times, most recent failure: Lost task 9.3 in stage 
>>> 1.0 (TID 54, 
>>> ip-10-8-38-103.us-west-2.computk.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266)
>>>     ... 8 more
>>>     Suppressed: java.lang.NullPointerException
>>>             at 
>>> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:147)
>>>             at 
>>> org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:113)
>>>             at 
>>> org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
>>>             at 
>>> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetRelation.scala:101)
>>>             at 
>>> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org 
>>> <http://org.apache.spark.sql.execution.datasources.defaultwritercontainer.org/>$apache$spark$sql$execution$datasources$DefaultWriterContainer$$abortTask$1(WriterContainer.scala:290)
>>>             at 
>>> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$2.apply$mcV$sp(WriterContainer.scala:266)
>>>             at 
>>> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1286)
>>>             ... 9 more
>>>
>>>
> probable bug in parquet cleanup if it never started right...you may want
> to report that to them.
>
>
> Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied 
> (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 
> EA0E434768316935), S3 Extended Request ID: 
> fHtu7Q9VSi/8h0RAyfRiyK6uAJnajZBrwqZH3eBfF5kM13H6dDl006031NTwU/whyGu1uNqW1mI=
>>>     at 
>>> com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1389)
>>>     at 
>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:902)
>>>     at 
>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
>>>     at 
>>> com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
>>>     at 
>>> com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
>>>     at 
>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
>>>     at 
>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
>>>     at 
>>> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1405)
>>>     at 
>>> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131)
>>>     at 
>>> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123)
>>>     at 
>>> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139)
>>>     at 
>>> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47)
>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>>     ... 3 more
>>>
>>>
>>>
>
> The ASF implementation of S3A only uses the transfer manager for async
> transfers when using the fast output stream ( "fs.s3a.fast.upload" = true),
> or on a rename. Sounds a bit like a rename in the codepath —except I don't
> see why the 403 would wait until the rename(). I'd have expected it to
> start on the attempt to create the temporary directory
>
> Overall, it's not a problem I've seen before. I think trying to mix things
> is the root cause
>
> -Steve
>
>
>
>
>

Reply via email to