[jira] [Commented] (HADOOP-17372) S3A AWS Credential provider loading gets confused with isolated classloaders

2022-06-02 Thread Brandon (Jira)


[ 
https://issues.apache.org/jira/browse/HADOOP-17372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17545665#comment-17545665
 ] 

Brandon commented on HADOOP-17372:
--

Hello [~ste...@apache.org], thank you for your work on S3A.

In our Spark jobs, we use a custom AWS credentials provider class which is 
bundled into the Spark application jar. This worked on Hadoop 3.2.1, but 
unfortunately this class can't be found after upgrading to Hadoop 3.3.3. This 
surfaces as a ClassNotFoundException in S3AFileSystem's initialization:
{noformat}
java.io.IOException: From option fs.s3a.aws.credentials.provider 
java.lang.ClassNotFoundException: Class [custom AWS credentials provider class] 
not found
at org.apache.hadoop.fs.s3a.S3AUtils.loadAWSProviderClasses (S3AUtils.java:657)
org.apache.hadoop.fs.s3a.S3AUtils.buildAWSProviderList (S3AUtils.java:680)
org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet 
(S3AUtils.java:631)
org.apache.hadoop.fs.s3a.S3AFileSystem.bindAWSClient (S3AFileSystem.java:877)
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize (S3AFileSystem.java:534)
org.apache.hadoop.fs.FileSystem.createFileSystem 
(FileSystem.java:3469){noformat}
We were able to track this down to the change in this ticket. I believe what's 
happening here is:
 * The S3AFileSystem class is provided by a jar on disk. This jar is added to 
the java classpath via the normal java command-line option. So, the classloader 
of S3AFileSystem is a java application classloader.
 * The Spark application jar which contains our AWS credentials provider class 
is downloaded at runtime by Spark and then "patched into" the java classpath 
via Spark's mutable classloader.
 * Therefore, classes in the application jar are not visible to the classloader 
that loaded S3AFileSystem.

In the meantime, I think our most reasonable path forward is to pull the custom 
AWS credentials provider out of the application jar, install it in a jar on 
disk, and add it to the java command-line classpath like hadoop-aws itself. Not 
too bad, but certainly more complicated than the prior setup with Hadoop 3.2.1.

> S3A AWS Credential provider loading gets confused with isolated classloaders
> 
>
> Key: HADOOP-17372
> URL: https://issues.apache.org/jira/browse/HADOOP-17372
> Project: Hadoop Common
>  Issue Type: Sub-task
>  Components: fs/s3
>Affects Versions: 3.4.0
>Reporter: Steve Loughran
>Assignee: Steve Loughran
>Priority: Major
> Fix For: 3.3.1
>
>
> Problem: exception in loading S3A credentials for an FS, "Class class 
> com.amazonaws.auth.EnvironmentVariableCredentialsProvider does not implement 
> AWSCredentialsProvider"
> Location: S3A + Spark dataframes test
> Hypothesised cause:
> Configuration.getClasses() uses the context classloader, and with the spark 
> isolated CL that's different from the one the s3a FS uses, so it can't load 
> AWS credential providers.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[jira] [Comment Edited] (HADOOP-17935) Spark job stuck in S3A StagingCommitter::setupJob

2021-09-27 Thread Brandon (Jira)


[ 
https://issues.apache.org/jira/browse/HADOOP-17935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17421068#comment-17421068
 ] 

Brandon edited comment on HADOOP-17935 at 9/27/21, 10:15 PM:
-

> turns out each object in a bulk delete request counts as 1 write of your 
> quota of 3500/s, so a page of 1000 entries will be rejected if there's more 
> writes going on in same partition of a bucket. The AWS SDK throttle retry 
> handler blindly retries, and so does the S3A one.
 >
 > Are you writing files down a deep tree? And is it likely there's other 
 > writes going on to the same tree in the bucket?

Interesting. We do have lots of Spark jobs that run concurrently throughout the 
day and are all writing through S3A. Based on that, one theory is that other 
jobs are in the process of cleaning up their pending commits, by deleting a 
large "directory" and unleashing a thundering herd. This causes the large 
volume of delete requests and 503s. If at the same time as that thundering 
herd, another job tries to begin a write, it could get stuck as described in 
this ticket. If this theory is true, then an upgrade to Hadoop 3.3.1 should 
help.

About the depth of the tree: I'm using these configs to specify the storage of 
pending commits:

{noformat}
 "spark.hadoop.fs.defaultFS": "s3a://some-bucket"
 "spark.hadoop.fs.s3a.committer.staging.tmp.path": "/"
{noformat}

So, the location of the pending commits is at the root of that bucket, as 
shallow a tree as possible. The S3A directory committer always prepends a user 
name, which happens to be "www-data" in the Spark environment. So the working 
directory of the pending commits for a given write is:

{noformat}
 www-data/{Random UUID}/staging-uploads/...
{noformat}

Having that shared "www-data" prefix probably isn't helping with load balancing 
the bucket.


was (Author: brandonvin):
> turns out each object in a bulk delete request counts as 1 write of your 
> quota of 3500/s, so a page of 1000 entries will be rejected if there's more 
> writes going on in same partition of a bucket. The AWS SDK throttle retry 
> handler blindly retries, and so does the S3A one.
 >
 > Are you writing files down a deep tree? And is it likely there's other 
 > writes going on to the same tree in the bucket?

Interesting. We do have lots of Spark jobs that run concurrently throughout the 
day and are all writing through S3A. Based on that, one theory is that other 
jobs are in the process of cleaning up their pending commits, by deleting a 
large "directory" and unleashing a thundering herd. This causes the large 
volume of delete requests and 503s. If at the same time as that thundering 
herd, another job tries to begin a write, it could get stuck as described in 
this ticket. All that seems to explain the If this theory is true, then an 
upgrade to Hadoop 3.3.1 should help.

About the depth of the tree: I'm using these configs to specify the storage of 
pending commits:

{noformat}
 "spark.hadoop.fs.defaultFS": "s3a://some-bucket"
 "spark.hadoop.fs.s3a.committer.staging.tmp.path": "/"
{noformat}

So, the location of the pending commits is at the root of that bucket, as 
shallow a tree as possible. The S3A directory committer always prepends a user 
name, which happens to be "www-data" in the Spark environment. So the working 
directory of the pending commits for a given write is:

{noformat}
 www-data/{Random UUID}/staging-uploads/...
{noformat}

Having that shared "www-data" prefix probably isn't helping with load balancing 
the bucket.

> Spark job stuck in S3A StagingCommitter::setupJob
> -
>
> Key: HADOOP-17935
> URL: https://issues.apache.org/jira/browse/HADOOP-17935
> Project: Hadoop Common
>  Issue Type: Bug
>  Components: fs/s3
>Affects Versions: 3.2.1
> Environment: Spark 2.4.4
> Hadoop 3.2.1
> "spark.hadoop.fs.s3a.committer.name": "directory"
>Reporter: Brandon
>Priority: Major
>
> This is using the S3A directory staging committer, the Spark driver gets 
> stuck in a retry loop inside setupJob. Here's a stack trace:
> {noformat}
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$1753/2105635903.apply(Unknown
>  Source)
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
> 

[jira] [Comment Edited] (HADOOP-17935) Spark job stuck in S3A StagingCommitter::setupJob

2021-09-27 Thread Brandon (Jira)


[ 
https://issues.apache.org/jira/browse/HADOOP-17935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17421068#comment-17421068
 ] 

Brandon edited comment on HADOOP-17935 at 9/27/21, 10:14 PM:
-

> turns out each object in a bulk delete request counts as 1 write of your 
> quota of 3500/s, so a page of 1000 entries will be rejected if there's more 
> writes going on in same partition of a bucket. The AWS SDK throttle retry 
> handler blindly retries, and so does the S3A one.
 >
 > Are you writing files down a deep tree? And is it likely there's other 
 > writes going on to the same tree in the bucket?

Interesting. We do have lots of Spark jobs that run concurrently throughout the 
day and are all writing through S3A. Based on that, one theory is that other 
jobs are in the process of cleaning up their pending commits, by deleting a 
large "directory" and unleashing a thundering herd. This causes the large 
volume of delete requests and 503s. If at the same time as that thundering 
herd, another job tries to begin a write, it could get stuck as described in 
this ticket. All that seems to explain the If this theory is true, then an 
upgrade to Hadoop 3.3.1 should help.

About the depth of the tree: I'm using these configs to specify the storage of 
pending commits:

{noformat}
 "spark.hadoop.fs.defaultFS": "s3a://some-bucket"
 "spark.hadoop.fs.s3a.committer.staging.tmp.path": "/"
{noformat}

So, the location of the pending commits is at the root of that bucket, as 
shallow a tree as possible. The S3A directory committer always prepends a user 
name, which happens to be "www-data" in the Spark environment. So the working 
directory of the pending commits for a given write is:

{noformat}
 www-data/{Random UUID}/staging-uploads/...
{noformat}

Having that shared "www-data" prefix probably isn't helping with load balancing 
the bucket.


was (Author: brandonvin):
> turns out each object in a bulk delete request counts as 1 write of your 
> quota of 3500/s, so a page of 1000 entries will be rejected if there's more 
> writes going on in same partition of a bucket. The AWS SDK throttle retry 
> handler blindly retries, and so does the S3A one.
>
> Are you writing files down a deep tree? And is it likely there's other writes 
> going on to the same tree in the bucket?

Interesting. We do have lots of Spark jobs that run concurrently throughout the 
day and are all writing through S3A. Based on that, one theory is that other 
jobs are in the process of cleaning up their pending commits, by deleting a 
large "directory" and unleashing a thundering herd. This causes the large 
volume of delete requests and 503s. If at the same time as that thundering 
herd, another job tries to begin a write, it could get stuck as described in 
this ticket. All that seems to explain the If this theory is true, then an 
upgrade to Hadoop 3.3.1 should help.

About the depth of the tree: I'm using these configs to specify the storage of 
pending commits:
```
"spark.hadoop.fs.defaultFS": "s3a://some-bucket"
"spark.hadoop.fs.s3a.committer.staging.tmp.path": "/"
```

So, the location of the pending commits is at the root of that bucket, as 
shallow a tree as possible. The S3A directory committer always prepends a user 
name, which happens to be "www-data" in the Spark environment. So the working 
directory of the pending commits for a given write is:
```
www-data/{Random UUID}/staging-uploads/...
```

