HADOOP-13761. S3Guard: implement retries for DDB failures and throttling; 
translate exceptions.
Contributed by Aaron Fabbri.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8110d6a0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8110d6a0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8110d6a0

Branch: refs/heads/HDFS-7240
Commit: 8110d6a0d59e7dc2ddb25fa424fab188c5e9ce35
Parents: e8c5be6
Author: Steve Loughran <ste...@apache.org>
Authored: Mon Mar 5 14:06:20 2018 +0000
Committer: Steve Loughran <ste...@apache.org>
Committed: Mon Mar 5 14:06:20 2018 +0000

----------------------------------------------------------------------
 .../hadoop-aws/dev-support/findbugs-exclude.xml |  36 +++
 .../hadoop/fs/s3a/FailureInjectionPolicy.java   | 163 +++++++++++++
 .../fs/s3a/InconsistentAmazonS3Client.java      | 177 ++++++--------
 .../hadoop/fs/s3a/InconsistentS3Object.java     | 232 +++++++++++++++++++
 .../java/org/apache/hadoop/fs/s3a/Invoker.java  |   5 +-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java |  26 ++-
 .../apache/hadoop/fs/s3a/S3AInputStream.java    | 122 ++++++----
 .../org/apache/hadoop/fs/s3a/S3AOpContext.java  |  87 +++++++
 .../apache/hadoop/fs/s3a/S3AReadOpContext.java  |  56 +++++
 .../apache/hadoop/fs/s3a/S3ARetryPolicy.java    |  45 +++-
 .../hadoop/fs/s3a/S3GuardExistsRetryPolicy.java |  47 ++++
 .../fs/s3a/s3guard/DynamoDBMetadataStore.java   |  20 +-
 .../hadoop/fs/s3a/s3guard/MetadataStore.java    |   3 +
 .../site/markdown/tools/hadoop-aws/testing.md   |   5 +
 .../hadoop/fs/s3a/ITestS3AInconsistency.java    | 116 +++++++++-
 .../fs/s3a/ITestS3GuardListConsistency.java     |  13 +-
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java  |   4 +-
 .../fs/s3a/commit/AbstractCommitITest.java      |   3 +-
 .../ITestDynamoDBMetadataStoreScale.java        | 174 ++++++++++++++
 .../fs/s3a/s3guard/MetadataStoreTestBase.java   |   2 +-
 .../scale/ITestDynamoDBMetadataStoreScale.java  |  48 ----
 21 files changed, 1131 insertions(+), 253 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8110d6a0/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml 
b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
index 2615566..855aac9 100644
--- a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
@@ -28,4 +28,40 @@
     <Method name="s3Exists" />
     <Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE" />
   </Match>
