HDFS-11170. Add builder-based create API to FileSystem. Contributed by SammiChen and Wei Zhou.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/332a997e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/332a997e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/332a997e Branch: refs/heads/HDFS-9806 Commit: 332a997e10cca88d9ab3aa8252102366b628eaec Parents: 52b0060 Author: Andrew Wang <w...@apache.org> Authored: Fri Mar 24 12:56:46 2017 -0700 Committer: Andrew Wang <w...@apache.org> Committed: Fri Mar 24 12:56:46 2017 -0700 ---------------------------------------------------------------------- .../hadoop/fs/FSDataOutputStreamBuilder.java | 142 +++++++++++++++++++ .../java/org/apache/hadoop/fs/FileSystem.java | 9 ++ .../org/apache/hadoop/fs/FilterFileSystem.java | 5 + .../org/apache/hadoop/fs/HarFileSystem.java | 5 + .../apache/hadoop/fs/TestLocalFileSystem.java | 54 +++++++ .../hadoop/hdfs/DistributedFileSystem.java | 81 +++++++++++ .../hadoop/hdfs/TestDistributedFileSystem.java | 35 ++++- .../namenode/TestFavoredNodesEndToEnd.java | 23 +++ 8 files changed, 353 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/332a997e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java new file mode 100644 index 0000000..2e885f3 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.Options.ChecksumOpt; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; + +import java.io.IOException; +import java.util.EnumSet; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; + +/** Base of specific file system FSDataOutputStreamBuilder. */ +public class FSDataOutputStreamBuilder{ + private Path path = null; + private FsPermission permission = null; + private Integer bufferSize; + private Short replication; + private Long blockSize; + private Progressable progress = null; + private EnumSet<CreateFlag> flags = null; + private ChecksumOpt checksumOpt = null; + + private final FileSystem fs; + + public FSDataOutputStreamBuilder(FileSystem fileSystem, Path p) { + fs = fileSystem; + path = p; + } + + protected Path getPath() { + return path; + } + + protected FsPermission getPermission() { + if (permission == null) { + return FsPermission.getFileDefault(); + } + return permission; + } + + public FSDataOutputStreamBuilder setPermission(final FsPermission perm) { + Preconditions.checkNotNull(perm); + permission = perm; + return this; + } + + protected int getBufferSize() { + if (bufferSize == null) { + return fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, + IO_FILE_BUFFER_SIZE_DEFAULT); + } + return bufferSize; + } + + public FSDataOutputStreamBuilder setBufferSize(int bufSize) { + bufferSize = bufSize; + return this; + } + + protected short getReplication() { + if (replication == null) { + return fs.getDefaultReplication(getPath()); + } + return replication; + } + + public FSDataOutputStreamBuilder setReplication(short replica) { + replication = replica; + return this; + } + + protected long getBlockSize() { + if (blockSize == null) { + return fs.getDefaultBlockSize(getPath()); + } + return blockSize; + } + + public FSDataOutputStreamBuilder setBlockSize(long blkSize) { + blockSize = blkSize; + return this; + } + + protected Progressable getProgress() { + return progress; + } + + public FSDataOutputStreamBuilder setProgress(final Progressable prog) { + Preconditions.checkNotNull(prog); + progress = prog; + return this; + } + + protected EnumSet<CreateFlag> getFlags() { + if (flags == null) { + return EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); + } + return flags; + } + + public FSDataOutputStreamBuilder setFlags( + final EnumSet<CreateFlag> enumFlags) { + Preconditions.checkNotNull(enumFlags); + flags = enumFlags; + return this; + } + + protected ChecksumOpt getChecksumOpt() { + return checksumOpt; + } + + public FSDataOutputStreamBuilder setChecksumOpt( + final ChecksumOpt chksumOpt) { + Preconditions.checkNotNull(chksumOpt); + checksumOpt = chksumOpt; + return this; + } + + public FSDataOutputStream build() throws IOException { + return fs.create(getPath(), getPermission(), getFlags(), getBufferSize(), + getReplication(), getBlockSize(), getProgress(), getChecksumOpt()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/332a997e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index ededfa9..c3282e5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -4138,4 +4138,13 @@ public abstract class FileSystem extends Configured implements Closeable { public static GlobalStorageStatistics getGlobalStorageStatistics() { return GlobalStorageStatistics.INSTANCE; } + + /** + * Create a new FSDataOutputStreamBuilder for the file with path. + * @param path file path + * @return a FSDataOutputStreamBuilder object to build the file + */ + public FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) { + return new FSDataOutputStreamBuilder(this, path); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/332a997e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java index 41429ac..ef09458 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java @@ -665,4 +665,9 @@ public class FilterFileSystem extends FileSystem { public Collection<FileStatus> getTrashRoots(boolean allUsers) { return fs.getTrashRoots(allUsers); } + + @Override + public FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) { + return fs.newFSDataOutputStreamBuilder(path); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/332a997e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java index ce1cf45..7e50ab1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java @@ -1268,4 +1268,9 @@ public class HarFileSystem extends FileSystem { public short getDefaultReplication(Path f) { return fs.getDefaultReplication(f); } + + @Override + public FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) { + return fs.newFSDataOutputStreamBuilder(path); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/332a997e/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java index 2311337..5da5a4a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java @@ -19,10 +19,13 @@ package org.apache.hadoop.fs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem.Statistics; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.StringUtils; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; import static org.apache.hadoop.fs.FileSystemTestHelper.*; import java.io.*; @@ -636,4 +639,55 @@ public class TestLocalFileSystem { FileStatus[] stats = fs.listStatus(path); assertTrue(stats != null && stats.length == 1 && stats[0] == stat); } + + @Test + public void testFSOutputStreamBuilder() throws Exception { + Path path = new Path(TEST_ROOT_DIR, "testBuilder"); + + try { + FSDataOutputStreamBuilder builder = + fileSys.newFSDataOutputStreamBuilder(path); + FSDataOutputStream out = builder.build(); + String content = "Create with a generic type of createBuilder!"; + byte[] contentOrigin = content.getBytes("UTF8"); + out.write(contentOrigin); + out.close(); + + FSDataInputStream input = fileSys.open(path); + byte[] buffer = + new byte[(int) (fileSys.getFileStatus(path).getLen())]; + input.readFully(0, buffer); + input.close(); + Assert.assertArrayEquals("The data be read should equals with the " + + "data written.", contentOrigin, buffer); + } catch (IOException e) { + throw e; + } + + // Test value not being set for replication, block size, buffer size + // and permission + FSDataOutputStreamBuilder builder = + fileSys.newFSDataOutputStreamBuilder(path); + builder.build(); + Assert.assertEquals("Should be default block size", + builder.getBlockSize(), fileSys.getDefaultBlockSize()); + Assert.assertEquals("Should be default replication factor", + builder.getReplication(), fileSys.getDefaultReplication()); + Assert.assertEquals("Should be default buffer size", + builder.getBufferSize(), + fileSys.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, + IO_FILE_BUFFER_SIZE_DEFAULT)); + Assert.assertEquals("Should be default permission", + builder.getPermission(), FsPermission.getFileDefault()); + + // Test set 0 to replication, block size and buffer size + builder = fileSys.newFSDataOutputStreamBuilder(path); + builder.setBufferSize(0).setBlockSize(0).setReplication((short) 0); + Assert.assertEquals("Block size should be 0", + builder.getBlockSize(), 0); + Assert.assertEquals("Replication factor should be 0", + builder.getReplication(), 0); + Assert.assertEquals("Buffer size should be 0", + builder.getBufferSize(), 0); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/332a997e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 1eef560..5cd956c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataOutputStreamBuilder; import org.apache.hadoop.fs.FSLinkResolver; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileEncryptionInfo; @@ -446,6 +447,48 @@ public class DistributedFileSystem extends FileSystem { }.resolve(this, absF); } + /** + * Same as + * {@link #create(Path, FsPermission, EnumSet<CreateFlag>, int, short, long, + * Progressable, ChecksumOpt)} with the addition of favoredNodes that is a + * hint to where the namenode should place the file blocks. + * The favored nodes hint is not persisted in HDFS. Hence it may be honored + * at the creation time only. And with favored nodes, blocks will be pinned + * on the datanodes to prevent balancing move the block. HDFS could move the + * blocks during replication, to move the blocks from favored nodes. A value + * of null means no favored nodes for this create + */ + private HdfsDataOutputStream create(final Path f, + final FsPermission permission, EnumSet<CreateFlag> flag, + final int bufferSize, final short replication, final long blockSize, + final Progressable progress, final ChecksumOpt checksumOpt, + final InetSocketAddress[] favoredNodes) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CREATE); + Path absF = fixRelativePart(f); + return new FileSystemLinkResolver<HdfsDataOutputStream>() { + @Override + public HdfsDataOutputStream doCall(final Path p) throws IOException { + final DFSOutputStream out = dfs.create(getPathName(f), permission, + flag, true, replication, blockSize, progress, bufferSize, + checksumOpt, favoredNodes); + return dfs.createWrappedOutputStream(out, statistics); + } + @Override + public HdfsDataOutputStream next(final FileSystem fs, final Path p) + throws IOException { + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem myDfs = (DistributedFileSystem)fs; + return myDfs.create(p, permission, flag, bufferSize, replication, + blockSize, progress, checksumOpt, favoredNodes); + } + throw new UnsupportedOperationException("Cannot create with" + + " favoredNodes through a symlink to a non-DistributedFileSystem: " + + f + " -> " + p); + } + }.resolve(this, absF); + } + @Override protected HdfsDataOutputStream primitiveCreate(Path f, FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize, @@ -2584,4 +2627,42 @@ public class DistributedFileSystem extends FileSystem { DFSOpsCountStatistics getDFSOpsCountStatistics() { return storageStatistics; } + + /** + * Extends FSDataOutputStreamBuilder to support special requirements + * of DistributedFileSystem. + */ + public static class HdfsDataOutputStreamBuilder + extends FSDataOutputStreamBuilder { + private final DistributedFileSystem dfs; + private InetSocketAddress[] favoredNodes = null; + + public HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) { + super(dfs, path); + this.dfs = dfs; + } + + protected InetSocketAddress[] getFavoredNodes() { + return favoredNodes; + } + + public HdfsDataOutputStreamBuilder setFavoredNodes( + final InetSocketAddress[] nodes) { + Preconditions.checkNotNull(nodes); + favoredNodes = nodes.clone(); + return this; + } + + @Override + public HdfsDataOutputStream build() throws IOException { + return dfs.create(getPath(), getPermission(), getFlags(), + getBufferSize(), getReplication(), getBlockSize(), + getProgress(), getChecksumOpt(), getFavoredNodes()); + } + } + + @Override + public HdfsDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) { + return new HdfsDataOutputStreamBuilder(this, path); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/332a997e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 7654531..e9af594 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -69,6 +69,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.StorageStatistics.LongStatistic; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.impl.LeaseRenewer; @@ -81,7 +82,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; -import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op; import org.apache.hadoop.hdfs.web.WebHdfsConstants; import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.NetUtils; @@ -1410,4 +1410,37 @@ public class TestDistributedFileSystem { } } } + + @Test + public void testDFSDataOutputStreamBuilder() throws Exception { + Configuration conf = getTestConfiguration(); + MiniDFSCluster cluster = null; + String testFile = "/testDFSDataOutputStreamBuilder"; + Path testFilePath = new Path(testFile); + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + DistributedFileSystem fs = cluster.getFileSystem(); + + // Test create an empty file + FSDataOutputStream out = + fs.newFSDataOutputStreamBuilder(testFilePath).build(); + out.close(); + + // Test create a file with content, and verify the content + String content = "This is a test!"; + out = fs.newFSDataOutputStreamBuilder(testFilePath) + .setBufferSize(4096).setReplication((short) 1) + .setBlockSize(4096).build(); + byte[] contentOrigin = content.getBytes("UTF8"); + out.write(contentOrigin); + out.close(); + + ContractTestUtils.verifyFileContents(fs, testFilePath, + content.getBytes()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/332a997e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java index b78b6cc..50e56cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java @@ -189,6 +189,29 @@ public class TestFavoredNodesEndToEnd { } } + @Test(timeout = 180000) + public void testCreateStreamBuilderFavoredNodesEndToEnd() throws Exception { + //create 10 files with random preferred nodes + for (int i = 0; i < NUM_FILES; i++) { + Random rand = new Random(System.currentTimeMillis() + i); + //pass a new created rand so as to get a uniform distribution each time + //without too much collisions (look at the do-while loop in getDatanodes) + InetSocketAddress[] dns = getDatanodes(rand); + Path p = new Path("/filename"+i); + FSDataOutputStream out = + dfs.newFSDataOutputStreamBuilder(p).setFavoredNodes(dns).build(); + out.write(SOME_BYTES); + out.close(); + BlockLocation[] locations = getBlockLocations(p); + //verify the files got created in the right nodes + for (BlockLocation loc : locations) { + String[] hosts = loc.getNames(); + String[] hosts1 = getStringForInetSocketAddrs(dns); + assertTrue(compareNodes(hosts, hosts1)); + } + } + } + private BlockLocation[] getBlockLocations(Path p) throws Exception { DFSTestUtil.waitReplication(dfs, p, (short)3); BlockLocation[] locations = dfs.getClient().getBlockLocations( --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org