Having that shared "www-data" prefix probably isn't helping with load balancing 
the bucket.

> Spark job stuck in S3A StagingCommitter::setupJob
> -
>
> Key: HADOOP-17935
> URL: https://issues.apache.org/jira/browse/HADOOP-17935
> Project: Hadoop Common
>  Issue Type: Bug
>  Components: fs/s3
>Affects Versions: 3.2.1
> Environment: Spark 2.4.4
> Hadoop 3.2.1
> "spark.hadoop.fs.s3a.committer.name": "directory"
>Reporter: Brandon
>Priority: Major
>
> This is using the S3A directory staging committer, the Spark driver gets 
> stuck in a retry loop inside setupJob. Here's a stack trace:
> {noformat}
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$1753/2105635903.apply(Unknown
>  Source)
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
> 

[jira] [Commented] (HADOOP-17935) Spark job stuck in S3A StagingCommitter::setupJob

2021-09-27 Thread Brandon (Jira)


[ 
https://issues.apache.org/jira/browse/HADOOP-17935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17421068#comment-17421068
 ] 

Brandon commented on HADOOP-17935:
--

> turns out each object in a bulk delete request counts as 1 write of your 
> quota of 3500/s, so a page of 1000 entries will be rejected if there's more 
> writes going on in same partition of a bucket. The AWS SDK throttle retry 
> handler blindly retries, and so does the S3A one.
>
> Are you writing files down a deep tree? And is it likely there's other writes 
> going on to the same tree in the bucket?

Interesting. We do have lots of Spark jobs that run concurrently throughout the 
day and are all writing through S3A. Based on that, one theory is that other 
jobs are in the process of cleaning up their pending commits, by deleting a 
large "directory" and unleashing a thundering herd. This causes the large 
volume of delete requests and 503s. If at the same time as that thundering 
herd, another job tries to begin a write, it could get stuck as described in 
this ticket. All that seems to explain the If this theory is true, then an 
upgrade to Hadoop 3.3.1 should help.

About the depth of the tree: I'm using these configs to specify the storage of 
pending commits:
```
"spark.hadoop.fs.defaultFS": "s3a://some-bucket"
"spark.hadoop.fs.s3a.committer.staging.tmp.path": "/"
```

So, the location of the pending commits is at the root of that bucket, as 
shallow a tree as possible. The S3A directory committer always prepends a user 
name, which happens to be "www-data" in the Spark environment. So the working 
directory of the pending commits for a given write is:
```
www-data/{Random UUID}/staging-uploads/...
```

Having that shared "www-data" prefix probably isn't helping with load balancing 
the bucket.

> Spark job stuck in S3A StagingCommitter::setupJob
> -
>
> Key: HADOOP-17935
> URL: https://issues.apache.org/jira/browse/HADOOP-17935
> Project: Hadoop Common
>  Issue Type: Bug
>  Components: fs/s3
>Affects Versions: 3.2.1
> Environment: Spark 2.4.4
> Hadoop 3.2.1
> "spark.hadoop.fs.s3a.committer.name": "directory"
>Reporter: Brandon
>Priority: Major
>
> This is using the S3A directory staging committer, the Spark driver gets 
> stuck in a retry loop inside setupJob. Here's a stack trace:
> {noformat}
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$1753/2105635903.apply(Unknown
>  Source)
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
> org.apache.spark.sql.DataFrameWriter$$Lambda$1752/114484787.apply(Unknown 
> Source)
> org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:676)
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:85)
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:85)
>  => holding Monitor(org.apache.spark.sql.execution.QueryExecution@705144571})
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> org.apache.spark.sql.execution.SparkPlan$$Lambda$1574/1384254911.apply(Unknown
>  Source)
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
> org.apache.spark.sql.execution.SparkPlan$$Lambda$1573/696771575.apply(Unknown 
> Source)
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
>  => holding 
> Monitor(org.apache.spark.sql.execution.command.DataWritingCommandExec@539925125})
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:170)
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:139)
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:163)
> 

[jira] [Commented] (HADOOP-17935) Spark job stuck in S3A StagingCommitter::setupJob

2021-09-24 Thread Brandon (Jira)


[ 
https://issues.apache.org/jira/browse/HADOOP-17935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17419971#comment-17419971
 ] 

Brandon commented on HADOOP-17935:
--

[~ste...@apache.org]

Thanks for the quick reply, Steve.

I should clarify that the Spark jobs _almost always_ work just fine. This bug 
only happens rarely and unpredictably, in jobs that otherwise have no issue. So 
I don't think permissions issues in the job are to blame.

# I could try enabling debug logging on `org.apache.hadoop.fs.s3a` and wait for 
another driver to get stuck. The job does a lot of big table writes, so I'm a 
bit concerned that leaving debug logging on all the time might slow down the 
driver or clog up the log storage system. 
# I didn't have bucket logs enabled at the time, for the pending-commit bucket. 
I could try enabling that. However, I do have request metrics for that bucket, 
which shows a lot of HTTP 5xx responses around the time when the driver got 
stuck. So it's likely that the the driver got a "HTTP 503 Slow Down" response, 
which led to the retry loop seen in the stack trace. I'm also noticing that 
this stack trace shows a retry-within-a-retry.
# I could share a snippet of storediag output if you can specify which part 
you're interested in. There's a lot there. Also, running storediag on my laptop 
is not going to capture all the settings in effect in the actual Spark job.
# Could you elaborate on how increasing `fs.s3a.threads.max` would help? Does 
that thread count have an impact on the mkdirs call?

> Spark job stuck in S3A StagingCommitter::setupJob
> -
>
> Key: HADOOP-17935
> URL: https://issues.apache.org/jira/browse/HADOOP-17935
> Project: Hadoop Common
>  Issue Type: Bug
>  Components: fs/s3
>Affects Versions: 3.2.1
> Environment: Spark 2.4.4
> Hadoop 3.2.1
> "spark.hadoop.fs.s3a.committer.name": "directory"
>Reporter: Brandon
>Priority: Major
>
> This is using the S3A directory staging committer, the Spark driver gets 
> stuck in a retry loop inside setupJob. Here's a stack trace:
> {noformat}
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$1753/2105635903.apply(Unknown
>  Source)
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
> org.apache.spark.sql.DataFrameWriter$$Lambda$1752/114484787.apply(Unknown 
> Source)
> org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:676)
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:85)
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:85)
>  => holding Monitor(org.apache.spark.sql.execution.QueryExecution@705144571})
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> org.apache.spark.sql.execution.SparkPlan$$Lambda$1574/1384254911.apply(Unknown
>  Source)
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
> org.apache.spark.sql.execution.SparkPlan$$Lambda$1573/696771575.apply(Unknown 
> Source)
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
>  => holding 
> Monitor(org.apache.spark.sql.execution.command.DataWritingCommandExec@539925125})
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:170)
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:139)
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:163)
> org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter.setupJob(DirectoryStagingCommitter.java:65)
> org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter.setupJob(StagingCommitter.java:458)
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:355)
> 

[jira] [Updated] (HADOOP-17935) Spark job stuck in S3A StagingCommitter::setupJob

2021-09-23 Thread Brandon (Jira)


 [ 
https://issues.apache.org/jira/browse/HADOOP-17935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brandon updated HADOOP-17935:
-
Description: 
This is using the S3A directory staging committer, the Spark driver gets stuck 
in a retry loop inside setupJob. Here's a stack trace:


{noformat}
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
org.apache.spark.sql.execution.SQLExecution$$$Lambda$1753/2105635903.apply(Unknown
 Source)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
org.apache.spark.sql.DataFrameWriter$$Lambda$1752/114484787.apply(Unknown 
Source)
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:676)
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:85)
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:85)
 => holding Monitor(org.apache.spark.sql.execution.QueryExecution@705144571})
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan$$Lambda$1574/1384254911.apply(Unknown 
Source)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
org.apache.spark.sql.execution.SparkPlan$$Lambda$1573/696771575.apply(Unknown 
Source)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
 => holding 
Monitor(org.apache.spark.sql.execution.command.DataWritingCommandExec@539925125})
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:170)
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:139)
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:163)
org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter.setupJob(DirectoryStagingCommitter.java:65)
org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter.setupJob(StagingCommitter.java:458)
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:355)
org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2275)
org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:2062)
org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:2129)
org.apache.hadoop.fs.s3a.S3AFileSystem.createFakeDirectory(S3AFileSystem.java:2808)
org.apache.hadoop.fs.s3a.S3AFileSystem.createEmptyObject(S3AFileSystem.java:2833)
org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236)
org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
org.apache.hadoop.fs.s3a.Invoker$$Lambda$232/695085082.execute(Unknown Source)
org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
org.apache.hadoop.fs.s3a.S3AFileSystem$$Lambda$1932/855044548.execute(Unknown 
Source)
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$13(S3AFileSystem.java:2835)
org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:1589)
org.apache.hadoop.fs.s3a.S3AFileSystem.finishedWrite(S3AFileSystem.java:2751)
org.apache.hadoop.fs.s3a.S3AFileSystem.deleteUnnecessaryFakeDirectories(S3AFileSystem.java:2785)
org.apache.hadoop.fs.s3a.S3AFileSystem.removeKeys(S3AFileSystem.java:1717)
org.apache.hadoop.fs.s3a.S3AFileSystem.deleteObjects(S3AFileSystem.java:1457)
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:285)
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
org.apache.hadoop.fs.s3a.S3AFileSystem$$Lambda$1933/1245120662.execute(Unknown 
Source)
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$deleteObjects$8(S3AFileSystem.java:1461)
com.amazonaws.services.s3.AmazonS3Client.deleteObjects(AmazonS3Client.java:2136)
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4315)
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4368)
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)

[jira] [Created] (HADOOP-17935) Spark job stuck in S3A StagingCommitter::setupJob

2021-09-23 Thread Brandon (Jira)
Brandon created HADOOP-17935:


 Summary: Spark job stuck in S3A StagingCommitter::setupJob
 Key: HADOOP-17935
 URL: https://issues.apache.org/jira/browse/HADOOP-17935
 Project: Hadoop Common
  Issue Type: Bug
  Components: fs/s3
Affects Versions: 3.2.1
 Environment: Spark 2.4.4
