This is an automated email from the ASF dual-hosted git repository. shv pushed a commit to branch fgl in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 455e8c019184d5d3ae7bcff4d29d9baa7aff3663 Author: Konstantin V Shvachko <s...@apache.org> AuthorDate: Fri May 7 17:51:58 2021 -0700 Add namespace key for INode. (shv) --- .../org/apache/hadoop/util/PartitionedGSet.java | 80 ++++++++++++++++++---- .../hadoop/hdfs/server/namenode/FSDirMkdirOp.java | 3 + .../apache/hadoop/hdfs/server/namenode/INode.java | 40 ++++++++++- .../hadoop/hdfs/server/namenode/INodeMap.java | 71 +++++++++++++++++-- 4 files changed, 176 insertions(+), 18 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java index 4b0cdc9..7ebb1b3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java @@ -22,6 +22,7 @@ import java.util.Comparator; import java.util.Iterator; import java.util.Map.Entry; import java.util.NavigableMap; +import java.util.Set; import java.util.TreeMap; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -44,7 +45,8 @@ import org.apache.hadoop.util.LightWeightGSet.LinkedElement; @InterfaceAudience.Private public class PartitionedGSet<K, E extends K> implements GSet<K, E> { - private static final int DEFAULT_PARTITION_CAPACITY = 2027; + private static final int DEFAULT_PARTITION_CAPACITY = 65536; // 4096; // 5120; // 2048; // 1027; + private static final float DEFAULT_PARTITION_OVERFLOW = 1.8f; /** * An ordered map of contiguous segments of elements. @@ -81,8 +83,11 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> { final E rootKey) { this.partitions = new TreeMap<K, PartitionEntry>(comparator); this.latchLock = latchLock; - addNewPartition(rootKey).put(rootKey); - this.size = 1; + // addNewPartition(rootKey).put(rootKey); + // this.size = 1; + this.size = 0; + LOG.info("Partition capacity = {}", DEFAULT_PARTITION_CAPACITY); + LOG.info("Partition overflow factor = {}", DEFAULT_PARTITION_OVERFLOW); } /** @@ -90,16 +95,19 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> { * @param key * @return */ - private PartitionEntry addNewPartition(final K key) { + public PartitionEntry addNewPartition(final K key) { + Entry<K, PartitionEntry> lastEntry = partitions.lastEntry(); PartitionEntry lastPart = null; - if(size > 0) - lastPart = partitions.lastEntry().getValue(); + if(lastEntry != null) + lastPart = lastEntry.getValue(); PartitionEntry newPart = new PartitionEntry(DEFAULT_PARTITION_CAPACITY); // assert size == 0 || newPart.partLock.isWriteTopLocked() : // "Must hold write Lock: key = " + key; - partitions.put(key, newPart); + PartitionEntry oldPart = partitions.put(key, newPart); + assert oldPart == null : + "RangeMap already has a partition associated with " + key; LOG.debug("Total GSet size = {}", size); LOG.debug("Number of partitions = {}", partitions.size()); @@ -173,7 +181,7 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> { private PartitionEntry addNewPartitionIfNeeded( PartitionEntry curPart, K key) { - if(curPart.size() < DEFAULT_PARTITION_CAPACITY * 1.1 + if(curPart.size() < DEFAULT_PARTITION_CAPACITY * DEFAULT_PARTITION_OVERFLOW || curPart.contains(key)) { return curPart; } @@ -197,12 +205,56 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> { public void clear() { LOG.error("Total GSet size = {}", size); LOG.error("Number of partitions = {}", partitions.size()); + printStats(); // assert latchLock.hasWriteTopLock() : "Must hold write topLock"; // SHV May need to clear all partitions? partitions.clear(); size = 0; } + private void printStats() { + int partSizeMin = Integer.MAX_VALUE, partSizeAvg = 0, partSizeMax = 0; + long totalSize = 0; + int numEmptyPartitions = 0, numFullPartitions = 0; + Collection<PartitionEntry> parts = partitions.values(); + Set<Entry<K, PartitionEntry>> entries = partitions.entrySet(); + int i = 0; + for(Entry<K, PartitionEntry> e : entries) { + PartitionEntry part = e.getValue(); + int s = part.size; + if(s == 0) numEmptyPartitions++; + if(s > DEFAULT_PARTITION_CAPACITY) numFullPartitions++; + totalSize += s; + partSizeMin = (s < partSizeMin ? s : partSizeMin); + partSizeMax = (partSizeMax < s ? s : partSizeMax); + Class<?> inodeClass = e.getKey().getClass(); + try { + long[] key = (long[]) inodeClass. + getMethod("getNamespaceKey", int.class).invoke(e.getKey(), 2); + long[] firstKey = new long[0]; + if(part.iterator().hasNext()) { + Object first = part.iterator().next(); + firstKey = (long[]) inodeClass.getMethod( + "getNamespaceKey", int.class).invoke(first, 2); + Object parent = inodeClass. + getMethod("getParent").invoke(first); + long parentId = (parent == null ? 0L : + (long) inodeClass.getMethod("getId").invoke(parent)); + firstKey[0] = parentId; + } + LOG.error("Partition #{}\t key: {}\t size: {}\t first: {}", + i++, key, s, firstKey); // SHV should be info + } catch (Exception ex) { + LOG.error("Cannot find Method getNamespaceKey() in {}", inodeClass); + } + } + partSizeAvg = (int) (totalSize / parts.size()); + LOG.error("Partition sizes: min = {}, avg = {}, max = {}, sum = {}", + partSizeMin, partSizeAvg, partSizeMax, totalSize); + LOG.error("Number of partitions: empty = {}, full = {}", + numEmptyPartitions, numFullPartitions); + } + @Override public Collection<E> values() { // TODO Auto-generated method stub @@ -234,15 +286,19 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> { @Override public boolean hasNext() { - if(partitionIterator.hasNext()) { - return true; + while(!partitionIterator.hasNext()) { + if(!keyIterator.hasNext()) { + return false; + } + K curKey = keyIterator.next(); + partitionIterator = getPartition(curKey).iterator(); } - return keyIterator.hasNext(); + return partitionIterator.hasNext(); } @Override public E next() { - if(!partitionIterator.hasNext()) { + while(!partitionIterator.hasNext()) { K curKey = keyIterator.next(); partitionIterator = getPartition(curKey).iterator(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java index c8c6277..5a40906 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java @@ -265,10 +265,13 @@ class FSDirMkdirOp { // create the missing directories along the path INode[] missing = new INode[numMissing]; final int last = iip.length(); + INode parent = existing.getLastINode(); for (int i = existing.length(); i < last; i++) { byte[] component = iip.getPathComponent(i); missing[i - existing.length()] = createDirectoryINode(fsd, existing, component, perm); + missing[i - existing.length()].setParent(parent.asDirectory()); + parent = missing[i - existing.length()]; } return missing; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java index daff95c..42e462e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java @@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory; import java.io.PrintStream; import java.io.PrintWriter; import java.io.StringWriter; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -577,6 +578,43 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> { return name == null? null: DFSUtil.bytes2String(name); } + private long[] namespaceKey; + + /** + * Key of an INode. + * Defines partitioning of INodes in the INodeMap. + * + * @param level how many levels to be included in the key + * @return + */ + public long[] getNamespaceKey(int level) { + if(namespaceKey == null) { // generate the namespace key + long[] buf = new long[level]; + INode cur = this; + for(int l = 0; l < level; l++) { + long curId = (cur == null) ? 0L : cur.getId(); + buf[level - l - 1] = curId; + cur = (cur == null) ? null : cur.parent; + } + buf[0] = indexOf(buf); + namespaceKey = buf; + } + return namespaceKey; + } + + private final static long LARGE_PRIME = 512927357; + public static long indexOf(long[] key) { + if(key[key.length-1] == INodeId.ROOT_INODE_ID) { + return key[0]; + } + long idx = LARGE_PRIME * key[0]; + idx = (idx ^ (idx >> 32)) & (INodeMap.NUM_RANGES_STATIC -1); + return idx; + } + + /** + * Key of a snapshot Diff Element + */ @Override public final byte[] getKey() { return getLocalNameBytes(); @@ -636,7 +674,7 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> { @Override public String toString() { - return getLocalName(); + return getLocalName() + ": " + Arrays.toString(namespaceKey); } @VisibleForTesting diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java index 88c3233..3b07dce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java @@ -29,14 +29,63 @@ import org.apache.hadoop.util.GSet; import org.apache.hadoop.util.LatchLock; import org.apache.hadoop.util.LightWeightGSet; import org.apache.hadoop.util.PartitionedGSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Storing all the {@link INode}s and maintaining the mapping between INode ID * and INode. */ public class INodeMap { + static final int NAMESPACE_KEY_DEBTH = 2; + static final int NUM_RANGES_STATIC = 256; // power of 2 + + public static class INodeKeyComparator implements Comparator<INode> { + INodeKeyComparator() { + FSDirectory.LOG.info("Namespace key debth = {}", NAMESPACE_KEY_DEBTH); + } + + @Override + public int compare(INode i1, INode i2) { + if (i1 == null || i2 == null) { + throw new NullPointerException("Cannot compare null INodes"); + } + long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEBTH); + long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEBTH); + for(int l = 0; l < NAMESPACE_KEY_DEBTH; l++) { + if(key1[l] == key2[l]) continue; + return (key1[l] < key2[l] ? -1 : 1); + } + return 0; + } + } + + /** + * INodeKeyComparator with Hashed Parent + * + */ + public static class HPINodeKeyComparator implements Comparator<INode> { + HPINodeKeyComparator() { + FSDirectory.LOG.info("Namespace key debth = {}", NAMESPACE_KEY_DEBTH); + } + + @Override + public int compare(INode i1, INode i2) { + if (i1 == null || i2 == null) { + throw new NullPointerException("Cannot compare null INodes"); + } + long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEBTH); + long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEBTH); + long key1_0 = INode.indexOf(key1); + long key2_0 = INode.indexOf(key2); + if(key1_0 != key2_0) + return (key1_0 < key2_0 ? -1 : 1); + for(int l = 1; l < NAMESPACE_KEY_DEBTH; l++) { + if(key1[l] == key2[l]) continue; + return (key1[l] < key2[l] ? -1 : 1); + } + return 0; + } + } + public static class INodeIdComparator implements Comparator<INode> { @Override public int compare(INode i1, INode i2) { @@ -50,8 +99,6 @@ public class INodeMap { } public class INodeMapLock extends LatchLock<ReentrantReadWriteLock> { - Logger LOG = LoggerFactory.getLogger(INodeMapLock.class); - private ReentrantReadWriteLock childLock; INodeMapLock() { @@ -146,8 +193,22 @@ public class INodeMap { this.namesystem = ns; // Compute the map capacity by allocating 1% of total memory int capacity = LightWeightGSet.computeCapacity(1, "INodeMap"); - this.map = new PartitionedGSet<>(capacity, new INodeIdComparator(), + this.map = new PartitionedGSet<>(capacity, new INodeKeyComparator(), new INodeMapLock(), rootDir); + + // Pre-populate initial empty partitions + PartitionedGSet<INode, INodeWithAdditionalFields> pgs = + (PartitionedGSet<INode, INodeWithAdditionalFields>) map; + PermissionStatus perm = new PermissionStatus( + "", "", new FsPermission((short) 0)); + for(int p = 0; p < NUM_RANGES_STATIC; p++) { + INodeDirectory key = new INodeDirectory( + INodeId.ROOT_INODE_ID, "range key".getBytes(), perm, 0); + key.setParent(new INodeDirectory((long)p, null, perm, 0)); + pgs.addNewPartition(key); + } + + map.put(rootDir); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org