http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContext.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContext.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContext.java deleted file mode 100644 index 34d78a5..0000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContext.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs.s3a.fileContext; - -import org.apache.hadoop.fs.TestFileContext; - -/** - * Implementation of TestFileContext for S3a - */ -public class TestS3AFileContext extends TestFileContext{ - -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextCreateMkdir.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextCreateMkdir.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextCreateMkdir.java deleted file mode 100644 index b0c4d84..0000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextCreateMkdir.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs.s3a.fileContext; - -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileContextCreateMkdirBaseTest; -import org.apache.hadoop.fs.s3a.S3ATestUtils; -import org.junit.Before; - -/** - * Extends FileContextCreateMkdirBaseTest for a S3a FileContext - */ -public class TestS3AFileContextCreateMkdir - extends FileContextCreateMkdirBaseTest { - - @Before - public void setUp() throws IOException, Exception { - Configuration conf = new Configuration(); - fc = S3ATestUtils.createTestFileContext(conf); - super.setUp(); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextMainOperations.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextMainOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextMainOperations.java deleted file mode 100644 index 4d200d1..0000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextMainOperations.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs.s3a.fileContext; - -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileContextMainOperationsBaseTest; -import org.apache.hadoop.fs.s3a.S3ATestUtils; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -/** - * S3A implementation of FileContextMainOperationsBaseTest - */ -public class TestS3AFileContextMainOperations - extends FileContextMainOperationsBaseTest { - - @Before - public void setUp() throws IOException, Exception { - Configuration conf = new Configuration(); - fc = S3ATestUtils.createTestFileContext(conf); - super.setUp(); - } - - @Override - protected boolean listCorruptedBlocksSupported() { - return false; - } - - @Test - @Ignore - public void testCreateFlagAppendExistingFile() throws IOException { - //append not supported, so test removed - } - - @Test - @Ignore - public void testCreateFlagCreateAppendExistingFile() throws IOException { - //append not supported, so test removed - } - - @Test - @Ignore - public void testSetVerifyChecksum() throws IOException { - //checksums ignored, so test removed - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextStatistics.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextStatistics.java deleted file mode 100644 index a9f4848..0000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextStatistics.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs.s3a.fileContext; - -import java.net.URI; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FCStatisticsBaseTest; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.s3a.S3ATestUtils; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; - -/** - * S3a implementation of FCStatisticsBaseTest - */ -public class TestS3AFileContextStatistics extends FCStatisticsBaseTest { - - @Before - public void setUp() throws Exception { - Configuration conf = new Configuration(); - fc = S3ATestUtils.createTestFileContext(conf); - fc.mkdir(fileContextTestHelper.getTestRootPath(fc, "test"), FileContext.DEFAULT_PERM, true); - } - - @After - public void tearDown() throws Exception { - fc.delete(fileContextTestHelper.getTestRootPath(fc, "test"), true); - } - - @Override - protected void verifyReadBytes(FileSystem.Statistics stats) { - // one blockSize for read, one for pread - Assert.assertEquals(2 * blockSize, stats.getBytesRead()); - } - - @Override - protected void verifyWrittenBytes(FileSystem.Statistics stats) { - //No extra bytes are written - Assert.assertEquals(blockSize, stats.getBytesWritten()); - } - - @Override - protected URI getFsUri() { - return fc.getHomeDirectory().toUri(); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextURI.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextURI.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextURI.java deleted file mode 100644 index 3da7b19..0000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextURI.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs.s3a.fileContext; - -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileContextURIBase; -import org.apache.hadoop.fs.s3a.S3ATestUtils; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -/** - * S3a implementation of FileContextURIBase - */ -public class TestS3AFileContextURI extends FileContextURIBase { - - @Before - public void setUp() throws IOException, Exception { - Configuration conf = new Configuration(); - fc1 = S3ATestUtils.createTestFileContext(conf); - fc2 = S3ATestUtils.createTestFileContext(conf); //different object, same FS - super.setUp(); - } - - @Test - @Ignore - public void testFileStatus() throws IOException { - //test disabled (the statistics tested with this method are not relevant for an S3FS) - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextUtil.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextUtil.java deleted file mode 100644 index 666f4c2..0000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/TestS3AFileContextUtil.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs.s3a.fileContext; - -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileContextUtilBase; -import org.apache.hadoop.fs.s3a.S3ATestUtils; -import org.junit.Before; - -/** - * S3A implementation of FileContextUtilBase - */ -public class TestS3AFileContextUtil extends FileContextUtilBase { - - @Before - public void setUp() throws IOException, Exception { - Configuration conf = new Configuration(); - fc = S3ATestUtils.createTestFileContext(conf); - super.setUp(); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteFilesOneByOne.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteFilesOneByOne.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteFilesOneByOne.java new file mode 100644 index 0000000..a375664 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteFilesOneByOne.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.scale; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.Constants; +import org.junit.Test; + +import java.io.IOException; + +/** + * Tests file deletion with multi-delete disabled. + */ +public class ITestS3ADeleteFilesOneByOne extends ITestS3ADeleteManyFiles { + + @Override + protected Configuration createConfiguration() { + Configuration configuration = super.createConfiguration(); + configuration.setBoolean(Constants.ENABLE_MULTI_DELETE, false); + return configuration; + } + + @Override + @Test + public void testOpenCreate() throws IOException { + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java new file mode 100644 index 0000000..208c491 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.scale; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +/** + * Test some scalable operations related to file renaming and deletion. + */ +public class ITestS3ADeleteManyFiles extends S3AScaleTestBase { + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3ADeleteManyFiles.class); + + /** + * CAUTION: If this test starts failing, please make sure that the + * {@link org.apache.hadoop.fs.s3a.Constants#MAX_THREADS} configuration is not + * set too low. Alternatively, consider reducing the + * <code>scale.test.operation.count</code> parameter in + * <code>getOperationCount()</code>. + * + * @see #getOperationCount() + */ + @Test + public void testBulkRenameAndDelete() throws Throwable { + final Path scaleTestDir = getTestPath(); + final Path srcDir = new Path(scaleTestDir, "src"); + final Path finalDir = new Path(scaleTestDir, "final"); + final long count = getOperationCount(); + ContractTestUtils.rm(fs, scaleTestDir, true, false); + + fs.mkdirs(srcDir); + fs.mkdirs(finalDir); + + int testBufferSize = fs.getConf() + .getInt(ContractTestUtils.IO_CHUNK_BUFFER_SIZE, + ContractTestUtils.DEFAULT_IO_CHUNK_BUFFER_SIZE); + // use Executor to speed up file creation + ExecutorService exec = Executors.newFixedThreadPool(16); + final ExecutorCompletionService<Boolean> completionService = + new ExecutorCompletionService<>(exec); + try { + final byte[] data = ContractTestUtils.dataset(testBufferSize, 'a', 'z'); + + for (int i = 0; i < count; ++i) { + final String fileName = "foo-" + i; + completionService.submit(new Callable<Boolean>() { + @Override + public Boolean call() throws IOException { + ContractTestUtils.createFile(fs, new Path(srcDir, fileName), + false, data); + return fs.exists(new Path(srcDir, fileName)); + } + }); + } + for (int i = 0; i < count; ++i) { + final Future<Boolean> future = completionService.take(); + try { + if (!future.get()) { + LOG.warn("cannot create file"); + } + } catch (ExecutionException e) { + LOG.warn("Error while uploading file", e.getCause()); + throw e; + } + } + } finally { + exec.shutdown(); + } + + int nSrcFiles = fs.listStatus(srcDir).length; + fs.rename(srcDir, finalDir); + assertEquals(nSrcFiles, fs.listStatus(finalDir).length); + ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename", + new Path(srcDir, "foo-" + 0)); + ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename", + new Path(srcDir, "foo-" + count / 2)); + ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename", + new Path(srcDir, "foo-" + (count - 1))); + ContractTestUtils.assertPathExists(fs, "not renamed to dest dir", + new Path(finalDir, "foo-" + 0)); + ContractTestUtils.assertPathExists(fs, "not renamed to dest dir", + new Path(finalDir, "foo-" + count/2)); + ContractTestUtils.assertPathExists(fs, "not renamed to dest dir", + new Path(finalDir, "foo-" + (count-1))); + + ContractTestUtils.assertDeleted(fs, finalDir, true, false); + } + + @Test + public void testOpenCreate() throws IOException { + Path dir = new Path("/tests3a"); + ContractTestUtils.createAndVerifyFile(fs, dir, 1024); + ContractTestUtils.createAndVerifyFile(fs, dir, 5 * 1024 * 1024); + ContractTestUtils.createAndVerifyFile(fs, dir, 20 * 1024 * 1024); + + + /* + Enable to test the multipart upload + try { + ContractTestUtils.createAndVerifyFile(fs, dir, + (long)6 * 1024 * 1024 * 1024); + } catch (IOException e) { + fail(e.getMessage()); + } + */ + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java new file mode 100644 index 0000000..b5f4eb3 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.scale; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Statistic; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +import static org.apache.hadoop.fs.s3a.Statistic.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.contract.ContractTestUtils.*; + +/** + * Test the performance of listing files/directories. + */ +public class ITestS3ADirectoryPerformance extends S3AScaleTestBase { + private static final Logger LOG = LoggerFactory.getLogger( + ITestS3ADirectoryPerformance.class); + + @Test + public void testListOperations() throws Throwable { + describe("Test recursive list operations"); + final Path scaleTestDir = getTestPath(); + final Path listDir = new Path(scaleTestDir, "lists"); + + // scale factor. + int scale = getConf().getInt(KEY_DIRECTORY_COUNT, DEFAULT_DIRECTORY_COUNT); + int width = scale; + int depth = scale; + int files = scale; + MetricDiff metadataRequests = new MetricDiff(fs, OBJECT_METADATA_REQUESTS); + MetricDiff listRequests = new MetricDiff(fs, OBJECT_LIST_REQUESTS); + MetricDiff listContinueRequests = + new MetricDiff(fs, OBJECT_CONTINUE_LIST_REQUESTS); + MetricDiff listStatusCalls = new MetricDiff(fs, INVOCATION_LIST_FILES); + MetricDiff getFileStatusCalls = + new MetricDiff(fs, INVOCATION_GET_FILE_STATUS); + NanoTimer createTimer = new NanoTimer(); + TreeScanResults created = + createSubdirs(fs, listDir, depth, width, files, 0); + // add some empty directories + int emptyDepth = 1 * scale; + int emptyWidth = 3 * scale; + + created.add(createSubdirs(fs, listDir, emptyDepth, emptyWidth, 0, + 0, "empty", "f-", "")); + createTimer.end("Time to create %s", created); + LOG.info("Time per operation: {}", + toHuman(createTimer.nanosPerOperation(created.totalCount()))); + printThenReset(LOG, + metadataRequests, + listRequests, + listContinueRequests, + listStatusCalls, + getFileStatusCalls); + + describe("Listing files via treewalk"); + try { + // Scan the directory via an explicit tree walk. + // This is the baseline for any listing speedups. + NanoTimer treeWalkTimer = new NanoTimer(); + TreeScanResults treewalkResults = treeWalk(fs, listDir); + treeWalkTimer.end("List status via treewalk of %s", created); + + printThenReset(LOG, + metadataRequests, + listRequests, + listContinueRequests, + listStatusCalls, + getFileStatusCalls); + assertEquals("Files found in listFiles(recursive=true) " + + " created=" + created + " listed=" + treewalkResults, + created.getFileCount(), treewalkResults.getFileCount()); + + describe("Listing files via listFiles(recursive=true)"); + // listFiles() does the recursion internally + NanoTimer listFilesRecursiveTimer = new NanoTimer(); + + TreeScanResults listFilesResults = new TreeScanResults( + fs.listFiles(listDir, true)); + + listFilesRecursiveTimer.end("listFiles(recursive=true) of %s", created); + assertEquals("Files found in listFiles(recursive=true) " + + " created=" + created + " listed=" + listFilesResults, + created.getFileCount(), listFilesResults.getFileCount()); + + // only two list operations should have taken place + print(LOG, + metadataRequests, + listRequests, + listContinueRequests, + listStatusCalls, + getFileStatusCalls); + assertEquals(listRequests.toString(), 2, listRequests.diff()); + reset(metadataRequests, + listRequests, + listContinueRequests, + listStatusCalls, + getFileStatusCalls); + + + } finally { + describe("deletion"); + // deletion at the end of the run + NanoTimer deleteTimer = new NanoTimer(); + fs.delete(listDir, true); + deleteTimer.end("Deleting directory tree"); + printThenReset(LOG, + metadataRequests, + listRequests, + listContinueRequests, + listStatusCalls, + getFileStatusCalls); + } + } + + @Test + public void testTimeToStatEmptyDirectory() throws Throwable { + describe("Time to stat an empty directory"); + Path path = new Path(getTestPath(), "empty"); + fs.mkdirs(path); + timeToStatPath(path); + } + + @Test + public void testTimeToStatNonEmptyDirectory() throws Throwable { + describe("Time to stat a non-empty directory"); + Path path = new Path(getTestPath(), "dir"); + fs.mkdirs(path); + touch(fs, new Path(path, "file")); + timeToStatPath(path); + } + + @Test + public void testTimeToStatFile() throws Throwable { + describe("Time to stat a simple file"); + Path path = new Path(getTestPath(), "file"); + touch(fs, path); + timeToStatPath(path); + } + + @Test + public void testTimeToStatRoot() throws Throwable { + describe("Time to stat the root path"); + timeToStatPath(new Path("/")); + } + + private void timeToStatPath(Path path) throws IOException { + describe("Timing getFileStatus(\"%s\")", path); + MetricDiff metadataRequests = + new MetricDiff(fs, Statistic.OBJECT_METADATA_REQUESTS); + MetricDiff listRequests = + new MetricDiff(fs, Statistic.OBJECT_LIST_REQUESTS); + long attempts = getOperationCount(); + NanoTimer timer = new NanoTimer(); + for (long l = 0; l < attempts; l++) { + fs.getFileStatus(path); + } + timer.end("Time to execute %d getFileStatusCalls", attempts); + LOG.info("Time per call: {}", toHuman(timer.nanosPerOperation(attempts))); + LOG.info("metadata: {}", metadataRequests); + LOG.info("metadata per operation {}", metadataRequests.diff() / attempts); + LOG.info("listObjects: {}", listRequests); + LOG.info("listObjects: per operation {}", listRequests.diff() / attempts); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java new file mode 100644 index 0000000..e2163c5 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.scale; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3AInputPolicy; +import org.apache.hadoop.fs.s3a.S3AInputStream; +import org.apache.hadoop.fs.s3a.S3AInstrumentation; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.util.LineReader; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.*; +import static org.apache.hadoop.fs.s3a.Constants.*; + +/** + * Look at the performance of S3a operations. + */ +public class ITestS3AInputStreamPerformance extends S3AScaleTestBase { + private static final Logger LOG = LoggerFactory.getLogger( + ITestS3AInputStreamPerformance.class); + + private S3AFileSystem s3aFS; + private Path testData; + private S3AFileStatus testDataStatus; + private FSDataInputStream in; + private S3AInstrumentation.InputStreamStatistics streamStatistics; + public static final int BLOCK_SIZE = 32 * 1024; + public static final int BIG_BLOCK_SIZE = 256 * 1024; + + /** Tests only run if the there is a named test file that can be read. */ + private boolean testDataAvailable = true; + private String assumptionMessage = "test file"; + + /** + * Open the FS and the test data. The input stream is always set up here. + * @throws IOException IO Problems. + */ + @Before + public void openFS() throws IOException { + Configuration conf = getConf(); + conf.setInt(SOCKET_SEND_BUFFER, 16 * 1024); + conf.setInt(SOCKET_RECV_BUFFER, 16 * 1024); + String testFile = conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE); + if (testFile.isEmpty()) { + assumptionMessage = "Empty test property: " + KEY_CSVTEST_FILE; + testDataAvailable = false; + } else { + S3ATestUtils.useCSVDataEndpoint(conf); + testData = new Path(testFile); + Path path = this.testData; + bindS3aFS(path); + try { + testDataStatus = s3aFS.getFileStatus(this.testData); + } catch (IOException e) { + LOG.warn("Failed to read file {} specified in {}", + testFile, KEY_CSVTEST_FILE, e); + throw e; + } + } + } + + private void bindS3aFS(Path path) throws IOException { + s3aFS = (S3AFileSystem) FileSystem.newInstance(path.toUri(), getConf()); + } + + /** + * Cleanup: close the stream, close the FS. + */ + @After + public void cleanup() { + describe("cleanup"); + IOUtils.closeStream(in); + IOUtils.closeStream(s3aFS); + } + + /** + * Declare that the test requires the CSV test dataset. + */ + private void requireCSVTestData() { + Assume.assumeTrue(assumptionMessage, testDataAvailable); + } + + /** + * Open the test file with the read buffer specified in the setting. + * {@link #KEY_READ_BUFFER_SIZE}; use the {@code Normal} policy + * @return the stream, wrapping an S3a one + * @throws IOException IO problems + */ + FSDataInputStream openTestFile() throws IOException { + return openTestFile(S3AInputPolicy.Normal, 0); + } + + /** + * Open the test file with the read buffer specified in the setting + * {@link #KEY_READ_BUFFER_SIZE}. + * This includes the {@link #requireCSVTestData()} assumption; so + * if called before any FS op, will automatically skip the test + * if the CSV file is absent. + * + * @param inputPolicy input policy to use + * @param readahead readahead/buffer size + * @return the stream, wrapping an S3a one + * @throws IOException IO problems + */ + FSDataInputStream openTestFile(S3AInputPolicy inputPolicy, long readahead) + throws IOException { + requireCSVTestData(); + return openDataFile(s3aFS, this.testData, inputPolicy, readahead); + } + + /** + * Open a test file with the read buffer specified in the setting + * {@link #KEY_READ_BUFFER_SIZE}. + * + * @param path path to open + * @param inputPolicy input policy to use + * @param readahead readahead/buffer size + * @return the stream, wrapping an S3a one + * @throws IOException IO problems + */ + private FSDataInputStream openDataFile(S3AFileSystem fs, + Path path, + S3AInputPolicy inputPolicy, + long readahead) throws IOException { + int bufferSize = getConf().getInt(KEY_READ_BUFFER_SIZE, + DEFAULT_READ_BUFFER_SIZE); + S3AInputPolicy policy = fs.getInputPolicy(); + fs.setInputPolicy(inputPolicy); + try { + FSDataInputStream stream = fs.open(path, bufferSize); + if (readahead >= 0) { + stream.setReadahead(readahead); + } + streamStatistics = getInputStreamStatistics(stream); + return stream; + } finally { + fs.setInputPolicy(policy); + } + } + + /** + * Assert that the stream was only ever opened once. + */ + protected void assertStreamOpenedExactlyOnce() { + assertOpenOperationCount(1); + } + + /** + * Make an assertion count about the number of open operations. + * @param expected the expected number + */ + private void assertOpenOperationCount(long expected) { + assertEquals("open operations in\n" + in, + expected, streamStatistics.openOperations); + } + + /** + * Log how long an IOP took, by dividing the total time by the + * count of operations, printing in a human-readable form. + * @param operation operation being measured + * @param timer timing data + * @param count IOP count. + */ + protected void logTimePerIOP(String operation, + NanoTimer timer, + long count) { + LOG.info("Time per {}: {} nS", + operation, toHuman(timer.duration() / count)); + } + + @Test + public void testTimeToOpenAndReadWholeFileBlocks() throws Throwable { + requireCSVTestData(); + int blockSize = _1MB; + describe("Open the test file %s and read it in blocks of size %d", + testData, blockSize); + long len = testDataStatus.getLen(); + in = openTestFile(); + byte[] block = new byte[blockSize]; + NanoTimer timer2 = new NanoTimer(); + long count = 0; + // implicitly rounding down here + long blockCount = len / blockSize; + for (long i = 0; i < blockCount; i++) { + int offset = 0; + int remaining = blockSize; + NanoTimer blockTimer = new NanoTimer(); + int reads = 0; + while (remaining > 0) { + int bytesRead = in.read(block, offset, remaining); + reads++; + if (bytesRead == 1) { + break; + } + remaining -= bytesRead; + offset += bytesRead; + count += bytesRead; + } + blockTimer.end("Reading block %d in %d reads", i, reads); + } + timer2.end("Time to read %d bytes in %d blocks", len, blockCount); + bandwidth(timer2, count); + logStreamStatistics(); + } + + @Test + public void testLazySeekEnabled() throws Throwable { + describe("Verify that seeks do not trigger any IO"); + in = openTestFile(); + long len = testDataStatus.getLen(); + NanoTimer timer = new NanoTimer(); + long blockCount = len / BLOCK_SIZE; + for (long i = 0; i < blockCount; i++) { + in.seek(in.getPos() + BLOCK_SIZE - 1); + } + in.seek(0); + blockCount++; + timer.end("Time to execute %d seeks", blockCount); + logTimePerIOP("seek()", timer, blockCount); + logStreamStatistics(); + assertOpenOperationCount(0); + assertEquals("bytes read", 0, streamStatistics.bytesRead); + } + + @Test + public void testReadaheadOutOfRange() throws Throwable { + try { + in = openTestFile(); + in.setReadahead(-1L); + fail("Stream should have rejected the request "+ in); + } catch (IllegalArgumentException e) { + // expected + } + } + + @Test + public void testReadWithNormalPolicy() throws Throwable { + describe("Read big blocks with a big readahead"); + executeSeekReadSequence(BIG_BLOCK_SIZE, BIG_BLOCK_SIZE * 2, + S3AInputPolicy.Normal); + assertStreamOpenedExactlyOnce(); + } + + @Test + public void testDecompressionSequential128K() throws Throwable { + describe("Decompress with a 128K readahead"); + executeDecompression(128 * 1024, S3AInputPolicy.Sequential); + assertStreamOpenedExactlyOnce(); + } + + /** + * Execute a decompression + line read with the given input policy. + * @param readahead byte readahead + * @param inputPolicy read policy + * @throws IOException IO Problems + */ + private void executeDecompression(long readahead, + S3AInputPolicy inputPolicy) throws IOException { + CompressionCodecFactory factory + = new CompressionCodecFactory(getConf()); + CompressionCodec codec = factory.getCodec(testData); + long bytesRead = 0; + int lines = 0; + + FSDataInputStream objectIn = openTestFile(inputPolicy, readahead); + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + try (LineReader lineReader = new LineReader( + codec.createInputStream(objectIn), getConf())) { + Text line = new Text(); + int read; + while ((read = lineReader.readLine(line)) > 0) { + bytesRead += read; + lines++; + } + } catch (EOFException eof) { + // done + } + timer.end("Time to read %d lines [%d bytes expanded, %d raw]" + + " with readahead = %d", + lines, + bytesRead, + testDataStatus.getLen(), + readahead); + logTimePerIOP("line read", timer, lines); + logStreamStatistics(); + } + + private void logStreamStatistics() { + LOG.info(String.format("Stream Statistics%n{}"), streamStatistics); + } + + /** + * Execute a seek+read sequence. + * @param blockSize block size for seeks + * @param readahead what the readahead value of the stream should be + * @throws IOException IO problems + */ + protected void executeSeekReadSequence(long blockSize, + long readahead, + S3AInputPolicy policy) throws IOException { + in = openTestFile(policy, readahead); + long len = testDataStatus.getLen(); + NanoTimer timer = new NanoTimer(); + long blockCount = len / blockSize; + LOG.info("Reading {} blocks, readahead = {}", + blockCount, readahead); + for (long i = 0; i < blockCount; i++) { + in.seek(in.getPos() + blockSize - 1); + // this is the read + assertTrue(in.read() >= 0); + } + timer.end("Time to execute %d seeks of distance %d with readahead = %d", + blockCount, + blockSize, + readahead); + logTimePerIOP("seek(pos + " + blockCount+"); read()", timer, blockCount); + LOG.info("Effective bandwidth {} MB/S", + timer.bandwidthDescription(streamStatistics.bytesRead - + streamStatistics.bytesSkippedOnSeek)); + logStreamStatistics(); + } + + public static final int _4K = 4 * 1024; + public static final int _8K = 8 * 1024; + public static final int _16K = 16 * 1024; + public static final int _32K = 32 * 1024; + public static final int _64K = 64 * 1024; + public static final int _128K = 128 * 1024; + public static final int _256K = 256 * 1024; + public static final int _1MB = 1024 * 1024; + public static final int _2MB = 2 * _1MB; + public static final int _10MB = _1MB * 10; + public static final int _5MB = _1MB * 5; + + private static final int[][] RANDOM_IO_SEQUENCE = { + {_2MB, _128K}, + {_128K, _128K}, + {_5MB, _64K}, + {_1MB, _1MB}, + }; + + @Test + public void testRandomIORandomPolicy() throws Throwable { + executeRandomIO(S3AInputPolicy.Random, (long) RANDOM_IO_SEQUENCE.length); + assertEquals("streams aborted in " + streamStatistics, + 0, streamStatistics.aborted); + } + + @Test + public void testRandomIONormalPolicy() throws Throwable { + long expectedOpenCount = RANDOM_IO_SEQUENCE.length; + executeRandomIO(S3AInputPolicy.Normal, expectedOpenCount); + assertEquals("streams aborted in " + streamStatistics, + 4, streamStatistics.aborted); + } + + /** + * Execute the random IO {@code readFully(pos, bytes[])} sequence defined by + * {@link #RANDOM_IO_SEQUENCE}. The stream is closed afterwards; that's used + * in the timing too + * @param policy read policy + * @param expectedOpenCount expected number of stream openings + * @throws IOException IO problems + * @return the timer + */ + private ContractTestUtils.NanoTimer executeRandomIO(S3AInputPolicy policy, + long expectedOpenCount) + throws IOException { + describe("Random IO with policy \"%s\"", policy); + byte[] buffer = new byte[_1MB]; + long totalBytesRead = 0; + + in = openTestFile(policy, 0); + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + for (int[] action : RANDOM_IO_SEQUENCE) { + int position = action[0]; + int range = action[1]; + in.readFully(position, buffer, 0, range); + totalBytesRead += range; + } + int reads = RANDOM_IO_SEQUENCE.length; + timer.end("Time to execute %d reads of total size %d bytes", + reads, + totalBytesRead); + in.close(); + assertOpenOperationCount(expectedOpenCount); + logTimePerIOP("byte read", timer, totalBytesRead); + LOG.info("Effective bandwidth {} MB/S", + timer.bandwidthDescription(streamStatistics.bytesRead - + streamStatistics.bytesSkippedOnSeek)); + logStreamStatistics(); + return timer; + } + + S3AInputStream getS3aStream() { + return (S3AInputStream) in.getWrappedStream(); + } + + @Test + public void testRandomReadOverBuffer() throws Throwable { + describe("read over a buffer, making sure that the requests" + + " spans readahead ranges"); + int datasetLen = _32K; + Path dataFile = new Path(getTestPath(), "testReadOverBuffer.bin"); + byte[] sourceData = dataset(datasetLen, 0, 64); + // relies on the field 'fs' referring to the R/W FS + writeDataset(fs, dataFile, sourceData, datasetLen, _16K, true); + byte[] buffer = new byte[datasetLen]; + int readahead = _8K; + int halfReadahead = _4K; + in = openDataFile(fs, dataFile, S3AInputPolicy.Random, readahead); + + LOG.info("Starting initial reads"); + S3AInputStream s3aStream = getS3aStream(); + assertEquals(readahead, s3aStream.getReadahead()); + byte[] oneByte = new byte[1]; + assertEquals(1, in.read(0, oneByte, 0, 1)); + // make some assertions about the current state + assertEquals("remaining in\n" + in, + readahead - 1, s3aStream.remainingInCurrentRequest()); + assertEquals("range start in\n" + in, + 0, s3aStream.getContentRangeStart()); + assertEquals("range finish in\n" + in, + readahead, s3aStream.getContentRangeFinish()); + + assertStreamOpenedExactlyOnce(); + + describe("Starting sequence of positioned read calls over\n%s", in); + NanoTimer readTimer = new NanoTimer(); + int currentPos = halfReadahead; + int offset = currentPos; + int bytesRead = 0; + int readOps = 0; + + // make multiple read() calls + while (bytesRead < halfReadahead) { + int length = buffer.length - offset; + int read = in.read(currentPos, buffer, offset, length); + bytesRead += read; + offset += read; + readOps++; + assertEquals("open operations on request #" + readOps + + " after reading " + bytesRead + + " current position in stream " + currentPos + + " in\n" + fs + + "\n " + in, + 1, streamStatistics.openOperations); + for (int i = currentPos; i < currentPos + read; i++) { + assertEquals("Wrong value from byte " + i, + sourceData[i], buffer[i]); + } + currentPos += read; + } + assertStreamOpenedExactlyOnce(); + // assert at the end of the original block + assertEquals(readahead, currentPos); + readTimer.end("read %d in %d operations", bytesRead, readOps); + bandwidth(readTimer, bytesRead); + LOG.info("Time per byte(): {} nS", + toHuman(readTimer.nanosPerOperation(bytesRead))); + LOG.info("Time per read(): {} nS", + toHuman(readTimer.nanosPerOperation(readOps))); + + describe("read last byte"); + // read one more + int read = in.read(currentPos, buffer, bytesRead, 1); + assertTrue("-1 from last read", read >= 0); + assertOpenOperationCount(2); + assertEquals("Wrong value from read ", sourceData[currentPos], + (int) buffer[currentPos]); + currentPos++; + + + // now scan all the way to the end of the file, using single byte read() + // calls + describe("read() to EOF over \n%s", in); + long readCount = 0; + NanoTimer timer = new NanoTimer(); + LOG.info("seeking"); + in.seek(currentPos); + LOG.info("reading"); + while(currentPos < datasetLen) { + int r = in.read(); + assertTrue("Negative read() at position " + currentPos + " in\n" + in, + r >= 0); + buffer[currentPos] = (byte)r; + assertEquals("Wrong value from read from\n" + in, + sourceData[currentPos], r); + currentPos++; + readCount++; + } + timer.end("read %d bytes", readCount); + bandwidth(timer, readCount); + LOG.info("Time per read(): {} nS", + toHuman(timer.nanosPerOperation(readCount))); + + assertEquals("last read in " + in, -1, in.read()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java deleted file mode 100644 index 5e07dcb..0000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.scale; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.contract.ContractTestUtils; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -/** - * Test some scalable operations related to file renaming and deletion. - */ -public class TestS3ADeleteManyFiles extends S3AScaleTestBase { - private static final Logger LOG = - LoggerFactory.getLogger(TestS3ADeleteManyFiles.class); - - /** - * CAUTION: If this test starts failing, please make sure that the - * {@link org.apache.hadoop.fs.s3a.Constants#MAX_THREADS} configuration is not - * set too low. Alternatively, consider reducing the - * <code>scale.test.operation.count</code> parameter in - * <code>getOperationCount()</code>. - * - * @see #getOperationCount() - */ - @Test - public void testBulkRenameAndDelete() throws Throwable { - final Path scaleTestDir = getTestPath(); - final Path srcDir = new Path(scaleTestDir, "src"); - final Path finalDir = new Path(scaleTestDir, "final"); - final long count = getOperationCount(); - ContractTestUtils.rm(fs, scaleTestDir, true, false); - - fs.mkdirs(srcDir); - fs.mkdirs(finalDir); - - int testBufferSize = fs.getConf() - .getInt(ContractTestUtils.IO_CHUNK_BUFFER_SIZE, - ContractTestUtils.DEFAULT_IO_CHUNK_BUFFER_SIZE); - // use Executor to speed up file creation - ExecutorService exec = Executors.newFixedThreadPool(16); - final ExecutorCompletionService<Boolean> completionService = - new ExecutorCompletionService<>(exec); - try { - final byte[] data = ContractTestUtils.dataset(testBufferSize, 'a', 'z'); - - for (int i = 0; i < count; ++i) { - final String fileName = "foo-" + i; - completionService.submit(new Callable<Boolean>() { - @Override - public Boolean call() throws IOException { - ContractTestUtils.createFile(fs, new Path(srcDir, fileName), - false, data); - return fs.exists(new Path(srcDir, fileName)); - } - }); - } - for (int i = 0; i < count; ++i) { - final Future<Boolean> future = completionService.take(); - try { - if (!future.get()) { - LOG.warn("cannot create file"); - } - } catch (ExecutionException e) { - LOG.warn("Error while uploading file", e.getCause()); - throw e; - } - } - } finally { - exec.shutdown(); - } - - int nSrcFiles = fs.listStatus(srcDir).length; - fs.rename(srcDir, finalDir); - assertEquals(nSrcFiles, fs.listStatus(finalDir).length); - ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename", - new Path(srcDir, "foo-" + 0)); - ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename", - new Path(srcDir, "foo-" + count / 2)); - ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename", - new Path(srcDir, "foo-" + (count - 1))); - ContractTestUtils.assertPathExists(fs, "not renamed to dest dir", - new Path(finalDir, "foo-" + 0)); - ContractTestUtils.assertPathExists(fs, "not renamed to dest dir", - new Path(finalDir, "foo-" + count/2)); - ContractTestUtils.assertPathExists(fs, "not renamed to dest dir", - new Path(finalDir, "foo-" + (count-1))); - - ContractTestUtils.assertDeleted(fs, finalDir, true, false); - } - - @Test - public void testOpenCreate() throws IOException { - Path dir = new Path("/tests3a"); - ContractTestUtils.createAndVerifyFile(fs, dir, 1024); - ContractTestUtils.createAndVerifyFile(fs, dir, 5 * 1024 * 1024); - ContractTestUtils.createAndVerifyFile(fs, dir, 20 * 1024 * 1024); - - - /* - Enable to test the multipart upload - try { - ContractTestUtils.createAndVerifyFile(fs, dir, - (long)6 * 1024 * 1024 * 1024); - } catch (IOException e) { - fail(e.getMessage()); - } - */ - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADirectoryPerformance.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADirectoryPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADirectoryPerformance.java deleted file mode 100644 index 35ea3ad..0000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADirectoryPerformance.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.scale; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.Statistic; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -import static org.apache.hadoop.fs.s3a.Statistic.*; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; -import static org.apache.hadoop.fs.contract.ContractTestUtils.*; - -/** - * Test the performance of listing files/directories. - */ -public class TestS3ADirectoryPerformance extends S3AScaleTestBase { - private static final Logger LOG = LoggerFactory.getLogger( - TestS3ADirectoryPerformance.class); - - @Test - public void testListOperations() throws Throwable { - describe("Test recursive list operations"); - final Path scaleTestDir = getTestPath(); - final Path listDir = new Path(scaleTestDir, "lists"); - - // scale factor. - int scale = getConf().getInt(KEY_DIRECTORY_COUNT, DEFAULT_DIRECTORY_COUNT); - int width = scale; - int depth = scale; - int files = scale; - MetricDiff metadataRequests = new MetricDiff(fs, OBJECT_METADATA_REQUESTS); - MetricDiff listRequests = new MetricDiff(fs, OBJECT_LIST_REQUESTS); - MetricDiff listContinueRequests = - new MetricDiff(fs, OBJECT_CONTINUE_LIST_REQUESTS); - MetricDiff listStatusCalls = new MetricDiff(fs, INVOCATION_LIST_FILES); - MetricDiff getFileStatusCalls = - new MetricDiff(fs, INVOCATION_GET_FILE_STATUS); - NanoTimer createTimer = new NanoTimer(); - TreeScanResults created = - createSubdirs(fs, listDir, depth, width, files, 0); - // add some empty directories - int emptyDepth = 1 * scale; - int emptyWidth = 3 * scale; - - created.add(createSubdirs(fs, listDir, emptyDepth, emptyWidth, 0, - 0, "empty", "f-", "")); - createTimer.end("Time to create %s", created); - LOG.info("Time per operation: {}", - toHuman(createTimer.nanosPerOperation(created.totalCount()))); - printThenReset(LOG, - metadataRequests, - listRequests, - listContinueRequests, - listStatusCalls, - getFileStatusCalls); - - describe("Listing files via treewalk"); - try { - // Scan the directory via an explicit tree walk. - // This is the baseline for any listing speedups. - NanoTimer treeWalkTimer = new NanoTimer(); - TreeScanResults treewalkResults = treeWalk(fs, listDir); - treeWalkTimer.end("List status via treewalk of %s", created); - - printThenReset(LOG, - metadataRequests, - listRequests, - listContinueRequests, - listStatusCalls, - getFileStatusCalls); - assertEquals("Files found in listFiles(recursive=true) " + - " created=" + created + " listed=" + treewalkResults, - created.getFileCount(), treewalkResults.getFileCount()); - - describe("Listing files via listFiles(recursive=true)"); - // listFiles() does the recursion internally - NanoTimer listFilesRecursiveTimer = new NanoTimer(); - - TreeScanResults listFilesResults = new TreeScanResults( - fs.listFiles(listDir, true)); - - listFilesRecursiveTimer.end("listFiles(recursive=true) of %s", created); - assertEquals("Files found in listFiles(recursive=true) " + - " created=" + created + " listed=" + listFilesResults, - created.getFileCount(), listFilesResults.getFileCount()); - - // only two list operations should have taken place - print(LOG, - metadataRequests, - listRequests, - listContinueRequests, - listStatusCalls, - getFileStatusCalls); - assertEquals(listRequests.toString(), 2, listRequests.diff()); - reset(metadataRequests, - listRequests, - listContinueRequests, - listStatusCalls, - getFileStatusCalls); - - - } finally { - describe("deletion"); - // deletion at the end of the run - NanoTimer deleteTimer = new NanoTimer(); - fs.delete(listDir, true); - deleteTimer.end("Deleting directory tree"); - printThenReset(LOG, - metadataRequests, - listRequests, - listContinueRequests, - listStatusCalls, - getFileStatusCalls); - } - } - - @Test - public void testTimeToStatEmptyDirectory() throws Throwable { - describe("Time to stat an empty directory"); - Path path = new Path(getTestPath(), "empty"); - fs.mkdirs(path); - timeToStatPath(path); - } - - @Test - public void testTimeToStatNonEmptyDirectory() throws Throwable { - describe("Time to stat a non-empty directory"); - Path path = new Path(getTestPath(), "dir"); - fs.mkdirs(path); - touch(fs, new Path(path, "file")); - timeToStatPath(path); - } - - @Test - public void testTimeToStatFile() throws Throwable { - describe("Time to stat a simple file"); - Path path = new Path(getTestPath(), "file"); - touch(fs, path); - timeToStatPath(path); - } - - @Test - public void testTimeToStatRoot() throws Throwable { - describe("Time to stat the root path"); - timeToStatPath(new Path("/")); - } - - private void timeToStatPath(Path path) throws IOException { - describe("Timing getFileStatus(\"%s\")", path); - MetricDiff metadataRequests = - new MetricDiff(fs, Statistic.OBJECT_METADATA_REQUESTS); - MetricDiff listRequests = - new MetricDiff(fs, Statistic.OBJECT_LIST_REQUESTS); - long attempts = getOperationCount(); - NanoTimer timer = new NanoTimer(); - for (long l = 0; l < attempts; l++) { - fs.getFileStatus(path); - } - timer.end("Time to execute %d getFileStatusCalls", attempts); - LOG.info("Time per call: {}", toHuman(timer.nanosPerOperation(attempts))); - LOG.info("metadata: {}", metadataRequests); - LOG.info("metadata per operation {}", metadataRequests.diff() / attempts); - LOG.info("listObjects: {}", listRequests); - LOG.info("listObjects: per operation {}", listRequests.diff() / attempts); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java deleted file mode 100644 index d6d9d66..0000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java +++ /dev/null @@ -1,534 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.scale; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.contract.ContractTestUtils; -import org.apache.hadoop.fs.s3a.S3AFileStatus; -import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.hadoop.fs.s3a.S3AInputPolicy; -import org.apache.hadoop.fs.s3a.S3AInputStream; -import org.apache.hadoop.fs.s3a.S3AInstrumentation; -import org.apache.hadoop.fs.s3a.S3ATestUtils; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.util.LineReader; -import org.junit.After; -import org.junit.Assume; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.EOFException; -import java.io.IOException; - -import static org.apache.hadoop.fs.contract.ContractTestUtils.*; -import static org.apache.hadoop.fs.s3a.Constants.*; - -/** - * Look at the performance of S3a operations. - */ -public class TestS3AInputStreamPerformance extends S3AScaleTestBase { - private static final Logger LOG = LoggerFactory.getLogger( - TestS3AInputStreamPerformance.class); - - private S3AFileSystem s3aFS; - private Path testData; - private S3AFileStatus testDataStatus; - private FSDataInputStream in; - private S3AInstrumentation.InputStreamStatistics streamStatistics; - public static final int BLOCK_SIZE = 32 * 1024; - public static final int BIG_BLOCK_SIZE = 256 * 1024; - - /** Tests only run if the there is a named test file that can be read. */ - private boolean testDataAvailable = true; - private String assumptionMessage = "test file"; - - /** - * Open the FS and the test data. The input stream is always set up here. - * @throws IOException IO Problems. - */ - @Before - public void openFS() throws IOException { - Configuration conf = getConf(); - conf.setInt(SOCKET_SEND_BUFFER, 16 * 1024); - conf.setInt(SOCKET_RECV_BUFFER, 16 * 1024); - String testFile = conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE); - if (testFile.isEmpty()) { - assumptionMessage = "Empty test property: " + KEY_CSVTEST_FILE; - testDataAvailable = false; - } else { - S3ATestUtils.useCSVDataEndpoint(conf); - testData = new Path(testFile); - Path path = this.testData; - bindS3aFS(path); - try { - testDataStatus = s3aFS.getFileStatus(this.testData); - } catch (IOException e) { - LOG.warn("Failed to read file {} specified in {}", - testFile, KEY_CSVTEST_FILE, e); - throw e; - } - } - } - - private void bindS3aFS(Path path) throws IOException { - s3aFS = (S3AFileSystem) FileSystem.newInstance(path.toUri(), getConf()); - } - - /** - * Cleanup: close the stream, close the FS. - */ - @After - public void cleanup() { - describe("cleanup"); - IOUtils.closeStream(in); - IOUtils.closeStream(s3aFS); - } - - /** - * Declare that the test requires the CSV test dataset. - */ - private void requireCSVTestData() { - Assume.assumeTrue(assumptionMessage, testDataAvailable); - } - - /** - * Open the test file with the read buffer specified in the setting. - * {@link #KEY_READ_BUFFER_SIZE}; use the {@code Normal} policy - * @return the stream, wrapping an S3a one - * @throws IOException IO problems - */ - FSDataInputStream openTestFile() throws IOException { - return openTestFile(S3AInputPolicy.Normal, 0); - } - - /** - * Open the test file with the read buffer specified in the setting - * {@link #KEY_READ_BUFFER_SIZE}. - * This includes the {@link #requireCSVTestData()} assumption; so - * if called before any FS op, will automatically skip the test - * if the CSV file is absent. - * - * @param inputPolicy input policy to use - * @param readahead readahead/buffer size - * @return the stream, wrapping an S3a one - * @throws IOException IO problems - */ - FSDataInputStream openTestFile(S3AInputPolicy inputPolicy, long readahead) - throws IOException { - requireCSVTestData(); - return openDataFile(s3aFS, this.testData, inputPolicy, readahead); - } - - /** - * Open a test file with the read buffer specified in the setting - * {@link #KEY_READ_BUFFER_SIZE}. - * - * @param path path to open - * @param inputPolicy input policy to use - * @param readahead readahead/buffer size - * @return the stream, wrapping an S3a one - * @throws IOException IO problems - */ - private FSDataInputStream openDataFile(S3AFileSystem fs, - Path path, - S3AInputPolicy inputPolicy, - long readahead) throws IOException { - int bufferSize = getConf().getInt(KEY_READ_BUFFER_SIZE, - DEFAULT_READ_BUFFER_SIZE); - S3AInputPolicy policy = fs.getInputPolicy(); - fs.setInputPolicy(inputPolicy); - try { - FSDataInputStream stream = fs.open(path, bufferSize); - if (readahead >= 0) { - stream.setReadahead(readahead); - } - streamStatistics = getInputStreamStatistics(stream); - return stream; - } finally { - fs.setInputPolicy(policy); - } - } - - /** - * Assert that the stream was only ever opened once. - */ - protected void assertStreamOpenedExactlyOnce() { - assertOpenOperationCount(1); - } - - /** - * Make an assertion count about the number of open operations. - * @param expected the expected number - */ - private void assertOpenOperationCount(long expected) { - assertEquals("open operations in\n" + in, - expected, streamStatistics.openOperations); - } - - /** - * Log how long an IOP took, by dividing the total time by the - * count of operations, printing in a human-readable form. - * @param operation operation being measured - * @param timer timing data - * @param count IOP count. - */ - protected void logTimePerIOP(String operation, - NanoTimer timer, - long count) { - LOG.info("Time per {}: {} nS", - operation, toHuman(timer.duration() / count)); - } - - @Test - public void testTimeToOpenAndReadWholeFileBlocks() throws Throwable { - requireCSVTestData(); - int blockSize = _1MB; - describe("Open the test file %s and read it in blocks of size %d", - testData, blockSize); - long len = testDataStatus.getLen(); - in = openTestFile(); - byte[] block = new byte[blockSize]; - NanoTimer timer2 = new NanoTimer(); - long count = 0; - // implicitly rounding down here - long blockCount = len / blockSize; - for (long i = 0; i < blockCount; i++) { - int offset = 0; - int remaining = blockSize; - NanoTimer blockTimer = new NanoTimer(); - int reads = 0; - while (remaining > 0) { - int bytesRead = in.read(block, offset, remaining); - reads ++; - if (bytesRead == 1) { - break; - } - remaining -= bytesRead; - offset += bytesRead; - count += bytesRead; - } - blockTimer.end("Reading block %d in %d reads", i, reads); - } - timer2.end("Time to read %d bytes in %d blocks", len, blockCount ); - bandwidth(timer2, count); - logStreamStatistics(); - } - - @Test - public void testLazySeekEnabled() throws Throwable { - describe("Verify that seeks do not trigger any IO"); - in = openTestFile(); - long len = testDataStatus.getLen(); - NanoTimer timer = new NanoTimer(); - long blockCount = len / BLOCK_SIZE; - for (long i = 0; i < blockCount; i++) { - in.seek(in.getPos() + BLOCK_SIZE - 1); - } - in.seek(0); - blockCount++; - timer.end("Time to execute %d seeks", blockCount); - logTimePerIOP("seek()", timer, blockCount); - logStreamStatistics(); - assertOpenOperationCount(0); - assertEquals("bytes read", 0, streamStatistics.bytesRead); - } - - @Test - public void testReadaheadOutOfRange() throws Throwable { - try { - in = openTestFile(); - in.setReadahead(-1L); - fail("Stream should have rejected the request "+ in); - } catch (IllegalArgumentException e) { - // expected - } - } - - @Test - public void testReadWithNormalPolicy() throws Throwable { - describe("Read big blocks with a big readahead"); - executeSeekReadSequence(BIG_BLOCK_SIZE, BIG_BLOCK_SIZE * 2, - S3AInputPolicy.Normal); - assertStreamOpenedExactlyOnce(); - } - - @Test - public void testDecompressionSequential128K() throws Throwable { - describe("Decompress with a 128K readahead"); - executeDecompression(128 * 1024, S3AInputPolicy.Sequential); - assertStreamOpenedExactlyOnce(); - } - - /** - * Execute a decompression + line read with the given input policy. - * @param readahead byte readahead - * @param inputPolicy read policy - * @throws IOException IO Problems - */ - private void executeDecompression(long readahead, - S3AInputPolicy inputPolicy) throws IOException { - CompressionCodecFactory factory - = new CompressionCodecFactory(getConf()); - CompressionCodec codec = factory.getCodec(testData); - long bytesRead = 0; - int lines = 0; - - FSDataInputStream objectIn = openTestFile(inputPolicy, readahead); - ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); - try (LineReader lineReader = new LineReader( - codec.createInputStream(objectIn), getConf())) { - Text line = new Text(); - int read; - while ((read = lineReader.readLine(line)) > 0) { - bytesRead += read; - lines++; - } - } catch (EOFException eof) { - // done - } - timer.end("Time to read %d lines [%d bytes expanded, %d raw]" + - " with readahead = %d", - lines, - bytesRead, - testDataStatus.getLen(), - readahead); - logTimePerIOP("line read", timer, lines); - logStreamStatistics(); - } - - private void logStreamStatistics() { - LOG.info(String.format("Stream Statistics%n{}"), streamStatistics); - } - - /** - * Execute a seek+read sequence. - * @param blockSize block size for seeks - * @param readahead what the readahead value of the stream should be - * @throws IOException IO problems - */ - protected void executeSeekReadSequence(long blockSize, - long readahead, - S3AInputPolicy policy) throws IOException { - in = openTestFile(policy, readahead); - long len = testDataStatus.getLen(); - NanoTimer timer = new NanoTimer(); - long blockCount = len / blockSize; - LOG.info("Reading {} blocks, readahead = {}", - blockCount, readahead); - for (long i = 0; i < blockCount; i++) { - in.seek(in.getPos() + blockSize - 1); - // this is the read - assertTrue(in.read() >= 0); - } - timer.end("Time to execute %d seeks of distance %d with readahead = %d", - blockCount, - blockSize, - readahead); - logTimePerIOP("seek(pos + " + blockCount+"); read()", timer, blockCount); - LOG.info("Effective bandwidth {} MB/S", - timer.bandwidthDescription(streamStatistics.bytesRead - - streamStatistics.bytesSkippedOnSeek)); - logStreamStatistics(); - } - - public static final int _4K = 4 * 1024; - public static final int _8K = 8 * 1024; - public static final int _16K = 16 * 1024; - public static final int _32K = 32 * 1024; - public static final int _64K = 64 * 1024; - public static final int _128K = 128 * 1024; - public static final int _256K = 256 * 1024; - public static final int _1MB = 1024 * 1024; - public static final int _2MB = 2 * _1MB; - public static final int _10MB = _1MB * 10; - public static final int _5MB = _1MB * 5; - - private static final int[][] RANDOM_IO_SEQUENCE = { - {_2MB, _128K}, - {_128K, _128K}, - {_5MB, _64K}, - {_1MB, _1MB}, - }; - - @Test - public void testRandomIORandomPolicy() throws Throwable { - executeRandomIO(S3AInputPolicy.Random, (long) RANDOM_IO_SEQUENCE.length); - assertEquals("streams aborted in " + streamStatistics, - 0, streamStatistics.aborted); - } - - @Test - public void testRandomIONormalPolicy() throws Throwable { - long expectedOpenCount = RANDOM_IO_SEQUENCE.length; - executeRandomIO(S3AInputPolicy.Normal, expectedOpenCount); - assertEquals("streams aborted in " + streamStatistics, - 4, streamStatistics.aborted); - } - - /** - * Execute the random IO {@code readFully(pos, bytes[])} sequence defined by - * {@link #RANDOM_IO_SEQUENCE}. The stream is closed afterwards; that's used - * in the timing too - * @param policy read policy - * @param expectedOpenCount expected number of stream openings - * @throws IOException IO problems - * @return the timer - */ - private ContractTestUtils.NanoTimer executeRandomIO(S3AInputPolicy policy, - long expectedOpenCount) - throws IOException { - describe("Random IO with policy \"%s\"", policy); - byte[] buffer = new byte[_1MB]; - long totalBytesRead = 0; - - in = openTestFile(policy, 0); - ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); - for (int[] action : RANDOM_IO_SEQUENCE) { - int position = action[0]; - int range = action[1]; - in.readFully(position, buffer, 0, range); - totalBytesRead += range; - } - int reads = RANDOM_IO_SEQUENCE.length; - timer.end("Time to execute %d reads of total size %d bytes", - reads, - totalBytesRead); - in.close(); - assertOpenOperationCount(expectedOpenCount); - logTimePerIOP("byte read", timer, totalBytesRead); - LOG.info("Effective bandwidth {} MB/S", - timer.bandwidthDescription(streamStatistics.bytesRead - - streamStatistics.bytesSkippedOnSeek)); - logStreamStatistics(); - return timer; - } - - S3AInputStream getS3aStream() { - return (S3AInputStream) in.getWrappedStream(); - } - - @Test - public void testRandomReadOverBuffer() throws Throwable { - describe("read over a buffer, making sure that the requests" + - " spans readahead ranges"); - int datasetLen = _32K; - Path dataFile = new Path(getTestPath(), "testReadOverBuffer.bin"); - byte[] sourceData = dataset(datasetLen, 0, 64); - // relies on the field 'fs' referring to the R/W FS - writeDataset(fs, dataFile, sourceData, datasetLen, _16K, true); - byte[] buffer = new byte[datasetLen]; - int readahead = _8K; - int halfReadahead = _4K; - in = openDataFile(fs, dataFile, S3AInputPolicy.Random, readahead); - - LOG.info("Starting initial reads"); - S3AInputStream s3aStream = getS3aStream(); - assertEquals(readahead, s3aStream.getReadahead()); - byte[] oneByte = new byte[1]; - assertEquals(1, in.read(0, oneByte, 0, 1)); - // make some assertions about the current state - assertEquals("remaining in\n" + in, - readahead - 1, s3aStream.remainingInCurrentRequest()); - assertEquals("range start in\n" + in, - 0, s3aStream.getContentRangeStart()); - assertEquals("range finish in\n" + in, - readahead, s3aStream.getContentRangeFinish()); - - assertStreamOpenedExactlyOnce(); - - describe("Starting sequence of positioned read calls over\n%s", in); - NanoTimer readTimer = new NanoTimer(); - int currentPos = halfReadahead; - int offset = currentPos; - int bytesRead = 0; - int readOps = 0; - - // make multiple read() calls - while (bytesRead < halfReadahead) { - int length = buffer.length - offset; - int read = in.read(currentPos, buffer, offset, length); - bytesRead += read; - offset += read; - readOps++; - assertEquals("open operations on request #" + readOps - + " after reading " + bytesRead - + " current position in stream " + currentPos - + " in\n" + fs - + "\n " + in, - 1, streamStatistics.openOperations); - for (int i = currentPos; i < currentPos + read; i++) { - assertEquals("Wrong value from byte " + i, - sourceData[i], buffer[i]); - } - currentPos += read; - } - assertStreamOpenedExactlyOnce(); - // assert at the end of the original block - assertEquals(readahead, currentPos); - readTimer.end("read %d in %d operations", bytesRead, readOps); - bandwidth(readTimer, bytesRead); - LOG.info("Time per byte(): {} nS", - toHuman(readTimer.nanosPerOperation(bytesRead))); - LOG.info("Time per read(): {} nS", - toHuman(readTimer.nanosPerOperation(readOps))); - - describe("read last byte"); - // read one more - int read = in.read(currentPos, buffer, bytesRead, 1); - assertTrue("-1 from last read", read >= 0); - assertOpenOperationCount(2); - assertEquals("Wrong value from read ", sourceData[currentPos], - (int) buffer[currentPos]); - currentPos++; - - - // now scan all the way to the end of the file, using single byte read() - // calls - describe("read() to EOF over \n%s", in); - long readCount = 0; - NanoTimer timer = new NanoTimer(); - LOG.info("seeking"); - in.seek(currentPos); - LOG.info("reading"); - while(currentPos < datasetLen) { - int r = in.read(); - assertTrue("Negative read() at position " + currentPos + " in\n" + in, - r >= 0); - buffer[currentPos] = (byte)r; - assertEquals("Wrong value from read from\n" + in, - sourceData[currentPos], r); - currentPos++; - readCount++; - } - timer.end("read %d bytes", readCount); - bandwidth(timer, readCount); - LOG.info("Time per read(): {} nS", - toHuman(timer.nanosPerOperation(readCount))); - - assertEquals("last read in " + in, -1, in.read()); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3A.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3A.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3A.java new file mode 100644 index 0000000..ca57da6 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3A.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.s3a.yarn; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FsStatus; +import org.apache.hadoop.fs.Path; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.util.EnumSet; +import org.apache.hadoop.fs.s3a.S3ATestUtils; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * S3A tests through the {@link FileContext} API. + */ +public class ITestS3A { + private FileContext fc; + + @Rule + public final Timeout testTimeout = new Timeout(90000); + + @Before + public void setUp() throws Exception { + Configuration conf = new Configuration(); + fc = S3ATestUtils.createTestFileContext(conf); + } + + @After + public void tearDown() throws Exception { + if (fc != null) { + fc.delete(getTestPath(), true); + } + } + + protected Path getTestPath() { + return new Path("/tests3afc"); + } + + @Test + public void testS3AStatus() throws Exception { + FsStatus fsStatus = fc.getFsStatus(null); + assertNotNull(fsStatus); + assertTrue("Used capacity should be positive: " + fsStatus.getUsed(), + fsStatus.getUsed() >= 0); + assertTrue("Remaining capacity should be positive: " + fsStatus + .getRemaining(), + fsStatus.getRemaining() >= 0); + assertTrue("Capacity should be positive: " + fsStatus.getCapacity(), + fsStatus.getCapacity() >= 0); + } + + @Test + public void testS3ACreateFileInSubDir() throws Exception { + Path dirPath = getTestPath(); + fc.mkdir(dirPath, FileContext.DIR_DEFAULT_PERM, true); + Path filePath = new Path(dirPath, "file"); + try (FSDataOutputStream file = fc.create(filePath, EnumSet.of(CreateFlag + .CREATE))) { + file.write(666); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3AMiniYarnCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3AMiniYarnCluster.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3AMiniYarnCluster.java new file mode 100644 index 0000000..772d8c7 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3AMiniYarnCluster.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.fs.s3a.yarn; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.examples.WordCount; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.MiniYARNCluster; + +import org.junit.After; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.assertEquals; + +/** + * Tests that S3A is usable through a YARN application. + */ +public class ITestS3AMiniYarnCluster { + + private final Configuration conf = new YarnConfiguration(); + private S3AFileSystem fs; + private MiniYARNCluster yarnCluster; + private final String rootPath = "/tests/MiniClusterWordCount/"; + + @Before + public void beforeTest() throws IOException { + fs = S3ATestUtils.createTestFileSystem(conf); + fs.mkdirs(new Path(rootPath + "input/")); + + yarnCluster = new MiniYARNCluster("MiniClusterWordCount", // testName + 1, // number of node managers + 1, // number of local log dirs per node manager + 1); // number of hdfs dirs per node manager + yarnCluster.init(conf); + yarnCluster.start(); + } + + @After + public void afterTest() throws IOException { + fs.delete(new Path(rootPath), true); + yarnCluster.stop(); + } + + @Test + public void testWithMiniCluster() throws Exception { + Path input = new Path(rootPath + "input/in.txt"); + input = input.makeQualified(fs.getUri(), fs.getWorkingDirectory()); + Path output = new Path(rootPath + "output/"); + output = output.makeQualified(fs.getUri(), fs.getWorkingDirectory()); + + writeStringToFile(input, "first line\nsecond line\nthird line"); + + Job job = Job.getInstance(conf, "word count"); + job.setJarByClass(WordCount.class); + job.setMapperClass(WordCount.TokenizerMapper.class); + job.setCombinerClass(WordCount.IntSumReducer.class); + job.setReducerClass(WordCount.IntSumReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + FileInputFormat.addInputPath(job, input); + FileOutputFormat.setOutputPath(job, output); + + int exitCode = (job.waitForCompletion(true) ? 0 : 1); + assertEquals("Returned error code.", 0, exitCode); + + assertTrue(fs.exists(new Path(output, "_SUCCESS"))); + String outputAsStr = readStringFromFile(new Path(output, "part-r-00000")); + Map<String, Integer> resAsMap = getResultAsMap(outputAsStr); + + assertEquals(4, resAsMap.size()); + assertEquals(1, (int) resAsMap.get("first")); + assertEquals(1, (int) resAsMap.get("second")); + assertEquals(1, (int) resAsMap.get("third")); + assertEquals(3, (int) resAsMap.get("line")); + } + + /** + * helper method. + */ + private Map<String, Integer> getResultAsMap(String outputAsStr) + throws IOException { + Map<String, Integer> result = new HashMap<>(); + for (String line : outputAsStr.split("\n")) { + String[] tokens = line.split("\t"); + result.put(tokens[0], Integer.parseInt(tokens[1])); + } + return result; + } + + /** + * helper method. + */ + private void writeStringToFile(Path path, String string) throws IOException { + FileContext fc = S3ATestUtils.createTestFileContext(conf); + try (FSDataOutputStream file = fc.create(path, + EnumSet.of(CreateFlag.CREATE))) { + file.write(string.getBytes()); + } + } + + /** + * helper method. + */ + private String readStringFromFile(Path path) { + try (FSDataInputStream in = fs.open(path)) { + long bytesLen = fs.getFileStatus(path).getLen(); + byte[] buffer = new byte[(int) bytesLen]; + IOUtils.readFully(in, buffer, 0, buffer.length); + return new String(buffer); + } catch (IOException e) { + throw new RuntimeException("Failed to read from [" + path + "]", e); + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org