Hadoop 3.2.1
"spark.hadoop.fs.s3a.committer.name": "directory"
Reporter: Brandon


This is using the S3A directory staging committer, the Spark driver gets stuck 
in a retry loop inside setupJob. Here's a stack trace:


{noformat}
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
org.apache.spark.sql.execution.SQLExecution$$$Lambda$1753/2105635903.apply(Unknown
 Source)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
org.apache.spark.sql.DataFrameWriter$$Lambda$1752/114484787.apply(Unknown 
Source)
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:676)
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:85)
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:85)
 = holding Monitor(org.apache.spark.sql.execution.QueryExecution@705144571})
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan$$Lambda$1574/1384254911.apply(Unknown 
Source)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
org.apache.spark.sql.execution.SparkPlan$$Lambda$1573/696771575.apply(Unknown 
Source)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
 = holding 
Monitor(org.apache.spark.sql.execution.command.DataWritingCommandExec@539925125})
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:170)
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:139)
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:163)
org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter.setupJob(DirectoryStagingCommitter.java:65)
org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter.setupJob(StagingCommitter.java:458)
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:355)
org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2275)
org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:2062)
org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:2129)
org.apache.hadoop.fs.s3a.S3AFileSystem.createFakeDirectory(S3AFileSystem.java:2808)
org.apache.hadoop.fs.s3a.S3AFileSystem.createEmptyObject(S3AFileSystem.java:2833)
org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236)
org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
org.apache.hadoop.fs.s3a.Invoker$$Lambda$232/695085082.execute(Unknown Source)
org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
org.apache.hadoop.fs.s3a.S3AFileSystem$$Lambda$1932/855044548.execute(Unknown 
Source)
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$13(S3AFileSystem.java:2835)
org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:1589)
org.apache.hadoop.fs.s3a.S3AFileSystem.finishedWrite(S3AFileSystem.java:2751)
org.apache.hadoop.fs.s3a.S3AFileSystem.deleteUnnecessaryFakeDirectories(S3AFileSystem.java:2785)
org.apache.hadoop.fs.s3a.S3AFileSystem.removeKeys(S3AFileSystem.java:1717)
org.apache.hadoop.fs.s3a.S3AFileSystem.deleteObjects(S3AFileSystem.java:1457)
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:285)
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
org.apache.hadoop.fs.s3a.S3AFileSystem$$Lambda$1933/1245120662.execute(Unknown 
Source)
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$deleteObjects$8(S3AFileSystem.java:1461)

[jira] [Updated] (HADOOP-17377) ABFS: MsiTokenProvider doesn't retry HTTP 429 from the Instance Metadata Service

2021-02-18 Thread Brandon (Jira)


 [ 
https://issues.apache.org/jira/browse/HADOOP-17377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brandon updated HADOOP-17377:
-
Description: 
*Summary*
 The instance metadata service has its own guidance for error handling and 
retry which are different from the Blob store. 
[https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#error-handling]

In particular, it responds with HTTP 429 if request rate is too high. Whereas 
Blob store will respond with HTTP 503. The retry policy used only accounts for 
the latter as it will retry any status >=500. This can result in job 
instability when running multiple processes on the same host.

*Environment*
 * Spark talking to an ABFS store

 * Hadoop 3.2.1

 * Running on an Azure VM with user-assigned identity, ABFS configured to use 
MsiTokenProvider

 * 6 executor processes on each VM

*Example*
 Here's an example error message and stack trace. It's always the same stack 
trace. This appears in logs a few hundred to low thousands of times a day. It's 
luckily skating by since the download operation is wrapped in 3 retries.
{noformat}
AADToken: HTTP connection failed for getting token from AzureAD. Http response: 
429 null
Content-Type: application/json; charset=utf-8 Content-Length: 90 Request ID:  
Proxies: none
First 1K of Body: {"error":"invalid_request","error_description":"Temporarily 
throttled, too many requests"}
at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:190)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:125)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:506)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:489)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getIsNamespaceEnabled(AzureBlobFileSystemStore.java:208)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getFileStatus(AzureBlobFileSystemStore.java:473)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.getFileStatus(AzureBlobFileSystem.java:437)
at org.apache.hadoop.fs.FileSystem.isFile(FileSystem.java:1717)
at org.apache.spark.util.Utils$.fetchHcfsFile(Utils.scala:747)
at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:724)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:496)
at 
org.apache.spark.executor.Executor.$anonfun$updateDependencies$7(Executor.scala:812)
at 
org.apache.spark.executor.Executor.$anonfun$updateDependencies$7$adapted(Executor.scala:803)
at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:792)
at 
scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:791)
at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:803)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:375)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748){noformat}
 CC [~mackrorysd], [~ste...@apache.org]

  was:
*Summary*
 The instance metadata service has its own guidance for error handling and 
retry which are different from the Blob store. 
[https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#error-handling]

In particular, it responds with HTTP 429 if request rate is too high. Whereas 
Blob store will respond with HTTP 503. The retry policy used only accounts for 
the latter. This can result in job instability when running multiple processes 
on the same host.

*Environment*
 * Spark talking to an ABFS store

 * Hadoop 3.2.1

 * Running on an Azure VM with user-assigned identity, ABFS configured to use 
MsiTokenProvider

 * 6 executor processes on each VM

*Example*
 Here's an example error message and stack trace. It's always the same stack 
trace. This appears in logs a few hundred to low thousands of times a day. It's 
luckily skating by since the download operation is wrapped in 3 retries.
{noformat}
AADToken: HTTP connection failed for getting token from AzureAD. Http response: 
429 null
Content-Type: application/json; charset=utf-8 Content-Length: 90 Request ID:  
Proxies: none
First 1K of 

[jira] [Updated] (HADOOP-17377) ABFS: MsiTokenProvider doesn't retry HTTP 429 from the Instance Metadata Service

2021-02-18 Thread Brandon (Jira)


 [ 
https://issues.apache.org/jira/browse/HADOOP-17377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brandon updated HADOOP-17377:
-
Description: 
*Summary*
 The instance metadata service has its own guidance for error handling and 
retry which are different from the Blob store. 
[https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#error-handling]

In particular, it responds with HTTP 429 if request rate is too high. Whereas 
Blob store will respond with HTTP 503. The retry policy used only accounts for 
the latter. This can result in job instability when running multiple processes 
on the same host.

*Environment*
 * Spark talking to an ABFS store

 * Hadoop 3.2.1

 * Running on an Azure VM with user-assigned identity, ABFS configured to use 
MsiTokenProvider

 * 6 executor processes on each VM

*Example*
 Here's an example error message and stack trace. It's always the same stack 
trace. This appears in logs a few hundred to low thousands of times a day. It's 
luckily skating by since the download operation is wrapped in 3 retries.
{noformat}
AADToken: HTTP connection failed for getting token from AzureAD. Http response: 
429 null
Content-Type: application/json; charset=utf-8 Content-Length: 90 Request ID:  
Proxies: none
First 1K of Body: {"error":"invalid_request","error_description":"Temporarily 
throttled, too many requests"}
at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:190)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:125)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:506)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:489)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getIsNamespaceEnabled(AzureBlobFileSystemStore.java:208)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getFileStatus(AzureBlobFileSystemStore.java:473)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.getFileStatus(AzureBlobFileSystem.java:437)
at org.apache.hadoop.fs.FileSystem.isFile(FileSystem.java:1717)
at org.apache.spark.util.Utils$.fetchHcfsFile(Utils.scala:747)
at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:724)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:496)
at 
org.apache.spark.executor.Executor.$anonfun$updateDependencies$7(Executor.scala:812)
at 
org.apache.spark.executor.Executor.$anonfun$updateDependencies$7$adapted(Executor.scala:803)
at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:792)
at 
scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:791)
at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:803)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:375)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748){noformat}
 CC [~mackrorysd], [~ste...@apache.org]

  was:
*Summary*
The instance metadata service has its own guidance for error handling and retry 
which are different from the Blob store. 
[https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#error-handling]
 

In particular, it responds with HTTP 429 if request rate is too high. Whereas 
Blob store will respond with HTTP 503. The retry policy used only accounts for 
the latter. This can result in job instability when running multiple processes 
on the same host.

*Environment*
 * Spark talking to an ABFS store

* Hadoop 3.2.1

* Running on an Azure VM with user-assigned identity, ABFS configured to use 
MsiTokenProvider

* 6 executor processes on each VM

*Example*
 Here's an example error message and stack trace. It's always the same stack 
trace. This appears in my logs a few hundred to low thousands of times a day.
{noformat}
AADToken: HTTP connection failed for getting token from AzureAD. Http response: 
429 null
Content-Type: application/json; charset=utf-8 Content-Length: 90 Request ID:  
Proxies: none
First 1K of Body: {"error":"invalid_request","error_description":"Temporarily 
throttled, too many requests"}
at 

[jira] [Updated] (HADOOP-17377) ABFS: MsiTokenProvider doesn't retry HTTP 429 from the Instance Metadata Service

2021-02-18 Thread Brandon (Jira)


 [ 
https://issues.apache.org/jira/browse/HADOOP-17377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brandon updated HADOOP-17377:
-
Description: 
*Summary*
The instance metadata service has its own guidance for error handling and retry 
which are different from the Blob store. 
[https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#error-handling]
 

In particular, it responds with HTTP 429 if request rate is too high. Whereas 
Blob store will respond with HTTP 503. The retry policy used only accounts for 
the latter. This can result in job instability when running multiple processes 
on the same host.

*Environment*
 * Spark talking to an ABFS store

* Hadoop 3.2.1

* Running on an Azure VM with user-assigned identity, ABFS configured to use 
MsiTokenProvider

* 6 executor processes on each VM

*Example*
 Here's an example error message and stack trace. It's always the same stack 
trace. This appears in my logs a few hundred to low thousands of times a day.
{noformat}
AADToken: HTTP connection failed for getting token from AzureAD. Http response: 
429 null
Content-Type: application/json; charset=utf-8 Content-Length: 90 Request ID:  
Proxies: none
First 1K of Body: {"error":"invalid_request","error_description":"Temporarily 
throttled, too many requests"}
at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:190)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:125)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:506)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:489)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getIsNamespaceEnabled(AzureBlobFileSystemStore.java:208)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getFileStatus(AzureBlobFileSystemStore.java:473)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.getFileStatus(AzureBlobFileSystem.java:437)
at org.apache.hadoop.fs.FileSystem.isFile(FileSystem.java:1717)
at org.apache.spark.util.Utils$.fetchHcfsFile(Utils.scala:747)
at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:724)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:496)
at 
org.apache.spark.executor.Executor.$anonfun$updateDependencies$7(Executor.scala:812)
at 
org.apache.spark.executor.Executor.$anonfun$updateDependencies$7$adapted(Executor.scala:803)
at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:792)
at 
scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:791)
at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:803)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:375)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748){noformat}
 CC [~mackrorysd], [~ste...@apache.org]

  was:
