This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9221704  HADOOP-16490. Avoid/handle cached 404s during S3A file 
creation.
9221704 is described below

commit 9221704f857e33a5f9e00c19d3705e46e94f427b
Author: Steve Loughran <ste...@cloudera.com>
AuthorDate: Wed Sep 11 16:46:25 2019 +0100

    HADOOP-16490. Avoid/handle cached 404s during S3A file creation.
    
    Contributed by Steve Loughran.
    
    This patch avoids issuing any HEAD path request when creating a file with 
overwrite=true,
    so 404s will not end up in the S3 load balancers unless someone calls 
getFileStatus/exists/isFile
    in their own code.
    
    The Hadoop FsShell CommandWithDestination class is modified to not register 
uncreated files
    for deleteOnExit(), because that calls exists() and so can place the 404 in 
the cache, even
    after S3A is patched to not do it itself.
    
    Because S3Guard knows when a file should be present, it adds a special 
FileNotFound retry policy
    independently configurable from other retry policies; it is also 
exponential, but with
    different parameters. This is because every HEAD request will refresh any 
404 cached in
    the S3 Load Balancers. It's not enough to retry: we have to have a suitable 
gap between
    attempts to (hopefully) ensure any cached entry wil be gone.
    
    The options and values are:
    
    fs.s3a.s3guard.consistency.retry.interval: 2s
    fs.s3a.s3guard.consistency.retry.limit: 7
    
    The S3A copy() method used during rename() raises a 
RemoteFileChangedException which is not caught
    so not downgraded to false. Thus: when a rename is unrecoverable, this fact 
is propagated.
    
    Copy operations without S3Guard lack the confidence that the file exists, 
so don't retry the same way:
    it will fail fast with a different error message. However, because 
create(path, overwrite=false) no
    longer does HEAD path, we can at least be confident that S3A itself is not 
creating those cached
    404 markers.
    
    Change-Id: Ia7807faad8b9a8546836cb19f816cccf17cca26d
---
 .../java/org/apache/hadoop/fs/shell/Command.java   |   2 +-
 .../hadoop/fs/shell/CommandWithDestination.java    |  74 +++++----
 .../src/main/resources/core-default.xml            |  25 ++-
 .../hadoop/fs/contract/ContractTestUtils.java      |   3 +-
 .../java/org/apache/hadoop/fs/shell/TestCopy.java  |  52 ++++++-
 .../java/org/apache/hadoop/fs/s3a/Constants.java   |  28 ++++
 .../hadoop/fs/s3a/RemoteFileChangedException.java  |  16 +-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    | 171 ++++++++++++++-------
 .../org/apache/hadoop/fs/s3a/S3ARetryPolicy.java   |  52 +++++--
 .../hadoop/fs/s3a/S3GuardExistsRetryPolicy.java    |  32 +++-
 .../hadoop/fs/s3a/impl/ChangeDetectionPolicy.java  |  23 ++-
 .../apache/hadoop/fs/s3a/impl/ChangeTracker.java   |   5 +-
 .../hadoop/fs/s3a/impl/InternalConstants.java      |   2 +
 .../apache/hadoop/fs/s3a/impl/StatusProbeEnum.java |  44 ++++++
 .../src/site/markdown/tools/hadoop-aws/index.md    |   6 +-
 .../tools/hadoop-aws/troubleshooting_s3a.md        |  87 +++++++++++
 .../hadoop/fs/s3a/ITestS3AEmptyDirectory.java      |   5 +-
 .../hadoop/fs/s3a/ITestS3AFileOperationCost.java   |  20 ++-
 .../hadoop/fs/s3a/ITestS3ARemoteFileChanged.java   | 149 +++++++++++++++++-
 .../hadoop/fs/s3a/ITestS3GuardEmptyDirs.java       |  10 +-
 .../fs/s3a/ITestS3GuardOutOfBandOperations.java    |   7 +-
 .../org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java  |  16 +-
 .../hadoop/fs/s3a/ITestS3GuardWriteBack.java       |   1 +
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java     |   4 +-
 .../fs/s3a/commit/AbstractITCommitMRJob.java       |  25 ++-
 25 files changed, 710 insertions(+), 149 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java
