HADOOP-15086. NativeAzureFileSystem file rename is not atomic.
Contributed by Thomas Marquardt


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/52babbb4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/52babbb4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/52babbb4

Branch: refs/heads/YARN-6592
Commit: 52babbb4a0e3c89f2025bf6e9a1b51a96e8f8fb0
Parents: 76e664e
Author: Steve Loughran <ste...@apache.org>
Authored: Fri Dec 22 11:39:55 2017 +0000
Committer: Steve Loughran <ste...@apache.org>
Committed: Fri Dec 22 11:39:55 2017 +0000

----------------------------------------------------------------------
 .../fs/azure/AzureNativeFileSystemStore.java    | 16 +++--
 .../hadoop/fs/azure/NativeAzureFileSystem.java  | 25 +++++--
 .../fs/azure/NativeAzureFileSystemHelper.java   | 18 +++++
 .../hadoop/fs/azure/NativeFileSystemStore.java  |  4 ++
 .../fs/azure/SecureStorageInterfaceImpl.java    |  8 ++-
 .../hadoop/fs/azure/StorageInterface.java       |  2 +-
 .../hadoop/fs/azure/StorageInterfaceImpl.java   |  8 ++-
 .../azure/ITestNativeAzureFileSystemLive.java   | 72 ++++++++++++++++++++
 .../hadoop/fs/azure/MockStorageInterface.java   |  9 ++-
 9 files changed, 145 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/52babbb4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index f1031b4..9396a51 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -2605,12 +2605,18 @@ public class AzureNativeFileSystemStore implements 