*Summary*
 The MSI token provider fetches auth tokens from the local instance metadata 
service.
 The instance metadata service documentation states a limit of 5 requests per 
second: 
[https://docs.microsoft.com/en-us/azure/virtual-machines/windows/instance-metadata-service#error-and-debugging]
 which is fairly low.

Using ABFS and the MSI token provider, especially when there are multiple JVMs 
running on the same host, ABFS frequently throws HTTP429 throttled exception. 
The implementation for fetching a token from MSI uses ExponentialRetryPolicy, 
however ExponentialRetryPolicy does not retry on status code 429, from my read 
of the code.

Perhaps the ExponentialRetryPolicy could retry HTTP429 errors? I'm not sure 
what other ramifications that would have.

*Environment*
 This is in the context of Spark clusters running on Azure Virtual Machine 
Scale Sets. The Virtual Machine Scale Set is configured with a user-assigned 
identity. The Spark cluster is configured to download application JARs from an 
`abfs://` path, and auth to the storage account with the MSI token provider. 
The Spark version is 2.4.4. Hadoop libraries are version 3.2.1. More details on 
the Spark configuration: each VM runs 6 executor processes, and each executor 
process uses 5 cores. 

[jira] [Updated] (HADOOP-17377) ABFS: MsiTokenProvider doesn't retry HTTP 429 from the Instance Metadata Service

2021-02-18 Thread Brandon (Jira)


 [ 
https://issues.apache.org/jira/browse/HADOOP-17377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brandon updated HADOOP-17377:
-
Summary: ABFS: MsiTokenProvider doesn't retry HTTP 429 from the Instance 
Metadata Service  (was: ABFS: Frequent HTTP429 exceptions with MSI token 
provider)

> ABFS: MsiTokenProvider doesn't retry HTTP 429 from the Instance Metadata 
> Service
> 
>
> Key: HADOOP-17377
> URL: https://issues.apache.org/jira/browse/HADOOP-17377
> Project: Hadoop Common
>  Issue Type: Bug
>  Components: fs/azure
>Affects Versions: 3.2.1
>Reporter: Brandon
>Priority: Major
>
> *Summary*
>  The MSI token provider fetches auth tokens from the local instance metadata 
> service.
>  The instance metadata service documentation states a limit of 5 requests per 
> second: 
> [https://docs.microsoft.com/en-us/azure/virtual-machines/windows/instance-metadata-service#error-and-debugging]
>  which is fairly low.
> Using ABFS and the MSI token provider, especially when there are multiple 
> JVMs running on the same host, ABFS frequently throws HTTP429 throttled 
> exception. The implementation for fetching a token from MSI uses 
> ExponentialRetryPolicy, however ExponentialRetryPolicy does not retry on 
> status code 429, from my read of the code.
> Perhaps the ExponentialRetryPolicy could retry HTTP429 errors? I'm not sure 
> what other ramifications that would have.
> *Environment*
>  This is in the context of Spark clusters running on Azure Virtual Machine 
> Scale Sets. The Virtual Machine Scale Set is configured with a user-assigned 
> identity. The Spark cluster is configured to download application JARs from 
> an `abfs://` path, and auth to the storage account with the MSI token 
> provider. The Spark version is 2.4.4. Hadoop libraries are version 3.2.1. 
> More details on the Spark configuration: each VM runs 6 executor processes, 
> and each executor process uses 5 cores. The FileSystem objects are singletons 
> within each JVM due to the internal cache, so on each VM, I expect my setup 
> is making 6 rapid requests to the instance metadata service when the executor 
> is starting up and fetching the JAR.
> *Impact*
>  In my particular use case, the download operation itself is wrapped with 3 
> additional retries. I have never seen the download cause all the tries to be 
> exhausted and fail. In the end, it seems to contribute mostly noise and 
> slowness from the retries. However, having the HTTP429 handled robustly in 
> the ABFS implementation would help application developers succeed and write 
> cleaner code without wrapping individual ABFS operations with retries.
> *Example*
>  Here's an example error message and stack trace. It's always the same stack 
> trace. This appears in my logs a few hundred to low thousands of times a day.
> {noformat}
> AADToken: HTTP connection failed for getting token from AzureAD. Http 
> response: 429 null
> Content-Type: application/json; charset=utf-8 Content-Length: 90 Request ID:  
> Proxies: none
> First 1K of Body: {"error":"invalid_request","error_description":"Temporarily 
> throttled, too many requests"}
>   at 
> org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:190)
>   at 
> org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:125)
>   at 
> org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:506)
>   at 
> org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:489)
>   at 
> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getIsNamespaceEnabled(AzureBlobFileSystemStore.java:208)
>   at 
> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getFileStatus(AzureBlobFileSystemStore.java:473)
>   at 
> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.getFileStatus(AzureBlobFileSystem.java:437)
>   at org.apache.hadoop.fs.FileSystem.isFile(FileSystem.java:1717)
>   at org.apache.spark.util.Utils$.fetchHcfsFile(Utils.scala:747)
>   at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:724)
>   at org.apache.spark.util.Utils$.fetchFile(Utils.scala:496)
>   at 
> org.apache.spark.executor.Executor.$anonfun$updateDependencies$7(Executor.scala:812)
>   at 
> org.apache.spark.executor.Executor.$anonfun$updateDependencies$7$adapted(Executor.scala:803)
>   at 
> scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:792)
>   at 
> scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
>   at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
>   at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
>   at 

[jira] [Commented] (HADOOP-17377) ABFS: Frequent HTTP429 exceptions with MSI token provider

2021-02-12 Thread Brandon (Jira)


[ 
https://issues.apache.org/jira/browse/HADOOP-17377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17283947#comment-17283947
 ] 

Brandon commented on HADOOP-17377:
--

Another note. Very rarely, I've also seen HTTP 410 errors from the Instance 
Metadata Service. ABFS currently doesn't retry those. Azure documentation 
suggests 410 and 500 response codes should be retried: 
[https://docs.microsoft.com/en-in/azure/virtual-machines/linux/instance-metadata-service?tabs=windows#errors-and-debugging]

Here's the full error message and stack trace for reference:
{noformat}
AADToken: HTTP connection failed for getting token from AzureAD. Http response: 
410 Gone
Content-Type: text/html Content-Length: 35 Request ID:  Proxies: none
First 1K of Body: The page you requested was removed.
at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:190)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:125)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:506)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:489)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getIsNamespaceEnabled(AzureBlobFileSystemStore.java:208)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getFileStatus(AzureBlobFileSystemStore.java:473)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.getFileStatus(AzureBlobFileSystem.java:437)
at 
org.apache.hadoop.fs.FileSystem.isFile(FileSystem.java:1717){noformat}

> ABFS: Frequent HTTP429 exceptions with MSI token provider
> -
>
> Key: HADOOP-17377
> URL: https://issues.apache.org/jira/browse/HADOOP-17377
> Project: Hadoop Common
>  Issue Type: Bug
>  Components: fs/azure
>Affects Versions: 3.2.1
>Reporter: Brandon
>Priority: Major
>
> *Summary*
>  The MSI token provider fetches auth tokens from the local instance metadata 
> service.
>  The instance metadata service documentation states a limit of 5 requests per 
> second: 
> [https://docs.microsoft.com/en-us/azure/virtual-machines/windows/instance-metadata-service#error-and-debugging]
>  which is fairly low.
> Using ABFS and the MSI token provider, especially when there are multiple 
> JVMs running on the same host, ABFS frequently throws HTTP429 throttled 
> exception. The implementation for fetching a token from MSI uses 
> ExponentialRetryPolicy, however ExponentialRetryPolicy does not retry on 
> status code 429, from my read of the code.
> Perhaps the ExponentialRetryPolicy could retry HTTP429 errors? I'm not sure 
> what other ramifications that would have.
> *Environment*
>  This is in the context of Spark clusters running on Azure Virtual Machine 
> Scale Sets. The Virtual Machine Scale Set is configured with a user-assigned 
> identity. The Spark cluster is configured to download application JARs from 
> an `abfs://` path, and auth to the storage account with the MSI token 
> provider. The Spark version is 2.4.4. Hadoop libraries are version 3.2.1. 
> More details on the Spark configuration: each VM runs 6 executor processes, 
> and each executor process uses 5 cores. The FileSystem objects are singletons 
> within each JVM due to the internal cache, so on each VM, I expect my setup 
> is making 6 rapid requests to the instance metadata service when the executor 
> is starting up and fetching the JAR.
> *Impact*
>  In my particular use case, the download operation itself is wrapped with 3 
> additional retries. I have never seen the download cause all the tries to be 
> exhausted and fail. In the end, it seems to contribute mostly noise and 
> slowness from the retries. However, having the HTTP429 handled robustly in 
> the ABFS implementation would help application developers succeed and write 
> cleaner code without wrapping individual ABFS operations with retries.
> *Example*
>  Here's an example error message and stack trace. It's always the same stack 
> trace. This appears in my logs a few hundred to low thousands of times a day.
> {noformat}
> AADToken: HTTP connection failed for getting token from AzureAD. Http 
> response: 429 null
> Content-Type: application/json; charset=utf-8 Content-Length: 90 Request ID:  
> Proxies: none
> First 1K of Body: {"error":"invalid_request","error_description":"Temporarily 
> throttled, too many requests"}
>   at 
> org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:190)
>   at 
> org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:125)
>   at 
> org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:506)
>   at 
> 

[jira] [Comment Edited] (HADOOP-17201) Spark job with s3acommitter stuck at the last stage

2021-01-22 Thread Brandon (Jira)


