HDFS-7081. Add new DistributedFileSystem API for getting all the existing storage policies. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/073bbd80 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/073bbd80 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/073bbd80 Branch: refs/heads/YARN-1051 Commit: 073bbd805c6680f47bbfcc6e8efd708ad729bca4 Parents: 7af4c38 Author: Jing Zhao <j...@hortonworks.com> Authored: Wed Sep 24 10:05:40 2014 -0700 Committer: Jing Zhao <j...@hortonworks.com> Committed: Wed Sep 24 10:05:40 2014 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../src/main/conf/blockStoragePolicy-site.xml | 21 - .../hadoop-hdfs/src/main/conf/hdfs-site.xml | 3 +- .../apache/hadoop/hdfs/BlockStoragePolicy.java | 419 ------------------- .../java/org/apache/hadoop/hdfs/DFSClient.java | 8 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 8 - .../hadoop/hdfs/DistributedFileSystem.java | 7 + .../apache/hadoop/hdfs/client/HdfsAdmin.java | 11 + .../hdfs/protocol/BlockStoragePolicy.java | 244 +++++++++++ .../hadoop/hdfs/protocol/ClientProtocol.java | 7 + .../protocol/SnapshottableDirectoryStatus.java | 4 +- ...tNamenodeProtocolServerSideTranslatorPB.java | 23 + .../ClientNamenodeProtocolTranslatorPB.java | 25 +- .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 65 ++- .../server/blockmanagement/BlockManager.java | 12 +- .../blockmanagement/BlockPlacementPolicy.java | 2 +- .../BlockPlacementPolicyDefault.java | 2 +- .../BlockStoragePolicySuite.java | 119 ++++++ .../apache/hadoop/hdfs/server/mover/Mover.java | 31 +- .../hdfs/server/namenode/FSDirectory.java | 18 +- .../hdfs/server/namenode/FSEditLogLoader.java | 4 +- .../hdfs/server/namenode/FSNamesystem.java | 20 +- .../hadoop/hdfs/server/namenode/INode.java | 4 +- .../hdfs/server/namenode/INodeDirectory.java | 11 +- .../namenode/INodeDirectoryAttributes.java | 3 - .../hadoop/hdfs/server/namenode/INodeFile.java | 7 +- .../hadoop/hdfs/server/namenode/INodeMap.java | 7 +- .../hdfs/server/namenode/NameNodeRpcServer.java | 6 + .../org/apache/hadoop/hdfs/tools/DFSAdmin.java | 20 +- .../org/apache/hadoop/hdfs/web/JsonUtil.java | 5 +- .../src/main/proto/ClientNamenodeProtocol.proto | 9 + .../hadoop-hdfs/src/main/proto/hdfs.proto | 14 + .../src/main/resources/hdfs-default.xml | 3 +- .../hadoop/hdfs/TestBlockStoragePolicy.java | 77 +++- .../hadoop/hdfs/TestStoragePolicyCommands.java | 11 +- .../hdfs/server/mover/TestStorageMover.java | 19 +- .../hdfs/server/namenode/TestDeleteRace.java | 2 +- 37 files changed, 717 insertions(+), 537 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 4a52d44..43cebc0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -345,6 +345,9 @@ Trunk (Unreleased) HDFS-7095. TestStorageMover often fails in Jenkins. (jing9) + HDFS-7081. Add new DistributedFileSystem API for getting all the existing + storage policies. (jing9) + Release 2.6.0 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/blockStoragePolicy-site.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/blockStoragePolicy-site.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/blockStoragePolicy-site.xml deleted file mode 100644 index 04142ad..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/blockStoragePolicy-site.xml +++ /dev/null @@ -1,21 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> -<!-- - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. See accompanying LICENSE file. ---> - -<!-- Put site-specific property overrides in this file. --> - -<configuration> - -</configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/hdfs-site.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/hdfs-site.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/hdfs-site.xml index 3a0b0ed..50ec146 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/hdfs-site.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/hdfs-site.xml @@ -16,7 +16,6 @@ <!-- Put site-specific property overrides in this file. --> -<configuration xmlns:xi="http://www.w3.org/2001/XInclude"> - <xi:include href="blockStoragePolicy-site.xml" /> +<configuration> </configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java deleted file mode 100644 index efbf8a0..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java +++ /dev/null @@ -1,419 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs; - -import java.util.Arrays; -import java.util.EnumSet; -import java.util.LinkedList; -import java.util.List; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.XAttr; -import org.apache.hadoop.fs.XAttr.NameSpace; - -/** - * A block storage policy describes how to select the storage types - * for the replicas of a block. - */ -@InterfaceAudience.Private -public class BlockStoragePolicy { - public static final Log LOG = LogFactory.getLog(BlockStoragePolicy.class); - - public static final String DFS_BLOCK_STORAGE_POLICIES_KEY - = "dfs.block.storage.policies"; - public static final String DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX - = "dfs.block.storage.policy."; - public static final String DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX - = "dfs.block.storage.policy.creation-fallback."; - public static final String DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX - = "dfs.block.storage.policy.replication-fallback."; - public static final String STORAGE_POLICY_XATTR_NAME = "bsp"; - /** set the namespace to TRUSTED so that only privilege users can access */ - public static final NameSpace XAttrNS = NameSpace.TRUSTED; - - public static final int ID_BIT_LENGTH = 4; - public static final int ID_MAX = (1 << ID_BIT_LENGTH) - 1; - public static final byte ID_UNSPECIFIED = 0; - - private static final Suite DEFAULT_SUITE = createDefaultSuite(); - - private static Suite createDefaultSuite() { - final BlockStoragePolicy[] policies = new BlockStoragePolicy[1 << ID_BIT_LENGTH]; - final StorageType[] storageTypes = {StorageType.DISK}; - final byte defaultPolicyId = 12; - policies[defaultPolicyId] = new BlockStoragePolicy(defaultPolicyId, "HOT", - storageTypes, StorageType.EMPTY_ARRAY, StorageType.EMPTY_ARRAY); - return new Suite(defaultPolicyId, policies); - } - - /** A block storage policy suite. */ - public static class Suite { - private final byte defaultPolicyID; - private final BlockStoragePolicy[] policies; - - private Suite(byte defaultPolicyID, BlockStoragePolicy[] policies) { - this.defaultPolicyID = defaultPolicyID; - this.policies = policies; - } - - /** @return the corresponding policy. */ - public BlockStoragePolicy getPolicy(byte id) { - // id == 0 means policy not specified. - return id == 0? getDefaultPolicy(): policies[id]; - } - - /** @return the default policy. */ - public BlockStoragePolicy getDefaultPolicy() { - return getPolicy(defaultPolicyID); - } - - public BlockStoragePolicy getPolicy(String policyName) { - if (policies != null) { - for (BlockStoragePolicy policy : policies) { - if (policy != null && policy.name.equals(policyName)) { - return policy; - } - } - } - return null; - } - } - - /** A 4-bit policy ID */ - private final byte id; - /** Policy name */ - private final String name; - - /** The storage types to store the replicas of a new block. */ - private final StorageType[] storageTypes; - /** The fallback storage type for block creation. */ - private final StorageType[] creationFallbacks; - /** The fallback storage type for replication. */ - private final StorageType[] replicationFallbacks; - - @VisibleForTesting - public BlockStoragePolicy(byte id, String name, StorageType[] storageTypes, - StorageType[] creationFallbacks, StorageType[] replicationFallbacks) { - this.id = id; - this.name = name; - this.storageTypes = storageTypes; - this.creationFallbacks = creationFallbacks; - this.replicationFallbacks = replicationFallbacks; - } - - /** - * @return a list of {@link StorageType}s for storing the replicas of a block. - */ - public List<StorageType> chooseStorageTypes(final short replication) { - final List<StorageType> types = new LinkedList<StorageType>(); - int i = 0; - for(; i < replication && i < storageTypes.length; i++) { - types.add(storageTypes[i]); - } - final StorageType last = storageTypes[storageTypes.length - 1]; - for(; i < replication; i++) { - types.add(last); - } - return types; - } - - /** - * Choose the storage types for storing the remaining replicas, given the - * replication number and the storage types of the chosen replicas. - * - * @param replication the replication number. - * @param chosen the storage types of the chosen replicas. - * @return a list of {@link StorageType}s for storing the replicas of a block. - */ - public List<StorageType> chooseStorageTypes(final short replication, - final Iterable<StorageType> chosen) { - return chooseStorageTypes(replication, chosen, null); - } - - private List<StorageType> chooseStorageTypes(final short replication, - final Iterable<StorageType> chosen, final List<StorageType> excess) { - final List<StorageType> types = chooseStorageTypes(replication); - diff(types, chosen, excess); - return types; - } - - /** - * Choose the storage types for storing the remaining replicas, given the - * replication number, the storage types of the chosen replicas and - * the unavailable storage types. It uses fallback storage in case that - * the desired storage type is unavailable. - * - * @param replication the replication number. - * @param chosen the storage types of the chosen replicas. - * @param unavailables the unavailable storage types. - * @param isNewBlock Is it for new block creation? - * @return a list of {@link StorageType}s for storing the replicas of a block. - */ - public List<StorageType> chooseStorageTypes(final short replication, - final Iterable<StorageType> chosen, - final EnumSet<StorageType> unavailables, - final boolean isNewBlock) { - final List<StorageType> excess = new LinkedList<StorageType>(); - final List<StorageType> storageTypes = chooseStorageTypes( - replication, chosen, excess); - final int expectedSize = storageTypes.size() - excess.size(); - final List<StorageType> removed = new LinkedList<StorageType>(); - for(int i = storageTypes.size() - 1; i >= 0; i--) { - // replace/remove unavailable storage types. - final StorageType t = storageTypes.get(i); - if (unavailables.contains(t)) { - final StorageType fallback = isNewBlock? - getCreationFallback(unavailables) - : getReplicationFallback(unavailables); - if (fallback == null) { - removed.add(storageTypes.remove(i)); - } else { - storageTypes.set(i, fallback); - } - } - } - // remove excess storage types after fallback replacement. - diff(storageTypes, excess, null); - if (storageTypes.size() < expectedSize) { - LOG.warn("Failed to place enough replicas: expected size is " + expectedSize - + " but only " + storageTypes.size() + " storage types can be selected " - + "(replication=" + replication - + ", selected=" + storageTypes - + ", unavailable=" + unavailables - + ", removed=" + removed - + ", policy=" + this + ")"); - } - return storageTypes; - } - - /** - * Compute the list difference t = t - c. - * Further, if e is not null, set e = e + c - t; - */ - private static void diff(List<StorageType> t, Iterable<StorageType> c, - List<StorageType> e) { - for(StorageType storagetype : c) { - final int i = t.indexOf(storagetype); - if (i >= 0) { - t.remove(i); - } else if (e != null) { - e.add(storagetype); - } - } - } - - /** - * Choose excess storage types for deletion, given the - * replication number and the storage types of the chosen replicas. - * - * @param replication the replication number. - * @param chosen the storage types of the chosen replicas. - * @return a list of {@link StorageType}s for deletion. - */ - public List<StorageType> chooseExcess(final short replication, - final Iterable<StorageType> chosen) { - final List<StorageType> types = chooseStorageTypes(replication); - final List<StorageType> excess = new LinkedList<StorageType>(); - diff(types, chosen, excess); - return excess; - } - - /** @return the fallback {@link StorageType} for creation. */ - public StorageType getCreationFallback(EnumSet<StorageType> unavailables) { - return getFallback(unavailables, creationFallbacks); - } - - /** @return the fallback {@link StorageType} for replication. */ - public StorageType getReplicationFallback(EnumSet<StorageType> unavailables) { - return getFallback(unavailables, replicationFallbacks); - } - - @Override - public int hashCode() { - return Byte.valueOf(id).hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } else if (obj == null || !(obj instanceof BlockStoragePolicy)) { - return false; - } - final BlockStoragePolicy that = (BlockStoragePolicy)obj; - return this.id == that.id; - } - - @Override - public String toString() { - return getClass().getSimpleName() + "{" + name + ":" + id - + ", storageTypes=" + Arrays.asList(storageTypes) - + ", creationFallbacks=" + Arrays.asList(creationFallbacks) - + ", replicationFallbacks=" + Arrays.asList(replicationFallbacks); - } - - public byte getId() { - return id; - } - - public String getName() { - return name; - } - - private static StorageType getFallback(EnumSet<StorageType> unavailables, - StorageType[] fallbacks) { - for(StorageType fb : fallbacks) { - if (!unavailables.contains(fb)) { - return fb; - } - } - return null; - } - - private static byte parseID(String idString, String element, Configuration conf) { - byte id = 0; - try { - id = Byte.parseByte(idString); - } catch(NumberFormatException nfe) { - throwIllegalArgumentException("Failed to parse policy ID \"" + idString - + "\" to a " + ID_BIT_LENGTH + "-bit integer", conf); - } - if (id < 0) { - throwIllegalArgumentException("Invalid policy ID: id = " + id - + " < 1 in \"" + element + "\"", conf); - } else if (id == 0) { - throw new IllegalArgumentException("Policy ID 0 is reserved: " + element); - } else if (id > ID_MAX) { - throwIllegalArgumentException("Invalid policy ID: id = " + id - + " > MAX = " + ID_MAX + " in \"" + element + "\"", conf); - } - return id; - } - - private static StorageType[] parseStorageTypes(String[] strings) { - if (strings == null || strings.length == 0) { - return StorageType.EMPTY_ARRAY; - } - final StorageType[] types = new StorageType[strings.length]; - for(int i = 0; i < types.length; i++) { - types[i] = StorageType.valueOf(strings[i].trim().toUpperCase()); - } - return types; - } - - private static StorageType[] readStorageTypes(byte id, String keyPrefix, - Configuration conf) { - final String key = keyPrefix + id; - final String[] values = conf.getStrings(key); - try { - return parseStorageTypes(values); - } catch(Exception e) { - throw new IllegalArgumentException("Failed to parse " + key - + " \"" + conf.get(key), e); - } - } - - private static BlockStoragePolicy readBlockStoragePolicy(byte id, String name, - Configuration conf) { - final StorageType[] storageTypes = readStorageTypes(id, - DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX, conf); - if (storageTypes.length == 0) { - throw new IllegalArgumentException( - DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX + id + " is missing or is empty."); - } - final StorageType[] creationFallbacks = readStorageTypes(id, - DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX, conf); - final StorageType[] replicationFallbacks = readStorageTypes(id, - DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX, conf); - return new BlockStoragePolicy(id, name, storageTypes, creationFallbacks, - replicationFallbacks); - } - - /** Read {@link Suite} from conf. */ - public static Suite readBlockStorageSuite(Configuration conf) { - final BlockStoragePolicy[] policies = new BlockStoragePolicy[1 << ID_BIT_LENGTH]; - final String[] values = conf.getStrings(DFS_BLOCK_STORAGE_POLICIES_KEY); - if (values == null) { - // conf property is missing, use default suite. - return DEFAULT_SUITE; - } - byte firstID = -1; - for(String v : values) { - v = v.trim(); - final int i = v.indexOf(':'); - if (i < 0) { - throwIllegalArgumentException("Failed to parse element \"" + v - + "\" (expected format is NAME:ID)", conf); - } else if (i == 0) { - throwIllegalArgumentException("Policy name is missing in \"" + v + "\"", conf); - } else if (i == v.length() - 1) { - throwIllegalArgumentException("Policy ID is missing in \"" + v + "\"", conf); - } - final String name = v.substring(0, i).trim(); - for(int j = 1; j < policies.length; j++) { - if (policies[j] != null && policies[j].name.equals(name)) { - throwIllegalArgumentException("Policy name duplication: \"" - + name + "\" appears more than once", conf); - } - } - - final byte id = parseID(v.substring(i + 1).trim(), v, conf); - if (policies[id] != null) { - throwIllegalArgumentException("Policy duplication: ID " + id - + " appears more than once", conf); - } - policies[id] = readBlockStoragePolicy(id, name, conf); - String prefix = ""; - if (firstID == -1) { - firstID = id; - prefix = "(default) "; - } - LOG.info(prefix + policies[id]); - } - if (firstID == -1) { - throwIllegalArgumentException("Empty list is not allowed", conf); - } - return new Suite(firstID, policies); - } - - public static String buildXAttrName() { - return XAttrNS.toString().toLowerCase() + "." + STORAGE_POLICY_XATTR_NAME; - } - - public static XAttr buildXAttr(byte policyId) { - final String name = buildXAttrName(); - return XAttrHelper.buildXAttr(name, new byte[] { policyId }); - } - - public static boolean isStoragePolicyXAttr(XAttr xattr) { - return xattr != null && xattr.getNameSpace() == BlockStoragePolicy.XAttrNS - && xattr.getName().equals(BlockStoragePolicy.STORAGE_POLICY_XATTR_NAME); - } - - private static void throwIllegalArgumentException(String message, - Configuration conf) { - throw new IllegalArgumentException(message + " in " - + DFS_BLOCK_STORAGE_POLICIES_KEY + " \"" - + conf.get(DFS_BLOCK_STORAGE_POLICIES_KEY) + "\"."); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 6f94370..03f5670 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -139,6 +139,7 @@ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.AclException; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator; @@ -1780,6 +1781,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } /** + * @return All the existing storage policies + */ + public BlockStoragePolicy[] getStoragePolicySuite() throws IOException { + return namenode.getStoragePolicySuite(); + } + + /** * Rename file or directory. * @see ClientProtocol#rename(String, String) * @deprecated Use {@link #rename(String, String, Options.Rename...)} instead. http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 3c5358f..3aa9acf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -438,14 +438,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final Class<BlockPlacementPolicyDefault> DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT = BlockPlacementPolicyDefault.class; public static final String DFS_REPLICATION_MAX_KEY = "dfs.replication.max"; public static final int DFS_REPLICATION_MAX_DEFAULT = 512; - public static final String DFS_BLOCK_STORAGE_POLICIES_KEY - = BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICIES_KEY; - public static final String DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX - = BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICY_KEY_PREFIX; - public static final String DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX - = BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICY_CREATION_FALLBACK_KEY_PREFIX; - public static final String DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX - = BlockStoragePolicy.DFS_BLOCK_STORAGE_POLICY_REPLICATION_FALLBACK_KEY_PREFIX; public static final String DFS_DF_INTERVAL_KEY = "dfs.df.interval"; public static final int DFS_DF_INTERVAL_DEFAULT = 60000; http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 6bce8b9..a9507f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -62,6 +62,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CachePoolEntry; @@ -504,6 +505,12 @@ public class DistributedFileSystem extends FileSystem { }.resolve(this, absF); } + /** Get all the existing storage policies */ + public BlockStoragePolicy[] getStoragePolicySuite() throws IOException { + statistics.incrementReadOps(1); + return dfs.getStoragePolicySuite(); + } + /** * Move blocks from srcs to trg and delete srcs afterwards. * The file block sizes must be the same. http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java index fdc466a..6280d67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java @@ -325,4 +325,15 @@ public class HdfsAdmin { throws IOException { return dfs.getInotifyEventStream(lastReadTxid); } + + /** + * Set the source path to the specified storage policy. + * + * @param src The source path referring to either a directory or a file. + * @param policyName The name of the storage policy. + */ + public void setStoragePolicy(final Path src, final String policyName) + throws IOException { + dfs.setStoragePolicy(src, policyName); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockStoragePolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockStoragePolicy.java new file mode 100644 index 0000000..35bef51 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockStoragePolicy.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocol; + +import java.util.Arrays; +import java.util.EnumSet; +import java.util.LinkedList; +import java.util.List; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.StorageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A block storage policy describes how to select the storage types + * for the replicas of a block. + */ +@InterfaceAudience.Private +public class BlockStoragePolicy { + public static final Logger LOG = LoggerFactory.getLogger(BlockStoragePolicy + .class); + + /** A 4-bit policy ID */ + private final byte id; + /** Policy name */ + private final String name; + + /** The storage types to store the replicas of a new block. */ + private final StorageType[] storageTypes; + /** The fallback storage type for block creation. */ + private final StorageType[] creationFallbacks; + /** The fallback storage type for replication. */ + private final StorageType[] replicationFallbacks; + + @VisibleForTesting + public BlockStoragePolicy(byte id, String name, StorageType[] storageTypes, + StorageType[] creationFallbacks, StorageType[] replicationFallbacks) { + this.id = id; + this.name = name; + this.storageTypes = storageTypes; + this.creationFallbacks = creationFallbacks; + this.replicationFallbacks = replicationFallbacks; + } + + /** + * @return a list of {@link StorageType}s for storing the replicas of a block. + */ + public List<StorageType> chooseStorageTypes(final short replication) { + final List<StorageType> types = new LinkedList<StorageType>(); + int i = 0; + for(; i < replication && i < storageTypes.length; i++) { + types.add(storageTypes[i]); + } + final StorageType last = storageTypes[storageTypes.length - 1]; + for(; i < replication; i++) { + types.add(last); + } + return types; + } + + /** + * Choose the storage types for storing the remaining replicas, given the + * replication number and the storage types of the chosen replicas. + * + * @param replication the replication number. + * @param chosen the storage types of the chosen replicas. + * @return a list of {@link StorageType}s for storing the replicas of a block. + */ + public List<StorageType> chooseStorageTypes(final short replication, + final Iterable<StorageType> chosen) { + return chooseStorageTypes(replication, chosen, null); + } + + private List<StorageType> chooseStorageTypes(final short replication, + final Iterable<StorageType> chosen, final List<StorageType> excess) { + final List<StorageType> types = chooseStorageTypes(replication); + diff(types, chosen, excess); + return types; + } + + /** + * Choose the storage types for storing the remaining replicas, given the + * replication number, the storage types of the chosen replicas and + * the unavailable storage types. It uses fallback storage in case that + * the desired storage type is unavailable. + * + * @param replication the replication number. + * @param chosen the storage types of the chosen replicas. + * @param unavailables the unavailable storage types. + * @param isNewBlock Is it for new block creation? + * @return a list of {@link StorageType}s for storing the replicas of a block. + */ + public List<StorageType> chooseStorageTypes(final short replication, + final Iterable<StorageType> chosen, + final EnumSet<StorageType> unavailables, + final boolean isNewBlock) { + final List<StorageType> excess = new LinkedList<StorageType>(); + final List<StorageType> storageTypes = chooseStorageTypes( + replication, chosen, excess); + final int expectedSize = storageTypes.size() - excess.size(); + final List<StorageType> removed = new LinkedList<StorageType>(); + for(int i = storageTypes.size() - 1; i >= 0; i--) { + // replace/remove unavailable storage types. + final StorageType t = storageTypes.get(i); + if (unavailables.contains(t)) { + final StorageType fallback = isNewBlock? + getCreationFallback(unavailables) + : getReplicationFallback(unavailables); + if (fallback == null) { + removed.add(storageTypes.remove(i)); + } else { + storageTypes.set(i, fallback); + } + } + } + // remove excess storage types after fallback replacement. + diff(storageTypes, excess, null); + if (storageTypes.size() < expectedSize) { + LOG.warn("Failed to place enough replicas: expected size is " + expectedSize + + " but only " + storageTypes.size() + " storage types can be selected " + + "(replication=" + replication + + ", selected=" + storageTypes + + ", unavailable=" + unavailables + + ", removed=" + removed + + ", policy=" + this + ")"); + } + return storageTypes; + } + + /** + * Compute the difference between two lists t and c so that after the diff + * computation we have: t = t - c; + * Further, if e is not null, set e = e + c - t; + */ + private static void diff(List<StorageType> t, Iterable<StorageType> c, + List<StorageType> e) { + for(StorageType storagetype : c) { + final int i = t.indexOf(storagetype); + if (i >= 0) { + t.remove(i); + } else if (e != null) { + e.add(storagetype); + } + } + } + + /** + * Choose excess storage types for deletion, given the + * replication number and the storage types of the chosen replicas. + * + * @param replication the replication number. + * @param chosen the storage types of the chosen replicas. + * @return a list of {@link StorageType}s for deletion. + */ + public List<StorageType> chooseExcess(final short replication, + final Iterable<StorageType> chosen) { + final List<StorageType> types = chooseStorageTypes(replication); + final List<StorageType> excess = new LinkedList<StorageType>(); + diff(types, chosen, excess); + return excess; + } + + /** @return the fallback {@link StorageType} for creation. */ + public StorageType getCreationFallback(EnumSet<StorageType> unavailables) { + return getFallback(unavailables, creationFallbacks); + } + + /** @return the fallback {@link StorageType} for replication. */ + public StorageType getReplicationFallback(EnumSet<StorageType> unavailables) { + return getFallback(unavailables, replicationFallbacks); + } + + @Override + public int hashCode() { + return Byte.valueOf(id).hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } else if (obj == null || !(obj instanceof BlockStoragePolicy)) { + return false; + } + final BlockStoragePolicy that = (BlockStoragePolicy)obj; + return this.id == that.id; + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{" + name + ":" + id + + ", storageTypes=" + Arrays.asList(storageTypes) + + ", creationFallbacks=" + Arrays.asList(creationFallbacks) + + ", replicationFallbacks=" + Arrays.asList(replicationFallbacks); + } + + public byte getId() { + return id; + } + + public String getName() { + return name; + } + + public StorageType[] getStorageTypes() { + return this.storageTypes; + } + + public StorageType[] getCreationFallbacks() { + return this.creationFallbacks; + } + + public StorageType[] getReplicationFallbacks() { + return this.replicationFallbacks; + } + + private static StorageType getFallback(EnumSet<StorageType> unavailables, + StorageType[] fallbacks) { + for(StorageType fb : fallbacks) { + if (!unavailables.contains(fb)) { + return fb; + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 4be83f2..7e16feb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -260,6 +260,13 @@ public interface ClientProtocol { SnapshotAccessControlException, IOException; /** + * Get all the available block storage policies. + * @return All the in-use block storage policies currently. + */ + @Idempotent + public BlockStoragePolicy[] getStoragePolicySuite() throws IOException; + + /** * Set the storage policy for a file/directory * @param src Path of an existing file/directory. * @param policyName The name of the storage policy http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java index 13acc7a..31feb1e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java @@ -24,8 +24,8 @@ import java.util.Date; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; /** * Metadata about a snapshottable directory @@ -62,7 +62,7 @@ public class SnapshottableDirectoryStatus { int snapshotNumber, int snapshotQuota, byte[] parentFullPath) { this.dirStatus = new HdfsFileStatus(0, true, 0, 0, modification_time, access_time, permission, owner, group, null, localName, inodeId, - childrenNum, null, BlockStoragePolicy.ID_UNSPECIFIED); + childrenNum, null, BlockStoragePolicySuite.ID_UNSPECIFIED); this.snapshotNumber = snapshotNumber; this.snapshotQuota = snapshotQuota; this.parentFullPath = parentFullPath; http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 9d0d13c..26a9762 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.Options.Rename; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CachePoolEntry; @@ -119,6 +120,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSna import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicySuiteRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicySuiteResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto; @@ -1429,6 +1432,26 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements return VOID_SET_STORAGE_POLICY_RESPONSE; } + @Override + public GetStoragePolicySuiteResponseProto getStoragePolicySuite( + RpcController controller, GetStoragePolicySuiteRequestProto request) + throws ServiceException { + try { + BlockStoragePolicy[] policies = server.getStoragePolicySuite(); + GetStoragePolicySuiteResponseProto.Builder builder = + GetStoragePolicySuiteResponseProto.newBuilder(); + if (policies == null) { + return builder.build(); + } + for (BlockStoragePolicy policy : policies) { + builder.addPolicies(PBHelper.convert(policy)); + } + return builder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + public GetCurrentEditLogTxidResponseProto getCurrentEditLogTxid(RpcController controller, GetCurrentEditLogTxidRequestProto req) throws ServiceException { try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 1279f7c..22238b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -45,6 +45,7 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.inotify.EventsList; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CachePoolEntry; @@ -64,9 +65,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; -import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; -import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusRequestProto; @@ -119,6 +118,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSna import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicySuiteRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicySuiteResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto; @@ -159,13 +160,13 @@ import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.SetXAttrRequestProto; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; @@ -225,6 +226,10 @@ public class ClientNamenodeProtocolTranslatorPB implements VOID_GET_DATA_ENCRYPTIONKEY_REQUEST = GetDataEncryptionKeyRequestProto.newBuilder().build(); + private final static GetStoragePolicySuiteRequestProto + VOID_GET_STORAGE_POLICY_SUITE_REQUEST = + GetStoragePolicySuiteRequestProto.newBuilder().build(); + public ClientNamenodeProtocolTranslatorPB(ClientNamenodeProtocolPB proxy) { rpcProxy = proxy; } @@ -1440,8 +1445,7 @@ public class ClientNamenodeProtocolTranslatorPB implements @Override public void setStoragePolicy(String src, String policyName) - throws SnapshotAccessControlException, UnresolvedLinkException, - FileNotFoundException, QuotaExceededException, IOException { + throws IOException { SetStoragePolicyRequestProto req = SetStoragePolicyRequestProto .newBuilder().setSrc(src).setPolicyName(policyName).build(); try { @@ -1451,6 +1455,17 @@ public class ClientNamenodeProtocolTranslatorPB implements } } + @Override + public BlockStoragePolicy[] getStoragePolicySuite() throws IOException { + try { + GetStoragePolicySuiteResponseProto response = rpcProxy + .getStoragePolicySuite(null, VOID_GET_STORAGE_POLICY_SUITE_REQUEST); + return PBHelper.convertStoragePolicies(response.getPoliciesList()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + public long getCurrentEditLogTxid() throws IOException { GetCurrentEditLogTxidRequestProto req = GetCurrentEditLogTxidRequestProto .getDefaultInstance(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index c0b71eb..0408214 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -44,7 +44,7 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; -import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.inotify.Event; @@ -120,6 +120,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterComm import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto; @@ -174,6 +175,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; @@ -280,6 +282,65 @@ public class PBHelper { return null; } + public static BlockStoragePolicy[] convertStoragePolicies( + List<BlockStoragePolicyProto> policyProtos) { + if (policyProtos == null || policyProtos.size() == 0) { + return new BlockStoragePolicy[0]; + } + BlockStoragePolicy[] policies = new BlockStoragePolicy[policyProtos.size()]; + int i = 0; + for (BlockStoragePolicyProto proto : policyProtos) { + policies[i++] = convert(proto); + } + return policies; + } + + public static BlockStoragePolicy convert(BlockStoragePolicyProto proto) { + List<StorageTypeProto> cList = proto.getCreationPolicy() + .getStorageTypesList(); + StorageType[] creationTypes = convertStorageTypes(cList, cList.size()); + List<StorageTypeProto> cfList = proto.hasCreationFallbackPolicy() ? proto + .getCreationFallbackPolicy().getStorageTypesList() : null; + StorageType[] creationFallbackTypes = cfList == null ? StorageType + .EMPTY_ARRAY : convertStorageTypes(cfList, cfList.size()); + List<StorageTypeProto> rfList = proto.hasReplicationFallbackPolicy() ? + proto.getReplicationFallbackPolicy().getStorageTypesList() : null; + StorageType[] replicationFallbackTypes = rfList == null ? StorageType + .EMPTY_ARRAY : convertStorageTypes(rfList, rfList.size()); + return new BlockStoragePolicy((byte) proto.getPolicyId(), proto.getName(), + creationTypes, creationFallbackTypes, replicationFallbackTypes); + } + + public static BlockStoragePolicyProto convert(BlockStoragePolicy policy) { + BlockStoragePolicyProto.Builder builder = BlockStoragePolicyProto + .newBuilder().setPolicyId(policy.getId()).setName(policy.getName()); + // creation storage types + StorageTypesProto creationProto = convert(policy.getStorageTypes()); + Preconditions.checkArgument(creationProto != null); + builder.setCreationPolicy(creationProto); + // creation fallback + StorageTypesProto creationFallbackProto = convert( + policy.getCreationFallbacks()); + if (creationFallbackProto != null) { + builder.setCreationFallbackPolicy(creationFallbackProto); + } + // replication fallback + StorageTypesProto replicationFallbackProto = convert( + policy.getReplicationFallbacks()); + if (replicationFallbackProto != null) { + builder.setReplicationFallbackPolicy(replicationFallbackProto); + } + return builder.build(); + } + + public static StorageTypesProto convert(StorageType[] types) { + if (types == null || types.length == 0) { + return null; + } + List<StorageTypeProto> list = convertStorageTypes(types); + return StorageTypesProto.newBuilder().addAllStorageTypes(list).build(); + } + public static StorageInfoProto convert(StorageInfo info) { return StorageInfoProto.newBuilder().setClusterID(info.getClusterID()) .setCTime(info.getCTime()).setLayoutVersion(info.getLayoutVersion()) @@ -1349,7 +1410,7 @@ public class PBHelper { fs.hasChildrenNum() ? fs.getChildrenNum() : -1, fs.hasFileEncryptionInfo() ? convert(fs.getFileEncryptionInfo()) : null, fs.hasStoragePolicy() ? (byte) fs.getStoragePolicy() - : BlockStoragePolicy.ID_UNSPECIFIED); + : BlockStoragePolicySuite.ID_UNSPECIFIED); } public static SnapshottableDirectoryStatus convert( http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index cb303a7..4cdec30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -42,7 +42,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; @@ -255,7 +255,7 @@ public class BlockManager { /** for block replicas placement */ private BlockPlacementPolicy blockplacement; - private final BlockStoragePolicy.Suite storagePolicySuite; + private final BlockStoragePolicySuite storagePolicySuite; /** Check whether name system is running before terminating */ private boolean checkNSRunning = true; @@ -278,7 +278,7 @@ public class BlockManager { blockplacement = BlockPlacementPolicy.getInstance( conf, stats, datanodeManager.getNetworkTopology(), datanodeManager.getHost2DatanodeMap()); - storagePolicySuite = BlockStoragePolicy.readBlockStorageSuite(conf); + storagePolicySuite = BlockStoragePolicySuite.createDefaultSuite(); pendingReplications = new PendingReplicationBlocks(conf.getInt( DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L); @@ -402,6 +402,10 @@ public class BlockManager { return storagePolicySuite.getPolicy(policyName); } + public BlockStoragePolicy[] getStoragePolicySuite() { + return storagePolicySuite.getAllPolicies(); + } + public void setBlockPoolId(String blockPoolId) { if (isBlockTokenEnabled()) { blockTokenSecretManager.setBlockPoolId(blockPoolId); @@ -3599,7 +3603,7 @@ public class BlockManager { } private void chooseTargets(BlockPlacementPolicy blockplacement, - BlockStoragePolicy.Suite storagePolicySuite, + BlockStoragePolicySuite storagePolicySuite, Set<Node> excludedNodes) { targets = blockplacement.chooseTarget(bc.getName(), additionalReplRequired, srcNode, liveReplicaStorages, false, http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java index 00f7253..26a55a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java @@ -27,7 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index a0e6701..19c3075 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -23,7 +23,7 @@ import java.util.*; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.StorageType; http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java new file mode 100644 index 0000000..1d162a0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.XAttr; +import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.XAttrHelper; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** A collection of block storage policies. */ +public class BlockStoragePolicySuite { + static final Logger LOG = LoggerFactory.getLogger(BlockStoragePolicySuite + .class); + + public static final String STORAGE_POLICY_XATTR_NAME + = "hsm.block.storage.policy.id"; + public static final XAttr.NameSpace XAttrNS = XAttr.NameSpace.SYSTEM; + + public static final int ID_BIT_LENGTH = 4; + public static final byte ID_UNSPECIFIED = 0; + + @VisibleForTesting + public static BlockStoragePolicySuite createDefaultSuite() { + final BlockStoragePolicy[] policies = + new BlockStoragePolicy[1 << ID_BIT_LENGTH]; + final byte hotId = 12; + policies[hotId] = new BlockStoragePolicy(hotId, "HOT", + new StorageType[]{StorageType.DISK}, StorageType.EMPTY_ARRAY, + new StorageType[]{StorageType.ARCHIVE}); + final byte warmId = 8; + policies[warmId] = new BlockStoragePolicy(warmId, "WARM", + new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}, + new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}, + new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}); + final byte coldId = 4; + policies[coldId] = new BlockStoragePolicy(coldId, "COLD", + new StorageType[]{StorageType.ARCHIVE}, StorageType.EMPTY_ARRAY, + StorageType.EMPTY_ARRAY); + return new BlockStoragePolicySuite(hotId, policies); + } + + private final byte defaultPolicyID; + private final BlockStoragePolicy[] policies; + + public BlockStoragePolicySuite(byte defaultPolicyID, + BlockStoragePolicy[] policies) { + this.defaultPolicyID = defaultPolicyID; + this.policies = policies; + } + + /** @return the corresponding policy. */ + public BlockStoragePolicy getPolicy(byte id) { + // id == 0 means policy not specified. + return id == 0? getDefaultPolicy(): policies[id]; + } + + /** @return the default policy. */ + public BlockStoragePolicy getDefaultPolicy() { + return getPolicy(defaultPolicyID); + } + + public BlockStoragePolicy getPolicy(String policyName) { + if (policies != null) { + for (BlockStoragePolicy policy : policies) { + if (policy != null && policy.getName().equals(policyName)) { + return policy; + } + } + } + return null; + } + + public BlockStoragePolicy[] getAllPolicies() { + List<BlockStoragePolicy> list = Lists.newArrayList(); + if (policies != null) { + for (BlockStoragePolicy policy : policies) { + if (policy != null) { + list.add(policy); + } + } + } + return list.toArray(new BlockStoragePolicy[list.size()]); + } + + public static String buildXAttrName() { + return XAttrNS.toString().toLowerCase() + "." + STORAGE_POLICY_XATTR_NAME; + } + + public static XAttr buildXAttr(byte policyId) { + final String name = buildXAttrName(); + return XAttrHelper.buildXAttr(name, new byte[]{policyId}); + } + + public static boolean isStoragePolicyXAttr(XAttr xattr) { + return xattr != null && xattr.getNameSpace() == XAttrNS + && xattr.getName().equals(STORAGE_POLICY_XATTR_NAME); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index 858db1d..c222181 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup; import org.apache.hadoop.hdfs.server.balancer.ExitStatus; import org.apache.hadoop.hdfs.server.balancer.Matcher; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -102,7 +103,7 @@ public class Mover { private final StorageMap storages; private final List<Path> targetPaths; - private final BlockStoragePolicy.Suite blockStoragePolicies; + private final BlockStoragePolicy[] blockStoragePolicies; Mover(NameNodeConnector nnc, Configuration conf) { final long movedWinWidth = conf.getLong( @@ -119,11 +120,13 @@ public class Mover { Collections.<String> emptySet(), movedWinWidth, moverThreads, 0, maxConcurrentMovesPerNode, conf); this.storages = new StorageMap(); - this.blockStoragePolicies = BlockStoragePolicy.readBlockStorageSuite(conf); this.targetPaths = nnc.getTargetPaths(); + this.blockStoragePolicies = new BlockStoragePolicy[1 << + BlockStoragePolicySuite.ID_BIT_LENGTH]; } void init() throws IOException { + initStoragePolicies(); final List<DatanodeStorageReport> reports = dispatcher.init(); for(DatanodeStorageReport r : reports) { final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo()); @@ -137,6 +140,14 @@ public class Mover { } } + private void initStoragePolicies() throws IOException { + BlockStoragePolicy[] policies = dispatcher.getDistributedFileSystem() + .getStoragePolicySuite(); + for (BlockStoragePolicy policy : policies) { + this.blockStoragePolicies[policy.getId()] = policy; + } + } + private ExitStatus run() { try { init(); @@ -305,7 +316,7 @@ public class Mover { if (!isSnapshotPathInCurrent(fullPath)) { // the full path is a snapshot path but it is also included in the // current directory tree, thus ignore it. - hasRemaining = processFile((HdfsLocatedFileStatus)status); + hasRemaining = processFile(fullPath, (HdfsLocatedFileStatus)status); } } catch (IOException e) { LOG.warn("Failed to check the status of " + parent @@ -317,9 +328,17 @@ public class Mover { } /** @return true if it is necessary to run another round of migration */ - private boolean processFile(HdfsLocatedFileStatus status) { - final BlockStoragePolicy policy = blockStoragePolicies.getPolicy( - status.getStoragePolicy()); + private boolean processFile(String fullPath, HdfsLocatedFileStatus status) { + final byte policyId = status.getStoragePolicy(); + // currently we ignore files with unspecified storage policy + if (policyId == BlockStoragePolicySuite.ID_UNSPECIFIED) { + return false; + } + final BlockStoragePolicy policy = blockStoragePolicies[policyId]; + if (policy == null) { + LOG.warn("Failed to get the storage policy of file " + fullPath); + return false; + } final List<StorageType> types = policy.chooseStorageTypes( status.getReplication()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 961808e..94d7fcf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -53,7 +53,6 @@ import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; -import org.apache.hadoop.hdfs.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -79,6 +78,7 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; @@ -1037,7 +1037,7 @@ public class FSDirectory implements Closeable { private void setDirStoragePolicy(INodeDirectory inode, byte policyId, int latestSnapshotId) throws IOException { List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode); - XAttr xAttr = BlockStoragePolicy.buildXAttr(policyId); + XAttr xAttr = BlockStoragePolicySuite.buildXAttr(policyId); List<XAttr> newXAttrs = setINodeXAttrs(existingXAttrs, Arrays.asList(xAttr), EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE)); XAttrStorage.updateINodeXAttrs(inode, newXAttrs, latestSnapshotId); @@ -1375,7 +1375,7 @@ public class FSDirectory implements Closeable { } private byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) { - return inodePolicy != BlockStoragePolicy.ID_UNSPECIFIED ? inodePolicy : + return inodePolicy != BlockStoragePolicySuite.ID_UNSPECIFIED ? inodePolicy : parentPolicy; } @@ -1410,7 +1410,7 @@ public class FSDirectory implements Closeable { if (targetNode == null) return null; byte parentStoragePolicy = isSuperUser ? - targetNode.getStoragePolicyID() : BlockStoragePolicy.ID_UNSPECIFIED; + targetNode.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED; if (!targetNode.isDirectory()) { return new DirectoryListing( @@ -1430,7 +1430,8 @@ public class FSDirectory implements Closeable { for (int i=0; i<numOfListing && locationBudget>0; i++) { INode cur = contents.get(startChild+i); byte curPolicy = isSuperUser && !cur.isSymlink()? - cur.getLocalStoragePolicyID(): BlockStoragePolicy.ID_UNSPECIFIED; + cur.getLocalStoragePolicyID(): + BlockStoragePolicySuite.ID_UNSPECIFIED; listing[i] = createFileStatus(cur.getLocalNameBytes(), cur, needLocation, getStoragePolicyID(curPolicy, parentStoragePolicy), snapshot, isRawPath, inodesInPath); @@ -1484,7 +1485,7 @@ public class FSDirectory implements Closeable { for (int i = 0; i < numOfListing; i++) { Root sRoot = snapshots.get(i + skipSize).getRoot(); listing[i] = createFileStatus(sRoot.getLocalNameBytes(), sRoot, - BlockStoragePolicy.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID, + BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID, false, null); } return new DirectoryListing( @@ -1512,7 +1513,7 @@ public class FSDirectory implements Closeable { final INode[] inodes = inodesInPath.getINodes(); final INode i = inodes[inodes.length - 1]; byte policyId = includeStoragePolicy && i != null && !i.isSymlink() ? - i.getStoragePolicyID() : BlockStoragePolicy.ID_UNSPECIFIED; + i.getStoragePolicyID() : BlockStoragePolicySuite.ID_UNSPECIFIED; return i == null ? null : createFileStatus(HdfsFileStatus.EMPTY_NAME, i, policyId, inodesInPath.getPathSnapshotId(), isRawPath, inodesInPath); @@ -1532,7 +1533,8 @@ public class FSDirectory implements Closeable { throws UnresolvedLinkException { if (getINode4DotSnapshot(src) != null) { return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null, - HdfsFileStatus.EMPTY_NAME, -1L, 0, null, BlockStoragePolicy.ID_UNSPECIFIED); + HdfsFileStatus.EMPTY_NAME, -1L, 0, null, + BlockStoragePolicySuite.ID_UNSPECIFIED); } return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index cc0572e..d13199e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -33,7 +33,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.XAttrSetFlag; -import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -373,7 +373,7 @@ public class FSEditLogLoader { if (toAddRetryCache) { HdfsFileStatus stat = fsNamesys.dir.createFileStatus( HdfsFileStatus.EMPTY_NAME, newFile, - BlockStoragePolicy.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID, + BlockStoragePolicySuite.ID_UNSPECIFIED, Snapshot.CURRENT_STATE_ID, false, iip); fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId, addCloseOp.rpcCallId, stat); http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 876cf49..9ee6448 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -17,8 +17,7 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension - .EncryptedKeyVersion; +import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; @@ -160,7 +159,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.ServiceFailedException; -import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; @@ -2302,6 +2301,21 @@ public class FSNamesystem implements Namesystem, FSClusterStats, logAuditEvent(true, "setStoragePolicy", src, null, fileStat); } + /** + * @return All the existing block storage policies + */ + BlockStoragePolicy[] getStoragePolicySuite() throws IOException { + checkOperation(OperationCategory.READ); + waitForLoadingFSImage(); + readLock(); + try { + checkOperation(OperationCategory.READ); + return blockManager.getStoragePolicySuite(); + } finally { + readUnlock(); + } + } + long getPreferredBlockSize(String filename) throws IOException, UnresolvedLinkException { FSPermissionChecker pc = getPermissionChecker(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java index 307f507..4454930 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; -import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; @@ -695,7 +695,7 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> { /** * @return the storage policy directly specified on the INode. Return - * {@link BlockStoragePolicy#ID_UNSPECIFIED} if no policy has + * {@link BlockStoragePolicySuite#ID_UNSPECIFIED} if no policy has * been specified. */ public abstract byte getLocalStoragePolicyID(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java index f5579ee..a753230 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java @@ -28,10 +28,11 @@ import java.util.Map; import org.apache.hadoop.fs.PathIsNotDirectoryException; import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.permission.PermissionStatus; -import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.SnapshotException; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount; import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature; import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature; @@ -112,22 +113,22 @@ public class INodeDirectory extends INodeWithAdditionalFields ImmutableList<XAttr> xattrs = f == null ? ImmutableList.<XAttr> of() : f .getXAttrs(); for (XAttr xattr : xattrs) { - if (BlockStoragePolicy.isStoragePolicyXAttr(xattr)) { + if (BlockStoragePolicySuite.isStoragePolicyXAttr(xattr)) { return (xattr.getValue())[0]; } } - return BlockStoragePolicy.ID_UNSPECIFIED; + return BlockStoragePolicySuite.ID_UNSPECIFIED; } @Override public byte getStoragePolicyID() { byte id = getLocalStoragePolicyID(); - if (id != BlockStoragePolicy.ID_UNSPECIFIED) { + if (id != BlockStoragePolicySuite.ID_UNSPECIFIED) { return id; } // if it is unspecified, check its parent return getParent() != null ? getParent().getStoragePolicyID() : - BlockStoragePolicy.ID_UNSPECIFIED; + BlockStoragePolicySuite.ID_UNSPECIFIED; } void setQuota(long nsQuota, long dsQuota) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java index f0f58a9..26a6678 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryAttributes.java @@ -18,12 +18,9 @@ package org.apache.hadoop.hdfs.server.namenode; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.permission.PermissionStatus; -import org.apache.hadoop.hdfs.BlockStoragePolicy; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; /** * The attributes of an inode. http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index 7af2b71..583c193 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@ -28,12 +28,12 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.permission.PermissionStatus; -import org.apache.hadoop.hdfs.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiff; @@ -79,7 +79,8 @@ public class INodeFile extends INodeWithAdditionalFields static enum HeaderFormat { PREFERRED_BLOCK_SIZE(null, 48, 1), REPLICATION(PREFERRED_BLOCK_SIZE.BITS, 12, 1), - STORAGE_POLICY_ID(REPLICATION.BITS, BlockStoragePolicy.ID_BIT_LENGTH, 0); + STORAGE_POLICY_ID(REPLICATION.BITS, BlockStoragePolicySuite.ID_BIT_LENGTH, + 0); private final LongBitFormat BITS; @@ -374,7 +375,7 @@ public class INodeFile extends INodeWithAdditionalFields @Override public byte getStoragePolicyID() { byte id = getLocalStoragePolicyID(); - if (id == BlockStoragePolicy.ID_UNSPECIFIED) { + if (id == BlockStoragePolicySuite.ID_UNSPECIFIED) { return this.getParent() != null ? this.getParent().getStoragePolicyID() : id; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java index 87e4715..5009c58 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java @@ -22,8 +22,9 @@ import java.util.List; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; -import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.namenode.Quota.Counts; import org.apache.hadoop.util.GSet; import org.apache.hadoop.util.LightWeightGSet; @@ -125,12 +126,12 @@ public class INodeMap { @Override public byte getStoragePolicyID(){ - return BlockStoragePolicy.ID_UNSPECIFIED; + return BlockStoragePolicySuite.ID_UNSPECIFIED; } @Override public byte getLocalStoragePolicyID() { - return BlockStoragePolicy.ID_UNSPECIFIED; + return BlockStoragePolicySuite.ID_UNSPECIFIED; } }; http://git-wip-us.apache.org/repos/asf/hadoop/blob/073bbd80/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 2a05fcb..b05550d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.inotify.EventsList; import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CachePoolEntry; @@ -592,6 +593,11 @@ class NameNodeRpcServer implements NamenodeProtocols { namesystem.setStoragePolicy(src, policyName); } + @Override + public BlockStoragePolicy[] getStoragePolicySuite() throws IOException { + return namesystem.getStoragePolicySuite(); + } + @Override // ClientProtocol public void setPermission(String src, FsPermission permissions) throws IOException {