HADOOP-12533. Introduce FileNotFoundException in WASB for read and seek API. Contributed by Dushyanth.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3ce0a650 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3ce0a650 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3ce0a650 Branch: refs/heads/HDFS-7240 Commit: 3ce0a6502e78240f551c29bb27a2324ce359cd70 Parents: 259bea3 Author: cnauroth <cnaur...@apache.org> Authored: Mon Nov 2 09:38:37 2015 -0800 Committer: cnauroth <cnaur...@apache.org> Committed: Mon Nov 2 10:17:41 2015 -0800 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 + .../hadoop/fs/azure/NativeAzureFileSystem.java | 125 ++++++++++--- ...estFileSystemOperationExceptionHandling.java | 131 +++++++++++++ ...perationsExceptionHandlingMultiThreaded.java | 185 +++++++++++++++++++ 4 files changed, 422 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ce0a650/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 5c8daad..c8d60b0 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -1304,6 +1304,9 @@ Release 2.8.0 - UNRELEASED HADOOP-12519. hadoop-azure tests should avoid creating a metrics configuration file in the module root directory. (cnauroth) + HADOOP-12533. Introduce FileNotFoundException in WASB for read and seek API. + (Dushyanth via cnauroth) + OPTIMIZATIONS HADOOP-12051. ProtobufRpcEngine.invoke() should use Exception.toString() http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ce0a650/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java index 7c5a504..73bc6b3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.azure; import java.io.DataInputStream; +import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -49,6 +50,7 @@ import org.apache.hadoop.fs.BufferedFSInputStream; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -62,7 +64,6 @@ import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.codehaus.jackson.JsonNode; @@ -74,9 +75,11 @@ import org.codehaus.jackson.map.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.microsoft.azure.storage.AccessCondition; import com.microsoft.azure.storage.OperationContext; +import com.microsoft.azure.storage.StorageErrorCode; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlob; -import com.microsoft.azure.storage.core.*; +import com.microsoft.azure.storage.StorageErrorCodeStrings; +import org.apache.hadoop.io.IOUtils; /** * A {@link FileSystem} for reading and writing files stored on <a @@ -88,7 +91,6 @@ import com.microsoft.azure.storage.core.*; @InterfaceStability.Stable public class NativeAzureFileSystem extends FileSystem { private static final int USER_WX_PERMISION = 0300; - /** * A description of a folder rename operation, including the source and * destination keys, and descriptions of the files in the source folder. @@ -712,7 +714,7 @@ public class NativeAzureFileSystem extends FileSystem { * @returns int An integer corresponding to the byte read. */ @Override - public synchronized int read() throws IOException { + public synchronized int read() throws FileNotFoundException, IOException { try { int result = 0; result = in.read(); @@ -726,13 +728,21 @@ public class NativeAzureFileSystem extends FileSystem { // return result; } catch(IOException e) { - if (e.getCause() instanceof StorageException) { - StorageException storageExcp = (StorageException) e.getCause(); + + Throwable innerException = checkForAzureStorageException(e); + + if (innerException instanceof StorageException) { + LOG.error("Encountered Storage Exception for read on Blob : {}" + " Exception details: {} Error Code : {}", - key, e.getMessage(), storageExcp.getErrorCode()); + key, e, ((StorageException) innerException).getErrorCode()); + + if (isFileNotFoundException((StorageException) innerException)) { + throw new FileNotFoundException(String.format("%s is not found", key)); + } } - throw e; + + throw e; } } @@ -757,7 +767,7 @@ public class NativeAzureFileSystem extends FileSystem { * there is no more data because the end of stream is reached. */ @Override - public synchronized int read(byte[] b, int off, int len) throws IOException { + public synchronized int read(byte[] b, int off, int len) throws FileNotFoundException, IOException { try { int result = 0; result = in.read(b, off, len); @@ -772,29 +782,56 @@ public class NativeAzureFileSystem extends FileSystem { // Return to the caller with the result. return result; } catch(IOException e) { - if (e.getCause() instanceof StorageException) { - StorageException storageExcp = (StorageException) e.getCause(); + + Throwable innerException = checkForAzureStorageException(e); + + if (innerException instanceof StorageException) { + LOG.error("Encountered Storage Exception for read on Blob : {}" + " Exception details: {} Error Code : {}", - key, e.getMessage(), storageExcp.getErrorCode()); + key, e, ((StorageException) innerException).getErrorCode()); + + if (isFileNotFoundException((StorageException) innerException)) { + throw new FileNotFoundException(String.format("%s is not found", key)); + } } - throw e; + + throw e; } } @Override - public void close() throws IOException { - in.close(); - closed = true; + public synchronized void close() throws IOException { + if (!closed) { + closed = true; + IOUtils.closeStream(in); + in = null; + } } @Override - public synchronized void seek(long pos) throws IOException { - in.close(); - in = store.retrieve(key); - this.pos = in.skip(pos); - LOG.debug("Seek to position {}. Bytes skipped {}", pos, - this.pos); + public synchronized void seek(long pos) throws FileNotFoundException, EOFException, IOException { + try { + checkNotClosed(); + if (pos < 0) { + throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); + } + IOUtils.closeStream(in); + in = store.retrieve(key); + this.pos = in.skip(pos); + LOG.debug("Seek to position {}. Bytes skipped {}", pos, + this.pos); + } catch(IOException e) { + + Throwable innerException = checkForAzureStorageException(e); + + if (innerException instanceof StorageException + && isFileNotFoundException((StorageException) innerException)) { + throw new FileNotFoundException(String.format("%s is not found", key)); + } + + throw e; + } } @Override @@ -806,6 +843,50 @@ public class NativeAzureFileSystem extends FileSystem { public boolean seekToNewSource(long targetPos) throws IOException { return false; } + + /* + * Helper method to recursively check if the cause of the exception is + * a Azure storage exception. + */ + private Throwable checkForAzureStorageException(IOException e) { + + Throwable innerException = e.getCause(); + + while (innerException != null + && !(innerException instanceof StorageException)) { + innerException = innerException.getCause(); + } + + return innerException; + } + + /* + * Helper method to check if the AzureStorageException is + * because backing blob was not found. + */ + private boolean isFileNotFoundException(StorageException e) { + + String errorCode = ((StorageException) e).getErrorCode(); + if (errorCode != null + && (errorCode.equals(StorageErrorCodeStrings.BLOB_NOT_FOUND) + || errorCode.equals(StorageErrorCodeStrings.RESOURCE_NOT_FOUND) + || errorCode.equals(StorageErrorCode.BLOB_NOT_FOUND.toString()) + || errorCode.equals(StorageErrorCode.RESOURCE_NOT_FOUND.toString()))) { + + return true; + } + + return false; + } + + /* + * Helper method to check if a stream is closed. + */ + private void checkNotClosed() throws IOException { + if (closed) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + } } private class NativeAzureFsOutputStream extends OutputStream { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ce0a650/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationExceptionHandling.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationExceptionHandling.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationExceptionHandling.java new file mode 100644 index 0000000..35a1f50 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationExceptionHandling.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.io.FileNotFoundException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.Test; + + +public class TestFileSystemOperationExceptionHandling extends + NativeAzureFileSystemBaseTest { + + FSDataInputStream inputStream = null; + /* + * Helper method to create a PageBlob test storage account. + */ + private AzureBlobStorageTestAccount getPageBlobTestStorageAccount() + throws Exception { + + Configuration conf = new Configuration(); + + // Configure the page blob directories key so every file created is a page blob. + conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/"); + + // Configure the atomic rename directories key so every folder will have + // atomic rename applied. + conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/"); + return AzureBlobStorageTestAccount.create(conf); + } + + + /* + * Helper method that creates a InputStream to validate exceptions + * for various scenarios + */ + private void setupInputStreamToTest(AzureBlobStorageTestAccount testAccount) + throws Exception { + + fs = testAccount.getFileSystem(); + + // Step 1: Create a file and write dummy data. + Path testFilePath1 = new Path("test1.dat"); + Path testFilePath2 = new Path("test2.dat"); + FSDataOutputStream outputStream = fs.create(testFilePath1); + String testString = "This is a test string"; + outputStream.write(testString.getBytes()); + outputStream.close(); + + // Step 2: Open a read stream on the file. + inputStream = fs.open(testFilePath1); + + // Step 3: Rename the file + fs.rename(testFilePath1, testFilePath2); + } + + /* + * Tests a basic single threaded read scenario for Page blobs. + */ + @Test(expected=FileNotFoundException.class) + public void testSingleThreadedPageBlobReadScenario() throws Throwable { + AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount(); + setupInputStreamToTest(testAccount); + byte[] readBuffer = new byte[512]; + inputStream.read(readBuffer); + } + + /* + * Tests a basic single threaded seek scenario for Page blobs. + */ + @Test(expected=FileNotFoundException.class) + public void testSingleThreadedPageBlobSeekScenario() throws Throwable { + AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount(); + setupInputStreamToTest(testAccount); + inputStream.seek(5); + } + + /* + * Test a basic single thread seek scenario for Block blobs. + */ + @Test(expected=FileNotFoundException.class) + public void testSingleThreadBlockBlobSeekScenario() throws Throwable { + + AzureBlobStorageTestAccount testAccount = createTestAccount(); + setupInputStreamToTest(testAccount); + inputStream.seek(5); + } + + /* + * Tests a basic single threaded read scenario for Block blobs. + */ + @Test(expected=FileNotFoundException.class) + public void testSingledThreadBlockBlobReadScenario() throws Throwable{ + AzureBlobStorageTestAccount testAccount = createTestAccount(); + setupInputStreamToTest(testAccount); + byte[] readBuffer = new byte[512]; + inputStream.read(readBuffer); + } + + @After + public void tearDown() throws Exception { + if (inputStream != null) { + inputStream.close(); + } + } + + @Override + protected AzureBlobStorageTestAccount createTestAccount() throws Exception { + return AzureBlobStorageTestAccount.create(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ce0a650/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationsExceptionHandlingMultiThreaded.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationsExceptionHandlingMultiThreaded.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationsExceptionHandlingMultiThreaded.java new file mode 100644 index 0000000..0f91500 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationsExceptionHandlingMultiThreaded.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.io.FileNotFoundException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.Test; + +public class TestFileSystemOperationsExceptionHandlingMultiThreaded extends + NativeAzureFileSystemBaseTest { + + FSDataInputStream inputStream = null; + /* + * Helper method to creates an input stream to test various scenarios. + */ + private void getInputStreamToTest(FileSystem fs, Path testPath) throws Throwable { + + FSDataOutputStream outputStream = fs.create(testPath); + String testString = "This is a test string"; + outputStream.write(testString.getBytes()); + outputStream.close(); + + inputStream = fs.open(testPath); + } + + /* + * Test to validate correct exception is thrown for Multithreaded read + * scenario for block blobs + */ + @Test(expected=FileNotFoundException.class) + public void testMultiThreadedBlockBlobReadScenario() throws Throwable { + + AzureBlobStorageTestAccount testAccount = createTestAccount(); + fs = testAccount.getFileSystem(); + Path testFilePath1 = new Path("test1.dat"); + + getInputStreamToTest(fs, testFilePath1); + Thread renameThread = new Thread(new RenameThread(fs, testFilePath1)); + renameThread.start(); + + renameThread.join(); + + byte[] readBuffer = new byte[512]; + inputStream.read(readBuffer); + } + + /* + * Test to validate correct exception is thrown for Multithreaded seek + * scenario for block blobs + */ + + @Test(expected=FileNotFoundException.class) + public void testMultiThreadBlockBlobSeekScenario() throws Throwable { + + AzureBlobStorageTestAccount testAccount = createTestAccount(); + fs = testAccount.getFileSystem(); + Path testFilePath1 = new Path("test1.dat"); + + getInputStreamToTest(fs, testFilePath1); + Thread renameThread = new Thread(new RenameThread(fs, testFilePath1)); + renameThread.start(); + + renameThread.join(); + + inputStream.seek(5); + } + + /* + * Test to validate correct exception is thrown for Multithreaded read + * scenario for page blobs + */ + + @Test(expected=FileNotFoundException.class) + public void testMultiThreadedPageBlobReadScenario() throws Throwable { + + AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount(); + fs = testAccount.getFileSystem(); + Path testFilePath1 = new Path("test1.dat"); + + getInputStreamToTest(fs, testFilePath1); + Thread renameThread = new Thread(new RenameThread(fs, testFilePath1)); + renameThread.start(); + + renameThread.join(); + byte[] readBuffer = new byte[512]; + inputStream.read(readBuffer); + } + + /* + * Test to validate correct exception is thrown for Multithreaded seek + * scenario for page blobs + */ + + @Test(expected=FileNotFoundException.class) + public void testMultiThreadedPageBlobSeekScenario() throws Throwable { + + AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount(); + fs = testAccount.getFileSystem(); + Path testFilePath1 = new Path("test1.dat"); + + getInputStreamToTest(fs, testFilePath1); + Thread renameThread = new Thread(new RenameThread(fs, testFilePath1)); + renameThread.start(); + + renameThread.join(); + inputStream.seek(5); + } + + @Override + protected AzureBlobStorageTestAccount createTestAccount() throws Exception { + return AzureBlobStorageTestAccount.create(); + } + + /* + * Helper method to create a PageBlob test storage account. + */ + private AzureBlobStorageTestAccount getPageBlobTestStorageAccount() + throws Exception { + + Configuration conf = new Configuration(); + + // Configure the page blob directories key so every file created is a page blob. + conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/"); + + // Configure the atomic rename directories key so every folder will have + // atomic rename applied. + conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/"); + return AzureBlobStorageTestAccount.create(conf); + } + + @After + public void tearDown() throws Exception { + if (inputStream != null) { + inputStream.close(); + } + } +} + +/* + * Helper thread that just renames the test file. + */ +class RenameThread implements Runnable { + + private FileSystem fs; + private Path testPath; + private Path renamePath = new Path("test2.dat"); + + public RenameThread(FileSystem fs, Path testPath) { + this.fs = fs; + this.testPath = testPath; + } + + @Override + public void run(){ + try { + fs.rename(testPath, renamePath); + }catch (Exception e) { + // Swallowing the exception as the + // correctness of the test is controlled + // by the other thread + } + } +}