[ 
https://issues.apache.org/jira/browse/HADOOP-17201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17270478#comment-17270478
 ] 

Brandon edited comment on HADOOP-17201 at 1/22/21, 10:50 PM:
-

Some more context. For a while I had been using the staging committer with HDFS 
as the strongly consistent filesystem. Since S3 is strongly consistent now, I 
had recently switched the committer to use a distinct S3 bucket. After that 
change, Spark jobs would frequently get stuck. Stack traces revealed that it 
was commonly while doing a write or rename of the small `PendingSet` files - 
those operations were transitively invoking `deleteUnnecessaryFakeDirectories`, 
and getting stuck in what looks like an infinite retry loop.

In my situation, I saw great improvement after these two changes:
 * The prefix that I had configured in `fs.s3a.committer.staging.tmp.path` was 
excessively long; this was leading to extra deletes in 
`deleteUnnecessaryFakeDirectories`. I reduced it to just `/`  the root of the 
bucket.
 * I set `fs.s3a.retry.limit` to 3

Looking forward to the improvements in 3.3.0 as well, thanks for those!


was (Author: brandonvin):
Some more context. For a while I had been using the staging committer with HDFS 
as the strongly consistent filesystem. Since S3 is strongly consistent now, I 
had recently switched the committer to use a distinct S3 bucket. After that 
change, Spark jobs would frequently get stuck. Stack traces revealed that it 
was commonly while doing a write or rename of the small `PendingSet` files - 
those operations were transitively invoking `deleteUnnecessaryFakeDirectories`, 
and getting stuck in what looks like an infinite retry loop.

In my situation, I saw great improvement after these two changes:
 * The prefix used in `fs.s3a.committer.staging.tmp.path` was excessively long; 
this was leading to extra deletes in `deleteUnnecessaryFakeDirectories`. I 
reduced it to just `/`  the root of the bucket.
 * I set `fs.s3a.retry.limit` to 3

Looking forward to the improvements in 3.3.0 as well, thanks for those!

> Spark job with s3acommitter stuck at the last stage
> ---
>
> Key: HADOOP-17201
> URL: https://issues.apache.org/jira/browse/HADOOP-17201
> Project: Hadoop Common
>  Issue Type: Bug
>  Components: fs/s3
>Affects Versions: 3.2.1
> Environment: we are on spark 2.4.5/hadoop 3.2.1 with s3a committer.
> spark.hadoop.fs.s3a.committer.magic.enabled: 'true'
> spark.hadoop.fs.s3a.committer.name: magic
>Reporter: Dyno
>Priority: Major
>  Labels: pull-request-available
> Attachments: exec-120.log, exec-125.log, exec-25.log, exec-31.log, 
> exec-36.log, exec-44.log, exec-5.log, exec-64.log, exec-7.log
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> usually our spark job took 1 hour or 2 to finish, occasionally it runs for 
> more than 3 hour and then we know it's stuck and usually the executor has 
> stack like this
> {{
> "Executor task launch worker for task 78620" #265 daemon prio=5 os_prio=0 
> tid=0x7f73e0005000 nid=0x12d waiting on condition [0x7f74cb291000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
>   at java.lang.Thread.sleep(Native Method)
>   at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:349)
>   at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:285)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.deleteObjects(S3AFileSystem.java:1457)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.removeKeys(S3AFileSystem.java:1717)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.deleteUnnecessaryFakeDirectories(S3AFileSystem.java:2785)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.finishedWrite(S3AFileSystem.java:2751)
>   at 
> org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$finalizeMultipartUpload$1(WriteOperationHelper.java:238)
>   at 
> org.apache.hadoop.fs.s3a.WriteOperationHelper$$Lambda$210/1059071691.execute(Unknown
>  Source)
>   at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>   at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
>   at 
> org.apache.hadoop.fs.s3a.Invoker$$Lambda$23/586859139.execute(Unknown Source)
>   at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
>   at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
>   at 
> org.apache.hadoop.fs.s3a.WriteOperationHelper.finalizeMultipartUpload(WriteOperationHelper.java:226)
>   at 
> org.apache.hadoop.fs.s3a.WriteOperationHelper.completeMPUwithRetries(WriteOperationHelper.java:271)
>   at 
> org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.complete(S3ABlockOutputStream.java:660)
>   at 
> 

[jira] [Commented] (HADOOP-17201) Spark job with s3acommitter stuck at the last stage

2021-01-22 Thread Brandon (Jira)


[ 
https://issues.apache.org/jira/browse/HADOOP-17201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17270478#comment-17270478
 ] 

Brandon commented on HADOOP-17201:
--

Some more context. For a while I had been using the staging committer with HDFS 
as the strongly consistent filesystem. Since S3 is strongly consistent now, I 
had recently switched the committer to use a distinct S3 bucket. After that 
change, Spark jobs would frequently get stuck. Stack traces revealed that it 
was commonly while doing a write or rename of the small `PendingSet` files - 
those operations were transitively invoking `deleteUnnecessaryFakeDirectories`, 
and getting stuck in what looks like an infinite retry loop.

In my situation, I saw great improvement after these two changes:
 * The prefix used in `fs.s3a.committer.staging.tmp.path` was excessively long; 
this was leading to extra deletes in `deleteUnnecessaryFakeDirectories`. I 
reduced it to just `/`  the root of the bucket.
 * I set `fs.s3a.retry.limit` to 3

Looking forward to the improvements in 3.3.0 as well, thanks for those!

> Spark job with s3acommitter stuck at the last stage
> ---
>
> Key: HADOOP-17201
> URL: https://issues.apache.org/jira/browse/HADOOP-17201
> Project: Hadoop Common
>  Issue Type: Bug
>  Components: fs/s3
>Affects Versions: 3.2.1
> Environment: we are on spark 2.4.5/hadoop 3.2.1 with s3a committer.
> spark.hadoop.fs.s3a.committer.magic.enabled: 'true'
> spark.hadoop.fs.s3a.committer.name: magic
>Reporter: Dyno
>Priority: Major
>  Labels: pull-request-available
> Attachments: exec-120.log, exec-125.log, exec-25.log, exec-31.log, 
> exec-36.log, exec-44.log, exec-5.log, exec-64.log, exec-7.log
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> usually our spark job took 1 hour or 2 to finish, occasionally it runs for 
> more than 3 hour and then we know it's stuck and usually the executor has 
> stack like this
> {{
> "Executor task launch worker for task 78620" #265 daemon prio=5 os_prio=0 
> tid=0x7f73e0005000 nid=0x12d waiting on condition [0x7f74cb291000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
>   at java.lang.Thread.sleep(Native Method)
>   at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:349)
>   at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:285)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.deleteObjects(S3AFileSystem.java:1457)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.removeKeys(S3AFileSystem.java:1717)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.deleteUnnecessaryFakeDirectories(S3AFileSystem.java:2785)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.finishedWrite(S3AFileSystem.java:2751)
>   at 
> org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$finalizeMultipartUpload$1(WriteOperationHelper.java:238)
>   at 
> org.apache.hadoop.fs.s3a.WriteOperationHelper$$Lambda$210/1059071691.execute(Unknown
>  Source)
>   at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>   at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
>   at 
> org.apache.hadoop.fs.s3a.Invoker$$Lambda$23/586859139.execute(Unknown Source)
>   at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
>   at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
>   at 
> org.apache.hadoop.fs.s3a.WriteOperationHelper.finalizeMultipartUpload(WriteOperationHelper.java:226)
>   at 
> org.apache.hadoop.fs.s3a.WriteOperationHelper.completeMPUwithRetries(WriteOperationHelper.java:271)
>   at 
> org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.complete(S3ABlockOutputStream.java:660)
>   at 
> org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.access$200(S3ABlockOutputStream.java:521)
>   at 
> org.apache.hadoop.fs.s3a.S3ABlockOutputStream.close(S3ABlockOutputStream.java:385)
>   at 
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>   at 
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
>   at 
> org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64)
>   at 
> org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:685)
>   at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:122)
>   at 
> org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
>   at 
> 

[jira] [Commented] (HADOOP-17201) Spark job with s3acommitter stuck at the last stage

2021-01-08 Thread Brandon (Jira)


[ 
https://issues.apache.org/jira/browse/HADOOP-17201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17261496#comment-17261496
 ] 

Brandon commented on HADOOP-17201:
--

Adding another observation of this issue. My setup is Spark 2.4.4 with Hadoop 
3.2.1. Have tried both the staging committer and magic committer and have seen 
this with both. It's a Spark job that writes several large tables to Parquet in 
S3. Normally it takes ~2 hours, but nondeterministically it will hang 
indefinitely and never finish. When it's hung, the executors have a small 
handful of tasks that are stuck in a sleep. The tasks have thread dumps like 
these. This example was using the magic committer.
{noformat}
"Executor task launch worker for task 1270082" #897 daemon prio=5 os_prio=0 
tid=0x7f6ba4013000 nid=0x5a42 waiting on condition [0x7f6b345ef000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
 at java.lang.Thread.sleep(Native Method)
 at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:349)
 at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:285)
 at 
org.apache.hadoop.fs.s3a.S3AFileSystem.deleteObjects(S3AFileSystem.java:1457)
 at org.apache.hadoop.fs.s3a.S3AFileSystem.removeKeys(S3AFileSystem.java:1717)
 at 
org.apache.hadoop.fs.s3a.S3AFileSystem.deleteUnnecessaryFakeDirectories(S3AFileSystem.java:2785)
 at 
org.apache.hadoop.fs.s3a.S3AFileSystem.finishedWrite(S3AFileSystem.java:2751)
 at org.apache.hadoop.fs.s3a.S3AFileSystem.executePut(S3AFileSystem.java:2484)
 at 
org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadObject$6(WriteOperationHelper.java:445)
 at 
org.apache.hadoop.fs.s3a.WriteOperationHelper$$Lambda$1357/1424336210.execute(Unknown
 Source)
 at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
 at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
 at org.apache.hadoop.fs.s3a.Invoker$$Lambda$378/1750315651.execute(Unknown 
Source)
 at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
 at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
 at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236)
 at 
org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
 at 
org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadObject(WriteOperationHelper.java:443)
 at 
org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker.aboutToComplete(MagicCommitTracker.java:139)
 at 
org.apache.hadoop.fs.s3a.S3ABlockOutputStream.close(S3ABlockOutputStream.java:382)
 at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
 at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
 at 