+
+  <!--
+    This extends the serializable S3Object, so findbug checks
+    serializability. It is never serialized however, so its
+    warnings are false positives.
+  -->
+  <Match>
+    <Class name="org.apache.hadoop.fs.s3a.InconsistentS3Object" />
+    <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED" />
+  </Match>
+  <Match>
+    <Class name="org.apache.hadoop.fs.s3a.InconsistentS3Object" />
+    <Bug pattern="SE_NO_SERIALVERSIONID" />
+  </Match>
+
+  <!--
+   findbugs gets confused by lambda expressions in synchronized methods
+   and considers references to fields to be unsynchronized.
+   As you can't disable the methods individually, we have to disable
+   them for the entire class.
+    -->
+  <Match>
+    <Class name="org.apache.hadoop.fs.s3a.S3AInputStream"/>
+    <Bug pattern="IS2_INCONSISTENT_SYNC"/>
+  </Match>
+  <!--
+    findbugs reporting RV ignored. Not true.
+    "Return value of S3AReadOpContext.getReadInvoker() ignored,
+    but method has no side effect"
+  -->
+  <Match>
+    <Class name="org.apache.hadoop.fs.s3a.S3AInputStream"/>
+    <Method name="reopen"/>
+    <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
+  </Match>
+
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8110d6a0/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/FailureInjectionPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/FailureInjectionPolicy.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/FailureInjectionPolicy.java
new file mode 100644
index 0000000..8cd6036
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/FailureInjectionPolicy.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+/**
+ * Simple object which stores current failure injection settings.
+ * "Delaying a key" can mean:
+ *    - Removing it from the S3 client's listings while delay is in effect.
+ *    - Causing input stream reads to fail.
+ *    - Causing the S3 side of getFileStatus(), i.e.
+ *      AmazonS3#getObjectMetadata(), to throw FileNotFound.
+ */
+public class FailureInjectionPolicy {
+  /**
+   * Keys containing this substring will be subject to delayed visibility.
+   */
+  public static final String DEFAULT_DELAY_KEY_SUBSTRING = "DELAY_LISTING_ME";
+
+  /**
+   * How many seconds affected keys will have delayed visibility.
+   * This should probably be a config value.
+   */
+  public static final long DEFAULT_DELAY_KEY_MSEC = 5 * 1000;
+
+  public static final float DEFAULT_DELAY_KEY_PROBABILITY = 1.0f;
+
+  /** Special config value since we can't store empty strings in XML. */
+  public static final String MATCH_ALL_KEYS = "*";
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(InconsistentAmazonS3Client.class);
+
+  /** Empty string matches all keys. */
+  private String delayKeySubstring;
+
+  /** Probability to delay visibility of a matching key. */
+  private float delayKeyProbability;
+
+  /** Time in milliseconds to delay visibility of newly modified object. */
+  private long delayKeyMsec;
+
+  /**
+   * Probability of throttling a request.
+   */
+  private float throttleProbability;
+
+  /**
+   * limit for failures before operations succeed; if 0 then "no limit".
+   */
+  private int failureLimit = 0;
+
+  public FailureInjectionPolicy(Configuration conf) {
+
+    this.delayKeySubstring = conf.get(FAIL_INJECT_INCONSISTENCY_KEY,
+        DEFAULT_DELAY_KEY_SUBSTRING);
+    // "" is a substring of all strings, use it to match all keys.
+    if (this.delayKeySubstring.equals(MATCH_ALL_KEYS)) {
+      this.delayKeySubstring = "";
+    }
+    this.delayKeyProbability = validProbability(
+        conf.getFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY,
+            DEFAULT_DELAY_KEY_PROBABILITY));
+    this.delayKeyMsec = conf.getLong(FAIL_INJECT_INCONSISTENCY_MSEC,
+        DEFAULT_DELAY_KEY_MSEC);
+    this.setThrottleProbability(conf.getFloat(FAIL_INJECT_THROTTLE_PROBABILITY,
+        0.0f));
+  }
+
+  public String getDelayKeySubstring() {
+    return delayKeySubstring;
+  }
+
+  public float getDelayKeyProbability() {
+    return delayKeyProbability;
+  }
+
+  public long getDelayKeyMsec() {
+    return delayKeyMsec;
+  }
+
+  public float getThrottleProbability() {
+    return throttleProbability;
+  }
+
+  public int getFailureLimit() {
+    return failureLimit;
+  }
+
+  public void setFailureLimit(int failureLimit) {
+    this.failureLimit = failureLimit;
+  }
+
+  /**
+   * Set the probability of throttling a request.
+   * @param throttleProbability the probability of a request being throttled.
+   */
+  public void setThrottleProbability(float throttleProbability) {
+    this.throttleProbability = validProbability(throttleProbability);
+  }
+
+  public static boolean trueWithProbability(float p) {
+    return Math.random() < p;
+  }
+
+  /**
+   * Should we delay listing visibility for this key?
+   * @param key key which is being put
+   * @return true if we should delay
+   */
+  public boolean shouldDelay(String key) {
+    float p = getDelayKeyProbability();
+    boolean delay = key.contains(getDelayKeySubstring());
+    delay = delay && trueWithProbability(p);
+    LOG.debug("{}, p={} -> {}", key, p, delay);
+    return delay;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("FailureInjectionPolicy:" +
+            " %s msec delay, substring %s, delay probability %s;" +
+            " throttle probability %s" + "; failure limit %d",
+        delayKeyMsec, delayKeySubstring, delayKeyProbability,
+        throttleProbability, failureLimit);
+  }
+
+  /**
+   * Validate a probability option.
+   * @param p probability
+   * @return the probability, if valid
+   * @throws IllegalArgumentException if the probability is out of range.
+   */
+  private static float validProbability(float p) {
+    Preconditions.checkArgument(p >= 0.0f && p <= 1.0f,
+        "Probability out of range 0 to 1 %s", p);
+    return p;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8110d6a0/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
index d158061..99ed87d 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
@@ -38,6 +38,7 @@ import 
com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
 import com.amazonaws.services.s3.model.DeleteObjectRequest;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.DeleteObjectsResult;
+import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
 import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
 import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
@@ -48,6 +49,7 @@ import com.amazonaws.services.s3.model.MultipartUploadListing;
 import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.amazonaws.services.s3.model.UploadPartRequest;
 import com.amazonaws.services.s3.model.UploadPartResult;
@@ -60,8 +62,6 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
-import static org.apache.hadoop.fs.s3a.Constants.*;
-
 /**
  * A wrapper around {@link com.amazonaws.services.s3.AmazonS3} that injects
  * inconsistency and/or errors.  Used for testing S3Guard.
@@ -71,38 +71,10 @@ import static org.apache.hadoop.fs.s3a.Constants.*;
 @InterfaceStability.Unstable
 public class InconsistentAmazonS3Client extends AmazonS3Client {
 
-  /**
-   * Keys containing this substring will be subject to delayed visibility.
-   */
-  public static final String DEFAULT_DELAY_KEY_SUBSTRING = "DELAY_LISTING_ME";
-
-  /**
-   * How many seconds affected keys will be delayed from appearing in listing.
-   * This should probably be a config value.
-   */
-  public static final long DEFAULT_DELAY_KEY_MSEC = 5 * 1000;
-
-  public static final float DEFAULT_DELAY_KEY_PROBABILITY = 1.0f;
-
-  /** Special config value since we can't store empty strings in XML. */
-  public static final String MATCH_ALL_KEYS = "*";
-
   private static final Logger LOG =
       LoggerFactory.getLogger(InconsistentAmazonS3Client.class);
 
-  /** Empty string matches all keys. */
-  private String delayKeySubstring;
-
-  /** Probability to delay visibility of a matching key. */
-  private float delayKeyProbability;
-
-  /** Time in milliseconds to delay visibility of newly modified object. */
-  private long delayKeyMsec;
-
-  /**
-   * Probability of throttling a request.
-   */
-  private float throttleProbability;
+  private FailureInjectionPolicy policy;
 
   /**
    * Counter of failures since last reset.
@@ -110,11 +82,6 @@ public class InconsistentAmazonS3Client extends 
AmazonS3Client {
   private final AtomicLong failureCounter = new AtomicLong(0);
 
   /**
-   * limit for failures before operations succeed; if 0 then "no limit".
-   */
-  private int failureLimit = 0;
-
-  /**
    * Composite of data we need to track about recently deleted objects:
    * when it was deleted (same was with recently put objects) and the object
    * summary (since we should keep returning it for sometime after its
@@ -150,36 +117,42 @@ public class InconsistentAmazonS3Client extends 
AmazonS3Client {
   public InconsistentAmazonS3Client(AWSCredentialsProvider credentials,
       ClientConfiguration clientConfiguration, Configuration conf) {
     super(credentials, clientConfiguration);
-    setupConfig(conf);
+    policy = new FailureInjectionPolicy(conf);
   }
 
-  protected void setupConfig(Configuration conf) {
 
-    delayKeySubstring = conf.get(FAIL_INJECT_INCONSISTENCY_KEY,
-        DEFAULT_DELAY_KEY_SUBSTRING);
-    // "" is a substring of all strings, use it to match all keys.
-    if (delayKeySubstring.equals(MATCH_ALL_KEYS)) {
-      delayKeySubstring = "";
-    }
-    delayKeyProbability = validProbability(
-        conf.getFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY,
-            DEFAULT_DELAY_KEY_PROBABILITY));
-    delayKeyMsec = conf.getLong(FAIL_INJECT_INCONSISTENCY_MSEC,
-        DEFAULT_DELAY_KEY_MSEC);
-    setThrottleProbability(conf.getFloat(FAIL_INJECT_THROTTLE_PROBABILITY,
-        0.0f));
-    LOG.info("{}", this);
+  /**
+   * Clear any accumulated inconsistency state. Used by tests to make paths
+   * visible again.
+   * @param fs S3AFileSystem under test
+   * @throws Exception on failure
+   */
+  public static void clearInconsistency(S3AFileSystem fs) throws Exception {
+    AmazonS3 s3 = fs.getAmazonS3ClientForTesting("s3guard");
+    InconsistentAmazonS3Client ic = InconsistentAmazonS3Client.castFrom(s3);
+    ic.clearInconsistency();
+  }
+
+  /**
+   * A way for tests to patch in a different fault injection policy at runtime.
+   * @param fs filesystem under test
+   *
+   */
+  public static void setFailureInjectionPolicy(S3AFileSystem fs,
+      FailureInjectionPolicy policy) throws Exception {
+    AmazonS3 s3 = fs.getAmazonS3ClientForTesting("s3guard");
+    InconsistentAmazonS3Client ic = InconsistentAmazonS3Client.castFrom(s3);
+    ic.replacePolicy(policy);
+  }
+
+  private void replacePolicy(FailureInjectionPolicy pol) {
+    this.policy = pol;
   }
 
   @Override
   public String toString() {
-    return String.format(
-        "Inconsistent S3 Client with"
-            + " %s msec delay, substring %s, delay probability %s;"
-            + " throttle probability %s"
-            + "; failure limit %d, failure count %d",
-        delayKeyMsec, delayKeySubstring, delayKeyProbability,
-        throttleProbability, failureLimit, failureCounter.get());
+    return String.format("Inconsistent S3 Client: %s; failure count %d",
+        policy, failureCounter.get());
   }
 
   /**
@@ -470,7 +443,7 @@ public class InconsistentAmazonS3Client extends 
AmazonS3Client {
       return false;
     }
     long currentTime = System.currentTimeMillis();
-    long deadline = enqueueTime + delayKeyMsec;
+    long deadline = enqueueTime + policy.getDelayKeyMsec();
     if (currentTime >= deadline) {
       delayedDeletes.remove(key);
       LOG.debug("no longer delaying {}", key);
@@ -482,7 +455,7 @@ public class InconsistentAmazonS3Client extends 
AmazonS3Client {
   }
 
   private void registerDeleteObject(String key, String bucket) {
-    if (shouldDelay(key)) {
+    if (policy.shouldDelay(key)) {
       // Record summary so we can add it back for some time post-deletion
       ListObjectsRequest request = new ListObjectsRequest()
               .withBucketName(bucket)
@@ -498,29 +471,12 @@ public class InconsistentAmazonS3Client extends 
AmazonS3Client {
 
   private void registerPutObject(PutObjectRequest req) {
     String key = req.getKey();
-    if (shouldDelay(key)) {
+    if (policy.shouldDelay(key)) {
       enqueueDelayedPut(key);
     }
   }
 
   /**
-   * Should we delay listing visibility for this key?
-   * @param key key which is being put
-   * @return true if we should delay
-   */
-  private boolean shouldDelay(String key) {
-    boolean delay = key.contains(delayKeySubstring);
-    delay = delay && trueWithProbability(delayKeyProbability);
-    LOG.debug("{} -> {}", key, delay);
-    return delay;
-  }
-
-
-  private boolean trueWithProbability(float p) {
-    return Math.random() < p;
-  }
-
-  /**
    * Record this key as something that should not become visible in
    * listObject replies for a while, to simulate eventual list consistency.
    * @param key key to delay visibility of
@@ -561,20 +517,8 @@ public class InconsistentAmazonS3Client extends 
AmazonS3Client {
     return super.listMultipartUploads(listMultipartUploadsRequest);
   }
 
-  public float getDelayKeyProbability() {
-    return delayKeyProbability;
-  }
-
   public long getDelayKeyMsec() {
-    return delayKeyMsec;
-  }
-
-  /**
-   * Get the probability of the request being throttled.
-   * @return a value 0 - 1.0f.
-   */
-  public float getThrottleProbability() {
-    return throttleProbability;
+    return policy.getDelayKeyMsec();
   }
 
   /**
@@ -582,36 +526,28 @@ public class InconsistentAmazonS3Client extends 
AmazonS3Client {
    * @param throttleProbability the probability of a request being throttled.
    */
   public void setThrottleProbability(float throttleProbability) {
-    this.throttleProbability = validProbability(throttleProbability);
-  }
-
-  /**
-   * Validate a probability option.
-   * @param p probability
-   * @return the probability, if valid
-   * @throws IllegalArgumentException if the probability is out of range.
-   */
-  private float validProbability(float p) {
-    Preconditions.checkArgument(p >= 0.0f && p <= 1.0f,
-        "Probability out of range 0 to 1 %s", p);
-    return p;
+    policy.setThrottleProbability(throttleProbability);
   }
 
   /**
    * Conditionally fail the operation.
+   * @param errorMsg description of failure
+   * @param statusCode http status code for error
    * @throws AmazonClientException if the client chooses to fail
    * the request.
    */
-  private void maybeFail() throws AmazonClientException {
+  private void maybeFail(String errorMsg, int statusCode)
+      throws AmazonClientException {
     // code structure here is to line up for more failures later
     AmazonServiceException ex = null;
-    if (trueWithProbability(throttleProbability)) {
+    if (policy.trueWithProbability(policy.getThrottleProbability())) {
       // throttle the request
-      ex = new AmazonServiceException("throttled"
+      ex = new AmazonServiceException(errorMsg
           + " count = " + (failureCounter.get() + 1), null);
-      ex.setStatusCode(503);
+      ex.setStatusCode(statusCode);
     }
 
+    int failureLimit = policy.getFailureLimit();
     if (ex != null) {
       long count = failureCounter.incrementAndGet();
       if (failureLimit == 0
@@ -621,16 +557,37 @@ public class InconsistentAmazonS3Client extends 
AmazonS3Client {
     }
   }
 
+  private void maybeFail() {
+    maybeFail("throttled", 503);
+  }
+
   /**
    * Set the limit on failures before all operations pass through.
    * This resets the failure count.
    * @param limit limit; "0" means "no limit"
    */
   public void setFailureLimit(int limit) {
-    this.failureLimit = limit;
+    policy.setFailureLimit(limit);
     failureCounter.set(0);
   }
 
+  @Override
+  public S3Object getObject(GetObjectRequest var1) throws SdkClientException,
+      AmazonServiceException {
+    maybeFail("file not found", 404);
+    S3Object o = super.getObject(var1);
+    LOG.debug("Wrapping in InconsistentS3Object for key {}", var1.getKey());
+    return new InconsistentS3Object(o, policy);
+  }
+
+  @Override
+  public S3Object getObject(String bucketName, String key)
+      throws SdkClientException, AmazonServiceException {
+    S3Object o = super.getObject(bucketName, key);
+    LOG.debug("Wrapping in InconsistentS3Object for key {}", key);
+    return new InconsistentS3Object(o, policy);
+  }
+
   /** Since ObjectListing is immutable, we just override it with wrapper. */
   @SuppressWarnings("serial")
   private static class CustomObjectListing extends ObjectListing {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8110d6a0/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3Object.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3Object.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3Object.java
new file mode 100644
index 0000000..496ca1b
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3Object.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.amazonaws.services.s3.internal.AmazonS3ExceptionBuilder;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wrapper around S3Object so we can do failure injection on
+ * getObjectContent() and S3ObjectInputStream.
+ * See also {@link InconsistentAmazonS3Client}.
+ */
+@SuppressWarnings({"NonSerializableFieldInSerializableClass", "serial"})
+public class InconsistentS3Object extends S3Object {
+
+  // This should be configurable, probably.
+  public static final int MAX_READ_FAILURES = 100;
+
+  private static int readFailureCounter = 0;
+  private transient S3Object wrapped;
+  private transient FailureInjectionPolicy policy;
+  private final static transient Logger LOG = LoggerFactory.getLogger(
+      InconsistentS3Object.class);
+
+  public InconsistentS3Object(S3Object wrapped, FailureInjectionPolicy policy) 
{
+    this.wrapped = wrapped;
+    this.policy = policy;
+  }
+
+  @Override
+  public S3ObjectInputStream getObjectContent() {
+    return new InconsistentS3InputStream(wrapped.getObjectContent());
+  }
+
+  @Override
+  public String toString() {
+    return "InconsistentS3Object wrapping: " + wrapped.toString();
+  }
+
+  @Override
+  public ObjectMetadata getObjectMetadata() {
+    return wrapped.getObjectMetadata();
+  }
+
+  @Override
+  public void setObjectMetadata(ObjectMetadata metadata) {
+    wrapped.setObjectMetadata(metadata);
+  }
+
+  @Override
+  public void setObjectContent(S3ObjectInputStream objectContent) {
+    wrapped.setObjectContent(objectContent);
+  }
+
+  @Override
+  public void setObjectContent(InputStream objectContent) {
+    wrapped.setObjectContent(objectContent);
+  }
+
+  @Override
+  public String getBucketName() {
+    return wrapped.getBucketName();
+  }
+
+  @Override
+  public void setBucketName(String bucketName) {
+    wrapped.setBucketName(bucketName);
+  }
+
+  @Override
+  public String getKey() {
+    return wrapped.getKey();
+  }
+
+  @Override
+  public void setKey(String key) {
+    wrapped.setKey(key);
+  }
+
+  @Override
+  public String getRedirectLocation() {
+    return wrapped.getRedirectLocation();
+  }
+
+  @Override
+  public void setRedirectLocation(String redirectLocation) {
+    wrapped.setRedirectLocation(redirectLocation);
+  }
+
+  @Override
+  public Integer getTaggingCount() {
+    return wrapped.getTaggingCount();
+  }
+
+  @Override
+  public void setTaggingCount(Integer taggingCount) {
+    wrapped.setTaggingCount(taggingCount);
+  }
+
+  @Override
+  public void close() throws IOException {
+    wrapped.close();
+  }
+
+  @Override
+  public boolean isRequesterCharged() {
+    return wrapped.isRequesterCharged();
+  }
+
+  @Override
+  public void setRequesterCharged(boolean isRequesterCharged) {
+    wrapped.setRequesterCharged(isRequesterCharged);
+  }
+
+  private AmazonS3Exception mockException(String msg, int httpResponse) {
+    AmazonS3ExceptionBuilder builder = new AmazonS3ExceptionBuilder();
+    builder.setErrorMessage(msg);
+    builder.setStatusCode(httpResponse); // this is the important part
+    builder.setErrorCode(String.valueOf(httpResponse));
+    return builder.build();
+  }
+
+  /**
+   * Insert a failiure injection point for a read call.
+   * @throw IOException, as codepath is on InputStream, not other SDK call.
+   */
+  private void readFailpoint(int off, int len) throws IOException {
+    if (shouldInjectFailure(getKey())) {
+      String error = String.format(
+          "read(b, %d, %d) on key %s failed: injecting error %d/%d" +
+              " for test.", off, len, getKey(), readFailureCounter,
+          MAX_READ_FAILURES);
+      throw new FileNotFoundException(error);
+    }
+  }
+
+  /**
+   * Insert a failiure injection point for an InputStream skip() call.
+   * @throw IOException, as codepath is on InputStream, not other SDK call.
+   */
+  private void skipFailpoint(long len) throws IOException {
+    if (shouldInjectFailure(getKey())) {
+      String error = String.format(
+          "skip(%d) on key %s failed: injecting error %d/%d for test.",
+          len, getKey(), readFailureCounter, MAX_READ_FAILURES);
+      throw new FileNotFoundException(error);
+    }
+  }
+
+  private boolean shouldInjectFailure(String key) {
+    if (policy.shouldDelay(key) &&
+        readFailureCounter < MAX_READ_FAILURES) {
+      readFailureCounter++;
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Wraps S3ObjectInputStream and implements failure injection.
+   */
+  protected class InconsistentS3InputStream extends S3ObjectInputStream {
+    private S3ObjectInputStream wrapped;
+
+    public InconsistentS3InputStream(S3ObjectInputStream wrapped) {
+      // seems awkward to have the stream wrap itself.
+      super(wrapped, wrapped.getHttpRequest());
+      this.wrapped = wrapped;
+    }
+
+    @Override
+    public void abort() {
+      wrapped.abort();
+    }
+
+    @Override
+    public int available() throws IOException {
+      return wrapped.available();
+    }
+
+    @Override
+    public void close() throws IOException {
+      wrapped.close();
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+      skipFailpoint(n);
+      return wrapped.skip(n);
+    }
+
+    @Override
+    public int read() throws IOException {
+      LOG.debug("read() for key {}", getKey());
+      readFailpoint(0, 1);
+      return wrapped.read();
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      LOG.debug("read(b, {}, {}) for key {}", off, len, getKey());
+      readFailpoint(off, len);
+      return wrapped.read(b, off, len);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8110d6a0/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
index 875948e..a007ba1 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
@@ -310,6 +310,9 @@ public class Invoker {
     boolean shouldRetry;
     do {
       try {
+        if (retryCount > 0) {
+          LOG.debug("retry #{}", retryCount);
+        }
         // execute the operation, returning if successful
         return operation.execute();
       } catch (IOException | SdkBaseException e) {
@@ -327,8 +330,6 @@ public class Invoker {
             (SdkBaseException)caught);
       }
 
-
-      int attempts = retryCount + 1;
       try {
         // decide action base on operation, invocation count, etc
         retryAction = retryPolicy.shouldRetry(translated, retryCount, 0,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8110d6a0/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 99901ba..4424eba 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -166,6 +166,10 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities {
   // APIs on an uninitialized filesystem.
   private Invoker invoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL,
       Invoker.LOG_EVENT);
+  // Only used for very specific code paths which behave differently for
+  // S3Guard. Retries FileNotFound, so be careful if you use this.
+  private Invoker s3guardInvoker = new 
Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL,
+      Invoker.LOG_EVENT);
   private final Retried onRetry = this::operationRetried;
   private String bucket;
   private int maxKeys;
@@ -251,6 +255,8 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities {
       s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
           .createS3Client(name);
       invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
+      s3guardInvoker = new Invoker(new S3GuardExistsRetryPolicy(getConf()),
+          onRetry);
       writeHelper = new WriteOperationHelper(this, getConf());
 
       maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
@@ -697,18 +703,20 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities {
     }
 
     return new FSDataInputStream(
-        new S3AInputStream(new S3ObjectAttributes(
-          bucket,
-          pathToKey(f),
-          serverSideEncryptionAlgorithm,
-          getServerSideEncryptionKey(bucket, getConf())),
-            fileStatus.getLen(),
-            s3,
+        new S3AInputStream(new S3AReadOpContext(hasMetadataStore(),
+            invoker,
+            s3guardInvoker,
             statistics,
             instrumentation,
+            fileStatus),
+            new S3ObjectAttributes(bucket,
+                pathToKey(f),
+                serverSideEncryptionAlgorithm,
+                getServerSideEncryptionKey(bucket, getConf())),
+            fileStatus.getLen(),
+            s3,
             readAhead,
-            inputPolicy,
-            invoker));
+            inputPolicy));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8110d6a0/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 0074143..c54d3e26 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.CanSetReadahead;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FileSystem;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,10 +71,11 @@ public class S3AInputStream extends FSInputStream 
implements CanSetReadahead {
    */
   private volatile boolean closed;
   private S3ObjectInputStream wrappedStream;
-  private final FileSystem.Statistics stats;
+  private final S3AReadOpContext context;
   private final AmazonS3 client;
   private final String bucket;
   private final String key;
+  private final String pathStr;
   private final long contentLength;
   private final String uri;
   private static final Logger LOG =
@@ -85,7 +85,6 @@ public class S3AInputStream extends FSInputStream implements 
CanSetReadahead {
   private String serverSideEncryptionKey;
   private S3AInputPolicy inputPolicy;
   private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
-  private final Invoker invoker;
 
   /**
    * This is the actual position within the object, used by
@@ -108,40 +107,33 @@ public class S3AInputStream extends FSInputStream 
implements CanSetReadahead {
    * Create the stream.
    * This does not attempt to open it; that is only done on the first
    * actual read() operation.
+   * @param ctx operation context
    * @param s3Attributes object attributes from a HEAD request
    * @param contentLength length of content
    * @param client S3 client to use
-   * @param stats statistics to update
-   * @param instrumentation instrumentation to update
    * @param readahead readahead bytes
    * @param inputPolicy IO policy
-   * @param invoker preconfigured invoker
    */
-  public S3AInputStream(S3ObjectAttributes s3Attributes,
-      long contentLength,
-      AmazonS3 client,
-      FileSystem.Statistics stats,
-      S3AInstrumentation instrumentation,
-      long readahead,
-      S3AInputPolicy inputPolicy,
-      Invoker invoker) {
+  public S3AInputStream(S3AReadOpContext ctx, S3ObjectAttributes s3Attributes,
+      long contentLength, AmazonS3 client, long readahead,
+      S3AInputPolicy inputPolicy) {
     Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()),
         "No Bucket");
     Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key");
     Preconditions.checkArgument(contentLength >= 0, "Negative content length");
+    this.context = ctx;
     this.bucket = s3Attributes.getBucket();
     this.key = s3Attributes.getKey();
+    this.pathStr = ctx.dstFileStatus.getPath().toString();
     this.contentLength = contentLength;
     this.client = client;
-    this.stats = stats;
     this.uri = "s3a://" + this.bucket + "/" + this.key;
-    this.streamStatistics = instrumentation.newInputStreamStatistics();
+    this.streamStatistics = ctx.instrumentation.newInputStreamStatistics();
     this.serverSideEncryptionAlgorithm =
         s3Attributes.getServerSideEncryptionAlgorithm();
     this.serverSideEncryptionKey = s3Attributes.getServerSideEncryptionKey();
     setInputPolicy(inputPolicy);
     setReadahead(readahead);
-    this.invoker = invoker;
   }
 
   /**
@@ -162,6 +154,7 @@ public class S3AInputStream extends FSInputStream 
implements CanSetReadahead {
    * @param length length requested
    * @throws IOException on any failure to open the object
    */
+  @Retries.OnceTranslated
   private synchronized void reopen(String reason, long targetPos, long length)
       throws IOException {
 
@@ -185,7 +178,7 @@ public class S3AInputStream extends FSInputStream 
implements CanSetReadahead {
     }
     String text = String.format("Failed to %s %s at %d",
         (opencount == 0 ? "open" : "re-open"), uri, targetPos);
-    S3Object object = invoker.retry(text, uri, true,
+    S3Object object = context.getReadInvoker().once(text, uri,
         () -> client.getObject(request));
     wrappedStream = object.getObjectContent();
     contentRangeStart = targetPos;
@@ -241,6 +234,7 @@ public class S3AInputStream extends FSInputStream 
implements CanSetReadahead {
    * @param length length of content that needs to be read from targetPos
    * @throws IOException
    */
+  @Retries.OnceTranslated
   private void seekInStream(long targetPos, long length) throws IOException {
     checkNotClosed();
     if (wrappedStream == null) {
@@ -317,14 +311,22 @@ public class S3AInputStream extends FSInputStream 
implements CanSetReadahead {
    * @param targetPos position from where data should be read
    * @param len length of the content that needs to be read
    */
+  @Retries.RetryTranslated
   private void lazySeek(long targetPos, long len) throws IOException {
-    //For lazy seek
-    seekInStream(targetPos, len);
 
-    //re-open at specific location if needed
-    if (wrappedStream == null) {
-      reopen("read from new offset", targetPos, len);
-    }
+    // With S3Guard, the metadatastore gave us metadata for the file in
+    // open(), so we use a slightly different retry policy.
+    Invoker invoker = context.getReadInvoker();
+    invoker.retry("lazySeek", pathStr, true,
+        () -> {
+          //For lazy seek
+          seekInStream(targetPos, len);
+
+          //re-open at specific location if needed
+          if (wrappedStream == null) {
+            reopen("read from new offset", targetPos, len);
+          }
+        });
   }
 
   /**
@@ -334,29 +336,44 @@ public class S3AInputStream extends FSInputStream 
implements CanSetReadahead {
    */
   private void incrementBytesRead(long bytesRead) {
     streamStatistics.bytesRead(bytesRead);
-    if (stats != null && bytesRead > 0) {
-      stats.incrementBytesRead(bytesRead);
+    if (context.stats != null && bytesRead > 0) {
+      context.stats.incrementBytesRead(bytesRead);
     }
   }
 
   @Override
+  @Retries.RetryTranslated  // Some retries only happen w/ S3Guard, as 
intended.
   public synchronized int read() throws IOException {
     checkNotClosed();
     if (this.contentLength == 0 || (nextReadPos >= contentLength)) {
       return -1;
     }
 
-    int byteRead;
     try {
       lazySeek(nextReadPos, 1);
-      byteRead = wrappedStream.read();
     } catch (EOFException e) {
       return -1;
-    } catch (IOException e) {
-      onReadFailure(e, 1);
-      byteRead = wrappedStream.read();
     }
 
+    // With S3Guard, the metadatastore gave us metadata for the file in
+    // open(), so we use a slightly different retry policy.
+    // read() may not be likely to fail, but reopen() does a GET which
+    // certainly could.
+    Invoker invoker = context.getReadInvoker();
+    int byteRead = invoker.retry("read", pathStr, true,
+        () -> {
+          int b;
+          try {
+            b = wrappedStream.read();
+          } catch (EOFException e) {
+            return -1;
+          } catch (IOException e) {
+            onReadFailure(e, 1);
+            b = wrappedStream.read();
+          }
+          return b;
+        });
+
     if (byteRead >= 0) {
       pos++;
       nextReadPos++;
@@ -375,10 +392,11 @@ public class S3AInputStream extends FSInputStream 
implements CanSetReadahead {
    * @param length length of data being attempted to read
    * @throws IOException any exception thrown on the re-open attempt.
    */
+  @Retries.OnceTranslated
   private void onReadFailure(IOException ioe, int length) throws IOException {
-    LOG.info("Got exception while trying to read from stream {}"
-        + " trying to recover: "+ ioe, uri);
-    LOG.debug("While trying to read from stream {}", uri, ioe);
+
+    LOG.info("Got exception while trying to read from stream {}" +
+        " trying to recover: " + ioe, uri);
     streamStatistics.readException();
     reopen("failure recovery", pos, length);
   }
@@ -392,6 +410,7 @@ public class S3AInputStream extends FSInputStream 
implements CanSetReadahead {
    * @throws IOException if there are other problems
    */
   @Override
+  @Retries.RetryTranslated  // Some retries only happen w/ S3Guard, as 
intended.
   public synchronized int read(byte[] buf, int off, int len)
       throws IOException {
     checkNotClosed();
@@ -412,18 +431,27 @@ public class S3AInputStream extends FSInputStream 
implements CanSetReadahead {
       return -1;
     }
 
-    int bytesRead;
-    try {
-      streamStatistics.readOperationStarted(nextReadPos, len);
-      bytesRead = wrappedStream.read(buf, off, len);
-    } catch (EOFException e) {
-      onReadFailure(e, len);
-      // the base implementation swallows EOFs.
-      return -1;
-    } catch (IOException e) {
-      onReadFailure(e, len);
-      bytesRead = wrappedStream.read(buf, off, len);
-    }
+    // With S3Guard, the metadatastore gave us metadata for the file in
+    // open(), so we use a slightly different retry policy.
+    // read() may not be likely to fail, but reopen() does a GET which
+    // certainly could.
+    Invoker invoker = context.getReadInvoker();
+
+    streamStatistics.readOperationStarted(nextReadPos, len);
+    int bytesRead = invoker.retry("read", pathStr, true,
+        () -> {
+          int bytes;
+          try {
+            bytes = wrappedStream.read(buf, off, len);
+          } catch (EOFException e) {
+            // the base implementation swallows EOFs.
+            return -1;
+          } catch (IOException e) {
+            onReadFailure(e, len);
+            bytes= wrappedStream.read(buf, off, len);
+          }
+          return bytes;
+        });
 
     if (bytesRead > 0) {
       pos += bytesRead;
@@ -481,6 +509,7 @@ public class S3AInputStream extends FSInputStream 
implements CanSetReadahead {
    * @param length length of the stream.
    * @param forceAbort force an abort; used if explicitly requested.
    */
+  @Retries.OnceRaw
   private void closeStream(String reason, long length, boolean forceAbort) {
     if (wrappedStream != null) {
 
@@ -645,6 +674,7 @@ public class S3AInputStream extends FSInputStream 
implements CanSetReadahead {
    *
    */
   @Override
+  @Retries.RetryTranslated  // Some retries only happen w/ S3Guard, as 
intended.
   public void readFully(long position, byte[] buffer, int offset, int length)
       throws IOException {
     checkNotClosed();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8110d6a0/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOpContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOpContext.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOpContext.java
new file mode 100644
index 0000000..fba39b9
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOpContext.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * Base class for operation context struct passed through codepaths for main
+ * S3AFileSystem operations.
+ * Anything op-specific should be moved to a subclass of this.
+ */
+@SuppressWarnings("visibilitymodifier")  // I want a struct of finals, for 
real.
+public class S3AOpContext {
+
+  final boolean isS3GuardEnabled;
+  final Invoker invoker;
+  @Nullable final FileSystem.Statistics stats;
+  final S3AInstrumentation instrumentation;
+  @Nullable final Invoker s3guardInvoker;
+
+  /** FileStatus for "destination" path being operated on. */
+  protected final FileStatus dstFileStatus;
+
+  /**
+   * Alternate constructor that allows passing in two invokers, the common
+   * one, and another with the S3Guard Retry Policy.
+   * @param isS3GuardEnabled true if s3Guard is active
+   * @param invoker invoker, which contains retry policy
+   * @param s3guardInvoker s3guard-specific retry policy invoker
+   * @param stats optional stats object
+   * @param instrumentation instrumentation to use
+   * @param dstFileStatus file status from existence check
+   */
+  public S3AOpContext(boolean isS3GuardEnabled, Invoker invoker,
+      Invoker s3guardInvoker, @Nullable FileSystem.Statistics stats,
+      S3AInstrumentation instrumentation, FileStatus dstFileStatus) {
+
+    Preconditions.checkNotNull(invoker, "Null invoker arg");
+    Preconditions.checkNotNull(instrumentation, "Null instrumentation arg");
+    Preconditions.checkNotNull(dstFileStatus, "Null dstFileStatus arg");
+    this.isS3GuardEnabled = isS3GuardEnabled;
+    Preconditions.checkArgument(!isS3GuardEnabled || s3guardInvoker != null,
+        "S3Guard invoker required: S3Guard is enabled.");
+    this.invoker = invoker;
+    this.s3guardInvoker = s3guardInvoker;
+    this.stats = stats;
+    this.instrumentation = instrumentation;
+    this.dstFileStatus = dstFileStatus;
+  }
+
+  /**
+   * Constructor using common invoker and retry policy.
+   * @param isS3GuardEnabled true if s3Guard is active
+   * @param invoker invoker, which contains retry policy
+   * @param stats optional stats object
+   * @param instrumentation instrumentation to use
+   * @param dstFileStatus
+   */
+  public S3AOpContext(boolean isS3GuardEnabled, Invoker invoker,
+      @Nullable FileSystem.Statistics stats, S3AInstrumentation 
instrumentation,
+      FileStatus dstFileStatus) {
+    this(isS3GuardEnabled, invoker, null, stats, instrumentation,
+        dstFileStatus);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8110d6a0/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
new file mode 100644
index 0000000..220cd0d
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+
+import javax.annotation.Nullable;
+
+/**
+ * Read-specific operation context struct.
+ */
+public class S3AReadOpContext extends S3AOpContext {
+  public S3AReadOpContext(boolean isS3GuardEnabled, Invoker invoker,
+      Invoker s3guardInvoker, @Nullable FileSystem.Statistics stats,
+      S3AInstrumentation instrumentation, FileStatus dstFileStatus) {
+    super(isS3GuardEnabled, invoker, s3guardInvoker, stats, instrumentation,
+        dstFileStatus);
+  }
+
+  public S3AReadOpContext(boolean isS3GuardEnabled, Invoker invoker,
+      @Nullable FileSystem.Statistics stats, S3AInstrumentation 
instrumentation,
+      FileStatus dstFileStatus) {
+    super(isS3GuardEnabled, invoker, stats, instrumentation, dstFileStatus);
+  }
+
+  /**
+   * Get invoker to use for read operations.  When S3Guard is enabled we use
+   * the S3Guard invoker, which deals with things like FileNotFoundException
+   * differently.
+   * @return invoker to use for read codepaths
+   */
+  public Invoker getReadInvoker() {
+    if (isS3GuardEnabled) {
+      return s3guardInvoker;
+    } else {
+      return invoker;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8110d6a0/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
index e37a554..d857330 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
@@ -76,10 +76,30 @@ import static org.apache.hadoop.fs.s3a.Constants.*;
  * @see <a 
href="http://docs.aws.amazon.com/AmazonS3/latest/dev/ErrorBestPractices.html";>Amazon
 S3 Error Best Practices</a>
  * @see <a 
href="http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/CommonErrors.html";>Dynamo
 DB Commmon errors</a>
  */
+@SuppressWarnings("visibilitymodifier")  // I want a struct of finals, for 
real.
 public class S3ARetryPolicy implements RetryPolicy {
 
+  /** Final retry policy we end up with. */
   private final RetryPolicy retryPolicy;
 
+  // Retry policies for mapping exceptions to
+
+  /** Base policy from configuration. */
+  protected final RetryPolicy fixedRetries;
+
+  /** Rejection of all non-idempotent calls except specific failures. */
+  protected final RetryPolicy retryIdempotentCalls;
+
+  /** Policy for throttle requests, which are considered repeatable, even for
+   * non-idempotent calls, as the service rejected the call entirely. */
+  protected final RetryPolicy throttlePolicy;
+
+  /** No retry on network and tangible API issues. */
+  protected final RetryPolicy fail = RetryPolicies.TRY_ONCE_THEN_FAIL;
+
+  /** Client connectivity: fixed retries without care for idempotency. */
+  protected final RetryPolicy connectivityFailure;
+
   /**
    * Instantiate.
    * @param conf configuration to read.
@@ -88,7 +108,7 @@ public class S3ARetryPolicy implements RetryPolicy {
     Preconditions.checkArgument(conf != null, "Null configuration");
 
     // base policy from configuration
-    RetryPolicy fixedRetries = retryUpToMaximumCountWithFixedSleep(
+    fixedRetries = retryUpToMaximumCountWithFixedSleep(
         conf.getInt(RETRY_LIMIT, RETRY_LIMIT_DEFAULT),
         conf.getTimeDuration(RETRY_INTERVAL,
             RETRY_INTERVAL_DEFAULT,
@@ -97,25 +117,33 @@ public class S3ARetryPolicy implements RetryPolicy {
 
     // which is wrapped by a rejection of all non-idempotent calls except
     // for specific failures.
-    RetryPolicy retryIdempotentCalls = new FailNonIOEs(
+    retryIdempotentCalls = new FailNonIOEs(
         new IdempotencyRetryFilter(fixedRetries));
 
     // and a separate policy for throttle requests, which are considered
     // repeatable, even for non-idempotent calls, as the service
     // rejected the call entirely
-    RetryPolicy throttlePolicy = exponentialBackoffRetry(
+    throttlePolicy = exponentialBackoffRetry(
         conf.getInt(RETRY_THROTTLE_LIMIT, RETRY_THROTTLE_LIMIT_DEFAULT),
         conf.getTimeDuration(RETRY_THROTTLE_INTERVAL,
             RETRY_THROTTLE_INTERVAL_DEFAULT,
             TimeUnit.MILLISECONDS),
         TimeUnit.MILLISECONDS);
 
-    // no retry on network and tangible API issues
-    RetryPolicy fail = RetryPolicies.TRY_ONCE_THEN_FAIL;
-
     // client connectivity: fixed retries without care for idempotency
-    RetryPolicy connectivityFailure = fixedRetries;
+    connectivityFailure = fixedRetries;
 
+    Map<Class<? extends Exception>, RetryPolicy> policyMap =
+        createExceptionMap();
+    retryPolicy = retryByException(retryIdempotentCalls, policyMap);
+  }
+
+  /**
+   * Subclasses can override this like a constructor to change behavior: call
+   * superclass method, then modify it as needed, and return it.
+   * @return Map from exception type to RetryPolicy
+   */
+  protected Map<Class<? extends Exception>, RetryPolicy> createExceptionMap() {
     // the policy map maps the exact classname; subclasses do not
     // inherit policies.
     Map<Class<? extends Exception>, RetryPolicy> policyMap = new HashMap<>();
@@ -126,7 +154,6 @@ public class S3ARetryPolicy implements RetryPolicy {
     policyMap.put(InterruptedException.class, fail);
     // note this does not pick up subclasses (like socket timeout)
     policyMap.put(InterruptedIOException.class, fail);
-    policyMap.put(AWSRedirectException.class, fail);
     // interesting question: should this be retried ever?
     policyMap.put(AccessDeniedException.class, fail);
     policyMap.put(FileNotFoundException.class, fail);
@@ -169,7 +196,7 @@ public class S3ARetryPolicy implements RetryPolicy {
     // trigger sleep
     policyMap.put(ProvisionedThroughputExceededException.class, 
throttlePolicy);
 
-    retryPolicy = retryByException(retryIdempotentCalls, policyMap);
+    return policyMap;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8110d6a0/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3GuardExistsRetryPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3GuardExistsRetryPolicy.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3GuardExistsRetryPolicy.java
new file mode 100644
index 0000000..023d0c3
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3GuardExistsRetryPolicy.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.FileNotFoundException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicy;
+
+
+/**
+ * Slightly-modified retry policy for cases when the file is present in the
+ * MetadataStore, but may be still throwing FileNotFoundException from S3.
+ */
+public class S3GuardExistsRetryPolicy extends S3ARetryPolicy {
+  /**
+   * Instantiate.
+   * @param conf configuration to read.
+   */
+  public S3GuardExistsRetryPolicy(Configuration conf) {
+    super(conf);
+  }
+
+  @Override
+  protected Map<Class<? extends Exception>, RetryPolicy> createExceptionMap() {
+    Map<Class<? extends Exception>, RetryPolicy> b = 
super.createExceptionMap();
+    b.put(FileNotFoundException.class, retryIdempotentCalls);
+    return b;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8110d6a0/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
index 4dfbdc8..4c4043e 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
@@ -688,9 +688,11 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
   @Override
   @Retries.OnceRaw
   public void put(Collection<PathMetadata> metas) throws IOException {
-    LOG.debug("Saving batch to table {} in region {}", tableName, region);
 
-    processBatchWriteRequest(null, 
pathMetadataToItem(completeAncestry(metas)));
+    Item[] items = pathMetadataToItem(completeAncestry(metas));
+    LOG.debug("Saving batch of {} items to table {}, region {}", items.length,
+        tableName, region);
+    processBatchWriteRequest(null, items);
   }
 
   /**
@@ -1076,6 +1078,15 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
         });
   }
 
+  @Retries.RetryTranslated
+  @VisibleForTesting
+  void provisionTableBlocking(Long readCapacity, Long writeCapacity)
+      throws IOException {
+    provisionTable(readCapacity, writeCapacity);
+    waitForTableActive(table);
+  }
+
+  @VisibleForTesting
   Table getTable() {
     return table;
   }
@@ -1173,15 +1184,12 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
             S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY,
             currentWrite);
 
-    ProvisionedThroughput throughput = new ProvisionedThroughput()
-        .withReadCapacityUnits(newRead)
-        .withWriteCapacityUnits(newWrite);
     if (newRead != currentRead || newWrite != currentWrite) {
       LOG.info("Current table capacity is read: {}, write: {}",
           currentRead, currentWrite);
       LOG.info("Changing capacity of table to read: {}, write: {}",
           newRead, newWrite);
-      table.updateTable(throughput);
+      provisionTableBlocking(newRead, newWrite);
     } else {
       LOG.info("Table capacity unchanged at read: {}, write: {}",
           newRead, newWrite);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8110d6a0/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
index bdab7b7..69d181e 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
@@ -34,6 +34,9 @@ import org.apache.hadoop.fs.Path;
  * {@code MetadataStore} defines the set of operations that any metadata store
  * implementation must provide.  Note that all {@link Path} objects provided
  * to methods must be absolute, not relative paths.
+ * Implementations must implement any retries needed internally, such that
+ * transient errors are generally recovered from without throwing exceptions
+ * from this API.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8110d6a0/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md 
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
index 4924b45..4454d5c 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
@@ -300,6 +300,11 @@ By their very nature they are slow. And, as their 
execution time is often
 limited by bandwidth between the computer running the tests and the S3 
endpoint,
 parallel execution does not speed these tests up.
 
+***Note: Running scale tests with -Ds3guard and -Ddynamo requires that
+you use a private, testing-only DynamoDB table.*** The tests do disruptive
+things such as deleting metadata and setting the provisioned throughput
+to very low values.
+
 ### <a name="enabling-scale"></a> Enabling the Scale Tests
 
 The tests are enabled if the `scale` property is set in the maven build

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8110d6a0/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java
index eb4f70b..6ac803e 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java
@@ -22,16 +22,22 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.contract.s3a.S3AContract;
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
+import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.junit.Test;
 
 import java.io.FileNotFoundException;
-import java.util.concurrent.Callable;
+import java.io.InputStream;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
 import static org.apache.hadoop.fs.s3a.Constants.*;
-import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*;
+import static org.apache.hadoop.fs.s3a.FailureInjectionPolicy.*;
+import static org.apache.hadoop.test.LambdaTestUtils.eventually;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /**
  * Tests S3A behavior under forced inconsistency via {@link
@@ -43,6 +49,8 @@ import static 
org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*;
  */
 public class ITestS3AInconsistency extends AbstractS3ATestBase {
 
+  private static final int OPEN_READ_ITERATIONS = 20;
+
   @Override
   protected AbstractFSContract createContract(Configuration conf) {
     conf.setClass(S3_CLIENT_FACTORY_IMPL, InconsistentS3ClientFactory.class,
@@ -86,15 +94,103 @@ public class ITestS3AInconsistency extends 
AbstractS3ATestBase {
     }
   }
 
+
+  /**
+   * Ensure that deleting a file with an open read stream does eventually cause
+   * readers to get a FNFE, even with S3Guard and its retries enabled.
+   * In real usage, S3Guard should be enabled for all clients that modify the
+   * file, so the delete would be immediately recorded in the MetadataStore.
+   * Here, however, we test deletion from under S3Guard to make sure it still
+   * eventually propagates the FNFE after any retry policies are exhausted.
+   */
+  @Test
+  public void testOpenDeleteRead() throws Exception {
+    S3AFileSystem fs = getFileSystem();
+    Path p = path("testOpenDeleteRead.txt");
+    writeTextFile(fs, p, "1337c0d3z", true);
+    try (InputStream s = fs.open(p)) {
+      // Disable s3guard, delete file underneath it, re-enable s3guard
+      MetadataStore metadataStore = fs.getMetadataStore();
+      fs.setMetadataStore(new NullMetadataStore());
+      fs.delete(p, false);
+      fs.setMetadataStore(metadataStore);
+      eventually(1000, 200, () -> {
+        intercept(FileNotFoundException.class, () -> s.read());
+      });
+    }
+  }
+
+  /**
+   * Test read() path behavior when getFileStatus() succeeds but subsequent
+   * read() on the input stream fails due to eventual consistency.
+   * There are many points in the InputStream codepaths that can fail. We set
+   * a probability of failure and repeat the test multiple times to achieve
+   * decent coverage.
+   */
+  @Test
+  public void testOpenFailOnRead() throws Exception {
+
+    S3AFileSystem fs = getFileSystem();
+
+    // 1. Patch in a different failure injection policy with <1.0 probability
+    Configuration conf = fs.getConf();
+    conf.setFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY, 0.5f);
+    InconsistentAmazonS3Client.setFailureInjectionPolicy(fs,
+        new FailureInjectionPolicy(conf));
+
+    // 2. Make sure no ancestor dirs exist
+    Path dir = path("ancestor");
+    fs.delete(dir, true);
+    waitUntilDeleted(dir);
+
+    // 3. Create a descendant file, which implicitly creates ancestors
+    // This file has delayed visibility.
+    describe("creating test file");
+    Path path = path("ancestor/file-to-read-" + DEFAULT_DELAY_KEY_SUBSTRING);
+    writeTextFile(getFileSystem(), path, "Reading is fun", false);
+
+    // 4. Clear inconsistency so the first getFileStatus() can succeed, if we
+    // are not using S3Guard. If we are using S3Guard, it should tolerate the
+    // delayed visibility.
+    if (!fs.hasMetadataStore()) {
+      InconsistentAmazonS3Client.clearInconsistency(fs);
+    }
+
+    // ? Do we need multiple iterations when S3Guard is disabled?  For now,
+    // leaving it in
+    for (int i = 0; i < OPEN_READ_ITERATIONS; i++) {
+      doOpenFailOnReadTest(fs, path, i);
+    }
+  }
+
+  private void doOpenFailOnReadTest(S3AFileSystem fs, Path path, int iteration)
+      throws Exception {
+
+    // 4. Open the file
+    describe(String.format("i=%d: opening test file", iteration));
+    try(InputStream in = fs.open(path)) {
+      // 5. Assert expected behavior on read() failure.
+      int l = 4;
+      byte[] buf = new byte[l];
+      describe("reading test file");
+      // Use both read() variants
+      if ((iteration % 2) == 0) {
+        assertEquals(l, in.read(buf, 0, l));
+      } else {
+        in.read();
+      }
+    } catch (FileNotFoundException e) {
+      if (fs.hasMetadataStore()) {
+        LOG.error("Error:", e);
+        ContractTestUtils.fail("S3Guard failed to handle fail-on-read", e);
+      } else {
+        LOG.info("File not found on read(), as expected.");
+      }
+    }
+  }
+
   private void waitUntilDeleted(final Path p) throws Exception {
     LambdaTestUtils.eventually(30 * 1000, 1000,
-        new Callable<Void>() {
-          @Override
-          public Void call() throws Exception {
-            assertPathDoesNotExist("Dir should be deleted", p);
-            return null;
-          }
-        }
-    );
+        () -> assertPathDoesNotExist("Dir should be deleted", p));
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8110d6a0/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
index 4a81374..763819b 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.fs.s3a;
 
 import com.amazonaws.services.s3.model.ListObjectsV2Request;
 import com.amazonaws.services.s3.model.ListObjectsV2Result;
-import com.amazonaws.services.s3.AmazonS3;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -33,6 +32,7 @@ import org.junit.Assume;
 import org.junit.Test;
 
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -41,6 +41,7 @@ import java.util.List;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
 import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.FailureInjectionPolicy.*;
 import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*;
 
 /**
@@ -552,11 +553,10 @@ public class ITestS3GuardListConsistency extends 
AbstractS3ATestBase {
    * @param key
    * @param delimiter
    * @return
-   * @throws IOException
+   * @throws IOException on error
    */
-
   private ListObjectsV2Result listObjectsV2(S3AFileSystem fs,
-      String key, String delimiter) throws java.io.IOException {
+      String key, String delimiter) throws IOException {
     ListObjectsV2Request k = fs.createListObjectsRequest(key, delimiter)
         .getV2();
     return invoker.retryUntranslated("list", true,
@@ -565,9 +565,4 @@ public class ITestS3GuardListConsistency extends 
AbstractS3ATestBase {
         });
   }
 
-  private static void clearInconsistency(S3AFileSystem fs) throws Exception {
-    AmazonS3 s3 = fs.getAmazonS3ClientForTesting("s3guard");
-    InconsistentAmazonS3Client ic = InconsistentAmazonS3Client.castFrom(s3);
-    ic.clearInconsistency();
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8110d6a0/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index da0060e..4414746 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -51,7 +51,7 @@ import java.util.List;
 import java.util.concurrent.Callable;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
-import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*;
+import static org.apache.hadoop.fs.s3a.FailureInjectionPolicy.*;
 import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
@@ -819,7 +819,7 @@ public final class S3ATestUtils {
    * Turn on the inconsistent S3A FS client in a configuration,
    * with 100% probability of inconsistency, default delays.
    * For this to go live, the paths must include the element
-   * {@link InconsistentAmazonS3Client#DEFAULT_DELAY_KEY_SUBSTRING}.
+   * {@link FailureInjectionPolicy#DEFAULT_DELAY_KEY_SUBSTRING}.
    * @param conf configuration to patch
    * @param delay delay in millis
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8110d6a0/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
index 4730a90..b8610d6 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.FailureInjectionPolicy;
 import org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.WriteOperationHelper;
@@ -90,7 +91,7 @@ public abstract class AbstractCommitITest extends 
AbstractS3ATestBase {
   @Override
   protected Path path(String filepath) throws IOException {
     return useInconsistentClient() ?
-           super.path(InconsistentAmazonS3Client.DEFAULT_DELAY_KEY_SUBSTRING
+           super.path(FailureInjectionPolicy.DEFAULT_DELAY_KEY_SUBSTRING
                + "/" + filepath)
            : super.path(filepath);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8110d6a0/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java
new file mode 100644
index 0000000..02a8966
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.scale.AbstractITestS3AMetadataStoreScale;
+
+import static 
org.apache.hadoop.fs.s3a.s3guard.MetadataStoreTestBase.basicFileStatus;
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.junit.Assume.*;
+
+/**
+ * Scale test for DynamoDBMetadataStore.
+ */
+public class ITestDynamoDBMetadataStoreScale
+    extends AbstractITestS3AMetadataStoreScale {
+
+  private static final long BATCH_SIZE = 25;
+  private static final long SMALL_IO_UNITS = BATCH_SIZE / 4;
+
+  @Override
+  public MetadataStore createMetadataStore() throws IOException {
+    Configuration conf = getFileSystem().getConf();
+    String ddbTable = conf.get(S3GUARD_DDB_TABLE_NAME_KEY);
+    assumeNotNull("DynamoDB table is configured", ddbTable);
+    String ddbEndpoint = conf.get(S3GUARD_DDB_REGION_KEY);
+    assumeNotNull("DynamoDB endpoint is configured", ddbEndpoint);
+
+    DynamoDBMetadataStore ms = new DynamoDBMetadataStore();
+    ms.initialize(getFileSystem().getConf());
+    return ms;
+  }
+
+
+  /**
+   * Though the AWS SDK claims in documentation to handle retries and
+   * exponential backoff, we have witnessed
+   * com.amazonaws...dynamodbv2.model.ProvisionedThroughputExceededException
+   * (Status Code: 400; Error Code: ProvisionedThroughputExceededException)
+   * Hypothesis:
+   * Happens when the size of a batched write is bigger than the number of
+   * provisioned write units.  This test ensures we handle the case
+   * correctly, retrying w/ smaller batch instead of surfacing exceptions.
+   */
+  @Test
+  public void testBatchedWriteExceedsProvisioned() throws Exception {
+
+    final long iterations = 5;
+    boolean isProvisionedChanged;
+    List<PathMetadata> toCleanup = new ArrayList<>();
+
+    // Fail if someone changes a constant we depend on
+    assertTrue("Maximum batch size must big enough to run this test",
+        S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT >= BATCH_SIZE);
+
+    try (DynamoDBMetadataStore ddbms =
+         (DynamoDBMetadataStore)createMetadataStore()) {
+
+      DynamoDB ddb = ddbms.getDynamoDB();
+      String tableName = ddbms.getTable().getTableName();
+      final ProvisionedThroughputDescription existing =
+          ddb.getTable(tableName).describe().getProvisionedThroughput();
+
+      // If you set the same provisioned I/O as already set it throws an
+      // exception, avoid that.
+      isProvisionedChanged = (existing.getReadCapacityUnits() != SMALL_IO_UNITS
+          || existing.getWriteCapacityUnits() != SMALL_IO_UNITS);
+
+      if (isProvisionedChanged) {
+        // Set low provisioned I/O for dynamodb
+        describe("Provisioning dynamo tbl %s read/write -> %d/%d", tableName,
+            SMALL_IO_UNITS, SMALL_IO_UNITS);
+        // Blocks to ensure table is back to ready state before we proceed
+        ddbms.provisionTableBlocking(SMALL_IO_UNITS, SMALL_IO_UNITS);
+      } else {
+        describe("Skipping provisioning table I/O, already %d/%d",
+            SMALL_IO_UNITS, SMALL_IO_UNITS);
+      }
+
+      try {
+        // We know the dynamodb metadata store will expand a put of a path
+        // of depth N into a batch of N writes (all ancestors are written
+        // separately up to the root).  (Ab)use this for an easy way to write
+        // a batch of stuff that is bigger than the provisioned write units
+        try {
+          describe("Running %d iterations of batched put, size %d", iterations,
+              BATCH_SIZE);
+          long pruneItems = 0;
+          for (long i = 0; i < iterations; i++) {
+            Path longPath = pathOfDepth(BATCH_SIZE, String.valueOf(i));
+            FileStatus status = basicFileStatus(longPath, 0, false, 12345,
+                12345);
+            PathMetadata pm = new PathMetadata(status);
+
+            ddbms.put(pm);
+            toCleanup.add(pm);
+            pruneItems++;
+            // Having hard time reproducing Exceeded exception with put, also
+            // try occasional prune, which was the only stack trace I've seen
+            // (on JIRA)
+            if (pruneItems == BATCH_SIZE) {
+              describe("pruning files");
+              ddbms.prune(Long.MAX_VALUE /* all files */);
+              pruneItems = 0;
+            }
+          }
+        } finally {
+          describe("Cleaning up table %s", tableName);
+          for (PathMetadata pm : toCleanup) {
+            cleanupMetadata(ddbms, pm);
+          }
+        }
+      } finally {
+        if (isProvisionedChanged) {
+          long write = existing.getWriteCapacityUnits();
+          long read = existing.getReadCapacityUnits();
+          describe("Restoring dynamo tbl %s read/write -> %d/%d", tableName,
+              read, write);
+          ddbms.provisionTableBlocking(existing.getReadCapacityUnits(),
+              existing.getWriteCapacityUnits());
+        }
+      }
+    }
+  }
+
+  // Attempt do delete metadata, suppressing any errors
+  private void cleanupMetadata(MetadataStore ms, PathMetadata pm) {
+    try {
+      ms.forgetMetadata(pm.getFileStatus().getPath());
+    } catch (IOException ioe) {
+      // Ignore.
+    }
+  }
+
+  private Path pathOfDepth(long n, @Nullable String fileSuffix) {
+    StringBuilder sb = new StringBuilder();
+    for (long i = 0; i < n; i++) {
+      sb.append(i == 0 ? "/" + this.getClass().getSimpleName() : "lvl");
+      sb.append(i);
+      if (i == n-1 && fileSuffix != null) {
+        sb.append(fileSuffix);
+      }
+      sb.append("/");
+    }
+    return new Path(getFileSystem().getUri().toString(), sb.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8110d6a0/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
index c19ae91..e463ce4 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
@@ -839,7 +839,7 @@ public abstract class MetadataStoreTestBase extends Assert {
     return basicFileStatus(path, size, isDir, modTime, accessTime);
   }
 
-  FileStatus basicFileStatus(Path path, int size, boolean isDir,
+  public static FileStatus basicFileStatus(Path path, int size, boolean isDir,
       long newModTime, long newAccessTime) throws IOException {
     return new FileStatus(size, isDir, REPLICATION, BLOCK_SIZE, newModTime,
         newAccessTime, PERMISSION, OWNER, GROUP, path);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8110d6a0/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestDynamoDBMetadataStoreScale.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestDynamoDBMetadataStoreScale.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestDynamoDBMetadataStoreScale.java
deleted file mode 100644
index 3de1935..0000000
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestDynamoDBMetadataStoreScale.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a.scale;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore;
-import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
-
-import java.io.IOException;
-
-import static org.junit.Assume.*;
-import static org.apache.hadoop.fs.s3a.Constants.*;
-
-/**
- * Scale test for DynamoDBMetadataStore.
- */
-public class ITestDynamoDBMetadataStoreScale
-    extends AbstractITestS3AMetadataStoreScale {
-
-  @Override
-  public MetadataStore createMetadataStore() throws IOException {
-    Configuration conf = getFileSystem().getConf();
-    String ddbTable = conf.get(S3GUARD_DDB_TABLE_NAME_KEY);
-    assumeNotNull("DynamoDB table is configured", ddbTable);
-    String ddbEndpoint = conf.get(S3GUARD_DDB_REGION_KEY);
-    assumeNotNull("DynamoDB endpoint is configured", ddbEndpoint);
-
-    DynamoDBMetadataStore ms = new DynamoDBMetadataStore();
-    ms.initialize(getFileSystem().getConf());
-    return ms;
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to