index 3eef278..c818257 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java
@@ -458,7 +458,7 @@ abstract public class Command extends Configured {
     if (e instanceof InterruptedIOException) {
       throw new CommandInterruptException();
     }
-    
+    LOG.debug("{} failure", getName(), e);
     String errorMessage = e.getLocalizedMessage();
     if (errorMessage == null) {
       // this is an unexpected condition, so dump the whole exception since
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
index 2421f06..0802a00 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
@@ -30,6 +30,9 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -58,7 +61,11 @@ import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
  * a source and resolved target.  Sources are resolved as children of
  * a destination directory.
  */
-abstract class CommandWithDestination extends FsCommand {  
+abstract class CommandWithDestination extends FsCommand {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(
+      CommandWithDestination.class);
+
   protected PathData dst;
   private boolean overwrite = false;
   private boolean verifyChecksum = true;
@@ -220,6 +227,7 @@ abstract class CommandWithDestination extends FsCommand {
       }
     } else if (dst.exists) {
       if (!dst.stat.isDirectory() && !overwrite) {
+        LOG.debug("Destination file exists: {}", dst.stat);
         throw new PathExistsException(dst.toString());
       }
     } else if (!dst.parentExists()) {
@@ -407,6 +415,7 @@ abstract class CommandWithDestination extends FsCommand {
       targetFs.setWriteChecksum(writeChecksum);
       targetFs.writeStreamToFile(in, tempTarget, lazyPersist, direct);
       if (!direct) {
+        targetFs.deleteOnExit(tempTarget.path);
         targetFs.rename(tempTarget, target);
       }
     } finally {
@@ -484,6 +493,15 @@ abstract class CommandWithDestination extends FsCommand {
       try {
         out = create(target, lazyPersist, direct);
         IOUtils.copyBytes(in, out, getConf(), true);
+      } catch (IOException e) {
+        // failure: clean up if we got as far as creating the file
+        if (!direct && out != null) {
+          try {
+            fs.delete(target.path, false);
+          } catch (IOException ignored) {
+          }
+        }
+        throw e;
       } finally {
         IOUtils.closeStream(out); // just in case copyBytes didn't
       }
@@ -493,37 +511,31 @@ abstract class CommandWithDestination extends FsCommand {
     FSDataOutputStream create(PathData item, boolean lazyPersist,
         boolean direct)
         throws IOException {
-      try {
-        if (lazyPersist) {
-          long defaultBlockSize;
-          try {
-            defaultBlockSize = getDefaultBlockSize();
-          } catch (NotInMountpointException ex) {
-            // ViewFileSystem#getDefaultBlockSize() throws an exception as it
-            // needs a target FS to retrive the default block size from.
-            // Hence, for ViewFs, we should call getDefaultBlockSize with the
-            // target path.
-            defaultBlockSize = getDefaultBlockSize(item.path);
-          }
-
-          EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE, LAZY_PERSIST);
-          return create(item.path,
-                        FsPermission.getFileDefault().applyUMask(
-                            FsPermission.getUMask(getConf())),
-                        createFlags,
-                        getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
-                            IO_FILE_BUFFER_SIZE_DEFAULT),
-                        (short) 1,
-                        defaultBlockSize,
-                        null,
-                        null);
-        } else {
-          return create(item.path, true);
-        }
-      } finally { // might have been created but stream was interrupted
-        if (!direct) {
-          deleteOnExit(item.path);
+      if (lazyPersist) {
+        long defaultBlockSize;
+        try {
+          defaultBlockSize = getDefaultBlockSize();
+        } catch (NotInMountpointException ex) {
+          // ViewFileSystem#getDefaultBlockSize() throws an exception as it
+          // needs a target FS to retrive the default block size from.
+          // Hence, for ViewFs, we should call getDefaultBlockSize with the
+          // target path.
+          defaultBlockSize = getDefaultBlockSize(item.path);
         }
+
+        EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE, LAZY_PERSIST);
+        return create(item.path,
+                      FsPermission.getFileDefault().applyUMask(
+                          FsPermission.getUMask(getConf())),
+                      createFlags,
+                      getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
+                          IO_FILE_BUFFER_SIZE_DEFAULT),
+                      (short) 1,
+                      defaultBlockSize,
+                      null,
+                      null);
+      } else {
+        return create(item.path, true);
       }
     }
 
diff --git 
a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml 
b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 1842171..c08cfd2 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1663,7 +1663,7 @@
   <value>7</value>
   <description>
     Number of times to retry any repeatable S3 client request on failure,
-    excluding throttling requests.
+    excluding throttling requests and S3Guard inconsistency resolution.
   </description>
 </property>
 
@@ -1672,7 +1672,7 @@
   <value>500ms</value>
   <description>
     Initial retry interval when retrying operations for any reason other
-    than S3 throttle errors.
+    than S3 throttle errors and S3Guard inconsistency resolution.
   </description>
 </property>
 
@@ -1693,6 +1693,27 @@
 </property>
 
 <property>
+  <name>fs.s3a.s3guard.consistency.retry.limit</name>
+  <value>7</value>
+  <description>
+    Number of times to retry attempts to read/open/copy files when
+    S3Guard believes a specific version of the file to be available,
+    but the S3 request does not find any version of a file, or a different
+    version.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.s3guard.consistency.retry.interval</name>
+  <value>2s</value>
+  <description>
+    Initial interval between attempts to retry operations while waiting for S3
+    to become consistent with the S3Guard data.
+    An exponential back-off is used here: every failure doubles the delay.
+  </description>
+</property>
+
+<property>
   <name>fs.s3a.committer.name</name>
   <value>file</value>
   <description>
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
index 64f9cb8..789fb0a 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
@@ -558,7 +558,8 @@ public class ContractTestUtils extends Assert {
    */
   public static void assertIsDirectory(FileSystem fs,
                                        Path path) throws IOException {
-    FileStatus fileStatus = fs.getFileStatus(path);
+    FileStatus fileStatus = verifyPathExists(fs,
+        "Expected to find a directory", path);
     assertIsDirectory(fileStatus);
   }
 
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopy.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopy.java
index 74fb34d..f73e83d 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopy.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopy.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.shell;
 
+import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
@@ -77,10 +78,19 @@ public class TestCopy {
     when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
     
     tryCopyStream(in, true);
+    verify(in).close();
+    verify(out, times(2)).close();
+    // no data was written.
+    verify(out, never()).write(any(byte[].class), anyInt(), anyInt());
     verify(mockFs, never()).delete(eq(path), anyBoolean());
     verify(mockFs).rename(eq(tmpPath), eq(path));
     verify(mockFs, never()).delete(eq(tmpPath), anyBoolean());
     verify(mockFs, never()).close();
+    // temp path never had is existence checked. This is critical for S3 as it
+    // avoids the successful path accidentally getting a 404 into the S3 load
+    // balancer cache
+    verify(mockFs, never()).exists(eq(tmpPath));
+    verify(mockFs, never()).exists(eq(path));
   }
 
   @Test
@@ -110,6 +120,31 @@ public class TestCopy {
     FSDataInputStream in = mock(FSDataInputStream.class);
 
     tryCopyStream(in, false);
+    verify(mockFs, never()).rename(any(Path.class), any(Path.class));
+    verify(mockFs, never()).delete(eq(tmpPath), anyBoolean());
+    verify(mockFs, never()).delete(eq(path), anyBoolean());
+    verify(mockFs, never()).close();
+  }
+
+  /**
+   * Create a file but fail in the write.
+   * The copy operation should attempt to clean up by
+   * closing the output stream then deleting it.
+   */
+  @Test
+  public void testFailedWrite() throws Exception {
+    FSDataOutputStream out = mock(FSDataOutputStream.class);
+    doThrow(new IOException("mocked"))
+        .when(out).write(any(byte[].class), anyInt(), anyInt());
+    whenFsCreate().thenReturn(out);
+    when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
+    FSInputStream in = mock(FSInputStream.class);
+    doReturn(0)
+        .when(in).read(any(byte[].class), anyInt(), anyInt());
+    Throwable thrown = tryCopyStream(in, false);
+    assertExceptionContains("mocked", thrown);
+    verify(in).close();
+    verify(out, times(2)).close();
     verify(mockFs).delete(eq(tmpPath), anyBoolean());
     verify(mockFs, never()).rename(any(Path.class), any(Path.class));
     verify(mockFs, never()).delete(eq(path), anyBoolean());
@@ -155,14 +190,21 @@ public class TestCopy {
         anyBoolean(), anyInt(), anyShort(), anyLong(), any()));
   }
   
-  private void tryCopyStream(InputStream in, boolean shouldPass) {
+  private Throwable tryCopyStream(InputStream in, boolean shouldPass) {
     try {
       cmd.copyStreamToTarget(new FSDataInputStream(in), target);
+      return null;
     } catch (InterruptedIOException e) {
-      assertFalse("copy failed", shouldPass);
+      if (shouldPass) {
+        throw new AssertionError("copy failed", e);
+      }
+      return e;
     } catch (Throwable e) {
-      assertFalse(e.getMessage(), shouldPass);
-    }    
+      if (shouldPass) {
+        throw new AssertionError(e.getMessage(), e);
+      }
+      return e;
+    }
   }
   
   static class MockFileSystem extends FilterFileSystem {
@@ -183,4 +225,4 @@ public class TestCopy {
       return conf;
     }
   }
-}
\ No newline at end of file
+}
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index eead489..5c45ce6 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -761,4 +761,32 @@ public final class Constants {
    * Default change detection require version: true.
    */
   public static final boolean CHANGE_DETECT_REQUIRE_VERSION_DEFAULT = true;
+
+  /**
+   * Number of times to retry any repeatable S3 client request on failure,
+   * excluding throttling requests: {@value}.
+   */
+  public static final String S3GUARD_CONSISTENCY_RETRY_LIMIT =
+      "fs.s3a.s3guard.consistency.retry.limit";
+
+  /**
+   * Default retry limit: {@value}.
+   */
+  public static final int S3GUARD_CONSISTENCY_RETRY_LIMIT_DEFAULT = 7;
+
+  /**
+   * Initial retry interval: {@value}.
+   */
+  public static final String S3GUARD_CONSISTENCY_RETRY_INTERVAL =
+      "fs.s3a.s3guard.consistency.retry.interval";
+
+  /**
+   * Default initial retry interval: {@value}.
+   * The consistency retry probe uses exponential backoff, because
+   * each probe can cause the S3 load balancers to retain any 404 in
+   * its cache for longer. See HADOOP-16490.
+   */
+  public static final String S3GUARD_CONSISTENCY_RETRY_INTERVAL_DEFAULT =
+      "2s";
+
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RemoteFileChangedException.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RemoteFileChangedException.java
index 1df2d7e..ce0b9a8 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RemoteFileChangedException.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RemoteFileChangedException.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.PathIOException;
 /**
  * Indicates the S3 object is out of sync with the expected version.  Thrown in
  * cases such as when the object is updated while an {@link S3AInputStream} is
- * open.
+ * open, or when a file expected was never found.
  */
 @SuppressWarnings("serial")
 @InterfaceAudience.Public
@@ -36,6 +36,20 @@ public class RemoteFileChangedException extends 
PathIOException {
       "Constraints of request were unsatisfiable";
 
   /**
+   * While trying to get information on a file known to S3Guard, the
+   * file never became visible in S3.
+   */
+  public static final String FILE_NEVER_FOUND =
+      "File to rename not found on guarded S3 store after repeated attempts";
+
+  /**
+   * The file wasn't found in rename after a single attempt -the unguarded
+   * codepath.
+   */
+  public static final String FILE_NOT_FOUND_SINGLE_ATTEMPT =
+      "File to rename not found on unguarded S3 store";
+
+  /**
    * Constructs a RemoteFileChangedException.
    *
    * @param path the path accessed when the change was detected
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 6edbed7..0ce9823 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -102,6 +102,7 @@ import org.apache.hadoop.fs.s3a.impl.InternalConstants;
 import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport;
 import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
 import org.apache.hadoop.fs.s3a.impl.RenameOperation;
+import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
 import org.apache.hadoop.fs.s3a.impl.StoreContext;
 import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
 import org.apache.hadoop.fs.s3a.select.InternalSelectConstants;
@@ -160,6 +161,7 @@ import static 
org.apache.hadoop.fs.s3a.auth.RolePolicies.STATEMENT_ALLOW_SSE_KMS
 import static org.apache.hadoop.fs.s3a.auth.RolePolicies.allowS3Operations;
 import static 
org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.TokenIssuingPolicy.NoTokensAvailable;
 import static 
org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
 import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
 
 /**
@@ -1059,8 +1061,14 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
     String key = pathToKey(path);
     FileStatus status = null;
     try {
-      // get the status or throw an FNFE
-      status = getFileStatus(path);
+      // get the status or throw an FNFE.
+      // when overwriting, there is no need to look for any existing file,
+      // and attempting to do so can poison the load balancers with 404
+      // entries.
+      status = innerGetFileStatus(path, false,
+          overwrite
+              ? StatusProbeEnum.DIRECTORIES
+              : StatusProbeEnum.ALL);
 
       // if the thread reaches here, there is something at the path
       if (status.isDirectory()) {
@@ -1216,7 +1224,8 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
 
     // get the source file status; this raises a FNFE if there is no source
     // file.
-    S3AFileStatus srcStatus = innerGetFileStatus(src, true);
+    S3AFileStatus srcStatus = innerGetFileStatus(src, true,
+        StatusProbeEnum.ALL);
 
     if (srcKey.equals(dstKey)) {
       LOG.debug("rename: src and dest refer to the same file or directory: {}",
@@ -1228,7 +1237,7 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
 
     S3AFileStatus dstStatus = null;
     try {
-      dstStatus = innerGetFileStatus(dst, true);
+      dstStatus = innerGetFileStatus(dst, true, StatusProbeEnum.ALL);
       // if there is no destination entry, an exception is raised.
       // hence this code sequence can assume that there is something
       // at the end of the path; the only detail being what it is and
@@ -1261,7 +1270,7 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
       if (!pathToKey(parent).isEmpty()) {
         try {
           S3AFileStatus dstParentStatus = innerGetFileStatus(dst.getParent(),
-              false);
+              false, StatusProbeEnum.ALL);
           if (!dstParentStatus.isDirectory()) {
             throw new RenameFailedException(src, dst,
                 "destination parent is not a directory");
@@ -1660,6 +1669,7 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
     ObjectMetadata meta = changeInvoker.retryUntranslated("GET " + key, true,
         () -> {
           incrementStatistic(OBJECT_METADATA_REQUESTS);
+          LOG.debug("HEAD {} with change tracker {}", key, changeTracker);
           if (changeTracker != null) {
             changeTracker.maybeApplyConstraint(request);
           }
@@ -2267,7 +2277,7 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
       entryPoint(INVOCATION_DELETE);
       DeleteOperation deleteOperation = new DeleteOperation(
           createStoreContext(),
-          innerGetFileStatus(f, true),
+          innerGetFileStatus(f, true, StatusProbeEnum.ALL),
           recursive,
           operationCallbacks,
           InternalConstants.MAX_ENTRIES_TO_DELETE);
@@ -2297,13 +2307,15 @@ public class S3AFileSystem extends FileSystem 
implements StreamCapabilities,
    * Retry policy: retrying; untranslated.
    * @param f path to create
    * @throws IOException IO problem
-   * @throws AmazonClientException untranslated AWS client problem
    */
   @Retries.RetryTranslated
   private void createFakeDirectoryIfNecessary(Path f)
       throws IOException, AmazonClientException {
     String key = pathToKey(f);
-    if (!key.isEmpty() && !s3Exists(f)) {
+    // we only make the LIST call; the codepaths to get here should not
+    // be reached if there is an empty dir marker -and if they do, it
+    // is mostly harmless to create a new one.
+    if (!key.isEmpty() && !s3Exists(f, EnumSet.of(StatusProbeEnum.List))) {
       LOG.debug("Creating new fake directory at {}", f);
       createFakeDirectory(key);
     }
@@ -2314,7 +2326,6 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
    * That is: it parent is not the root path and does not yet exist.
    * @param path whose parent is created if needed.
    * @throws IOException IO problem
-   * @throws AmazonClientException untranslated AWS client problem
    */
   @Retries.RetryTranslated
   void maybeCreateFakeParentDirectory(Path path)
@@ -2568,14 +2579,23 @@ public class S3AFileSystem extends FileSystem 
implements StreamCapabilities,
    */
   @Retries.RetryTranslated
   public FileStatus getFileStatus(final Path f) throws IOException {
-    return innerGetFileStatus(f, false);
+    entryPoint(INVOCATION_GET_FILE_STATUS);
+    return innerGetFileStatus(f, false, StatusProbeEnum.ALL);
   }
 
   /**
+   * Get the status of a file or directory, first through S3Guard and then
+   * through S3.
+   * The S3 probes can leave 404 responses in the S3 load balancers; if
+   * a check is only needed for a directory, declaring this saves time and
+   * avoids creating one for the object.
+   * When only probing for directories, if an entry for a file is found in
+   * S3Guard it is returned, but checks for updated values are skipped.
    * Internal version of {@link #getFileStatus(Path)}.
    * @param f The path we want information from
    * @param needEmptyDirectoryFlag if true, implementation will calculate
    *        a TRUE or FALSE value for {@link S3AFileStatus#isEmptyDirectory()}
+   * @param probes probes to make
    * @return a S3AFileStatus object
    * @throws FileNotFoundException when the path does not exist
    * @throws IOException on other problems.
@@ -2583,9 +2603,8 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
   @VisibleForTesting
   @Retries.RetryTranslated
   S3AFileStatus innerGetFileStatus(final Path f,
-      boolean needEmptyDirectoryFlag) throws IOException {
-    entryPoint(INVOCATION_GET_FILE_STATUS);
-    checkNotClosed();
+      final boolean needEmptyDirectoryFlag,
+      final Set<StatusProbeEnum> probes) throws IOException {
     final Path path = qualify(f);
     String key = pathToKey(path);
     LOG.debug("Getting path status for {}  ({})", path, key);
@@ -2602,7 +2621,7 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
         OffsetDateTime deletedAt = OffsetDateTime.ofInstant(
             Instant.ofEpochMilli(pm.getFileStatus().getModificationTime()),
             ZoneOffset.UTC);
-        throw new FileNotFoundException("Path " + f + " is recorded as " +
+        throw new FileNotFoundException("Path " + path + " is recorded as " +
             "deleted by S3Guard at " + deletedAt);
       }
 
@@ -2612,15 +2631,18 @@ public class S3AFileSystem extends FileSystem 
implements StreamCapabilities,
       // dest is also a directory, there's no difference.
       // TODO After HADOOP-16085 the modification detection can be done with
       //  etags or object version instead of modTime
-      boolean allowAuthoritative = allowAuthoritative(f);
+      boolean allowAuthoritative = allowAuthoritative(path);
       if (!pm.getFileStatus().isDirectory() &&
-          !allowAuthoritative) {
+          !allowAuthoritative &&
+          probes.contains(StatusProbeEnum.Head)) {
+        // a file has been found in a non-auth path and the caller has not said
+        // they only care about directories
         LOG.debug("Metadata for {} found in the non-auth metastore.", path);
         final long msModTime = pm.getFileStatus().getModificationTime();
 
         S3AFileStatus s3AFileStatus;
         try {
-          s3AFileStatus = s3GetFileStatus(path, key, tombstones);
+          s3AFileStatus = s3GetFileStatus(path, key, probes, tombstones);
         } catch (FileNotFoundException fne) {
           s3AFileStatus = null;
         }
@@ -2662,7 +2684,7 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
       // S3 yet, we'll assume the empty directory is true;
       S3AFileStatus s3FileStatus;
       try {
-        s3FileStatus = s3GetFileStatus(path, key, tombstones);
+        s3FileStatus = s3GetFileStatus(path, key, probes, tombstones);
       } catch (FileNotFoundException e) {
         return S3AFileStatus.fromFileStatus(msStatus, Tristate.TRUE,
             null, null);
@@ -2674,7 +2696,8 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
       // there was no entry in S3Guard
       // retrieve the data and update the metadata store in the process.
       return S3Guard.putAndReturn(metadataStore,
-          s3GetFileStatus(path, key, tombstones), instrumentation,
+          s3GetFileStatus(path, key, StatusProbeEnum.ALL, tombstones),
+          instrumentation,
           ttlTimeProvider);
     }
   }
@@ -2686,14 +2709,18 @@ public class S3AFileSystem extends FileSystem 
implements StreamCapabilities,
    * Retry policy: retry translated.
    * @param path Qualified path
    * @param key  Key string for the path
+   * @param probes probes to make
+   * @param tombstones tombstones to filter
    * @return Status
    * @throws FileNotFoundException when the path does not exist
    * @throws IOException on other problems.
    */
   @Retries.RetryTranslated
-  private S3AFileStatus s3GetFileStatus(final Path path, String key,
-      Set<Path> tombstones) throws IOException {
-    if (!key.isEmpty()) {
+  private S3AFileStatus s3GetFileStatus(final Path path,
+      String key,
+      final Set<StatusProbeEnum> probes,
+      final Set<Path> tombstones) throws IOException {
+    if (!key.isEmpty() && probes.contains(StatusProbeEnum.Head)) {
       try {
         ObjectMetadata meta = getObjectMetadata(key);
 
@@ -2711,15 +2738,15 @@ public class S3AFileSystem extends FileSystem 
implements StreamCapabilities,
               meta.getVersionId());
         }
       } catch (AmazonServiceException e) {
-        if (e.getStatusCode() != 404) {
+        if (e.getStatusCode() != SC_404) {
           throw translateException("getFileStatus", path, e);
         }
       } catch (AmazonClientException e) {
         throw translateException("getFileStatus", path, e);
       }
 
-      // Necessary?
-      if (!key.endsWith("/")) {
+      // Look for the dir marker
+      if (!key.endsWith("/") && probes.contains(StatusProbeEnum.DirMarker)) {
         String newKey = key + "/";
         try {
           ObjectMetadata meta = getObjectMetadata(newKey);
@@ -2740,7 +2767,7 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
                     meta.getVersionId());
           }
         } catch (AmazonServiceException e) {
-          if (e.getStatusCode() != 404) {
+          if (e.getStatusCode() != SC_404) {
             throw translateException("getFileStatus", newKey, e);
           }
         } catch (AmazonClientException e) {
@@ -2749,39 +2776,42 @@ public class S3AFileSystem extends FileSystem 
implements StreamCapabilities,
       }
     }
 
-    try {
-      key = maybeAddTrailingSlash(key);
-      S3ListRequest request = createListObjectsRequest(key, "/", 1);
+    // execute the list
+    if (probes.contains(StatusProbeEnum.List)) {
+      try {
+        key = maybeAddTrailingSlash(key);
+        S3ListRequest request = createListObjectsRequest(key, "/", 1);
 
-      S3ListResult objects = listObjects(request);
+        S3ListResult objects = listObjects(request);
 
-      Collection<String> prefixes = objects.getCommonPrefixes();
-      Collection<S3ObjectSummary> summaries = objects.getObjectSummaries();
-      if (!isEmptyOfKeys(prefixes, tombstones) ||
-          !isEmptyOfObjects(summaries, tombstones)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Found path as directory (with /): {}/{}",
-              prefixes.size(), summaries.size());
+        Collection<String> prefixes = objects.getCommonPrefixes();
+        Collection<S3ObjectSummary> summaries = objects.getObjectSummaries();
+        if (!isEmptyOfKeys(prefixes, tombstones) ||
+            !isEmptyOfObjects(summaries, tombstones)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Found path as directory (with /): {}/{}",
+                prefixes.size(), summaries.size());
 
-          for (S3ObjectSummary summary : summaries) {
-            LOG.debug("Summary: {} {}", summary.getKey(), summary.getSize());
-          }
-          for (String prefix : prefixes) {
-            LOG.debug("Prefix: {}", prefix);
+            for (S3ObjectSummary summary : summaries) {
+              LOG.debug("Summary: {} {}", summary.getKey(), summary.getSize());
+            }
+            for (String prefix : prefixes) {
+              LOG.debug("Prefix: {}", prefix);
+            }
           }
-        }
 
-        return new S3AFileStatus(Tristate.FALSE, path, username);
-      } else if (key.isEmpty()) {
-        LOG.debug("Found root directory");
-        return new S3AFileStatus(Tristate.TRUE, path, username);
-      }
-    } catch (AmazonServiceException e) {
-      if (e.getStatusCode() != 404) {
+          return new S3AFileStatus(Tristate.FALSE, path, username);
+        } else if (key.isEmpty()) {
+          LOG.debug("Found root directory");
+          return new S3AFileStatus(Tristate.TRUE, path, username);
+        }
+      } catch (AmazonServiceException e) {
+        if (e.getStatusCode() != SC_404) {
+          throw translateException("getFileStatus", path, e);
+        }
+      } catch (AmazonClientException e) {
         throw translateException("getFileStatus", path, e);
       }
-    } catch (AmazonClientException e) {
-      throw translateException("getFileStatus", path, e);
     }
 
     LOG.debug("Not Found: {}", path);
@@ -2834,15 +2864,17 @@ public class S3AFileSystem extends FileSystem 
implements StreamCapabilities,
    * Raw version of {@link FileSystem#exists(Path)} which uses S3 only:
    * S3Guard MetadataStore, if any, will be skipped.
    * Retry policy: retrying; translated.
+   * @param path qualified path to look for
+   * @param probes probes to make
    * @return true if path exists in S3
    * @throws IOException IO failure
    */
   @Retries.RetryTranslated
-  private boolean s3Exists(final Path f) throws IOException {
-    Path path = qualify(f);
+  private boolean s3Exists(final Path path, final Set<StatusProbeEnum> probes)
+      throws IOException {
     String key = pathToKey(path);
     try {
-      s3GetFileStatus(path, key, null);
+      s3GetFileStatus(path, key, probes, null);
       return true;
     } catch (FileNotFoundException e) {
       return false;
@@ -3160,10 +3192,31 @@ public class S3AFileSystem extends FileSystem 
implements StreamCapabilities,
     String action = "copyFile(" + srcKey + ", " + dstKey + ")";
     Invoker readInvoker = readContext.getReadInvoker();
 
-    ObjectMetadata srcom =
-        once(action, srcKey,
-            () ->
-                getObjectMetadata(srcKey, changeTracker, readInvoker, "copy"));
+    ObjectMetadata srcom;
+    try {
+      srcom = once(action, srcKey,
+          () ->
+              getObjectMetadata(srcKey, changeTracker, readInvoker, "copy"));
+    } catch (FileNotFoundException e) {
+      // if rename fails at this point it means that the expected file was not
+      // found.
+      // The cause is believed to always be one of
+      //  - File was deleted since LIST/S3Guard metastore.list.() knew of it.
+      //  - S3Guard is asking for a specific version and it's been removed by
+      //    lifecycle rules.
+      //  - there's a 404 cached in the S3 load balancers.
+      LOG.debug("getObjectMetadata({}) failed to find an expected file",
+          srcKey, e);
+      // We create an exception, but the text depends on the S3Guard state
+      String message = hasMetadataStore()
+          ? RemoteFileChangedException.FILE_NEVER_FOUND
+          : RemoteFileChangedException.FILE_NOT_FOUND_SINGLE_ATTEMPT;
+      throw new RemoteFileChangedException(
+          keyToQualifiedPath(srcKey).toString(),
+          action,
+          message,
+          e);
+    }
     ObjectMetadata dstom = cloneObjectMetadata(srcom);
     setOptionalObjectMetadata(dstom);
 
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
index c4b7880..09e9c99 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
@@ -33,6 +33,8 @@ import java.util.concurrent.TimeUnit;
 import com.amazonaws.AmazonClientException;
 import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
 import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.InvalidRequestException;
@@ -80,15 +82,21 @@ import static org.apache.hadoop.fs.s3a.Constants.*;
 @SuppressWarnings("visibilitymodifier")  // I want a struct of finals, for 
real.
 public class S3ARetryPolicy implements RetryPolicy {
 
+  private static final Logger LOG = LoggerFactory.getLogger(
+      S3ARetryPolicy.class);
+
+  private final Configuration configuration;
+
   /** Final retry policy we end up with. */
   private final RetryPolicy retryPolicy;
 
   // Retry policies for mapping exceptions to
 
-  /** Base policy from configuration. */
-  protected final RetryPolicy fixedRetries;
+  /** Exponential policy for the base of normal failures. */
+  protected final RetryPolicy baseExponentialRetry;
 
-  /** Rejection of all non-idempotent calls except specific failures. */
+  /** Idempotent calls which raise IOEs are retried.
+   *  */
   protected final RetryPolicy retryIdempotentCalls;
 
   /** Policy for throttle requests, which are considered repeatable, even for
@@ -98,7 +106,10 @@ public class S3ARetryPolicy implements RetryPolicy {
   /** No retry on network and tangible API issues. */
   protected final RetryPolicy fail = RetryPolicies.TRY_ONCE_THEN_FAIL;
 
-  /** Client connectivity: fixed retries without care for idempotency. */
+  /**
+   * Client connectivity: baseExponentialRetry without worrying about whether
+   * or not the command is idempotent.
+   */
   protected final RetryPolicy connectivityFailure;
 
   /**
@@ -107,19 +118,26 @@ public class S3ARetryPolicy implements RetryPolicy {
    */
   public S3ARetryPolicy(Configuration conf) {
     Preconditions.checkArgument(conf != null, "Null configuration");
+    this.configuration = conf;
 
     // base policy from configuration
-    fixedRetries = exponentialBackoffRetry(
-        conf.getInt(RETRY_LIMIT, RETRY_LIMIT_DEFAULT),
-        conf.getTimeDuration(RETRY_INTERVAL,
-            RETRY_INTERVAL_DEFAULT,
-            TimeUnit.MILLISECONDS),
+    int limit = conf.getInt(RETRY_LIMIT, RETRY_LIMIT_DEFAULT);
+    long interval = conf.getTimeDuration(RETRY_INTERVAL,
+        RETRY_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    baseExponentialRetry = exponentialBackoffRetry(
+        limit,
+        interval,
         TimeUnit.MILLISECONDS);
 
-    // which is wrapped by a rejection of all non-idempotent calls except
-    // for specific failures.
+    LOG.debug("Retrying on recoverable AWS failures {} times with an"
+        + " initial interval of {}ms", limit, interval);
+
+    // which is wrapped by a rejection of failures of non-idempotent calls
+    // except for specific exceptions considered recoverable.
+    // idempotent calls are retried on IOEs but not other exceptions
     retryIdempotentCalls = new FailNonIOEs(
-        new IdempotencyRetryFilter(fixedRetries));
+        new IdempotencyRetryFilter(baseExponentialRetry));
 
     // and a separate policy for throttle requests, which are considered
     // repeatable, even for non-idempotent calls, as the service
@@ -127,7 +145,7 @@ public class S3ARetryPolicy implements RetryPolicy {
     throttlePolicy = createThrottleRetryPolicy(conf);
 
     // client connectivity: fixed retries without care for idempotency
-    connectivityFailure = fixedRetries;
+    connectivityFailure = baseExponentialRetry;
 
     Map<Class<? extends Exception>, RetryPolicy> policyMap =
         createExceptionMap();
@@ -240,6 +258,14 @@ public class S3ARetryPolicy implements RetryPolicy {
   }
 
   /**
+   * Get the configuration this policy was created from.
+   * @return the configuration.
+   */
+  protected Configuration getConfiguration() {
+    return configuration;
+  }
+
+  /**
    * Policy which fails fast any non-idempotent call; hands off
    * all idempotent calls to the next retry policy.
    */
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3GuardExistsRetryPolicy.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3GuardExistsRetryPolicy.java
index 1a0135b..079c94e 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3GuardExistsRetryPolicy.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3GuardExistsRetryPolicy.java
@@ -20,16 +20,29 @@ package org.apache.hadoop.fs.s3a;
 
 import java.io.FileNotFoundException;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.retry.RetryPolicy;
 
+import static 
org.apache.hadoop.fs.s3a.Constants.S3GUARD_CONSISTENCY_RETRY_INTERVAL;
+import static 
org.apache.hadoop.fs.s3a.Constants.S3GUARD_CONSISTENCY_RETRY_INTERVAL_DEFAULT;
+import static 
org.apache.hadoop.fs.s3a.Constants.S3GUARD_CONSISTENCY_RETRY_LIMIT;
+import static 
org.apache.hadoop.fs.s3a.Constants.S3GUARD_CONSISTENCY_RETRY_LIMIT_DEFAULT;
+import static 
org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithProportionalSleep;
 
 /**
  * Slightly-modified retry policy for cases when the file is present in the
  * MetadataStore, but may be still throwing FileNotFoundException from S3.
  */
 public class S3GuardExistsRetryPolicy extends S3ARetryPolicy {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      S3GuardExistsRetryPolicy.class);
+
   /**
    * Instantiate.
    * @param conf configuration to read.
@@ -41,8 +54,23 @@ public class S3GuardExistsRetryPolicy extends S3ARetryPolicy 
{
   @Override
   protected Map<Class<? extends Exception>, RetryPolicy> createExceptionMap() {
     Map<Class<? extends Exception>, RetryPolicy> b = 
super.createExceptionMap();
-    b.put(FileNotFoundException.class, retryIdempotentCalls);
-    b.put(RemoteFileChangedException.class, retryIdempotentCalls);
+    Configuration conf = getConfiguration();
+
+    // base policy from configuration
+    int limit = conf.getInt(S3GUARD_CONSISTENCY_RETRY_LIMIT,
+        S3GUARD_CONSISTENCY_RETRY_LIMIT_DEFAULT);
+    long interval = conf.getTimeDuration(S3GUARD_CONSISTENCY_RETRY_INTERVAL,
+        S3GUARD_CONSISTENCY_RETRY_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    RetryPolicy retryPolicy = retryUpToMaximumCountWithProportionalSleep(
+        limit,
+        interval,
+        TimeUnit.MILLISECONDS);
+    LOG.debug("Retrying on recoverable S3Guard table/S3 inconsistencies {}"
+        + " times with an initial interval of {}ms", limit, interval);
+
+    b.put(FileNotFoundException.class, retryPolicy);
+    b.put(RemoteFileChangedException.class, retryPolicy);
     return b;
   }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeDetectionPolicy.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeDetectionPolicy.java
index b0e9d6f..fc6fcd3 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeDetectionPolicy.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeDetectionPolicy.java
@@ -51,7 +51,7 @@ public abstract class ChangeDetectionPolicy {
       LoggerFactory.getLogger(ChangeDetectionPolicy.class);
 
   @VisibleForTesting
-  public static final String CHANGE_DETECTED = "change detected  on client";
+  public static final String CHANGE_DETECTED = "change detected on client";
 
   private final Mode mode;
   private final boolean requireVersion;
@@ -191,6 +191,15 @@ public abstract class ChangeDetectionPolicy {
   }
 
   /**
+   * String value for logging.
+   * @return source and mode.
+   */
+  @Override
+  public String toString() {
+    return "Policy " + getSource() + "/" + getMode();
+  }
+
+  /**
    * Pulls the attribute this policy uses to detect change out of the S3 object
    * metadata.  The policy generically refers to this attribute as
    * {@code revisionId}.
@@ -342,6 +351,8 @@ public abstract class ChangeDetectionPolicy {
       if (revisionId != null) {
         LOG.debug("Restricting get request to etag {}", revisionId);
         request.withMatchingETagConstraint(revisionId);
+      } else {
+        LOG.debug("No etag revision ID to use as a constraint");
       }
     }
 
@@ -351,13 +362,15 @@ public abstract class ChangeDetectionPolicy {
       if (revisionId != null) {
         LOG.debug("Restricting copy request to etag {}", revisionId);
         request.withMatchingETagConstraint(revisionId);
+      } else {
+        LOG.debug("No etag revision ID to use as a constraint");
       }
     }
 
     @Override
     public void applyRevisionConstraint(GetObjectMetadataRequest request,
         String revisionId) {
-      // GetObjectMetadataRequest doesn't support eTag qualification
+      LOG.debug("Unable to restrict HEAD request to etag; will check later");
     }
 
     @Override
@@ -415,6 +428,8 @@ public abstract class ChangeDetectionPolicy {
       if (revisionId != null) {
         LOG.debug("Restricting get request to version {}", revisionId);
         request.withVersionId(revisionId);
+      } else {
+        LOG.debug("No version ID to use as a constraint");
       }
     }
 
@@ -424,6 +439,8 @@ public abstract class ChangeDetectionPolicy {
       if (revisionId != null) {
         LOG.debug("Restricting copy request to version {}", revisionId);
         request.withSourceVersionId(revisionId);
+      } else {
+        LOG.debug("No version ID to use as a constraint");
       }
     }
 
@@ -433,6 +450,8 @@ public abstract class ChangeDetectionPolicy {
       if (revisionId != null) {
         LOG.debug("Restricting metadata request to version {}", revisionId);
         request.withVersionId(revisionId);
+      } else {
+        LOG.debug("No version ID to use as a constraint");
       }
     }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeTracker.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeTracker.java
index a95282c..d34328c 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeTracker.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeTracker.java
@@ -97,7 +97,8 @@ public class ChangeTracker {
     this.versionMismatches = versionMismatches;
     this.revisionId = policy.getRevisionId(s3ObjectAttributes);
     if (revisionId != null) {
-      LOG.debug("Revision ID for object at {}: {}", uri, revisionId);
+      LOG.debug("Tracker {} has revision ID for object at {}: {}",
+          policy, uri, revisionId);
     }
   }
 
@@ -307,7 +308,7 @@ public class ChangeTracker {
   public String toString() {
     final StringBuilder sb = new StringBuilder(
         "ChangeTracker{");
-    sb.append("changeDetectionPolicy=").append(policy);
+    sb.append(policy);
     sb.append(", revisionId='").append(revisionId).append('\'');
     sb.append('}');
     return sb.toString();
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
index efef5cf..82250af 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
@@ -83,4 +83,6 @@ public final class InternalConstants {
               Arrays.asList(Constants.INPUT_FADVISE,
                   Constants.READAHEAD_RANGE)));
 
+  /** 404 error code. */
+  public static final int SC_404 = 404;
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StatusProbeEnum.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StatusProbeEnum.java
new file mode 100644
index 0000000..ca2875c
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StatusProbeEnum.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.util.EnumSet;
+import java.util.Set;
+
+/**
+ * Enum of probes which can be made of S3.
+ */
+public enum StatusProbeEnum {
+
+  /** The actual path. */
+  Head,
+  /** HEAD of the path + /. */
+  DirMarker,
+  /** LIST under the path. */
+  List;
+
+  /** All probes. */
+  public static final Set<StatusProbeEnum> ALL = EnumSet.allOf(
+      StatusProbeEnum.class);
+
+  /** Skip the HEAD and only look for directories. */
+  public static final Set<StatusProbeEnum> DIRECTORIES =
+      EnumSet.of(DirMarker, List);
+
+}
diff --git 
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md 
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index 7b6eb83..ea55f90 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -1202,9 +1202,9 @@ The configurations items controlling this behavior are:
 In the default configuration, S3 object eTags are used to detect changes.  When
 the filesystem retrieves a file from S3 using
 [Get 
Object](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html),
-it captures the eTag and uses that eTag in an 'If-Match' condition on each
+it captures the eTag and uses that eTag in an `If-Match` condition on each
 subsequent request.  If a concurrent writer has overwritten the file, the
-'If-Match' condition will fail and a RemoteFileChangedException will be thrown.
+'If-Match' condition will fail and a `RemoteFileChangedException` will be 
thrown.
 
 Even in this default configuration, a new write may not trigger this exception
 on an open reader.  For example, if the reader only reads forward in the file
@@ -1229,7 +1229,7 @@ It is possible to switch to using the
 instead of eTag as the change detection mechanism.  Use of this option requires
 object versioning to be enabled on any S3 buckets used by the filesystem.  The
 benefit of using version id instead of eTag is potentially reduced frequency
-of RemoteFileChangedException. With object versioning enabled, old versions
+of `RemoteFileChangedException`. With object versioning enabled, old versions
 of objects remain available after they have been overwritten.
 This means an open input stream will still be able to seek backwards after a
 concurrent writer has overwritten the file.
diff --git 
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
 
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
index d1f05fe..a6eb349 100644
--- 
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
+++ 
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
@@ -1029,6 +1029,56 @@ before versioning was enabled.
 See [Handling 
Read-During-Overwrite](./index.html#handling_read-during-overwrite)
 for more information.
 
+### `RemoteFileChangedException`: "File to rename not found on guarded S3 
store after repeated attempts"
+
+A file being renamed and listed in the S3Guard table could not be found
+in the S3 bucket even after multiple attempts.
+
+```
+org.apache.hadoop.fs.s3a.RemoteFileChangedException: 
copyFile(/sourcedir/missing, /destdir/)
+ `s3a://example/sourcedir/missing': File not found on S3 after repeated 
attempts: `s3a://example/sourcedir/missing'
+at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java:3231)
+at org.apache.hadoop.fs.s3a.S3AFileSystem.access$700(S3AFileSystem.java:177)
+at 
org.apache.hadoop.fs.s3a.S3AFileSystem$RenameOperationCallbacksImpl.copyFile(S3AFileSystem.java:1368)
+at 
org.apache.hadoop.fs.s3a.impl.RenameOperation.copySourceAndUpdateTracker(RenameOperation.java:448)
+at 
org.apache.hadoop.fs.s3a.impl.RenameOperation.lambda$initiateCopy$0(RenameOperation.java:412)
+```
+
+Either the file has been deleted, or an attempt was made to read a file before 
it
+was created and the S3 load balancer has briefly cached the 404 returned by 
that
+operation. This is something which AWS S3 can do for short periods.
+
+If error occurs and the file is on S3, consider increasing the value of
+`fs.s3a.s3guard.consistency.retry.limit`.
+
+We also recommend using applications/application
+options which do  not rename files when committing work or when copying data
+to S3, but instead write directly to the final destination.
+
+### `RemoteFileChangedException`: "File to rename not found on unguarded S3 
store"
+
+```
+org.apache.hadoop.fs.s3a.RemoteFileChangedException: 
copyFile(/sourcedir/missing, /destdir/)
+ `s3a://example/sourcedir/missing': File to rename not found on unguarded S3 
store: `s3a://example/sourcedir/missing'
+at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java:3231)
+at org.apache.hadoop.fs.s3a.S3AFileSystem.access$700(S3AFileSystem.java:177)
+at 
org.apache.hadoop.fs.s3a.S3AFileSystem$RenameOperationCallbacksImpl.copyFile(S3AFileSystem.java:1368)
+at 
org.apache.hadoop.fs.s3a.impl.RenameOperation.copySourceAndUpdateTracker(RenameOperation.java:448)
+at 
org.apache.hadoop.fs.s3a.impl.RenameOperation.lambda$initiateCopy$0(RenameOperation.java:412)
+```
+
+An attempt was made to rename a file in an S3 store not protected by SGuard,
+the directory list operation included the filename in its results but the
+actual operation to rename the file failed.
+
+This can happen because S3 directory listings and the store itself are not
+consistent: the list operation tends to lag changes in the store.
+It is possible that the file has been deleted.
+
+The fix here is to use S3Guard. We also recommend using 
applications/application
+options which do  not rename files when committing work or when copying data
+to S3, but instead write directly to the final destination.
+
 ## <a name="encryption"></a> S3 Server Side Encryption
 
 ### `AWSS3IOException` `KMS.NotFoundException` "Invalid arn" when using SSE-KMS
@@ -1275,3 +1325,40 @@ Please don't do that. Given that the emulated directory 
rename and delete operat
 are not atomic, even without retries, multiple S3 clients working with the same
 paths can interfere with each other
 
+### <a name="retries"></a> Tuning S3Guard open/rename Retry Policies
+
+When the S3A connector attempts to open a file for which it has an entry in
+its database, it will retry if the desired file is not found. This is
+done if:
+
+* No file is found in S3.
+* There is a file but its version or etag is not consistent with S3Guard table.
+
+These can be symptoms of S3's eventual consistency, hence the retries.
+They can also be caused by changes having been made to the S3 Store without
+SGuard being kept up to date.
+
+For this reason, the number of retry events are limited.
+
+```xml
+<property>
+  <name>fs.s3a.s3guard.consistency.retry.limit</name>
+  <value>7</value>
+  <description>
+    Number of times to retry attempts to read/open/copy files when
+    S3Guard believes a specific version of the file to be available,
+    but the S3 request does not find any version of a file, or a different
+    version.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.s3guard.consistency.retry.interval</name>
+  <value>2s</value>
+  <description>
+    Initial interval between attempts to retry operations while waiting for S3
+    to become consistent with the S3Guard data.
+    An exponential back-off is used here: every failure doubles the delay.
+  </description>
+</property>
+```
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEmptyDirectory.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEmptyDirectory.java
index c55be5b..f736be0 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEmptyDirectory.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEmptyDirectory.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.fs.s3a;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
+
 import org.junit.Test;
 
 import java.io.IOException;
@@ -77,7 +79,8 @@ public class ITestS3AEmptyDirectory extends 
AbstractS3ATestBase {
 
   private S3AFileStatus getS3AFileStatus(S3AFileSystem fs, Path p) throws
       IOException {
-    return fs.innerGetFileStatus(p, true /* want isEmptyDirectory value */);
+    return fs.innerGetFileStatus(p, true,
+        StatusProbeEnum.ALL);
   }
 
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java
index 279ec9c..c62176b 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,14 +87,23 @@ public class ITestS3AFileOperationCost extends 
AbstractS3ATestBase {
     Path dir = path("empty");
     fs.mkdirs(dir);
     resetMetricDiffs();
-    S3AFileStatus status = fs.innerGetFileStatus(dir, true);
-    assertSame("not empty: " + status, status.isEmptyDirectory(),
-        Tristate.TRUE);
+    S3AFileStatus status = fs.innerGetFileStatus(dir, true,
+        StatusProbeEnum.ALL);
+    assertSame("not empty: " + status, Tristate.TRUE,
+        status.isEmptyDirectory());
 
     if (!fs.hasMetadataStore()) {
       metadataRequests.assertDiffEquals(2);
     }
     listRequests.assertDiffEquals(0);
+
+    // but now only ask for the directories and the file check is skipped.
+    resetMetricDiffs();
+    fs.innerGetFileStatus(dir, false,
+        StatusProbeEnum.DIRECTORIES);
+    if (!fs.hasMetadataStore()) {
+      metadataRequests.assertDiffEquals(1);
+    }
   }
 
   @Test
@@ -128,7 +139,8 @@ public class ITestS3AFileOperationCost extends 
AbstractS3ATestBase {
     Path simpleFile = new Path(dir, "simple.txt");
     touch(fs, simpleFile);
     resetMetricDiffs();
-    S3AFileStatus status = fs.innerGetFileStatus(dir, true);
+    S3AFileStatus status = fs.innerGetFileStatus(dir, true,
+        StatusProbeEnum.ALL);
     if (status.isEmptyDirectory() == Tristate.TRUE) {
       // erroneous state
       String fsState = fs.toString();
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java
index 8e0fb30..20f25f2 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java
@@ -129,6 +129,10 @@ public class ITestS3ARemoteFileChanged extends 
AbstractS3ATestBase {
 
   private Optional<AmazonS3> originalS3Client = Optional.empty();
 
+  private static final String INCONSISTENT = "inconsistent";
+
+  private static final String CONSISTENT = "consistent";
+
   private enum InteractionType {
     READ,
     READ_AFTER_DELETE,
@@ -280,15 +284,21 @@ public class ITestS3ARemoteFileChanged extends 
AbstractS3ATestBase {
         CHANGE_DETECT_MODE,
         RETRY_LIMIT,
         RETRY_INTERVAL,
-        METADATASTORE_AUTHORITATIVE);
+        S3GUARD_CONSISTENCY_RETRY_LIMIT,
+        S3GUARD_CONSISTENCY_RETRY_INTERVAL,
+        METADATASTORE_AUTHORITATIVE,
+        AUTHORITATIVE_PATH);
     conf.set(CHANGE_DETECT_SOURCE, changeDetectionSource);
     conf.set(CHANGE_DETECT_MODE, changeDetectionMode);
     conf.setBoolean(METADATASTORE_AUTHORITATIVE, authMode);
+    conf.set(AUTHORITATIVE_PATH, "");
 
     // reduce retry limit so FileNotFoundException cases timeout faster,
     // speeding up the tests
     conf.setInt(RETRY_LIMIT, TEST_MAX_RETRIES);
     conf.set(RETRY_INTERVAL, TEST_RETRY_INTERVAL);
+    conf.setInt(S3GUARD_CONSISTENCY_RETRY_LIMIT, TEST_MAX_RETRIES);
+    conf.set(S3GUARD_CONSISTENCY_RETRY_INTERVAL, TEST_RETRY_INTERVAL);
 
     if (conf.getClass(S3_METADATA_STORE_IMPL, MetadataStore.class) ==
         NullMetadataStore.class) {
@@ -697,10 +707,8 @@ public class ITestS3ARemoteFileChanged extends 
AbstractS3ATestBase {
     Path sourcedir = new Path(basedir, "sourcedir");
     fs.mkdirs(sourcedir);
     Path destdir = new Path(basedir, "destdir");
-    String inconsistent = "inconsistent";
-    String consistent = "consistent";
-    Path inconsistentFile = new Path(sourcedir, inconsistent);
-    Path consistentFile = new Path(sourcedir, consistent);
+    Path inconsistentFile = new Path(sourcedir, INCONSISTENT);
+    Path consistentFile = new Path(sourcedir, CONSISTENT);
 
     // write the consistent data
     writeDataset(fs, consistentFile, TEST_DATA_BYTES, TEST_DATA_BYTES.length,
@@ -724,6 +732,82 @@ public class ITestS3ARemoteFileChanged extends 
AbstractS3ATestBase {
   }
 
   /**
+   * Tests doing a rename() on a file which is eventually visible.
+   */
+  @Test
+  public void testRenameEventuallyVisibleFile() throws Throwable {
+    requireS3Guard();
+    AmazonS3 s3ClientSpy = spyOnFilesystem();
+    Path basedir = path();
+    Path sourcedir = new Path(basedir, "sourcedir");
+    fs.mkdirs(sourcedir);
+    Path destdir = new Path(basedir, "destdir");
+    Path inconsistentFile = new Path(sourcedir, INCONSISTENT);
+    Path consistentFile = new Path(sourcedir, CONSISTENT);
+
+    // write the consistent data
+    writeDataset(fs, consistentFile, TEST_DATA_BYTES, TEST_DATA_BYTES.length,
+        1024, true, true);
+
+    Pair<Integer, Integer> counts = renameInconsistencyCounts(0);
+    int metadataInconsistencyCount = counts.getLeft();
+
+    writeDataset(fs, inconsistentFile, TEST_DATA_BYTES, TEST_DATA_BYTES.length,
+        1024, true, true);
+
+    stubTemporaryNotFound(s3ClientSpy, metadataInconsistencyCount,
+        inconsistentFile);
+
+    // must not fail since the inconsistency doesn't last through the
+    // configured retry limit
+    fs.rename(sourcedir, destdir);
+  }
+
+  /**
+   * Tests doing a rename() on a file which never quite appears will
+   * fail with a RemoteFileChangedException rather than have the exception
+   * downgraded to a failure.
+   */
+  @Test
+  public void testRenameMissingFile()
+      throws Throwable {
+    requireS3Guard();
+    AmazonS3 s3ClientSpy = spyOnFilesystem();
+    Path basedir = path();
+    Path sourcedir = new Path(basedir, "sourcedir");
+    fs.mkdirs(sourcedir);
+    Path destdir = new Path(basedir, "destdir");
+    Path inconsistentFile = new Path(sourcedir, INCONSISTENT);
+    Path consistentFile = new Path(sourcedir, CONSISTENT);
+
+    // write the consistent data
+    writeDataset(fs, consistentFile, TEST_DATA_BYTES, TEST_DATA_BYTES.length,
+        1024, true, true);
+
+    Pair<Integer, Integer> counts = renameInconsistencyCounts(0);
+    int metadataInconsistencyCount = counts.getLeft();
+
+    writeDataset(fs, inconsistentFile, TEST_DATA_BYTES, TEST_DATA_BYTES.length,
+        1024, true, true);
+
+    stubTemporaryNotFound(s3ClientSpy, metadataInconsistencyCount + 1,
+        inconsistentFile);
+
+    String expected = fs.hasMetadataStore()
+        ? RemoteFileChangedException.FILE_NEVER_FOUND
+        : RemoteFileChangedException.FILE_NOT_FOUND_SINGLE_ATTEMPT;
+    RemoteFileChangedException ex = intercept(
+        RemoteFileChangedException.class,
+        expected,
+        () -> fs.rename(sourcedir, destdir));
+    assertEquals("Path in " + ex,
+        inconsistentFile, ex.getPath());
+    if (!(ex.getCause() instanceof FileNotFoundException)) {
+      throw ex;
+    }
+  }
+
+  /**
    * Ensures a file can be renamed when there is no version metadata
    * (ETag, versionId).
    */
@@ -910,6 +994,9 @@ public class ITestS3ARemoteFileChanged extends 
AbstractS3ATestBase {
     LOG.debug("Updated file info: {}: version={}, etag={}", testpath,
         newStatus.getVersionId(), newStatus.getETag());
 
+    LOG.debug("File {} will be inconsistent for {} HEAD and {} GET requests",
+        testpath, getMetadataInconsistencyCount, getObjectInconsistencyCount);
+
     stubTemporaryUnavailable(s3ClientSpy, getObjectInconsistencyCount,
         testpath, newStatus);
 
@@ -919,6 +1006,8 @@ public class ITestS3ARemoteFileChanged extends 
AbstractS3ATestBase {
     if (versionCheckingIsOnServer()) {
       // only stub inconsistency when mode is server since no constraints that
       // should trigger inconsistency are passed in any other mode
+      LOG.debug("File {} will be inconsistent for {} COPY operations",
+          testpath, copyInconsistencyCount);
       stubTemporaryCopyInconsistency(s3ClientSpy, testpath, newStatus,
           copyInconsistencyCount);
     }
@@ -1231,6 +1320,18 @@ public class ITestS3ARemoteFileChanged extends 
AbstractS3ATestBase {
   }
 
   /**
+   * Match any getObjectMetadata request against a given path.
+   * @param path path to to match.
+   * @return the matching request.
+   */
+  private GetObjectMetadataRequest matchingMetadataRequest(Path path) {
+    return ArgumentMatchers.argThat(request -> {
+      return request.getBucketName().equals(fs.getBucket())
+          && request.getKey().equals(fs.pathToKey(path));
+    });
+  }
+
+  /**
    * Skip a test case if it needs S3Guard and the filesystem does
    * not have it.
    */
@@ -1290,4 +1391,42 @@ public class ITestS3ARemoteFileChanged extends 
AbstractS3ATestBase {
   private boolean versionCheckingIsOnServer() {
     return fs.getChangeDetectionPolicy().getMode() == Mode.Server;
   }
+
+  /**
+   * Stubs {@link AmazonS3#getObject(GetObjectRequest)}
+   * within s3ClientSpy to return throw a FileNotFoundException
+   * until inconsistentCallCount calls have been made.
+   * This simulates the condition where the S3 endpoint is caching
+   * a 404 request, or there is a tombstone in the way which has yet
+   * to clear.
+   * @param s3ClientSpy the spy to stub
+   * @param inconsistentCallCount the number of calls that should return the
+   * null response
+   * @param testpath the path of the object the stub should apply to
+   */
+  private void stubTemporaryNotFound(AmazonS3 s3ClientSpy,
+      int inconsistentCallCount, Path testpath) {
+    Answer<ObjectMetadata> notFound = new Answer<ObjectMetadata>() {
+      private int callCount = 0;
+
+      @Override
+      public ObjectMetadata answer(InvocationOnMock invocation
+      ) throws Throwable {
+        // simulates delayed visibility.
+        callCount++;
+        if (callCount <= inconsistentCallCount) {
+          LOG.info("Temporarily unavailable {} count {} of {}",
+              testpath, callCount, inconsistentCallCount);
+          logLocationAtDebug();
+          throw new FileNotFoundException(testpath.toString());
+        }
+        return (ObjectMetadata) invocation.callRealMethod();
+      }
+    };
+
+    // HEAD requests will fail
+    doAnswer(notFound).when(s3ClientSpy).getObjectMetadata(
+        matchingMetadataRequest(testpath));
+  }
+
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardEmptyDirs.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardEmptyDirs.java
index c85af29..ab81491 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardEmptyDirs.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardEmptyDirs.java
@@ -32,6 +32,7 @@ import org.assertj.core.api.Assertions;
 import org.junit.Test;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
 import org.apache.hadoop.fs.s3a.impl.StoreContext;
 import org.apache.hadoop.fs.s3a.s3guard.DDBPathMetadata;
 import org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore;
@@ -92,7 +93,7 @@ public class ITestS3GuardEmptyDirs extends 
AbstractS3ATestBase {
   }
 
   private S3AFileStatus getEmptyDirStatus(Path dir) throws IOException {
-    return getFileSystem().innerGetFileStatus(dir, true);
+    return getFileSystem().innerGetFileStatus(dir, true, StatusProbeEnum.ALL);
   }
 
   @Test
@@ -118,21 +119,22 @@ public class ITestS3GuardEmptyDirs extends 
AbstractS3ATestBase {
       Path newFile = path("existing-dir/new-file");
       touch(fs, newFile);
 
-      S3AFileStatus status = fs.innerGetFileStatus(existingDir, true);
+      S3AFileStatus status = fs.innerGetFileStatus(existingDir, true,
+          StatusProbeEnum.ALL);
       assertEquals("Should not be empty dir", Tristate.FALSE,
           status.isEmptyDirectory());
 
       // 3. Assert that removing the only file the MetadataStore witnessed
       // being created doesn't cause it to think the directory is now empty.
       fs.delete(newFile, false);
-      status = fs.innerGetFileStatus(existingDir, true);
+      status = fs.innerGetFileStatus(existingDir, true, StatusProbeEnum.ALL);
       assertEquals("Should not be empty dir", Tristate.FALSE,
           status.isEmptyDirectory());
 
       // 4. Assert that removing the final file, that existed "before"
       // MetadataStore started, *does* cause the directory to be marked empty.
       fs.delete(existingFile, false);
-      status = fs.innerGetFileStatus(existingDir, true);
+      status = fs.innerGetFileStatus(existingDir, true, StatusProbeEnum.ALL);
       assertEquals("Should be empty dir now", Tristate.TRUE,
           status.isEmptyDirectory());
     } finally {
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java
index f9e84e2..c9d083e 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.test.LambdaTestUtils;
 import static 
org.apache.hadoop.fs.contract.ContractTestUtils.readBytesToString;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
+import static org.apache.hadoop.fs.s3a.Constants.AUTHORITATIVE_PATH;
 import static 
org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
 import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
 import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
@@ -224,7 +225,8 @@ public class ITestS3GuardOutOfBandOperations extends 
AbstractS3ATestBase {
 
     removeBaseAndBucketOverrides(uri.getHost(), config,
         METADATASTORE_AUTHORITATIVE,
-        METADATASTORE_METADATA_TTL);
+        METADATASTORE_METADATA_TTL,
+        AUTHORITATIVE_PATH);
     config.setBoolean(METADATASTORE_AUTHORITATIVE, authoritativeMode);
     config.setLong(METADATASTORE_METADATA_TTL,
         DEFAULT_METADATASTORE_METADATA_TTL);
@@ -247,7 +249,8 @@ public class ITestS3GuardOutOfBandOperations extends 
AbstractS3ATestBase {
     removeBaseAndBucketOverrides(uri.getHost(), config,
         S3_METADATA_STORE_IMPL);
     removeBaseAndBucketOverrides(uri.getHost(), config,
-        METADATASTORE_AUTHORITATIVE);
+        METADATASTORE_AUTHORITATIVE,
+        AUTHORITATIVE_PATH);
     return createFS(uri, config);
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java
index 06032d1..fb539a8 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.s3a;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -331,7 +332,7 @@ public class ITestS3GuardTtl extends AbstractS3ATestBase {
 
       // listing will contain the tombstone with oldtime
       when(mockTimeProvider.getNow()).thenReturn(oldTime);
-      final DirListingMetadata fullDLM = ms.listChildren(baseDirPath);
+      final DirListingMetadata fullDLM = getDirListingMetadata(ms, 
baseDirPath);
       List<Path> containedPaths = fullDLM.getListing().stream()
           .map(pm -> pm.getFileStatus().getPath())
           .collect(Collectors.toList());
@@ -342,7 +343,8 @@ public class ITestS3GuardTtl extends AbstractS3ATestBase {
 
       // listing will be filtered, and won't contain the tombstone with oldtime
       when(mockTimeProvider.getNow()).thenReturn(newTime);
-      final DirListingMetadata filteredDLM = ms.listChildren(baseDirPath);
+      final DirListingMetadata filteredDLM = getDirListingMetadata(ms,
+          baseDirPath);
       containedPaths = filteredDLM.getListing().stream()
           .map(pm -> pm.getFileStatus().getPath())
           .collect(Collectors.toList());
@@ -356,4 +358,14 @@ public class ITestS3GuardTtl extends AbstractS3ATestBase {
     }
   }
 
+  protected DirListingMetadata getDirListingMetadata(final MetadataStore ms,
+      final Path baseDirPath) throws IOException {
+    final DirListingMetadata fullDLM = ms.listChildren(baseDirPath);
+    Assertions.assertThat(fullDLM)
+        .describedAs("Metastrore directory listing of %s",
+            baseDirPath)
+        .isNotNull();
+    return fullDLM;
+  }
+
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardWriteBack.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardWriteBack.java
index d5cd4d4..4ec7f46 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardWriteBack.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardWriteBack.java
@@ -132,6 +132,7 @@ public class ITestS3GuardWriteBack extends 
AbstractS3ATestBase {
 
     conf.set(Constants.S3_METADATA_STORE_IMPL, metastore);
     conf.setBoolean(METADATASTORE_AUTHORITATIVE, authoritativeMeta);
+    conf.unset(AUTHORITATIVE_PATH);
     S3AUtils.setBucketOption(conf, host,
         METADATASTORE_AUTHORITATIVE,
         Boolean.toString(authoritativeMeta));
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index e7f7f39..1a6de9e 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -35,6 +35,7 @@ import 
org.apache.hadoop.fs.s3a.auth.MarshalledCredentialBinding;
 import org.apache.hadoop.fs.s3a.auth.MarshalledCredentials;
 import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 
+import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
 import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
 import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreCapabilities;
 import org.apache.hadoop.fs.s3native.S3xLoginHelper;
@@ -483,6 +484,7 @@ public final class S3ATestUtils {
       LOG.debug("Enabling S3Guard, authoritative={}, implementation={}",
           authoritative, implClass);
       conf.setBoolean(METADATASTORE_AUTHORITATIVE, authoritative);
+      conf.set(AUTHORITATIVE_PATH, "");
       conf.set(S3_METADATA_STORE_IMPL, implClass);
       conf.setBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, true);
     }
@@ -855,7 +857,7 @@ public final class S3ATestUtils {
   public static S3AFileStatus getStatusWithEmptyDirFlag(
       final S3AFileSystem fs,
       final Path dir) throws IOException {
-    return fs.innerGetFileStatus(dir, true);
+    return fs.innerGetFileStatus(dir, true, StatusProbeEnum.ALL);
   }
 
   /**
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
index ea3fd84..1a51847 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.UUID;
 
 import com.google.common.collect.Sets;
+import org.assertj.core.api.Assertions;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -52,8 +53,10 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.DurationInfo;
 
+import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
 import static 
org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID;
+import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH;
 
 /**
  * Test for an MR Job with all the different committers.
@@ -112,6 +115,7 @@ public abstract class AbstractITCommitMRJob extends 
AbstractYarnClusterITest {
     String committerPath = "file:" + mockResultsFile;
     jobConf.set("mock-results-file", committerPath);
     jobConf.set(FS_S3A_COMMITTER_STAGING_UUID, commitUUID);
+    jobConf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, "/staging");
 
     mrJob.setInputFormatClass(TextInputFormat.class);
     FileInputFormat.addInputPath(mrJob, new Path(temp.getRoot().toURI()));
@@ -143,6 +147,10 @@ public abstract class AbstractITCommitMRJob extends 
AbstractYarnClusterITest {
     }
 
     waitForConsistency();
+    verifyPathExists(fs,
+        "MR job Output directory not found,"
+            + " even though the job did not report a failure",
+        outputPath);
     assertIsDirectory(outputPath);
     FileStatus[] results = fs.listStatus(outputPath,
         S3AUtils.HIDDEN_FILE_FILTER);
@@ -160,16 +168,17 @@ public abstract class AbstractITCommitMRJob extends 
AbstractYarnClusterITest {
         fs, "MR job");
     List<String> successFiles = successData.getFilenames();
     String commitData = successData.toString();
-    assertTrue("No filenames in " + commitData,
-        !successFiles.isEmpty());
+    assertFalse("No filenames in " + commitData,
+        successFiles.isEmpty());
 
-    assertEquals("Should commit the expected files",
-        expectedFiles, actualFiles);
+    Assertions.assertThat(actualFiles)
+        .describedAs("Committed files in the job output directory")
+        .containsExactlyInAnyOrderElementsOf(expectedFiles);
+
+    Assertions.assertThat(successFiles)
+        .describedAs("List of committed files in %s", commitData)
+        .containsExactlyInAnyOrderElementsOf(expectedKeys);
 
-    Set<String> summaryKeys = Sets.newHashSet();
-    summaryKeys.addAll(successFiles);
-    assertEquals("Summary keyset doesn't list the the expected paths "
-        + commitData, expectedKeys, summaryKeys);
     assertPathDoesNotExist("temporary dir",
         new Path(outputPath, CommitConstants.TEMPORARY));
     customPostExecutionValidation(outputPath, successData);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to