org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64)
 at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:685)
 at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:122)
 at 
org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
 at 
org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
 at 
org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
{noformat}

> Spark job with s3acommitter stuck at the last stage
> ---
>
> Key: HADOOP-17201
> URL: https://issues.apache.org/jira/browse/HADOOP-17201
> Project: Hadoop Common
>  Issue Type: Bug
>  Components: fs/s3
>Affects Versions: 3.2.1
> Environment: we are on spark 2.4.5/hadoop 3.2.1 with s3a committer.
> spark.hadoop.fs.s3a.committer.magic.enabled: 'true'
> spark.hadoop.fs.s3a.committer.name: magic
>Reporter: Dyno
>Priority: Major
>  Labels: pull-request-available
> Attachments: exec-120.log, exec-125.log, exec-25.log, exec-31.log, 
> exec-36.log, exec-44.log, exec-5.log, exec-64.log, exec-7.log
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> usually our spark job took 1 hour or 2 to finish, occasionally it runs for 
> more than 3 hour and then we know it's stuck and usually the executor has 
> stack like this
> {{
> "Executor task launch worker for task 78620" #265 daemon prio=5 os_prio=0 
> tid=0x7f73e0005000 nid=0x12d waiting on condition [0x7f74cb291000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
>   at java.lang.Thread.sleep(Native Method)
>   at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:349)
>   at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:285)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.deleteObjects(S3AFileSystem.java:1457)
>   at 
> 

[jira] [Updated] (HADOOP-17377) ABFS: Frequent HTTP429 exceptions with MSI token provider

2020-11-13 Thread Brandon (Jira)


 [ 
https://issues.apache.org/jira/browse/HADOOP-17377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brandon updated HADOOP-17377:
-
Description: 
*Summary*
 The MSI token provider fetches auth tokens from the local instance metadata 
service.
 The instance metadata service documentation states a limit of 5 requests per 
second: 
[https://docs.microsoft.com/en-us/azure/virtual-machines/windows/instance-metadata-service#error-and-debugging]
 which is fairly low.

Using ABFS and the MSI token provider, especially when there are multiple JVMs 
running on the same host, ABFS frequently throws HTTP429 throttled exception. 
The implementation for fetching a token from MSI uses ExponentialRetryPolicy, 
however ExponentialRetryPolicy does not retry on status code 429, from my read 
of the code.

Perhaps the ExponentialRetryPolicy could retry HTTP429 errors? I'm not sure 
what other ramifications that would have.

*Environment*
 This is in the context of Spark clusters running on Azure Virtual Machine 
Scale Sets. The Virtual Machine Scale Set is configured with a user-assigned 
identity. The Spark cluster is configured to download application JARs from an 
`abfs://` path, and auth to the storage account with the MSI token provider. 
The Spark version is 2.4.4. Hadoop libraries are version 3.2.1. More details on 
the Spark configuration: each VM runs 6 executor processes, and each executor 
process uses 5 cores. The FileSystem objects are singletons within each JVM due 
to the internal cache, so on each VM, I expect my setup is making 6 rapid 
requests to the instance metadata service when the executor is starting up and 
fetching the JAR.

*Impact*
 In my particular use case, the download operation itself is wrapped with 3 
additional retries. I have never seen the download cause all the tries to be 
exhausted and fail. In the end, it seems to contribute mostly noise and 
slowness from the retries. However, having the HTTP429 handled robustly in the 
ABFS implementation would help application developers succeed and write cleaner 
code without wrapping individual ABFS operations with retries.

*Example*
 Here's an example error message and stack trace. It's always the same stack 
trace. This appears in my logs a few hundred to low thousands of times a day.
{noformat}
AADToken: HTTP connection failed for getting token from AzureAD. Http response: 
429 null
Content-Type: application/json; charset=utf-8 Content-Length: 90 Request ID:  
Proxies: none
First 1K of Body: {"error":"invalid_request","error_description":"Temporarily 
throttled, too many requests"}
at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:190)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:125)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:506)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:489)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getIsNamespaceEnabled(AzureBlobFileSystemStore.java:208)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getFileStatus(AzureBlobFileSystemStore.java:473)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.getFileStatus(AzureBlobFileSystem.java:437)
at org.apache.hadoop.fs.FileSystem.isFile(FileSystem.java:1717)
at org.apache.spark.util.Utils$.fetchHcfsFile(Utils.scala:747)
at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:724)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:496)
at 
org.apache.spark.executor.Executor.$anonfun$updateDependencies$7(Executor.scala:812)
at 
org.apache.spark.executor.Executor.$anonfun$updateDependencies$7$adapted(Executor.scala:803)
at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:792)
at 
scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:791)
at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:803)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:375)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748){noformat}
 CC [~mackrorysd], [~ste...@apache.org]

  was:
*Summary*
 The MSI token 

[jira] [Updated] (HADOOP-17377) ABFS: Frequent HTTP429 exceptions with MSI token provider

2020-11-13 Thread Brandon (Jira)


 [ 
https://issues.apache.org/jira/browse/HADOOP-17377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brandon updated HADOOP-17377:
-
Description: 
*Summary*
 The MSI token provider fetches auth tokens from the local instance metadata 
service.
 The instance metadata service documentation states a limit of 5 requests per 
second: 
[https://docs.microsoft.com/en-us/azure/virtual-machines/windows/instance-metadata-service#error-and-debugging]
 which is fairly low.

Using ABFS and the MSI token provider, especially when used from multiple 
threads, ABFS frequently throws HTTP429 throttled exception. The implementation 
for fetching a token from MSI uses ExponentialRetryPolicy, however 
ExponentialRetryPolicy does not retry on status code 429, from my read of the 
code.

Perhaps the ExponentialRetryPolicy could retry HTTP429 errors? I'm not sure 
what other ramifications that would have.

*Environment*
 This is in the context of Spark clusters running on Azure Virtual Machine 
Scale Sets. The Virtual Machine Scale Set is configured with a user-assigned 
identity. The Spark cluster is configured to download application JARs from an 
`abfs://` path, and auth to the storage account with the MSI token provider. 
The Spark version is 2.4.4. Hadoop libraries are version 3.2.1. More details on 
the Spark configuration: each VM runs 3 executor processes, and each executor 
process uses 5 cores. So on each VM, I expect maybe up to 15 rapid requests to 
the instance metadata service when the executor is starting up and fetching the 
JAR.

*Impact*
 In my particular use case, the download operation itself is wrapped with 3 
additional retries. I have never seen the download cause all the tries to be 
exhausted and fail. In the end, it seems to contribute mostly noise and 
slowness from the retries. However, having the HTTP429 handled robustly in the 
ABFS implementation would help application developers succeed and write cleaner 
code without wrapping individual ABFS operations with retries.

*Example*
 Here's an example error message and stack trace. It's always the same stack 
trace. This appears in my logs a few hundred to low thousands of times a day.
{noformat}
AADToken: HTTP connection failed for getting token from AzureAD. Http response: 
429 null
Content-Type: application/json; charset=utf-8 Content-Length: 90 Request ID:  
Proxies: none
First 1K of Body: {"error":"invalid_request","error_description":"Temporarily 
throttled, too many requests"}
at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:190)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:125)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:506)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:489)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getIsNamespaceEnabled(AzureBlobFileSystemStore.java:208)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getFileStatus(AzureBlobFileSystemStore.java:473)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.getFileStatus(AzureBlobFileSystem.java:437)
at org.apache.hadoop.fs.FileSystem.isFile(FileSystem.java:1717)
at org.apache.spark.util.Utils$.fetchHcfsFile(Utils.scala:747)
at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:724)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:496)
at 
org.apache.spark.executor.Executor.$anonfun$updateDependencies$7(Executor.scala:812)
at 
org.apache.spark.executor.Executor.$anonfun$updateDependencies$7$adapted(Executor.scala:803)
at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:792)
at 
scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:791)
at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:803)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:375)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748){noformat}
 CC [~mackrorysd], [~ste...@apache.org]

  was:
*Summary*
 The MSI token provider fetches auth tokens from the local instance metadata 
service.
 The instance metadata service 

[jira] [Updated] (HADOOP-17377) ABFS: Frequent HTTP429 exceptions with MSI token provider

2020-11-12 Thread Brandon (Jira)


 [ 
https://issues.apache.org/jira/browse/HADOOP-17377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brandon updated HADOOP-17377:
-
Description: 
*Summary*
 The MSI token provider fetches auth tokens from the local instance metadata 
service.
 The instance metadata service documentation states a limit of 5 requests per 
second: 
[https://docs.microsoft.com/en-us/azure/virtual-machines/windows/instance-metadata-service#error-and-debugging]
 which is fairly low.

Using ABFS and the MSI token provider, especially when used from multiple 
threads, ABFS frequently throws HTTP429 throttled exception. The implementation 
for fetching a token from MSI uses ExponentialRetryPolicy, however 
ExponentialRetryPolicy does not retry on status code 429, from my read of the 
code.

So an initial idea is that the ExponentialRetryPolicy could retry HTTP429 
errors.

Another potential enhancement, though more complicated, is to use a static 
cache for the MSI tokens. The cache would be shared by all threads in the JVM.

*Environment*
 This is in the context of Spark clusters running on Azure Virtual Machine 
Scale Sets. The Virtual Machine Scale Set is configured with a user-assigned 
identity. The Spark cluster is configured to download application JARs from an 
`abfs://` path, and auth to the storage account with the MSI token provider. 
The Spark version is 2.4.4. Hadoop libraries are version 3.2.1. More details on 
the Spark configuration: each VM runs 3 executor processes, and each executor 
process uses 5 cores. So on each VM, I expect maybe up to 15 rapid requests to 
the instance metadata service when the executor is starting up and fetching the 
JAR.

*Impact*
 In my particular use case, the download operation itself is wrapped with 3 
additional retries. I have never seen the download cause all the tries to be 
exhausted and fail. In the end, it seems to contribute mostly noise and 
slowness from the retries. However, having the HTTP429 handled robustly in the 
ABFS implementation would help application developers succeed and write cleaner 
code without wrapping individual ABFS operations with retries.

*Example*
 Here's an example error message and stack trace. It's always the same stack 
trace. This appears in my logs a few hundred to low thousands of times a day.
{noformat}
AADToken: HTTP connection failed for getting token from AzureAD. Http response: 
429 null
Content-Type: application/json; charset=utf-8 Content-Length: 90 Request ID:  
Proxies: none
First 1K of Body: {"error":"invalid_request","error_description":"Temporarily 
throttled, too many requests"}
at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:190)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:125)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:506)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:489)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getIsNamespaceEnabled(AzureBlobFileSystemStore.java:208)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getFileStatus(AzureBlobFileSystemStore.java:473)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.getFileStatus(AzureBlobFileSystem.java:437)
at org.apache.hadoop.fs.FileSystem.isFile(FileSystem.java:1717)
at org.apache.spark.util.Utils$.fetchHcfsFile(Utils.scala:747)
at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:724)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:496)
at 
org.apache.spark.executor.Executor.$anonfun$updateDependencies$7(Executor.scala:812)
at 
org.apache.spark.executor.Executor.$anonfun$updateDependencies$7$adapted(Executor.scala:803)
at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:792)
at 
scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:791)
at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:803)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:375)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748){noformat}
 CC [~mackrorysd], [~ste...@apache.org]

  was:
*Summary*
 The 

[jira] [Updated] (HADOOP-17377) ABFS: Frequent HTTP429 exceptions with MSI token provider

2020-11-12 Thread Brandon (Jira)


 [ 
https://issues.apache.org/jira/browse/HADOOP-17377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brandon updated HADOOP-17377:
-
Description: 
*Summary*
 The MSI token provider fetches auth tokens from the local instance metadata 
service.
 The instance metadata service documentation states a limit of 5 requests per 
second: 
[https://docs.microsoft.com/en-us/azure/virtual-machines/windows/instance-metadata-service#error-and-debugging]
 which is fairly low.

Using ABFS and the MSI token provider, especially when used from multiple 
threads, ABFS frequently throws HTTP429 throttled exception. The implementation 
for fetching a token from MSI uses ExponentialRetryPolicy, however 
ExponentialRetryPolicy does not retry on status code 429, from my read of the 
code.

So an initial idea is that the ExponentialRetryPolicy could retry HTTP429 
errors.

Another potential enhancement, though more complicated, is to use a static 
cache for the MSI tokens. The cache would be shared by all threads in the JVM.

*Environment*
 This is in the context of Spark clusters running on Azure Virtual Machine 
Scale Sets. The Virtual Machine Scale Set is configured with a user-assigned 
identity. The Spark cluster is configured to download application JARs from an 
`abfs://` path, and auth to the storage account with the MSI token provider. 
The Spark version is 2.4.4. Hadoop libraries are version 3.2.1. More details on 
the Spark configuration: each VM runs 3 executor processes, and each executor 
process uses 5 cores. So on each VM, I expect a maximum of 15 concurrent token 
fetches when the application is starting up and fetching its JAR.

*Impact*
 In my particular use case, the download operation itself is wrapped with 3 
additional retries. I have never seen the download cause all the tries to be 
exhausted and fail. In the end, it seems to contribute mostly noise and 
slowness from the retries. However, having the HTTP429 handled robustly in the 
ABFS implementation would help application developers succeed and write cleaner 
code without wrapping individual ABFS operations with retries.

*Example*
 Here's an example error message and stack trace. It's always the same stack 
trace. This appears in my logs a few hundred to low thousands of times a day.
{noformat}
AADToken: HTTP connection failed for getting token from AzureAD. Http response: 
429 null
Content-Type: application/json; charset=utf-8 Content-Length: 90 Request ID:  
Proxies: none
First 1K of Body: {"error":"invalid_request","error_description":"Temporarily 
throttled, too many requests"}
at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:190)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:125)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:506)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:489)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getIsNamespaceEnabled(AzureBlobFileSystemStore.java:208)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getFileStatus(AzureBlobFileSystemStore.java:473)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.getFileStatus(AzureBlobFileSystem.java:437)
at org.apache.hadoop.fs.FileSystem.isFile(FileSystem.java:1717)
at org.apache.spark.util.Utils$.fetchHcfsFile(Utils.scala:747)
at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:724)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:496)
at 
org.apache.spark.executor.Executor.$anonfun$updateDependencies$7(Executor.scala:812)
at 
org.apache.spark.executor.Executor.$anonfun$updateDependencies$7$adapted(Executor.scala:803)
at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:792)
at 
scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:791)
at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:803)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:375)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748){noformat}
 CC [~mackrorysd], [~ste...@apache.org]

  was:
*Summary*
 The MSI token provider 

[jira] [Created] (HADOOP-17377) ABFS: Frequent HTTP429 exceptions with MSI token provider

2020-11-12 Thread Brandon (Jira)
Brandon created HADOOP-17377:


 Summary: ABFS: Frequent HTTP429 exceptions with MSI token provider
 Key: HADOOP-17377
 URL: https://issues.apache.org/jira/browse/HADOOP-17377
 Project: Hadoop Common
  Issue Type: Bug
  Components: fs/azure
Affects Versions: 3.2.1
Reporter: Brandon


*Summary*
 The MSI token provider fetches auth tokens from the local instance metadata 
service.
 The instance metadata service documentation states a limit of 5 requests per 
second: 
[https://docs.microsoft.com/en-us/azure/virtual-machines/windows/instance-metadata-service#error-and-debugging]
 which is fairly low.

Using ABFS and the MSI token provider, especially when used from multiple 
threads, ABFS frequently throws HTTP429 throttled exception. The implementation 
for fetching a token from MSI uses ExponentialRetryPolicy, however 
ExponentialRetryPolicy does not retry on status code 429, from my read of the 
code.

So an initial idea is that the ExponentialRetryPolicy could retry HTTP429 
errors.

Another potential enhancement, though more complicated, is to use a static 
cache for the MSI tokens. The cache would be shared by all threads in the JVM.

*Environment*
 This is in the context of Spark clusters running on Azure Virtual Machine 
Scale Sets. The Virtual Machine Scale Set is configured with a user-assigned 
identity. The Spark cluster is configured to download application JARs from an 
`abfs://` path, and auth to the storage account with the MSI token provider. 
The Spark version is 2.4.4. Hadoop libraries are version 3.2.1. More details on 
the Spark configuration: each VM runs 3 executor processes, and each executor 
process uses 5 cores. So I expect a maximum of 15 concurrent requests to MSI 
when the application is starting up and fetching its JAR.

*Impact*
 In my particular use case, the download operation itself is wrapped with 3 
additional retries. I have never seen the download cause all the tries to be 
exhausted and fail. In the end, it seems to contribute mostly noise and 
slowness from the retries. However, having the HTTP429 handled robustly in the 
ABFS implementation would help application developers succeed and write cleaner 
code without wrapping individual ABFS operations with retries.

*Example*
 Here's an example error message and stack trace. It's always the same stack 
trace. This appears in my logs a few hundred to low thousands of times a day.
{noformat}
AADToken: HTTP connection failed for getting token from AzureAD. Http response: 
429 null
Content-Type: application/json; charset=utf-8 Content-Length: 90 Request ID:  
Proxies: none
First 1K of Body: {"error":"invalid_request","error_description":"Temporarily 
throttled, too many requests"}
at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:190)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:125)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:506)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:489)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getIsNamespaceEnabled(AzureBlobFileSystemStore.java:208)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getFileStatus(AzureBlobFileSystemStore.java:473)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.getFileStatus(AzureBlobFileSystem.java:437)
at org.apache.hadoop.fs.FileSystem.isFile(FileSystem.java:1717)
at org.apache.spark.util.Utils$.fetchHcfsFile(Utils.scala:747)
at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:724)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:496)
at 
org.apache.spark.executor.Executor.$anonfun$updateDependencies$7(Executor.scala:812)
at 
org.apache.spark.executor.Executor.$anonfun$updateDependencies$7$adapted(Executor.scala:803)
at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:792)
at 
scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:791)
at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:803)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:375)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 

[jira] [Comment Edited] (HADOOP-17066) S3A staging committer committing duplicate files

2020-06-05 Thread Brandon (Jira)


[ 
https://issues.apache.org/jira/browse/HADOOP-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126934#comment-17126934
 ] 

Brandon edited comment on HADOOP-17066 at 6/5/20, 5:12 PM:
---

