HADOOP-15086. NativeAzureFileSystem file rename is not atomic. Contributed by Thomas Marquardt
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/52babbb4 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/52babbb4 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/52babbb4 Branch: refs/heads/YARN-6592 Commit: 52babbb4a0e3c89f2025bf6e9a1b51a96e8f8fb0 Parents: 76e664e Author: Steve Loughran <ste...@apache.org> Authored: Fri Dec 22 11:39:55 2017 +0000 Committer: Steve Loughran <ste...@apache.org> Committed: Fri Dec 22 11:39:55 2017 +0000 ---------------------------------------------------------------------- .../fs/azure/AzureNativeFileSystemStore.java | 16 +++-- .../hadoop/fs/azure/NativeAzureFileSystem.java | 25 +++++-- .../fs/azure/NativeAzureFileSystemHelper.java | 18 +++++ .../hadoop/fs/azure/NativeFileSystemStore.java | 4 ++ .../fs/azure/SecureStorageInterfaceImpl.java | 8 ++- .../hadoop/fs/azure/StorageInterface.java | 2 +- .../hadoop/fs/azure/StorageInterfaceImpl.java | 8 ++- .../azure/ITestNativeAzureFileSystemLive.java | 72 ++++++++++++++++++++ .../hadoop/fs/azure/MockStorageInterface.java | 9 ++- 9 files changed, 145 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/52babbb4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java index f1031b4..9396a51 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java @@ -2605,12 +2605,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { @Override public void rename(String srcKey, String dstKey) throws IOException { - rename(srcKey, dstKey, false, null); + rename(srcKey, dstKey, false, null, true); } @Override public void rename(String srcKey, String dstKey, boolean acquireLease, - SelfRenewingLease existingLease) throws IOException { + SelfRenewingLease existingLease) throws IOException { + rename(srcKey, dstKey, acquireLease, existingLease, true); + } + + @Override + public void rename(String srcKey, String dstKey, boolean acquireLease, + SelfRenewingLease existingLease, boolean overwriteDestination) throws IOException { LOG.debug("Moving {} to {}", srcKey, dstKey); @@ -2672,7 +2678,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { // a more intensive exponential retry policy when the cluster is getting // throttled. try { - dstBlob.startCopyFromBlob(srcBlob, null, getInstrumentedContext()); + dstBlob.startCopyFromBlob(srcBlob, null, + getInstrumentedContext(), overwriteDestination); } catch (StorageException se) { if (se.getHttpStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE) { int copyBlobMinBackoff = sessionConfiguration.getInt( @@ -2695,7 +2702,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { options.setRetryPolicyFactory(new RetryExponentialRetry( copyBlobMinBackoff, copyBlobDeltaBackoff, copyBlobMaxBackoff, copyBlobMaxRetries)); - dstBlob.startCopyFromBlob(srcBlob, options, getInstrumentedContext()); + dstBlob.startCopyFromBlob(srcBlob, options, + getInstrumentedContext(), overwriteDestination); } else { throw se; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/52babbb4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java index 85a46ea..3d44b20 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java @@ -3269,16 +3269,27 @@ public class NativeAzureFileSystem extends FileSystem { } else if (!srcMetadata.isDir()) { LOG.debug("Source {} found as a file, renaming.", src); try { - store.rename(srcKey, dstKey); + // HADOOP-15086 - file rename must ensure that the destination does + // not exist. The fix is targeted to this call only to avoid + // regressions. Other call sites are attempting to rename temporary + // files, redo a failed rename operation, or rename a directory + // recursively; for these cases the destination may exist. + store.rename(srcKey, dstKey, false, null, + false); } catch(IOException ex) { - Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); - if (innerException instanceof StorageException - && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { - - LOG.debug("BlobNotFoundException encountered. Failing rename", src); - return false; + if (innerException instanceof StorageException) { + if (NativeAzureFileSystemHelper.isFileNotFoundException( + (StorageException) innerException)) { + LOG.debug("BlobNotFoundException encountered. Failing rename", src); + return false; + } + if (NativeAzureFileSystemHelper.isBlobAlreadyExistsConflict( + (StorageException) innerException)) { + LOG.debug("Destination BlobAlreadyExists. Failing rename", src); + return false; + } } throw ex; http://git-wip-us.apache.org/repos/asf/hadoop/blob/52babbb4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java index 57af1f8..754f343 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java @@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azure; import java.io.EOFException; import java.io.IOException; +import java.net.HttpURLConnection; import java.util.Map; import com.google.common.base.Preconditions; @@ -96,6 +97,23 @@ final class NativeAzureFileSystemHelper { } /* + * Determines if a conditional request failed because the blob already + * exists. + * + * @param e - the storage exception thrown by the failed operation. + * + * @return true if a conditional request failed because the blob already + * exists; otherwise, returns false. + */ + static boolean isBlobAlreadyExistsConflict(StorageException e) { + if (e.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT + && StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(e.getErrorCode())) { + return true; + } + return false; + } + + /* * Helper method that logs stack traces from all live threads. */ public static void logAllLiveStackTraces() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/52babbb4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java index 57a729d..b67ab1b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java @@ -91,6 +91,10 @@ interface NativeFileSystemStore { void rename(String srcKey, String dstKey, boolean acquireLease, SelfRenewingLease existingLease) throws IOException; + void rename(String srcKey, String dstKey, boolean acquireLease, + SelfRenewingLease existingLease, boolean overwriteDestination) + throws IOException; + /** * Delete all keys with the given prefix. Used for testing. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/52babbb4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java index 7c2722e..0f54249 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java @@ -503,10 +503,14 @@ public class SecureStorageInterfaceImpl extends StorageInterface { @Override public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options, - OperationContext opContext) + OperationContext opContext, boolean overwriteDestination) throws StorageException, URISyntaxException { + AccessCondition dstAccessCondition = + overwriteDestination + ? null + : AccessCondition.generateIfNotExistsCondition(); getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(), - null, null, options, opContext); + null, dstAccessCondition, options, opContext); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/52babbb4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java index e03d731..dbb3849 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java @@ -406,7 +406,7 @@ abstract class StorageInterface { * */ public abstract void startCopyFromBlob(CloudBlobWrapper sourceBlob, - BlobRequestOptions options, OperationContext opContext) + BlobRequestOptions options, OperationContext opContext, boolean overwriteDestination) throws StorageException, URISyntaxException; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/52babbb4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java index 41a4dbb..e600f9e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java @@ -425,10 +425,14 @@ class StorageInterfaceImpl extends StorageInterface { @Override public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options, - OperationContext opContext) + OperationContext opContext, boolean overwriteDestination) throws StorageException, URISyntaxException { + AccessCondition dstAccessCondition = + overwriteDestination + ? null + : AccessCondition.generateIfNotExistsCondition(); getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(), - null, null, options, opContext); + null, dstAccessCondition, options, opContext); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/52babbb4/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java index f969968..9033674 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java @@ -18,8 +18,16 @@ package org.apache.hadoop.fs.azure; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; @@ -40,6 +48,70 @@ public class ITestNativeAzureFileSystemLive extends return AzureBlobStorageTestAccount.create(); } + /** + * Tests the rename file operation to ensure that when there are multiple + * attempts to rename a file to the same destination, only one rename + * operation is successful (HADOOP-15086). + */ + @Test + public void testMultipleRenameFileOperationsToSameDestination() + throws IOException, InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger successfulRenameCount = new AtomicInteger(0); + final AtomicReference<IOException> unexpectedError = new AtomicReference<IOException>(); + final Path dest = path("dest"); + + // Run 10 threads to rename multiple files to the same target path + List<Thread> threads = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + final int threadNumber = i; + Path src = path("test" + threadNumber); + threads.add(new Thread(() -> { + try { + latch.await(Long.MAX_VALUE, TimeUnit.SECONDS); + } catch (InterruptedException e) { + } + try { + try (OutputStream output = fs.create(src)) { + output.write(("Source file number " + threadNumber).getBytes()); + } + + if (fs.rename(src, dest)) { + LOG.info("rename succeeded for thread " + threadNumber); + successfulRenameCount.incrementAndGet(); + } + } catch (IOException e) { + unexpectedError.compareAndSet(null, e); + ContractTestUtils.fail("Exception unexpected", e); + } + })); + } + + // Start each thread + threads.forEach(t -> t.start()); + + // Wait for threads to start and wait on latch + Thread.sleep(2000); + + // Now start to rename + latch.countDown(); + + // Wait for all threads to complete + threads.forEach(t -> { + try { + t.join(); + } catch (InterruptedException e) { + } + }); + + if (unexpectedError.get() != null) { + throw unexpectedError.get(); + } + assertEquals(1, successfulRenameCount.get()); + LOG.info("Success, only one rename operation succeeded!"); + } + @Test public void testLazyRenamePendingCanOverwriteExistingFile() throws Exception { http://git-wip-us.apache.org/repos/asf/hadoop/blob/52babbb4/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java index e0ae7b4..d5f6437 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java @@ -425,7 +425,14 @@ public class MockStorageInterface extends StorageInterface { @Override public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options, - OperationContext opContext) throws StorageException, URISyntaxException { + OperationContext opContext, boolean overwriteDestination) throws StorageException, URISyntaxException { + if (!overwriteDestination && backingStore.exists(convertUriToDecodedString(uri))) { + throw new StorageException("BlobAlreadyExists", + "The blob already exists.", + HttpURLConnection.HTTP_CONFLICT, + null, + null); + } backingStore.copy(convertUriToDecodedString(sourceBlob.getUri()), convertUriToDecodedString(uri)); //TODO: set the backingStore.properties.CopyState and // update azureNativeFileSystemStore.waitForCopyToComplete --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org