http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a013b25/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestAssumeRole.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestAssumeRole.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestAssumeRole.java
deleted file mode 100644
index 7c8760b..0000000
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestAssumeRole.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.nio.file.AccessDeniedException;
-import java.util.concurrent.Callable;
-
-import com.amazonaws.auth.AWSCredentials;
-import 
com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import static org.apache.hadoop.fs.s3a.Constants.*;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
-import static org.apache.hadoop.test.LambdaTestUtils.intercept;
-
-/**
- * Tests use of assumed roles.
- * Only run if an assumed role is provided.
- */
-public class ITestAssumeRole extends AbstractS3ATestBase {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ITestAssumeRole.class);
-
-  private static final String ARN_EXAMPLE
-      = "arn:aws:kms:eu-west-1:00000000000:key/" +
-      "0000000-16c9-4832-a1a9-c8bbef25ec8b";
-
-  private static final String E_BAD_ROLE
-      = "Not authorized to perform sts:AssumeRole";
-
-  /**
-   * This is AWS policy removes read access.
-   */
-  public static final String RESTRICTED_POLICY = "{\n"
-      + "   \"Version\": \"2012-10-17\",\n"
-      + "   \"Statement\": [{\n"
-      + "      \"Effect\": \"Deny\",\n"
-      + "      \"Action\": \"s3:ListObjects\",\n"
-      + "      \"Resource\": \"*\"\n"
-      + "    }\n"
-      + "   ]\n"
-      + "}";
-
-  private void assumeRoleTests() {
-    assume("No ARN for role tests", !getAssumedRoleARN().isEmpty());
-  }
-
-  private String getAssumedRoleARN() {
-    return getContract().getConf().getTrimmed(ASSUMED_ROLE_ARN, "");
-  }
-
-  /**
-   * Expect a filesystem to fail to instantiate.
-   * @param conf config to use
-   * @param clazz class of exception to expect
-   * @param text text in exception
-   * @param <E> type of exception as inferred from clazz
-   * @throws Exception if the exception was the wrong class
-   */
-  private <E extends Throwable> void expectFileSystemFailure(
-      Configuration conf,
-      Class<E> clazz,
-      String text) throws Exception {
-    interceptC(clazz,
-        text,
-        () -> new Path(getFileSystem().getUri()).getFileSystem(conf));
-  }
-
-  /**
-   * Experimental variant of intercept() which closes any Closeable
-   * returned.
-   */
-  private static <E extends Throwable> E interceptC(
-      Class<E> clazz, String text,
-      Callable<Closeable> eval)
-      throws Exception {
-
-    return intercept(clazz, text,
-        () -> {
-          try (Closeable c = eval.call()) {
-            return c.toString();
-          }
-        });
-  }
-
-  @Test
-  public void testCreateCredentialProvider() throws IOException {
-    assumeRoleTests();
-    describe("Create the credential provider");
-
-    String roleARN = getAssumedRoleARN();
-
-    Configuration conf = new Configuration(getContract().getConf());
-    conf.set(AWS_CREDENTIALS_PROVIDER, AssumedRoleCredentialProvider.NAME);
-    conf.set(ASSUMED_ROLE_ARN, roleARN);
-    conf.set(ASSUMED_ROLE_SESSION_NAME, "valid");
-    conf.set(ASSUMED_ROLE_SESSION_DURATION, "45m");
-    conf.set(ASSUMED_ROLE_POLICY, RESTRICTED_POLICY);
-    try (AssumedRoleCredentialProvider provider
-             = new AssumedRoleCredentialProvider(conf)) {
-      LOG.info("Provider is {}", provider);
-      AWSCredentials credentials = provider.getCredentials();
-      assertNotNull("Null credentials from " + provider, credentials);
-    }
-  }
-
-  @Test
-  public void testAssumeRoleCreateFS() throws IOException {
-    assumeRoleTests();
-    describe("Create an FS client with the role and do some basic IO");
-
-    String roleARN = getAssumedRoleARN();
-    Configuration conf = createAssumedRoleConfig(roleARN);
-    conf.set(ASSUMED_ROLE_SESSION_NAME, "valid");
-    conf.set(ASSUMED_ROLE_SESSION_DURATION, "45m");
-    Path path = new Path(getFileSystem().getUri());
-    LOG.info("Creating test FS and user {} with assumed role {}",
-        conf.get(ACCESS_KEY), roleARN);
-
-    try (FileSystem fs = path.getFileSystem(conf)) {
-      fs.getFileStatus(new Path("/"));
-      fs.mkdirs(path("testAssumeRoleFS"));
-    }
-  }
-
-  @Test
-  public void testAssumeRoleRestrictedPolicyFS() throws Exception {
-    assumeRoleTests();
-    describe("Restrict the policy for this session; verify that reads fail");
-
-    String roleARN = getAssumedRoleARN();
-    Configuration conf = createAssumedRoleConfig(roleARN);
-    conf.set(ASSUMED_ROLE_POLICY, RESTRICTED_POLICY);
-    Path path = new Path(getFileSystem().getUri());
-    try (FileSystem fs = path.getFileSystem(conf)) {
-      intercept(AccessDeniedException.class, "getFileStatus",
-          () -> fs.getFileStatus(new Path("/")));
-      intercept(AccessDeniedException.class, "getFileStatus",
-          () -> fs.listStatus(new Path("/")));
-      intercept(AccessDeniedException.class, "getFileStatus",
-          () -> fs.mkdirs(path("testAssumeRoleFS")));
-    }
-  }
-
-  @Test
-  public void testAssumeRoleFSBadARN() throws Exception {
-    assumeRoleTests();
-    describe("Attemnpt to create the FS with an invalid ARN");
-    Configuration conf = createAssumedRoleConfig(getAssumedRoleARN());
-    conf.set(ASSUMED_ROLE_ARN, ARN_EXAMPLE);
-    expectFileSystemFailure(conf, AccessDeniedException.class, E_BAD_ROLE);
-  }
-
-  @Test
-  public void testAssumeRoleNoARN() throws Exception {
-    assumeRoleTests();
-    describe("Attemnpt to create the FS with no ARN");
-    Configuration conf = createAssumedRoleConfig(getAssumedRoleARN());
-    conf.unset(ASSUMED_ROLE_ARN);
-    expectFileSystemFailure(conf,
-        IOException.class,
-        AssumedRoleCredentialProvider.E_NO_ROLE);
-  }
-
-  @Test
-  public void testAssumeRoleFSBadPolicy() throws Exception {
-    assumeRoleTests();
-    describe("Attemnpt to create the FS with malformed JSON");
-    Configuration conf = createAssumedRoleConfig(getAssumedRoleARN());
-    // add some malformed JSON
-    conf.set(ASSUMED_ROLE_POLICY, "}");
-    expectFileSystemFailure(conf,
-        AWSBadRequestException.class,
-        "JSON");
-  }
-
-  @Test
-  public void testAssumeRoleFSBadPolicy2() throws Exception {
-    assumeRoleTests();
-    describe("Attemnpt to create the FS with valid but non-compliant JSON");
-    Configuration conf = createAssumedRoleConfig(getAssumedRoleARN());
-    // add some invalid JSON
-    conf.set(ASSUMED_ROLE_POLICY, "{'json':'but not what AWS wants}");
-    expectFileSystemFailure(conf,
-        AWSBadRequestException.class,
-        "Syntax errors in policy");
-  }
-
-  @Test
-  public void testAssumeRoleCannotAuthAssumedRole() throws Exception {
-    assumeRoleTests();
-    describe("Assert that you can't use assumed roles to auth assumed roles");
-
-    Configuration conf = createAssumedRoleConfig(getAssumedRoleARN());
-    conf.set(ASSUMED_ROLE_CREDENTIALS_PROVIDER,
-        AssumedRoleCredentialProvider.NAME);
-    expectFileSystemFailure(conf,
-        IOException.class,
-        AssumedRoleCredentialProvider.E_FORBIDDEN_PROVIDER);
-  }
-
-  @Test
-  public void testAssumeRoleBadInnerAuth() throws Exception {
-    assumeRoleTests();
-    describe("Try to authenticate with a keypair with spaces");
-
-    Configuration conf = createAssumedRoleConfig(getAssumedRoleARN());
-    conf.set(ASSUMED_ROLE_CREDENTIALS_PROVIDER,
-        SimpleAWSCredentialsProvider.NAME);
-    conf.set(ACCESS_KEY, "not valid");
-    conf.set(SECRET_KEY, "not secret");
-    expectFileSystemFailure(conf, AWSBadRequestException.class, "not a valid " 
+
-        "key=value pair (missing equal-sign) in Authorization header");
-  }
-
-  @Test
-  public void testAssumeRoleBadInnerAuth2() throws Exception {
-    assumeRoleTests();
-    describe("Try to authenticate with an invalid keypair");
-
-    Configuration conf = createAssumedRoleConfig(getAssumedRoleARN());
-    conf.set(ASSUMED_ROLE_CREDENTIALS_PROVIDER,
-        SimpleAWSCredentialsProvider.NAME);
-    conf.set(ACCESS_KEY, "notvalid");
-    conf.set(SECRET_KEY, "notsecret");
-    expectFileSystemFailure(conf, AccessDeniedException.class,
-        "The security token included in the request is invalid");
-  }
-
-  @Test
-  public void testAssumeRoleBadSession() throws Exception {
-    assumeRoleTests();
-    describe("Try to authenticate with an invalid session");
-
-    Configuration conf = createAssumedRoleConfig(getAssumedRoleARN());
-    conf.set(ASSUMED_ROLE_SESSION_NAME,
-        "Session Names cannot Hava Spaces!");
-    expectFileSystemFailure(conf, AWSBadRequestException.class,
-        "Member must satisfy regular expression pattern");
-  }
-
-  /**
-   * Create a config for an assumed role; it also disables FS caching.
-   * @param roleARN ARN of role
-   * @return the configuration
-   */
-  private Configuration createAssumedRoleConfig(String roleARN) {
-    Configuration conf = new Configuration(getContract().getConf());
-    conf.set(AWS_CREDENTIALS_PROVIDER, AssumedRoleCredentialProvider.NAME);
-    conf.set(ASSUMED_ROLE_ARN, roleARN);
-    disableFilesystemCaching(conf);
-    return conf;
-  }
-
-  @Test
-  public void testAssumedRoleCredentialProviderValidation() throws Throwable {
-    Configuration conf = new Configuration();
-    conf.set(ASSUMED_ROLE_ARN, "");
-    interceptC(IOException.class,
-        AssumedRoleCredentialProvider.E_NO_ROLE,
-        () -> new AssumedRoleCredentialProvider(conf));
-  }
-
-  @Test
-  public void testAssumedDuration() throws Throwable {
-    assumeRoleTests();
-    describe("Expect the constructor to fail if the session is to short");
-    Configuration conf = new Configuration();
-    conf.set(ASSUMED_ROLE_SESSION_DURATION, "30s");
-    interceptC(IllegalArgumentException.class, "",
-        () -> new AssumedRoleCredentialProvider(conf));
-  }
-
-  @Test
-  public void testAssumedInvalidRole() throws Throwable {
-    assumeRoleTests();
-    describe("Expect the constructor to fail if the role is invalid");
-    Configuration conf = new Configuration();
-    conf.set(ASSUMED_ROLE_ARN, ARN_EXAMPLE);
-    interceptC(AWSSecurityTokenServiceException.class,
-        E_BAD_ROLE,
-        () -> new AssumedRoleCredentialProvider(conf));
-  }
-
-  /**
-   * This is here to check up on the S3ATestUtils probes themselves.
-   * @see S3ATestUtils#authenticationContains(Configuration, String).
-   */
-  @Test
-  public void testauthenticationContainsProbes() {
-    Configuration conf = new Configuration(false);
-    assertFalse("found AssumedRoleCredentialProvider",
-        authenticationContains(conf, AssumedRoleCredentialProvider.NAME));
-
-    conf.set(AWS_CREDENTIALS_PROVIDER, AssumedRoleCredentialProvider.NAME);
-    assertTrue("didn't find AssumedRoleCredentialProvider",
-        authenticationContains(conf, AssumedRoleCredentialProvider.NAME));
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a013b25/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index d6533bf..da0060e 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.fs.s3a;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -39,23 +41,28 @@ import org.junit.internal.AssumptionViolatedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.List;
+import java.util.concurrent.Callable;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*;
 import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.junit.Assert.*;
 
 /**
  * Utilities for the S3A tests.
  */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
 public final class S3ATestUtils {
   private static final Logger LOG = LoggerFactory.getLogger(
       S3ATestUtils.class);
@@ -456,6 +463,33 @@ public final class S3ATestUtils {
   }
 
   /**
+   * Variant of {@code LambdaTestUtils#intercept() which closes the Closeable
+   * returned by the invoked operation, and using its toString() value
+   * for exception messages.
+   * @param clazz class of exception; the raised exception must be this class
+   * <i>or a subclass</i>.
+   * @param contained string which must be in the {@code toString()} value
+   * of the exception
+   * @param eval expression to eval
+   * @param <T> return type of expression
+   * @param <E> exception class
+   * @return the caught exception if it was of the expected type and contents
+   */
+  public static <E extends Throwable, T extends Closeable> E interceptClosing(
+      Class<E> clazz,
+      String contained,
+      Callable<T> eval)
+      throws Exception {
+
+    return intercept(clazz, contained,
+        () -> {
+          try (Closeable c = eval.call()) {
+            return c.toString();
+          }
+        });
+  }
+
+  /**
    * Helper class to do diffs of metrics.
    */
   public static final class MetricDiff {
@@ -762,21 +796,23 @@ public final class S3ATestUtils {
   }
 
   /**
-   * List a directory.
+   * List a directory/directory tree.
    * @param fileSystem FS
    * @param path path
+   * @param recursive do a recursive listing?
+   * @return the number of files found.
    * @throws IOException failure.
    */
-  public static void lsR(FileSystem fileSystem, Path path, boolean recursive)
+  public static long lsR(FileSystem fileSystem, Path path, boolean recursive)
       throws Exception {
     if (path == null) {
       // surfaces when someone calls getParent() on something at the top
       // of the path
       LOG.info("Empty path");
-      return;
+      return 0;
     }
-    S3AUtils.applyLocatedFiles(fileSystem.listFiles(path, recursive),
-        (status) -> LOG.info("  {}", status));
+    return S3AUtils.applyLocatedFiles(fileSystem.listFiles(path, recursive),
+        (status) -> LOG.info("{}", status));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a013b25/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java
index a5be5de..d731ae7 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java
@@ -18,12 +18,6 @@
 
 package org.apache.hadoop.fs.s3a;
 
-import static org.apache.hadoop.fs.s3a.Constants.*;
-import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
-import static org.apache.hadoop.fs.s3a.S3AUtils.*;
-import static org.junit.Assert.*;
-
 import java.io.IOException;
 import java.net.URI;
 import java.util.Arrays;
@@ -33,13 +27,19 @@ import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
 import com.amazonaws.auth.InstanceProfileCredentialsProvider;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.junit.Assert.*;
 
 /**
  * Unit tests for {@link Constants#AWS_CREDENTIALS_PROVIDER} logic.
@@ -248,10 +248,10 @@ public class TestS3AAWSCredentialsProvider {
       AWSCredentialsProvider provider = providers.get(i);
       assertNotNull(
           String.format("At position %d, expected class is %s, but found 
null.",
-          i, expectedClass), provider);
+              i, expectedClass), provider);
       assertTrue(
           String.format("At position %d, expected class is %s, but found %s.",
-          i, expectedClass, provider.getClass()),
+              i, expectedClass, provider.getClass()),
           expectedClass.isAssignableFrom(provider.getClass()));
     }
   }
@@ -269,7 +269,23 @@ public class TestS3AAWSCredentialsProvider {
     assertNotNull(provider2);
     assertInstanceOf(InstanceProfileCredentialsProvider.class, provider2);
     assertSame("Expected all usage of InstanceProfileCredentialsProvider to "
-        + "share a singleton instance, but found unique instances.",
+            + "share a singleton instance, but found unique instances.",
         provider1, provider2);
   }
+
+  /**
+   * This is here to check up on the S3ATestUtils probes themselves.
+   * @see S3ATestUtils#authenticationContains(Configuration, String).
+   */
+  @Test
+  public void testAuthenticationContainsProbes() {
+    Configuration conf = new Configuration(false);
+    assertFalse("found AssumedRoleCredentialProvider",
+        authenticationContains(conf, AssumedRoleCredentialProvider.NAME));
+
+    conf.set(AWS_CREDENTIALS_PROVIDER, AssumedRoleCredentialProvider.NAME);
+    assertTrue("didn't find AssumedRoleCredentialProvider",
+        authenticationContains(conf, AssumedRoleCredentialProvider.NAME));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a013b25/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
new file mode 100644
index 0000000..08171b0
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
@@ -0,0 +1,789 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.auth;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.AccessDeniedException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.amazonaws.auth.AWSCredentials;
+import 
com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.AWSBadRequestException;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.MultipartUtils;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3AUtils;
+import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.CommitOperations;
+import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+
+import static 
org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.*;
+import static org.apache.hadoop.fs.s3a.auth.RoleModel.*;
+import static org.apache.hadoop.fs.s3a.auth.RolePolicies.*;
+import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.forbidden;
+import static org.apache.hadoop.test.LambdaTestUtils.*;
+
+/**
+ * Tests use of assumed roles.
+ * Only run if an assumed role is provided.
+ */
+@SuppressWarnings({"IOResourceOpenedButNotSafelyClosed", "ThrowableNotThrown"})
+public class ITestAssumeRole extends AbstractS3ATestBase {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestAssumeRole.class);
+
+  private static final Path ROOT = new Path("/");
+
+  /**
+   * A role FS; if non-null it is closed in teardown.
+   */
+  private S3AFileSystem roleFS;
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    assumeRoleTests();
+  }
+
+  @Override
+  public void teardown() throws Exception {
+    S3AUtils.closeAll(LOG, roleFS);
+    super.teardown();
+  }
+
+  private void assumeRoleTests() {
+    assume("No ARN for role tests", !getAssumedRoleARN().isEmpty());
+  }
+
+  private String getAssumedRoleARN() {
+    return getContract().getConf().getTrimmed(ASSUMED_ROLE_ARN, "");
+  }
+
+  /**
+   * Expect a filesystem to fail to instantiate.
+   * @param conf config to use
+   * @param clazz class of exception to expect
+   * @param text text in exception
+   * @param <E> type of exception as inferred from clazz
+   * @throws Exception if the exception was the wrong class
+   */
+  private <E extends Throwable> void expectFileSystemCreateFailure(
+      Configuration conf,
+      Class<E> clazz,
+      String text) throws Exception {
+    interceptClosing(clazz,
+        text,
+        () -> new Path(getFileSystem().getUri()).getFileSystem(conf));
+  }
+
+  @Test
+  public void testCreateCredentialProvider() throws IOException {
+    describe("Create the credential provider");
+
+    String roleARN = getAssumedRoleARN();
+
+    Configuration conf = new Configuration(getContract().getConf());
+    conf.set(AWS_CREDENTIALS_PROVIDER, AssumedRoleCredentialProvider.NAME);
+    conf.set(ASSUMED_ROLE_ARN, roleARN);
+    conf.set(ASSUMED_ROLE_SESSION_NAME, "valid");
+    conf.set(ASSUMED_ROLE_SESSION_DURATION, "45m");
+    bindRolePolicy(conf, RESTRICTED_POLICY);
+    try (AssumedRoleCredentialProvider provider
+             = new AssumedRoleCredentialProvider(conf)) {
+      LOG.info("Provider is {}", provider);
+      AWSCredentials credentials = provider.getCredentials();
+      assertNotNull("Null credentials from " + provider, credentials);
+    }
+  }
+
+  @Test
+  public void testAssumedInvalidRole() throws Throwable {
+    Configuration conf = new Configuration();
+    conf.set(ASSUMED_ROLE_ARN, ROLE_ARN_EXAMPLE);
+    interceptClosing(AWSSecurityTokenServiceException.class,
+        E_BAD_ROLE,
+        () -> new AssumedRoleCredentialProvider(conf));
+  }
+
+  @Test
+  public void testAssumeRoleFSBadARN() throws Exception {
+    describe("Attemnpt to create the FS with an invalid ARN");
+    Configuration conf = createAssumedRoleConfig();
+    conf.set(ASSUMED_ROLE_ARN, ROLE_ARN_EXAMPLE);
+    expectFileSystemCreateFailure(conf, AccessDeniedException.class,
+        E_BAD_ROLE);
+  }
+
+  @Test
+  public void testAssumeRoleNoARN() throws Exception {
+    describe("Attemnpt to create the FS with no ARN");
+    Configuration conf = createAssumedRoleConfig();
+    conf.unset(ASSUMED_ROLE_ARN);
+    expectFileSystemCreateFailure(conf,
+        IOException.class,
+        AssumedRoleCredentialProvider.E_NO_ROLE);
+  }
+
+  @Test
+  public void testAssumeRoleFSBadPolicy() throws Exception {
+    describe("Attemnpt to create the FS with malformed JSON");
+    Configuration conf = createAssumedRoleConfig();
+    // add some malformed JSON
+    conf.set(ASSUMED_ROLE_POLICY,  "}");
+    expectFileSystemCreateFailure(conf,
+        AWSBadRequestException.class,
+        "JSON");
+  }
+
+  @Test
+  public void testAssumeRoleFSBadPolicy2() throws Exception {
+    describe("Attempt to create the FS with valid but non-compliant JSON");
+    Configuration conf = createAssumedRoleConfig();
+    // add some invalid JSON
+    conf.set(ASSUMED_ROLE_POLICY, "{'json':'but not what AWS wants}");
+    expectFileSystemCreateFailure(conf,
+        AWSBadRequestException.class,
+        "Syntax errors in policy");
+  }
+
+  @Test
+  public void testAssumeRoleCannotAuthAssumedRole() throws Exception {
+    describe("Assert that you can't use assumed roles to auth assumed roles");
+
+    Configuration conf = createAssumedRoleConfig();
+    conf.set(ASSUMED_ROLE_CREDENTIALS_PROVIDER,
+        AssumedRoleCredentialProvider.NAME);
+    expectFileSystemCreateFailure(conf,
+        IOException.class,
+        AssumedRoleCredentialProvider.E_FORBIDDEN_PROVIDER);
+  }
+
+  @Test
+  public void testAssumeRoleBadInnerAuth() throws Exception {
+    describe("Try to authenticate with a keypair with spaces");
+
+    Configuration conf = createAssumedRoleConfig();
+    conf.set(ASSUMED_ROLE_CREDENTIALS_PROVIDER,
+        SimpleAWSCredentialsProvider.NAME);
+    conf.set(ACCESS_KEY, "not valid");
+    conf.set(SECRET_KEY, "not secret");
+    expectFileSystemCreateFailure(conf,
+        AWSBadRequestException.class,
+        "not a valid " +
+        "key=value pair (missing equal-sign) in Authorization header");
+  }
+
+  @Test
+  public void testAssumeRoleBadInnerAuth2() throws Exception {
+    describe("Try to authenticate with an invalid keypair");
+
+    Configuration conf = createAssumedRoleConfig();
+    conf.set(ASSUMED_ROLE_CREDENTIALS_PROVIDER,
+        SimpleAWSCredentialsProvider.NAME);
+    conf.set(ACCESS_KEY, "notvalid");
+    conf.set(SECRET_KEY, "notsecret");
+    expectFileSystemCreateFailure(conf,
+        AccessDeniedException.class,
+        "The security token included in the request is invalid");
+  }
+
+  @Test
+  public void testAssumeRoleBadSession() throws Exception {
+    describe("Try to authenticate with an invalid session");
+
+    Configuration conf = createAssumedRoleConfig();
+    conf.set(ASSUMED_ROLE_SESSION_NAME,
+        "Session names cannot hava spaces!");
+    expectFileSystemCreateFailure(conf,
+        AWSBadRequestException.class,
+        "Member must satisfy regular expression pattern");
+  }
+
+
+  /**
+   * Create the assumed role configuration.
+   * @return a config bonded to the ARN of the assumed role
+   */
+  public Configuration createAssumedRoleConfig() {
+    return createAssumedRoleConfig(getAssumedRoleARN());
+  }
+
+  /**
+   * Create a config for an assumed role; it also disables FS caching.
+   * @param roleARN ARN of role
+   * @return the new configuration
+   */
+  private Configuration createAssumedRoleConfig(String roleARN) {
+    return newAssumedRoleConfig(getContract().getConf(), roleARN);
+  }
+
+  @Test
+  public void testAssumeRoleUndefined() throws Throwable {
+    describe("Verify that you cannot instantiate the"
+        + " AssumedRoleCredentialProvider without a role ARN");
+    Configuration conf = new Configuration();
+    conf.set(ASSUMED_ROLE_ARN, "");
+    interceptClosing(IOException.class,
+        AssumedRoleCredentialProvider.E_NO_ROLE,
+        () -> new AssumedRoleCredentialProvider(conf));
+  }
+
+  @Test
+  public void testAssumedIllegalDuration() throws Throwable {
+    describe("Expect the constructor to fail if the session is to short");
+    Configuration conf = new Configuration();
+    conf.set(ASSUMED_ROLE_SESSION_DURATION, "30s");
+    interceptClosing(IllegalArgumentException.class, "",
+        () -> new AssumedRoleCredentialProvider(conf));
+  }
+
+
+  @Test
+  public void testAssumeRoleCreateFS() throws IOException {
+    describe("Create an FS client with the role and do some basic IO");
+
+    String roleARN = getAssumedRoleARN();
+    Configuration conf = createAssumedRoleConfig(roleARN);
+    Path path = new Path(getFileSystem().getUri());
+    LOG.info("Creating test FS and user {} with assumed role {}",
+        conf.get(ACCESS_KEY), roleARN);
+
+    try (FileSystem fs = path.getFileSystem(conf)) {
+      fs.getFileStatus(new Path("/"));
+      fs.mkdirs(path("testAssumeRoleFS"));
+    }
+  }
+
+  @Test
+  public void testAssumeRoleRestrictedPolicyFS() throws Exception {
+    describe("Restrict the policy for this session; verify that reads fail");
+
+    Configuration conf = createAssumedRoleConfig();
+    bindRolePolicy(conf, RESTRICTED_POLICY);
+    Path path = new Path(getFileSystem().getUri());
+    try (FileSystem fs = path.getFileSystem(conf)) {
+      forbidden("getFileStatus",
+          () -> fs.getFileStatus(new Path("/")));
+      forbidden("getFileStatus",
+          () -> fs.listStatus(new Path("/")));
+      forbidden("getFileStatus",
+          () -> fs.mkdirs(path("testAssumeRoleFS")));
+    }
+  }
+
+  /**
+   * Tighten the extra policy on the assumed role call for torrent access,
+   * and verify that it blocks all other operations.
+   * That is: any non empty policy in the assumeRole API call overrides
+   * all of the policies attached to the role before.
+   * switches the role instance to only those policies in the
+   */
+  @Test
+  public void testAssumeRolePoliciesOverrideRolePerms() throws Throwable {
+
+    describe("extra policies in assumed roles need;"
+        + " all required policies stated");
+    Configuration conf = createAssumedRoleConfig();
+
+    bindRolePolicy(conf,
+        policy(statement(false, S3_ALL_BUCKETS, S3_GET_OBJECT_TORRENT)));
+    Path path = path("testAssumeRoleStillIncludesRolePerms");
+    roleFS = (S3AFileSystem) path.getFileSystem(conf);
+    assertTouchForbidden(roleFS, path);
+  }
+
+  /**
+   * After blocking all write verbs used by S3A, try to write data (fail)
+   * and read data (succeed).
+   */
+  @Test
+  public void testReadOnlyOperations() throws Throwable {
+
+    describe("Restrict role to read only");
+    Configuration conf = createAssumedRoleConfig();
+
+    bindRolePolicy(conf,
+        policy(
+            statement(false, S3_ALL_BUCKETS, S3_PATH_WRITE_OPERATIONS),
+            STATEMENT_ALL_S3, STATEMENT_ALL_DDB));
+    Path path = methodPath();
+    roleFS = (S3AFileSystem) path.getFileSystem(conf);
+    // list the root path, expect happy
+    roleFS.listStatus(ROOT);
+
+    // touch will fail
+    assertTouchForbidden(roleFS, path);
+    // you can delete it, because it's not there and getFileStatus() is allowed
+    roleFS.delete(path, true);
+
+    //create it with the full FS
+    getFileSystem().mkdirs(path);
+
+    // and delete will not
+    assertDeleteForbidden(this.roleFS, path);
+
+    // list multipart uploads.
+    // This is part of the read policy.
+    int counter = 0;
+    MultipartUtils.UploadIterator iterator = roleFS.listUploads("/");
+    while (iterator.hasNext()) {
+      counter++;
+      iterator.next();
+    }
+    LOG.info("Found {} outstanding MPUs", counter);
+  }
+
+  /**
+   * Write successfully to the directory with full R/W access,
+   * fail to write or delete data elsewhere.
+   */
+  @SuppressWarnings("StringConcatenationMissingWhitespace")
+  @Test
+  public void testRestrictedWriteSubdir() throws Throwable {
+
+    describe("Attempt writing to paths where a role only has"
+        + " write access to a subdir of the bucket");
+    Path restrictedDir = methodPath();
+    Path child = new Path(restrictedDir, "child");
+    // the full FS
+    S3AFileSystem fs = getFileSystem();
+    fs.delete(restrictedDir, true);
+
+    Configuration conf = createAssumedRoleConfig();
+
+    bindRolePolicyStatements(conf,
+        STATEMENT_ALL_DDB,
+        statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS),
+        new Statement(Effects.Allow)
+          .addActions(S3_ALL_OPERATIONS)
+          .addResources(directory(restrictedDir)));
+    roleFS = (S3AFileSystem) restrictedDir.getFileSystem(conf);
+
+    roleFS.getFileStatus(ROOT);
+    roleFS.mkdirs(restrictedDir);
+    assertIsDirectory(restrictedDir);
+    // you can create an adjacent child
+    touch(roleFS, child);
+    assertIsFile(child);
+    // child delete rights
+    ContractTestUtils.assertDeleted(roleFS, child, true);
+    // parent delete rights
+    ContractTestUtils.assertDeleted(roleFS, restrictedDir, true);
+    // delete will try to create an empty parent directory marker, and may fail
+    roleFS.delete(restrictedDir, false);
+    // this sibling path has the same prefix as restrictedDir, but is
+    // adjacent. This verifies that a restrictedDir* pattern isn't matching
+    // siblings, so granting broader rights
+    Path sibling = new Path(restrictedDir.toUri() + "sibling");
+    touch(fs, sibling);
+    assertTouchForbidden(roleFS, sibling);
+    assertDeleteForbidden(roleFS, sibling);
+  }
+
+  public Path methodPath() throws IOException {
+    return path(getMethodName());
+  }
+
+  @Test
+  public void testRestrictedRename() throws Throwable {
+    describe("rename with parent paths not writeable");
+    executeRestrictedRename(createAssumedRoleConfig());
+  }
+
+  @Test
+  public void testRestrictedSingleDeleteRename() throws Throwable {
+    describe("rename with parent paths not writeable"
+        + " and multi-object delete disabled");
+    Configuration conf = createAssumedRoleConfig();
+    conf.setBoolean(ENABLE_MULTI_DELETE, false);
+    executeRestrictedRename(conf);
+  }
+
+  /**
+   * Execute a sequence of rename operations.
+   * @param conf FS configuration
+   */
+  public void executeRestrictedRename(final Configuration conf)
+      throws IOException {
+    Path basePath = methodPath();
+    Path restrictedDir = new Path(basePath, "renameSrc");
+    Path destPath = new Path(basePath, "renameDest");
+    Path child = new Path(restrictedDir, "child");
+    // the full FS
+    S3AFileSystem fs = getFileSystem();
+    fs.delete(basePath, true);
+
+    bindRolePolicyStatements(conf,
+        STATEMENT_ALL_DDB,
+        statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS),
+        new Statement(Effects.Allow)
+          .addActions(S3_PATH_RW_OPERATIONS)
+          .addResources(directory(restrictedDir))
+          .addResources(directory(destPath))
+    );
+    roleFS = (S3AFileSystem) restrictedDir.getFileSystem(conf);
+
+    roleFS.getFileStatus(ROOT);
+    roleFS.mkdirs(restrictedDir);
+    // you can create an adjacent child
+    touch(roleFS, child);
+
+    roleFS.delete(destPath, true);
+    // as dest doesn't exist, this will map child -> dest
+    assertRenameOutcome(roleFS, child, destPath, true);
+
+    assertIsFile(destPath);
+    assertIsDirectory(restrictedDir);
+    Path renamedDestPath = new Path(restrictedDir, destPath.getName());
+    assertRenameOutcome(roleFS, destPath, restrictedDir, true);
+    assertIsFile(renamedDestPath);
+    roleFS.delete(restrictedDir, true);
+    roleFS.delete(destPath, true);
+  }
+
+  @Test
+  public void testRestrictedRenameReadOnlyData() throws Throwable {
+    describe("rename with source read only, multidelete");
+    executeRenameReadOnlyData(createAssumedRoleConfig());
+  }
+
+  @Test
+  public void testRestrictedRenameReadOnlySingleDelete() throws Throwable {
+    describe("rename with source read only single delete");
+    Configuration conf = createAssumedRoleConfig();
+    conf.setBoolean(ENABLE_MULTI_DELETE, false);
+    executeRenameReadOnlyData(conf);
+  }
+
+  /**
+   * Execute a sequence of rename operations where the source
+   * data is read only to the client calling rename().
+   * This will cause the inner delete() operations to fail, whose outcomes
+   * are explored.
+   * Multiple files are created (in parallel) for some renames, so exploring
+   * the outcome on bulk delete calls, including verifying that a
+   * MultiObjectDeleteException is translated to an AccessDeniedException.
+   * <ol>
+   *   <li>The exception raised is AccessDeniedException,
+   *   from single and multi DELETE calls.</li>
+   *   <li>It happens after the COPY. Not ideal, but, well, we can't pretend
+   *   it's a filesystem forever.</li>
+   * </ol>
+   * @param conf FS configuration
+   */
+  public void executeRenameReadOnlyData(final Configuration conf)
+      throws Exception {
+    assume("Does not work with S3Guard", !getFileSystem().hasMetadataStore());
+    Path basePath = methodPath();
+    Path destDir = new Path(basePath, "renameDest");
+    Path readOnlyDir = new Path(basePath, "readonlyDir");
+    Path readOnlyFile = new Path(readOnlyDir, "readonlyChild");
+
+    // the full FS
+    S3AFileSystem fs = getFileSystem();
+    fs.delete(basePath, true);
+
+    // this file is readable by the roleFS, but cannot be deleted
+    touch(fs, readOnlyFile);
+
+    bindRolePolicyStatements(conf,
+        STATEMENT_ALL_DDB,
+        statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS),
+          new Statement(Effects.Allow)
+            .addActions(S3_PATH_RW_OPERATIONS)
+            .addResources(directory(destDir))
+    );
+    roleFS = (S3AFileSystem) destDir.getFileSystem(conf);
+
+    roleFS.delete(destDir, true);
+    roleFS.mkdirs(destDir);
+    // rename will fail in the delete phase
+    forbidden(readOnlyFile.toString(),
+        () -> roleFS.rename(readOnlyFile, destDir));
+
+    // and the source file is still there
+    assertIsFile(readOnlyFile);
+
+    // but so is the copied version, because there's no attempt
+    // at rollback, or preflight checking on the delete permissions
+    Path renamedFile = new Path(destDir, readOnlyFile.getName());
+
+    assertIsFile(renamedFile);
+
+    ContractTestUtils.assertDeleted(roleFS, renamedFile, true);
+    assertFileCount("Empty Dest Dir", roleFS,
+        destDir, 0);
+    // create a set of files
+    // this is done in parallel as it is 10x faster on a long-haul test run.
+    int range = 10;
+    touchFiles(fs, readOnlyDir, range);
+    // don't forget about that original file!
+    final long createdFiles = range + 1;
+    // are they all there?
+    assertFileCount("files ready to rename", roleFS,
+        readOnlyDir, createdFiles);
+
+    // try to rename the directory
+    LOG.info("Renaming readonly files {} to {}", readOnlyDir, destDir);
+    AccessDeniedException ex = forbidden("",
+        () -> roleFS.rename(readOnlyDir, destDir));
+    LOG.info("Result of renaming read-only files is AccessDeniedException", 
ex);
+    assertFileCount("files copied to the destination", roleFS,
+        destDir, createdFiles);
+    assertFileCount("files in the source directory", roleFS,
+        readOnlyDir, createdFiles);
+
+    // and finally (so as to avoid the delay of POSTing some more objects,
+    // delete that r/o source
+    forbidden("", () -> roleFS.delete(readOnlyDir, true));
+  }
+
+  /**
+   * Parallel-touch a set of files in the destination directory.
+   * @param fs filesystem
+   * @param destDir destination
+   * @param range range 1..range inclusive of files to create.
+   */
+  public void touchFiles(final S3AFileSystem fs,
+      final Path destDir,
+      final int range) {
+    IntStream.rangeClosed(1, range).parallel().forEach(
+        (i) -> eval(() -> touch(fs, new Path(destDir, "file-" + i))));
+  }
+
+  @Test
+  public void testRestrictedCommitActions() throws Throwable {
+    describe("Attempt commit operations against a path with restricted 
rights");
+    Configuration conf = createAssumedRoleConfig();
+    conf.setBoolean(CommitConstants.MAGIC_COMMITTER_ENABLED, true);
+    final int uploadPartSize = 5 * 1024 * 1024;
+
+    Path basePath = methodPath();
+    Path readOnlyDir = new Path(basePath, "readOnlyDir");
+    Path writeableDir = new Path(basePath, "writeableDir");
+    // the full FS
+    S3AFileSystem fs = getFileSystem();
+    fs.delete(basePath, true);
+    fs.mkdirs(readOnlyDir);
+
+    bindRolePolicyStatements(conf,
+        STATEMENT_ALL_DDB,
+        statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS),
+        new Statement(Effects.Allow)
+            .addActions(S3_PATH_RW_OPERATIONS)
+            .addResources(directory(writeableDir))
+    );
+    roleFS = (S3AFileSystem) writeableDir.getFileSystem(conf);
+    CommitOperations fullOperations = new CommitOperations(fs);
+    CommitOperations operations = new CommitOperations(roleFS);
+
+    File localSrc = File.createTempFile("source", "");
+    writeCSVData(localSrc);
+    Path uploadDest = new Path(readOnlyDir, "restricted.csv");
+
+    forbidden("initiate MultiPartUpload",
+        () -> {
+          return operations.uploadFileToPendingCommit(localSrc,
+              uploadDest, "", uploadPartSize);
+        });
+    // delete the file
+    localSrc.delete();
+    // create a directory there
+    localSrc.mkdirs();
+
+    // create some local files and upload them with permissions
+
+    int range = 2;
+    IntStream.rangeClosed(1, range)
+        .parallel()
+        .forEach((i) -> eval(() -> {
+          String name = "part-000" + i;
+          File src = new File(localSrc, name);
+          Path dest = new Path(readOnlyDir, name);
+          writeCSVData(src);
+          SinglePendingCommit pending =
+              fullOperations.uploadFileToPendingCommit(src, dest, "",
+                  uploadPartSize);
+          pending.save(fs, new Path(readOnlyDir,
+              name + CommitConstants.PENDING_SUFFIX), true);
+          assertTrue(src.delete());
+        }));
+
+    try {
+      // we expect to be able to list all the files here
+      Pair<PendingSet, List<Pair<LocatedFileStatus, IOException>>>
+          pendingCommits = operations.loadSinglePendingCommits(readOnlyDir,
+          true);
+
+      // all those commits must fail
+      List<SinglePendingCommit> commits = 
pendingCommits.getLeft().getCommits();
+      assertEquals(range, commits.size());
+      commits.parallelStream().forEach(
+          (c) -> {
+            CommitOperations.MaybeIOE maybeIOE = operations.commit(c, 
"origin");
+            Path path = c.destinationPath();
+            assertCommitAccessDenied(path, maybeIOE);
+          });
+
+      // fail of all list and abort of .pending files.
+      LOG.info("abortAllSinglePendingCommits({})", readOnlyDir);
+      assertCommitAccessDenied(readOnlyDir,
+          operations.abortAllSinglePendingCommits(readOnlyDir, true));
+
+      // try writing a magic file
+      Path magicDestPath = new Path(readOnlyDir,
+          CommitConstants.MAGIC + "/" + "magic.txt");
+      forbidden("", () -> {
+        touch(roleFS, magicDestPath);
+        // shouldn't get here; if we do: return the existence of the 0-byte
+        // dest file.
+        return fs.getFileStatus(magicDestPath);
+      });
+
+      // a recursive list and abort is blocked.
+      forbidden("",
+          () -> operations.abortPendingUploadsUnderPath(readOnlyDir));
+    } finally {
+      LOG.info("Cleanup");
+      fullOperations.abortPendingUploadsUnderPath(readOnlyDir);
+    }
+  }
+
+  /**
+   * Verifies that an operation returning a "MaybeIOE" failed
+   * with an AccessDeniedException in the maybe instance.
+   * @param path path operated on
+   * @param maybeIOE result to inspect
+   */
+  public void assertCommitAccessDenied(final Path path,
+      final CommitOperations.MaybeIOE maybeIOE) {
+    IOException ex = maybeIOE.getException();
+    assertNotNull("no IOE in " + maybeIOE + " for " + path, ex);
+    if (!(ex instanceof AccessDeniedException)) {
+      ContractTestUtils.fail("Wrong exception class for commit to "
+          + path, ex);
+    }
+  }
+
+  /**
+   * Write some CSV data to a local file.
+   * @param localSrc local file
+   * @throws IOException failure
+   */
+  public void writeCSVData(final File localSrc) throws IOException {
+    try(FileOutputStream fo = new FileOutputStream(localSrc)) {
+      fo.write("1, true".getBytes());
+    }
+  }
+
+  @Test
+  public void testPartialDelete() throws Throwable {
+    describe("delete with part of the child tree read only; multidelete");
+    executePartialDelete(createAssumedRoleConfig());
+  }
+
+  @Test
+  public void testPartialDeleteSingleDelete() throws Throwable {
+    describe("delete with part of the child tree read only");
+    Configuration conf = createAssumedRoleConfig();
+    conf.setBoolean(ENABLE_MULTI_DELETE, false);
+    executePartialDelete(conf);
+  }
+
+  /**
+   * Have a directory with full R/W permissions, but then remove
+   * write access underneath, and try to delete it.
+   * @param conf FS configuration
+   */
+  public void executePartialDelete(final Configuration conf)
+      throws Exception {
+    Path destDir = methodPath();
+    Path readOnlyDir = new Path(destDir, "readonlyDir");
+
+    // the full FS
+    S3AFileSystem fs = getFileSystem();
+    fs.delete(destDir, true);
+
+    bindRolePolicyStatements(conf,
+        STATEMENT_ALL_DDB,
+        statement(true, S3_ALL_BUCKETS, S3_ALL_OPERATIONS),
+        new Statement(Effects.Deny)
+            .addActions(S3_PATH_WRITE_OPERATIONS)
+            .addResources(directory(readOnlyDir))
+    );
+    roleFS = (S3AFileSystem) destDir.getFileSystem(conf);
+
+    int range = 10;
+    touchFiles(fs, readOnlyDir, range);
+    touchFiles(roleFS, destDir, range);
+    forbidden("", () -> roleFS.delete(readOnlyDir, true));
+    forbidden("", () -> roleFS.delete(destDir, true));
+
+    // and although you can't delete under the path, if the file doesn't
+    // exist, the delete call fails fast.
+    Path pathWhichDoesntExist = new Path(readOnlyDir, "no-such-path");
+    assertFalse("deleting " + pathWhichDoesntExist,
+        roleFS.delete(pathWhichDoesntExist, true));
+  }
+
+  /**
+   * Assert that the number of files in a destination matches that expected.
+   * @param text text to use in the message
+   * @param fs filesystem
+   * @param path path to list (recursively)
+   * @param expected expected count
+   * @throws IOException IO problem
+   */
+  private static void assertFileCount(String text, FileSystem fs,
+      Path path, long expected)
+      throws IOException {
+    List<String> files = new ArrayList<>();
+    applyLocatedFiles(fs.listFiles(path, true),
+        (status) -> files.add(status.getPath().toString()));
+    long actual = files.size();
+    if (actual != expected) {
+      String ls = files.stream().collect(Collectors.joining("\n"));
+      fail(text + ": expected " + expected + " files in " + path
+          + " but got " + actual + "\n" + ls);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a013b25/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumedRoleCommitOperations.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumedRoleCommitOperations.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumedRoleCommitOperations.java
new file mode 100644
index 0000000..bb66268
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumedRoleCommitOperations.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.auth;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3AUtils;
+import org.apache.hadoop.fs.s3a.commit.ITestCommitOperations;
+
+import static org.apache.hadoop.fs.s3a.Constants.ASSUMED_ROLE_ARN;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.auth.RoleModel.*;
+import static org.apache.hadoop.fs.s3a.auth.RolePolicies.*;
+import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.*;
+
+/**
+ * Verify that the commit operations work with a restricted set of operations.
+ * The superclass, {@link ITestCommitOperations} turns on an inconsistent 
client
+ * to see how things work in the presence of inconsistency.
+ * These tests disable it, to remove that as a factor in these tests, which are
+ * verifying that the policy settings to enabled MPU list/commit/abort are all
+ * enabled properly.
+ */
+public class ITestAssumedRoleCommitOperations extends ITestCommitOperations {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestAssumedRoleCommitOperations.class);
+
+  /**
+   * The restricted directory.
+   */
+  private Path restrictedDir;
+
+  /**
+   * A role FS; if non-null it is closed in teardown.
+   */
+  private S3AFileSystem roleFS;
+
+  @Override
+  public boolean useInconsistentClient() {
+    return false;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    assumeRoleTests();
+
+    restrictedDir = super.path("restricted");
+    Configuration conf = newAssumedRoleConfig(getConfiguration(),
+        getAssumedRoleARN());
+    bindRolePolicyStatements(conf,
+        STATEMENT_ALL_DDB,
+        statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS),
+        new RoleModel.Statement(RoleModel.Effects.Allow)
+            .addActions(S3_PATH_RW_OPERATIONS)
+            .addResources(directory(restrictedDir))
+    );
+    roleFS = (S3AFileSystem) restrictedDir.getFileSystem(conf);
+  }
+
+
+  @Override
+  public void teardown() throws Exception {
+    S3AUtils.closeAll(LOG, roleFS);
+    // switches getFileSystem() back to the full FS.
+    roleFS = null;
+    super.teardown();
+  }
+
+  private void assumeRoleTests() {
+    assume("No ARN for role tests", !getAssumedRoleARN().isEmpty());
+  }
+
+  /**
+   * The overridden operation returns the roleFS, so that test cases
+   * in the superclass run under restricted rights.
+   * There's special handling in startup to avoid NPEs
+   * @return {@link #roleFS}
+   */
+  @Override
+  public S3AFileSystem getFileSystem() {
+    return roleFS != null ? roleFS : getFullFileSystem();
+  }
+
+  /**
+   * Get the FS with full access rights.
+   * @return the FS created by the superclass.
+   */
+  public S3AFileSystem getFullFileSystem() {
+    return super.getFileSystem();
+  }
+
+  /**
+   * switch to an inconsistent path if in inconsistent mode.
+   * {@inheritDoc}
+   */
+  @Override
+  protected Path path(String filepath) throws IOException {
+    return new Path(restrictedDir, filepath);
+  }
+
+
+  private String getAssumedRoleARN() {
+    return getContract().getConf().getTrimmed(ASSUMED_ROLE_ARN, "");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a013b25/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/RoleTestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/RoleTestUtils.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/RoleTestUtils.java
new file mode 100644
index 0000000..9fa2600
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/RoleTestUtils.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.auth;
+
+import java.nio.file.AccessDeniedException;
+import java.util.concurrent.Callable;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
+import static org.apache.hadoop.fs.s3a.auth.RoleModel.*;
+import static org.apache.hadoop.fs.s3a.auth.RolePolicies.*;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Helper class for testing roles.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class RoleTestUtils {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RoleTestUtils.class);
+
+  private static final RoleModel MODEL = new RoleModel();
+
+
+  /** Example ARN of a role. */
+  public static final String ROLE_ARN_EXAMPLE
+      = "arn:aws:iam::9878543210123:role/role-s3-restricted";
+
+
+  /** Deny GET requests to all buckets. */
+  public static final Statement DENY_GET_ALL =
+      statement(false, S3_ALL_BUCKETS, S3_GET_OBJECT);
+
+  /**
+   * This is AWS policy removes read access.
+   */
+  public static final Policy RESTRICTED_POLICY = policy(DENY_GET_ALL);
+
+
+  /**
+   * Error message to get from the AWS SDK if you can't assume the role.
+   */
+  public static final String E_BAD_ROLE
+      = "Not authorized to perform sts:AssumeRole";
+
+  private RoleTestUtils() {
+  }
+
+  /**
+   * Bind the configuration's {@code ASSUMED_ROLE_POLICY} option to
+   * the given policy.
+   * @param conf configuration to patch
+   * @param policy policy to apply
+   * @return the modified configuration
+   * @throws JsonProcessingException JSON marshalling error
+   */
+  public static Configuration bindRolePolicy(final Configuration conf,
+      final Policy policy) throws JsonProcessingException {
+    String p = MODEL.toJson(policy);
+    LOG.info("Setting role policy to policy of size {}:\n{}", p.length(), p);
+    conf.set(ASSUMED_ROLE_POLICY, p);
+    return conf;
+  }
+
+  /**
+   * Wrap a set of statements with a policy and bind the configuration's
+   * {@code ASSUMED_ROLE_POLICY} option to it.
+   * @param conf configuration to patch
+   * @param statements statements to aggregate
+   * @return the modified configuration
+   * @throws JsonProcessingException JSON marshalling error
+   */
+  public static Configuration bindRolePolicyStatements(
+      final Configuration conf,
+      final Statement... statements) throws JsonProcessingException {
+    return bindRolePolicy(conf, policy(statements));
+  }
+
+
+  /**
+   * Try to delete a file, verify that it is not allowed.
+   * @param fs filesystem
+   * @param path path
+   */
+  public static void assertDeleteForbidden(final FileSystem fs, final Path 
path)
+      throws Exception {
+    intercept(AccessDeniedException.class, "",
+        () -> fs.delete(path, true));
+  }
+
+  /**
+   * Try to touch a file, verify that it is not allowed.
+   * @param fs filesystem
+   * @param path path
+   */
+  public static void assertTouchForbidden(final FileSystem fs, final Path path)
+      throws Exception {
+    intercept(AccessDeniedException.class, "",
+        "Caller could create file at " + path,
+        () -> {
+          touch(fs, path);
+          return fs.getFileStatus(path);
+        });
+  }
+
+  /**
+   * Create a config for an assumed role; it also disables FS caching.
+   * @param srcConf source config: this is not modified
+   * @param roleARN ARN of role
+   * @return the new configuration
+   */
+  public static Configuration newAssumedRoleConfig(
+      final Configuration srcConf,
+      final String roleARN) {
+    Configuration conf = new Configuration(srcConf);
+    conf.set(AWS_CREDENTIALS_PROVIDER, AssumedRoleCredentialProvider.NAME);
+    conf.set(ASSUMED_ROLE_ARN, roleARN);
+    conf.set(ASSUMED_ROLE_SESSION_NAME, "valid");
+    conf.set(ASSUMED_ROLE_SESSION_DURATION, "15m");
+    disableFilesystemCaching(conf);
+    return conf;
+  }
+
+  /**
+   * Assert that an operation is forbidden.
+   * @param contained contained text, may be null
+   * @param eval closure to evaluate
+   * @param <T> type of closure
+   * @return the access denied exception
+   * @throws Exception any other exception
+   */
+  public static <T> AccessDeniedException forbidden(
+      String contained,
+      Callable<T> eval)
+      throws Exception {
+    AccessDeniedException ex = intercept(AccessDeniedException.class, eval);
+    GenericTestUtils.assertExceptionContains(contained, ex);
+    return ex;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a013b25/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
index 04676db..4730a90 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
@@ -208,7 +208,9 @@ public abstract class AbstractCommitITest extends 
AbstractS3ATestBase {
    * @param p probability of a throttling occurring: 0-1.0
    */
   protected void setThrottling(float p) {
-    inconsistentClient.setThrottleProbability(p);
+    if (inconsistentClient != null) {
+      inconsistentClient.setThrottleProbability(p);
+    }
   }
 
   /**
@@ -217,7 +219,9 @@ public abstract class AbstractCommitITest extends 
AbstractS3ATestBase {
    * @param limit limit to number of calls which fail
    */
   protected void setThrottling(float p, int limit) {
-    inconsistentClient.setThrottleProbability(p);
+    if (inconsistentClient != null) {
+      inconsistentClient.setThrottleProbability(p);
+    }
     setFailureLimit(limit);
   }
 
@@ -235,7 +239,9 @@ public abstract class AbstractCommitITest extends 
AbstractS3ATestBase {
    * @param limit limit to number of calls which fail
    */
   private void setFailureLimit(int limit) {
-    inconsistentClient.setFailureLimit(limit);
+    if (inconsistentClient != null) {
+      inconsistentClient.setFailureLimit(limit);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a013b25/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
index 2a98382..2886a99 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
@@ -528,7 +528,9 @@ public class ITestCommitOperations extends 
AbstractCommitITest {
   @Test
   public void testWriteNormalStream() throws Throwable {
     S3AFileSystem fs = getFileSystem();
-    Assume.assumeTrue(fs.hasCapability(STREAM_CAPABILITY_MAGIC_OUTPUT));
+    Assume.assumeTrue(
+        "Filesystem does not have magic support enabled: " + fs,
+        fs.hasCapability(STORE_CAPABILITY_MAGIC_COMMITTER));
 
     Path destFile = path("normal");
     try (FSDataOutputStream out = fs.create(destFile, true)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to