NativeFileSystemStore {
 
   @Override
   public void rename(String srcKey, String dstKey) throws IOException {
-    rename(srcKey, dstKey, false, null);
+    rename(srcKey, dstKey, false, null, true);
   }
 
   @Override
   public void rename(String srcKey, String dstKey, boolean acquireLease,
-      SelfRenewingLease existingLease) throws IOException {
+                     SelfRenewingLease existingLease) throws IOException {
+    rename(srcKey, dstKey, acquireLease, existingLease, true);
+  }
+
+    @Override
+  public void rename(String srcKey, String dstKey, boolean acquireLease,
+      SelfRenewingLease existingLease, boolean overwriteDestination) throws 
IOException {
 
     LOG.debug("Moving {} to {}", srcKey, dstKey);
 
@@ -2672,7 +2678,8 @@ public class AzureNativeFileSystemStore implements 
NativeFileSystemStore {
       // a more intensive exponential retry policy when the cluster is getting
       // throttled.
       try {
-        dstBlob.startCopyFromBlob(srcBlob, null, getInstrumentedContext());
+        dstBlob.startCopyFromBlob(srcBlob, null,
+            getInstrumentedContext(), overwriteDestination);
       } catch (StorageException se) {
         if (se.getHttpStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE) {
           int copyBlobMinBackoff = sessionConfiguration.getInt(
@@ -2695,7 +2702,8 @@ public class AzureNativeFileSystemStore implements 
NativeFileSystemStore {
           options.setRetryPolicyFactory(new RetryExponentialRetry(
             copyBlobMinBackoff, copyBlobDeltaBackoff, copyBlobMaxBackoff,
             copyBlobMaxRetries));
-          dstBlob.startCopyFromBlob(srcBlob, options, 
getInstrumentedContext());
+          dstBlob.startCopyFromBlob(srcBlob, options,
+              getInstrumentedContext(), overwriteDestination);
         } else {
           throw se;
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52babbb4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index 85a46ea..3d44b20 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -3269,16 +3269,27 @@ public class NativeAzureFileSystem extends FileSystem {
     } else if (!srcMetadata.isDir()) {
       LOG.debug("Source {} found as a file, renaming.", src);
       try {
-        store.rename(srcKey, dstKey);
+        // HADOOP-15086 - file rename must ensure that the destination does
+        // not exist.  The fix is targeted to this call only to avoid
+        // regressions.  Other call sites are attempting to rename temporary
+        // files, redo a failed rename operation, or rename a directory
+        // recursively; for these cases the destination may exist.
+        store.rename(srcKey, dstKey, false, null,
+            false);
       } catch(IOException ex) {
-
         Throwable innerException = 
NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
 
-        if (innerException instanceof StorageException
-            && 
NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) 
innerException)) {
-
-          LOG.debug("BlobNotFoundException encountered. Failing rename", src);
-          return false;
+        if (innerException instanceof StorageException) {
+          if (NativeAzureFileSystemHelper.isFileNotFoundException(
+              (StorageException) innerException)) {
+            LOG.debug("BlobNotFoundException encountered. Failing rename", 
src);
+            return false;
+          }
+          if (NativeAzureFileSystemHelper.isBlobAlreadyExistsConflict(
+              (StorageException) innerException)) {
+            LOG.debug("Destination BlobAlreadyExists. Failing rename", src);
+            return false;
+          }
         }
 
         throw ex;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52babbb4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
index 57af1f8..754f343 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azure;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.net.HttpURLConnection;
 import java.util.Map;
 
 import com.google.common.base.Preconditions;
@@ -96,6 +97,23 @@ final class NativeAzureFileSystemHelper {
   }
 
   /*
+   * Determines if a conditional request failed because the blob already
+   * exists.
+   *
+   * @param e - the storage exception thrown by the failed operation.
+   *
+   * @return true if a conditional request failed because the blob already
+   * exists; otherwise, returns false.
+   */
+  static boolean isBlobAlreadyExistsConflict(StorageException e) {
+    if (e.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT
+        && 
StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(e.getErrorCode())) {
+      return true;
+    }
+    return false;
+  }
+
+  /*
    * Helper method that logs stack traces from all live threads.
    */
   public static void logAllLiveStackTraces() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52babbb4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
index 57a729d..b67ab1b 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
@@ -91,6 +91,10 @@ interface NativeFileSystemStore {
   void rename(String srcKey, String dstKey, boolean acquireLease, 
SelfRenewingLease existingLease)
       throws IOException;
 
+  void rename(String srcKey, String dstKey, boolean acquireLease,
+              SelfRenewingLease existingLease, boolean overwriteDestination)
+      throws IOException;
+
   /**
    * Delete all keys with the given prefix. Used for testing.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52babbb4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
index 7c2722e..0f54249 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
@@ -503,10 +503,14 @@ public class SecureStorageInterfaceImpl extends 
StorageInterface {
 
     @Override
     public void startCopyFromBlob(CloudBlobWrapper sourceBlob, 
BlobRequestOptions options,
-        OperationContext opContext)
+        OperationContext opContext, boolean overwriteDestination)
             throws StorageException, URISyntaxException {
+      AccessCondition dstAccessCondition =
+          overwriteDestination
+              ? null
+              : AccessCondition.generateIfNotExistsCondition();
       getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(),
-          null, null, options, opContext);
+          null, dstAccessCondition, options, opContext);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52babbb4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
index e03d731..dbb3849 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
@@ -406,7 +406,7 @@ abstract class StorageInterface {
      *
      */
     public abstract void startCopyFromBlob(CloudBlobWrapper sourceBlob,
-        BlobRequestOptions options, OperationContext opContext)
+        BlobRequestOptions options, OperationContext opContext, boolean 
overwriteDestination)
         throws StorageException, URISyntaxException;
     
     /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52babbb4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
index 41a4dbb..e600f9e 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
@@ -425,10 +425,14 @@ class StorageInterfaceImpl extends StorageInterface {
 
     @Override
     public void startCopyFromBlob(CloudBlobWrapper sourceBlob, 
BlobRequestOptions options,
-        OperationContext opContext)
+        OperationContext opContext, boolean overwriteDestination)
             throws StorageException, URISyntaxException {
+      AccessCondition dstAccessCondition =
+          overwriteDestination
+              ? null
+              : AccessCondition.generateIfNotExistsCondition();
       getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(),
-          null, null, options, opContext);
+          null, dstAccessCondition, options, opContext);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52babbb4/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
index f969968..9033674 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
@@ -18,8 +18,16 @@
 
 package org.apache.hadoop.fs.azure;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
@@ -40,6 +48,70 @@ public class ITestNativeAzureFileSystemLive extends
     return AzureBlobStorageTestAccount.create();
   }
 
+  /**
+   * Tests the rename file operation to ensure that when there are multiple
+   * attempts to rename a file to the same destination, only one rename
+   * operation is successful (HADOOP-15086).
+   */
+  @Test
+  public void testMultipleRenameFileOperationsToSameDestination()
+      throws IOException, InterruptedException {
+    final CountDownLatch latch = new CountDownLatch(1);
+    final AtomicInteger successfulRenameCount = new AtomicInteger(0);
+    final AtomicReference<IOException> unexpectedError = new 
AtomicReference<IOException>();
+    final Path dest = path("dest");
+
+    // Run 10 threads to rename multiple files to the same target path
+    List<Thread> threads = new ArrayList<>();
+
+    for (int i = 0; i < 10; i++) {
+      final int threadNumber = i;
+      Path src = path("test" + threadNumber);
+      threads.add(new Thread(() -> {
+        try {
+          latch.await(Long.MAX_VALUE, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+        }
+        try {
+          try (OutputStream output = fs.create(src)) {
+            output.write(("Source file number " + threadNumber).getBytes());
+          }
+
+          if (fs.rename(src, dest)) {
+            LOG.info("rename succeeded for thread " + threadNumber);
+            successfulRenameCount.incrementAndGet();
+          }
+        } catch (IOException e) {
+          unexpectedError.compareAndSet(null, e);
+          ContractTestUtils.fail("Exception unexpected", e);
+        }
+      }));
+    }
+
+    // Start each thread
+    threads.forEach(t -> t.start());
+
+    // Wait for threads to start and wait on latch
+    Thread.sleep(2000);
+
+    // Now start to rename
+    latch.countDown();
+
+    // Wait for all threads to complete
+    threads.forEach(t -> {
+      try {
+        t.join();
+      } catch (InterruptedException e) {
+      }
+    });
+
+    if (unexpectedError.get() != null) {
+      throw unexpectedError.get();
+    }
+    assertEquals(1, successfulRenameCount.get());
+    LOG.info("Success, only one rename operation succeeded!");
+  }
+
   @Test
   public void testLazyRenamePendingCanOverwriteExistingFile()
     throws Exception {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52babbb4/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
index e0ae7b4..d5f6437 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
@@ -425,7 +425,14 @@ public class MockStorageInterface extends StorageInterface 
{
 
     @Override
     public void startCopyFromBlob(CloudBlobWrapper sourceBlob, 
BlobRequestOptions options,
-        OperationContext opContext) throws StorageException, 
URISyntaxException {
+        OperationContext opContext, boolean overwriteDestination) throws 
StorageException, URISyntaxException {
+      if (!overwriteDestination && 
backingStore.exists(convertUriToDecodedString(uri))) {
+        throw new StorageException("BlobAlreadyExists",
+            "The blob already exists.",
+            HttpURLConnection.HTTP_CONFLICT,
+            null,
+            null);
+      }
       backingStore.copy(convertUriToDecodedString(sourceBlob.getUri()), 
convertUriToDecodedString(uri));
       //TODO: set the backingStore.properties.CopyState and
       //      update azureNativeFileSystemStore.waitForCopyToComplete


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to