This is an automated email from the ASF dual-hosted git repository.

shv pushed a commit to branch fgl
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 4610e1d90273f46bc3b770de8c33b264660b17bc
Author: Xing Lin <xing...@linkedin.com>
AuthorDate: Sat Jul 31 12:56:05 2021 -0700

    HDFS-16128. [FGL] Added support for saving/loading an FS Image for 
PartitionedGSet. Contributed by Xing Lin. (#3201)
---
 .../org/apache/hadoop/util/PartitionedGSet.java    |  24 +++--
 .../hadoop/hdfs/server/namenode/FSDirMkdirOp.java  |   4 +-
 .../hadoop/hdfs/server/namenode/FSDirectory.java   |  70 ++++++++++++++
 .../hadoop/hdfs/server/namenode/FSImage.java       |  12 +++
 .../hdfs/server/namenode/FSImageFormatPBINode.java |  11 ++-
 .../hadoop/hdfs/server/namenode/INodeMap.java      | 105 ++++++++++++---------
 6 files changed, 168 insertions(+), 58 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
index f3569cc..f493402 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
@@ -68,7 +68,7 @@ public class PartitionedGSet<K, E extends K> implements 
GSet<K, E> {
    * Consists of a hash table {@link LightWeightGSet} and a lock, which
    * controls access to this partition independently on the other ones.
    */
-  private class PartitionEntry extends LightWeightGSet<K, E> {
+  public class PartitionEntry extends LightWeightGSet<K, E> {
     private final LatchLock<?> partLock;
 
     PartitionEntry(int defaultPartitionCapacity) {
@@ -121,7 +121,7 @@ public class PartitionedGSet<K, E extends K> implements 
GSet<K, E> {
     return size;
   }
 
-  protected PartitionEntry getPartition(final K key) {
+  public PartitionEntry getPartition(final K key) {
     Entry<K, PartitionEntry> partEntry = partitions.floorEntry(key);
     if(partEntry == null) {
       return null;
@@ -174,6 +174,10 @@ public class PartitionedGSet<K, E extends K> implements 
GSet<K, E> {
     E result = part.put(element);
     if(result == null) {  // new element
       size++;
+      LOG.debug("partitionPGSet.put: added key {}, size is now {} ", key, 
size);
+    } else {
+      LOG.debug("partitionPGSet.put: replaced key {}, size is now {}",
+          key, size);
     }
     return result;
   }
@@ -230,19 +234,25 @@ public class PartitionedGSet<K, E extends K> implements 
GSet<K, E> {
       try {
         long[] key = (long[]) inodeClass.
             getMethod("getNamespaceKey", int.class).invoke(e.getKey(), 2);
-        long[] firstKey = new long[0];
+        long[] firstKey = new long[key.length];
         if(part.iterator().hasNext()) {
           Object first = part.iterator().next();
-          firstKey = (long[]) inodeClass.getMethod(
+          long[] firstKeyRef = (long[]) inodeClass.getMethod(
             "getNamespaceKey", int.class).invoke(first, 2);
           Object parent = inodeClass.
               getMethod("getParent").invoke(first);
           long parentId = (parent == null ? 0L :
             (long) inodeClass.getMethod("getId").invoke(parent));
+          for (int j=0; j < key.length; j++) {
+            firstKey[j] = firstKeyRef[j];
+          }
           firstKey[0] = parentId;
         }
         LOG.error("Partition #{}\t key: {}\t size: {}\t first: {}",
             i++, key, s, firstKey);  // SHV should be info
+      } catch (NoSuchElementException ex) {
+        LOG.error("iterator.next() throws NoSuchElementException.");
+        throw ex;
       } catch (Exception ex) {
         LOG.error("Cannot find Method getNamespaceKey() in {}", inodeClass);
       }
@@ -250,8 +260,8 @@ public class PartitionedGSet<K, E extends K> implements 
GSet<K, E> {
     partSizeAvg = (int) (totalSize / parts.size());
     LOG.error("Partition sizes: min = {}, avg = {}, max = {}, sum = {}",
         partSizeMin, partSizeAvg, partSizeMax, totalSize);
-    LOG.error("Number of partitions: empty = {}, full = {}",
-        numEmptyPartitions, numFullPartitions);
+    LOG.error("Number of partitions: empty = {}, in-use = {}, full = {}",
+        numEmptyPartitions, parts.size()-numEmptyPartitions, 
numFullPartitions);
   }
 
   @Override
@@ -277,6 +287,8 @@ public class PartitionedGSet<K, E extends K> implements 
GSet<K, E> {
     private Iterator<K> keyIterator;
     private Iterator<E> partitionIterator;
 
+    // Set partitionIterator to point to the first partition, or set it to null
+    // when there is no partitions created for this PartitionedGSet.
     public EntryIterator() {
       keyIterator = partitions.keySet().iterator();
  
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
index 5a40906..ef08e9e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.nio.charset.StandardCharsets;
 import org.apache.hadoop.fs.permission.FsCreateModes;
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -146,7 +147,8 @@ class FSDirMkdirOp {
         existing = createSingleDirectory(fsd, existing, component, perm);
         if(existing == null) {
           FSNamesystem.LOG.error("unprotectedMkdir returned null for "
-              + iip.getPath() + " for " + new String(component) + " i = " + i);
+              + iip.getPath() + " for "
+              + new String(component, StandardCharsets.US_ASCII) + " i = " + 
i);
           // Somebody already created the parent. Recalculate existing
           existing = INodesInPath.resolve(fsd.getRoot(), 
iip.getPathComponents());
           i = existing.length() - 1;
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index f072d58..8ddf0ff 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -17,7 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.util.HashMap;
+import java.util.Iterator;
+import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.util.GSet;
+import org.apache.hadoop.util.LightWeightGSet;
 import org.apache.hadoop.util.StringUtils;
 
 import 
org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -160,6 +165,8 @@ public class FSDirectory implements Closeable {
   private final int contentCountLimit; // max content summary counts per run
   private final long contentSleepMicroSec;
   private final INodeMap inodeMap; // Synchronized by dirLock
+  // Temp InodeMap used when loading an FS image.
+  private HashMap<Long, INodeWithAdditionalFields> tempInodeMap;
   private long yieldCount = 0; // keep track of lock yield count.
   private int quotaInitThreads;
 
@@ -318,6 +325,11 @@ public class FSDirectory implements Closeable {
     this.inodeId = new INodeId();
     rootDir = createRoot(ns);
     inodeMap = INodeMap.newInstance(rootDir, ns);
+    tempInodeMap = new HashMap<>(1000);
+
+    // add rootDir to inodeMapTemp.
+    tempInodeMap.put(rootDir.getId(), rootDir);
+
     this.isPermissionEnabled = conf.getBoolean(
       DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
       DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
@@ -1476,6 +1488,23 @@ public class FSDirectory implements Closeable {
     return inodeMap;
   }
 
+  final void addToTempInodeMap(INode inode) {
+    if (inode instanceof INodeWithAdditionalFields) {
+      LOG.debug("addToTempInodeMap: id={}, inodeMapTemp.size={}",
+          inode.getId(), tempInodeMap.size());
+      tempInodeMap.put(inode.getId(), (INodeWithAdditionalFields) inode);
+      if (!inode.isSymlink()) {
+        final XAttrFeature xaf = inode.getXAttrFeature();
+        addEncryptionZone((INodeWithAdditionalFields) inode, xaf);
+        StoragePolicySatisfyManager spsManager =
+            namesystem.getBlockManager().getSPSManager();
+        if (spsManager != null && spsManager.isEnabled()) {
+          addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf);
+        }
+      }
+    }
+  }
+
   /**
    * This method is always called with writeLock of FSDirectory held.
    */
@@ -1544,6 +1573,36 @@ public class FSDirectory implements Closeable {
   }
 
   /**
+   * After the inodes are set properly (set the parent for each inode), we move
+   * them from INodeMapTemp to INodeMap.
+   */
+  void moveInodes() throws IOException {
+    long count=0, totalInodes = tempInodeMap.size();
+    LOG.debug("inodeMapTemp={}", tempInodeMap);
+
+    for (Map.Entry e: tempInodeMap.entrySet()) {
+      INodeWithAdditionalFields n = (INodeWithAdditionalFields)e.getValue();
+
+      LOG.debug("populate {}-th inode: id={}, fullpath={}",
+          count, n.getId(), n.getFullPathName());
+
+      inodeMap.put(n);
+      count++;
+    }
+
+    if (count != totalInodes) {
+      String msg = String.format("moveInodes: expected to move %l inodes, " +
+          "but moved %l inodes", totalInodes, count);
+      throw new IOException(msg);
+    }
+
+    //inodeMap.show();
+    tempInodeMap.clear();
+    assert(tempInodeMap.isEmpty());
+    tempInodeMap = null;
+  }
+
+  /**
    * This method is always called with writeLock of FSDirectory held.
    */
   public final void removeFromInodeMap(List<? extends INode> inodes) {
@@ -1860,6 +1919,17 @@ public class FSDirectory implements Closeable {
     }
   }
 
+  public INode getInode(INode inode) {
+    return inodeMap.get(inode);
+  }
+  public INode getInodeFromTempINodeMap(long id) {
+    LOG.debug("getInodeFromTempINodeMap: id={}, TempINodeMap.size={}",
+        id, tempInodeMap.size());
+    if (id < INodeId.ROOT_INODE_ID)
+      return null;
+
+    return tempInodeMap.get(id);
+  }
   @VisibleForTesting
   FSPermissionChecker getPermissionChecker(String fsOwner, String superGroup,
       UserGroupInformation ugi) throws AccessControlException {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index 1305438..0f0024e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -761,6 +761,16 @@ public class FSImage implements Closeable {
           "above for more info.");
     }
     prog.endPhase(Phase.LOADING_FSIMAGE);
+
+    /*
+     * loadEdits always sets the parent of an inode before adding the inode to
+     * inodeMap. So, it is safe to move inodes from inodeMapTemp to inodeMap
+     * before loadEdits.
+     */
+    FSDirectory dir = target.getFSDirectory();
+    dir.moveInodes();
+    LOG.info("LOADING_FSIMAGE: loaded {} inodes into inodeMap",
+        dir.getINodeMap().size());
     
     if (!rollingRollback) {
       prog.beginPhase(Phase.LOADING_EDITS);
@@ -776,6 +786,8 @@ public class FSImage implements Closeable {
       needToSave = false;
     }
     editLog.setNextTxId(lastAppliedTxId + 1);
+    LOG.info("LOADING_EDITS: loaded {} inodes into inodeMap",
+        dir.getINodeMap().size());
     return needToSave;
   }
 
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
index 0a69c99..fe37b82 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
@@ -276,9 +276,10 @@ public final class FSImageFormatPBINode {
         if (e == null) {
           break;
         }
-        INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
+        INodeDirectory p =
+            dir.getInodeFromTempINodeMap(e.getParent()).asDirectory();
         for (long id : e.getChildrenList()) {
-          INode child = dir.getInode(id);
+          INode child = dir.getInodeFromTempINodeMap(id);
           if (!addToParent(p, child)) {
             LOG.warn("Failed to add the inode {} to the directory {}",
                 child.getId(), p.getId());
@@ -382,6 +383,7 @@ public final class FSImageFormatPBINode {
         if (p == null) {
           break;
         }
+        LOG.debug("loadINodesInSection: cntr={}, inode={}", cntr, p.getId());
         if (p.getId() == INodeId.ROOT_INODE_ID) {
           synchronized(this) {
             loadRootINode(p);
@@ -389,7 +391,7 @@ public final class FSImageFormatPBINode {
         } else {
           INode n = loadINode(p);
           synchronized(this) {
-            dir.addToInodeMap(n);
+            dir.addToTempInodeMap(n);
           }
           fillUpInodeList(inodeList, n);
         }
@@ -761,7 +763,7 @@ public final class FSImageFormatPBINode {
               DirEntry.newBuilder().setParent(n.getId());
           for (INode inode : children) {
             // Error if the child inode doesn't exist in inodeMap
-            if (dir.getInode(inode.getId()) == null) {
+            if (dir.getInode(inode) == null) {
               FSImage.LOG.error(
                   "FSImageFormatPBINode#serializeINodeDirectorySection: " +
                       "Dangling child pointer found. Missing INode in " +
@@ -812,6 +814,7 @@ public final class FSImageFormatPBINode {
       Iterator<INodeWithAdditionalFields> iter = inodesMap.getMapIterator();
       while (iter.hasNext()) {
         INodeWithAdditionalFields n = iter.next();
+        LOG.debug("i = {}, save inode: {}", i, n);
         save(out, n);
         ++i;
         if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
index a0253b6..2814b9f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -35,12 +36,12 @@ import org.apache.hadoop.util.PartitionedGSet;
  * and INode.  
  */
 public class INodeMap {
-  static final int NAMESPACE_KEY_DEBTH = 2;
+  static final int NAMESPACE_KEY_DEPTH = 2;
   static final int NUM_RANGES_STATIC = 256;  // power of 2
 
   public static class INodeKeyComparator implements Comparator<INode> {
     INodeKeyComparator() {
-      FSDirectory.LOG.info("Namespace key debth = {}", NAMESPACE_KEY_DEBTH);
+      FSDirectory.LOG.info("Namespace key depth = {}", NAMESPACE_KEY_DEPTH);
     }
 
     @Override
@@ -48,9 +49,9 @@ public class INodeMap {
       if (i1 == null || i2 == null) {
         throw new NullPointerException("Cannot compare null INodes");
       }
-      long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEBTH);
-      long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEBTH);
-      for(int l = 0; l < NAMESPACE_KEY_DEBTH; l++) {
+      long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEPTH);
+      long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEPTH);
+      for(int l = 0; l < NAMESPACE_KEY_DEPTH; l++) {
         if(key1[l] == key2[l]) continue;
         return (key1[l] < key2[l] ? -1 : 1);
       }
@@ -64,7 +65,7 @@ public class INodeMap {
    */
   public static class HPINodeKeyComparator implements Comparator<INode> {
     HPINodeKeyComparator() {
-      FSDirectory.LOG.info("Namespace key debth = {}", NAMESPACE_KEY_DEBTH);
+      FSDirectory.LOG.info("Namespace key depth = {}", NAMESPACE_KEY_DEPTH);
     }
 
     @Override
@@ -72,13 +73,13 @@ public class INodeMap {
       if (i1 == null || i2 == null) {
         throw new NullPointerException("Cannot compare null INodes");
       }
-      long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEBTH);
-      long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEBTH);
+      long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEPTH);
+      long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEPTH);
       long key1_0 = INode.indexOf(key1);
       long key2_0 = INode.indexOf(key2);
       if(key1_0 != key2_0)
         return (key1_0 < key2_0 ? -1 : 1);
-      for(int l = 1; l < NAMESPACE_KEY_DEBTH; l++) {
+      for(int l = 1; l < NAMESPACE_KEY_DEPTH; l++) {
         if(key1[l] == key2[l]) continue;
         return (key1[l] < key2[l] ? -1 : 1);
       }
@@ -202,8 +203,8 @@ public class INodeMap {
     PermissionStatus perm = new PermissionStatus(
         "", "", new FsPermission((short) 0));
     for(int p = 0; p < NUM_RANGES_STATIC; p++) {
-      INodeDirectory key = new INodeDirectory(
-          INodeId.ROOT_INODE_ID, "range key".getBytes(), perm, 0);
+      INodeDirectory key = new INodeDirectory(INodeId.ROOT_INODE_ID,
+          "range key".getBytes(StandardCharsets.UTF_8), perm, 0);
       key.setParent(new INodeDirectory((long)p, null, perm, 0));
       pgs.addNewPartition(key);
     }
@@ -244,48 +245,58 @@ public class INodeMap {
    *         such {@link INode} in the map.
    */
   public INode get(long id) {
-    INode inode = new INodeWithAdditionalFields(id, null, new PermissionStatus(
-        "", "", new FsPermission((short) 0)), 0, 0) {
-      
-      @Override
-      void recordModification(int latestSnapshotId) {
-      }
+    PartitionedGSet<INode, INodeWithAdditionalFields> pgs =
+        (PartitionedGSet<INode, INodeWithAdditionalFields>) map;
+    /*
+     * Convert a long inode id into an INode object. We only need to compare
+     * two inodes by inode id. So, it can be any type of INode object.
+     */
+    INode inode = new INodeDirectory(id, null,
+        new PermissionStatus("", "", new FsPermission((short) 0)), 0);
+
+    /*
+     * Iterate all partitions of PGSet and return the INode.
+     * Just for fallback.
+     */
+    PermissionStatus perm =
+        new PermissionStatus("", "", new FsPermission((short) 0));
+    // TODO: create a static array, to avoid creation of keys each time.
+    for (int p = 0; p < NUM_RANGES_STATIC; p++) {
+      INodeDirectory key = new INodeDirectory(INodeId.ROOT_INODE_ID,
+          "range key".getBytes(StandardCharsets.UTF_8), perm, 0);
+      key.setParent(new INodeDirectory((long)p, null, perm, 0));
+      PartitionedGSet.PartitionEntry e = pgs.getPartition(key);
       
-      @Override
-      public void destroyAndCollectBlocks(ReclaimContext reclaimContext) {
-        // Nothing to do
+      if (e.contains(inode)) {
+        return (INode) e.get(inode);
       }
+    }
 
-      @Override
-      public QuotaCounts computeQuotaUsage(
-          BlockStoragePolicySuite bsps, byte blockStoragePolicyId,
-          boolean useCache, int lastSnapshotId) {
-        return null;
-      }
+    return null;
+  }
 
-      @Override
-      public ContentSummaryComputationContext computeContentSummary(
-          int snapshotId, ContentSummaryComputationContext summary) {
-        return null;
-      }
-      
-      @Override
-      public void cleanSubtree(
-          ReclaimContext reclaimContext, int snapshotId, int priorSnapshotId) {
-      }
+  public INode get(INode inode) {
 
-      @Override
-      public byte getStoragePolicyID(){
-        return HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
-      }
+    /*
+     * Check whether the Inode has (NAMESPACE_KEY_DEPTH - 1) levels of parent
+     * dirs
+     */
+    int i = NAMESPACE_KEY_DEPTH - 1;
+    INode tmpInode = inode;
+    while (i > 0 && tmpInode.getParent() != null) {
+      tmpInode = tmpInode.getParent();
+      i--;
+    }
 
-      @Override
-      public byte getLocalStoragePolicyID() {
-        return HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
-      }
-    };
-      
-    return map.get(inode);
+    /*
+     * If the Inode has (NAMESPACE_KEY_DEPTH - 1) levels of parent dirs,
+     * use map.get(); else, fall back to get INode based on Inode ID.
+     */
+    if (i == 0) {
+      return map.get(inode);
+    } else {
+      return get(inode.getId());
+    }
   }
   
   /**

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to