Hm, I haven't been able to find the option to attach files here, so I'll just 
paste:
{noformat}
20/06/05 07:24:39 INFO ParquetFileFormat: Using user defined output committer 
for Parquet: org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
20/06/05 07:24:39 DEBUG AbstractS3ACommitterFactory: Committer option is 
directory
20/06/05 07:24:39 DEBUG AbstractS3ACommitter: Task committer 
attempt_20200605072439__m_00_0 instantiated for job "(anonymous)" ID 
job_20200605072439_ with destination 
s3://bucket/spark/tables/example/table-20200605-26635-TbaguBgC8ef/other_namespace/test_a.orc
20/06/05 07:24:39 DEBUG AbstractS3ACommitter: Setting work path to 
file:/opt/spark-2.4.4-without-hadoop-scala-2.12/work/driver-20200605072356-0020/s3a/app-20200605072420-0016/_temporary/0/_temporary/attempt_20200605072439__m_00_0
20/06/05 07:24:39 DEBUG AbstractS3ACommitterFactory: Committer option is 
directory
20/06/05 07:24:39 DEBUG AbstractS3ACommitter: Task committer 
attempt_20200605072439__m_00_0 instantiated for job "(anonymous)" ID 
job_20200605072439_ with destination 
s3://bucket/spark/tables/example/table-20200605-26635-TbaguBgC8ef/test_b.parquet
20/06/05 07:24:39 DEBUG StagingCommitter: Task committer 
attempt_20200605072439__m_00_0: final output path is 
s3://bucket/spark/tables/example/table-20200605-26635-TbaguBgC8ef/other_namespace/test_a.orc
20/06/05 07:24:39 DEBUG StagingCommitter: Conflict resolution mode: FAIL
20/06/05 07:24:39 INFO AbstractS3ACommitterFactory: Using committer directory 
to output data to 
s3://bucket/spark/tables/example/table-20200605-26635-TbaguBgC8ef/other_namespace/test_a.orc
20/06/05 07:24:39 INFO AbstractS3ACommitterFactory: Using Commmitter 
StagingCommitter{AbstractS3ACommitter{role=Task committer 
attempt_20200605072439__m_00_0, name=directory, 
outputPath=s3://bucket/spark/tables/example/table-20200605-26635-TbaguBgC8ef/other_namespace/test_a.orc,
 
workPath=file:/opt/spark-2.4.4-without-hadoop-scala-2.12/work/driver-20200605072356-0020/s3a/app-20200605072420-0016/_temporary/0/_temporary/attempt_20200605072439__m_00_0},
 conflictResolution=FAIL, 
wrappedCommitter=FileOutputCommitter{PathOutputCommitter{context=TaskAttemptContextImpl{JobContextImpl{jobId=job_20200605072439_};
 taskId=attempt_20200605072439__m_00_0, status=''}; 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter@68b8576f}; 
outputPath=hdfs://namenode/s3a-staging-tmp/www-data/app-20200605072420-0016/staging-uploads,
 workPath=null, algorithmVersion=1, skipCleanup=false, 
ignoreCleanupFailures=false}} for 
s3://bucket/spark/tables/example/table-20200605-26635-TbaguBgC8ef/other_namespace/test_a.orc
20/06/05 07:24:39 DEBUG AbstractS3ACommitter: Setting work path to 
file:/opt/spark-2.4.4-without-hadoop-scala-2.12/work/driver-20200605072356-0020/s3a/app-20200605072420-0016/_temporary/0/_temporary/attempt_20200605072439__m_00_0
20/06/05 07:24:39 DEBUG StagingCommitter: Task committer 
attempt_20200605072439__m_00_0, Setting up job (no job ID)
20/06/05 07:24:39 DEBUG StagingCommitter: Task committer 
attempt_20200605072439__m_00_0: final output path is 
s3://bucket/spark/tables/example/table-20200605-26635-TbaguBgC8ef/test_b.parquet
20/06/05 07:24:39 DEBUG StagingCommitter: Conflict resolution mode: FAIL
20/06/05 07:24:39 INFO AbstractS3ACommitterFactory: Using committer directory 
to output data to 
s3://bucket/spark/tables/example/table-20200605-26635-TbaguBgC8ef/test_b.parquet
20/06/05 07:24:39 INFO AbstractS3ACommitterFactory: Using Commmitter 
StagingCommitter{AbstractS3ACommitter{role=Task committer 
attempt_20200605072439__m_00_0, name=directory, 
outputPath=s3://bucket/spark/tables/example/table-20200605-26635-TbaguBgC8ef/test_b.parquet,
 
workPath=file:/opt/spark-2.4.4-without-hadoop-scala-2.12/work/driver-20200605072356-0020/s3a/app-20200605072420-0016/_temporary/0/_temporary/attempt_20200605072439__m_00_0},
 conflictResolution=FAIL, 
wrappedCommitter=FileOutputCommitter{PathOutputCommitter{context=TaskAttemptContextImpl{JobContextImpl{jobId=job_20200605072439_};
 taskId=attempt_20200605072439__m_00_0, status=''}; 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter@40c38a}; 
outputPath=hdfs://namenode/s3a-staging-tmp/www-data/app-20200605072420-0016/staging-uploads,
 workPath=null, algorithmVersion=1, skipCleanup=false, 
ignoreCleanupFailures=false}} for 
s3://bucket/spark/tables/example/table-20200605-26635-TbaguBgC8ef/test_b.parquet
20/06/05 07:24:39 DEBUG StagingCommitter: Task committer 
attempt_20200605072439__m_00_0, Setting up job (no job ID)
20/06/05 07:24:39 INFO 

[jira] [Commented] (HADOOP-17066) S3A staging committer committing duplicate files

2020-06-05 Thread Brandon (Jira)


[ 
https://issues.apache.org/jira/browse/HADOOP-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126934#comment-17126934
 ] 

Brandon commented on HADOOP-17066:
--

Hm, I haven't been able to find the option to attach files here, so I'll just 
paste:


{noformat}
20/06/05 07:24:39 INFO ParquetFileFormat: Using user defined output committer 
for Parquet: org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
20/06/05 07:24:39 DEBUG AbstractS3ACommitterFactory: Committer option is 
directory
20/06/05 07:24:39 DEBUG AbstractS3ACommitter: Task committer 
attempt_20200605072439__m_00_0 instantiated for job "(anonymous)" ID 
job_20200605072439_ with destination 
s3://bucket/spark/tables/planex/table-20200605-26635-TbaguBgC8ef/other_namespace/test_a.orc
20/06/05 07:24:39 DEBUG AbstractS3ACommitter: Setting work path to 
file:/opt/spark-2.4.4-without-hadoop-scala-2.12/work/driver-20200605072356-0020/s3a/app-20200605072420-0016/_temporary/0/_temporary/attempt_20200605072439__m_00_0
20/06/05 07:24:39 DEBUG AbstractS3ACommitterFactory: Committer option is 
directory
20/06/05 07:24:39 DEBUG AbstractS3ACommitter: Task committer 
attempt_20200605072439__m_00_0 instantiated for job "(anonymous)" ID 
job_20200605072439_ with destination 
s3://bucket/spark/tables/planex/table-20200605-26635-TbaguBgC8ef/test_b.parquet
20/06/05 07:24:39 DEBUG StagingCommitter: Task committer 
attempt_20200605072439__m_00_0: final output path is 
s3://bucket/spark/tables/planex/table-20200605-26635-TbaguBgC8ef/other_namespace/test_a.orc
20/06/05 07:24:39 DEBUG StagingCommitter: Conflict resolution mode: FAIL
20/06/05 07:24:39 INFO AbstractS3ACommitterFactory: Using committer directory 
to output data to 
s3://bucket/spark/tables/planex/table-20200605-26635-TbaguBgC8ef/other_namespace/test_a.orc
20/06/05 07:24:39 INFO AbstractS3ACommitterFactory: Using Commmitter 
StagingCommitter{AbstractS3ACommitter{role=Task committer 
attempt_20200605072439__m_00_0, name=directory, 
outputPath=s3://bucket/spark/tables/planex/table-20200605-26635-TbaguBgC8ef/other_namespace/test_a.orc,
 
workPath=file:/opt/spark-2.4.4-without-hadoop-scala-2.12/work/driver-20200605072356-0020/s3a/app-20200605072420-0016/_temporary/0/_temporary/attempt_20200605072439__m_00_0},
 conflictResolution=FAIL, 
wrappedCommitter=FileOutputCommitter{PathOutputCommitter{context=TaskAttemptContextImpl{JobContextImpl{jobId=job_20200605072439_};
 taskId=attempt_20200605072439__m_00_0, status=''}; 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter@68b8576f}; 
outputPath=hdfs://namenode/s3a-staging-tmp/www-data/app-20200605072420-0016/staging-uploads,
 workPath=null, algorithmVersion=1, skipCleanup=false, 
ignoreCleanupFailures=false}} for 
s3://bucket/spark/tables/planex/table-20200605-26635-TbaguBgC8ef/other_namespace/test_a.orc
20/06/05 07:24:39 DEBUG AbstractS3ACommitter: Setting work path to 
file:/opt/spark-2.4.4-without-hadoop-scala-2.12/work/driver-20200605072356-0020/s3a/app-20200605072420-0016/_temporary/0/_temporary/attempt_20200605072439__m_00_0
20/06/05 07:24:39 DEBUG StagingCommitter: Task committer 
attempt_20200605072439__m_00_0, Setting up job (no job ID)
20/06/05 07:24:39 DEBUG StagingCommitter: Task committer 
attempt_20200605072439__m_00_0: final output path is 
s3://bucket/spark/tables/planex/table-20200605-26635-TbaguBgC8ef/test_b.parquet
20/06/05 07:24:39 DEBUG StagingCommitter: Conflict resolution mode: FAIL
20/06/05 07:24:39 INFO AbstractS3ACommitterFactory: Using committer directory 
to output data to 
s3://bucket/spark/tables/planex/table-20200605-26635-TbaguBgC8ef/test_b.parquet
20/06/05 07:24:39 INFO AbstractS3ACommitterFactory: Using Commmitter 
StagingCommitter{AbstractS3ACommitter{role=Task committer 
attempt_20200605072439__m_00_0, name=directory, 
outputPath=s3://bucket/spark/tables/planex/table-20200605-26635-TbaguBgC8ef/test_b.parquet,
 
workPath=file:/opt/spark-2.4.4-without-hadoop-scala-2.12/work/driver-20200605072356-0020/s3a/app-20200605072420-0016/_temporary/0/_temporary/attempt_20200605072439__m_00_0},
 conflictResolution=FAIL, 
wrappedCommitter=FileOutputCommitter{PathOutputCommitter{context=TaskAttemptContextImpl{JobContextImpl{jobId=job_20200605072439_};
 taskId=attempt_20200605072439__m_00_0, status=''}; 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter@40c38a}; 
outputPath=hdfs://namenode/s3a-staging-tmp/www-data/app-20200605072420-0016/staging-uploads,
 workPath=null, algorithmVersion=1, skipCleanup=false, 
ignoreCleanupFailures=false}} for 
s3://bucket/spark/tables/planex/table-20200605-26635-TbaguBgC8ef/test_b.parquet
20/06/05 07:24:39 DEBUG StagingCommitter: Task committer 
attempt_20200605072439__m_00_0, Setting up job (no job ID)
20/06/05 07:24:39 INFO SparkContext: Starting job: save at build.clj:466
20/06/05 07:24:39 

[jira] [Commented] (HADOOP-17066) S3A staging committer committing duplicate files

2020-06-05 Thread Brandon (Jira)


[ 
https://issues.apache.org/jira/browse/HADOOP-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126931#comment-17126931
 ] 

Brandon commented on HADOOP-17066:
--

Attaching debug logs. This is from a simple Spark application that was writing 
two ORC tables concurrently.

> S3A staging committer committing duplicate files
> 
>
> Key: HADOOP-17066
> URL: https://issues.apache.org/jira/browse/HADOOP-17066
> Project: Hadoop Common
>  Issue Type: Sub-task
>  Components: fs/s3
>Affects Versions: 3.3.0, 3.2.1, 3.1.3
>Reporter: Steve Loughran
>Assignee: Steve Loughran
>Priority: Major
>
> SPARK-39111 reporting concurrent jobs double writing files.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org