[GitHub] [hadoop] Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve fsimage load time by writing sub-sections to the fsimage index

2019-08-15 Thread GitBox
Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve 
fsimage load time by writing sub-sections to the fsimage index
URL: https://github.com/apache/hadoop/pull/1028#discussion_r314210188
 
 

 ##
 File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
 ##
 @@ -197,16 +203,66 @@ public static void updateBlocksMap(INodeFile file, 
BlockManager bm) {
 private final FSDirectory dir;
 private final FSNamesystem fsn;
 private final FSImageFormatProtobuf.Loader parent;
+private ReentrantLock cacheNameMapLock;
+private ReentrantLock blockMapLock;
 
 Loader(FSNamesystem fsn, final FSImageFormatProtobuf.Loader parent) {
   this.fsn = fsn;
   this.dir = fsn.dir;
   this.parent = parent;
+  cacheNameMapLock = new ReentrantLock(true);
+  blockMapLock = new ReentrantLock(true);
+}
+
+void loadINodeDirectorySectionInParallel(ExecutorService service,
+ArrayList sections, String compressionCodec)
+throws IOException {
+  LOG.info("Loading the INodeDirectory section in parallel with {} sub-" +
+  "sections", sections.size());
+  CountDownLatch latch = new CountDownLatch(sections.size());
+  final CopyOnWriteArrayList exceptions =
+  new CopyOnWriteArrayList<>();
+  for (FileSummary.Section s : sections) {
+service.submit(new Runnable() {
+  public void run() {
+InputStream ins = null;
+try {
+  ins = parent.getInputStreamForSection(s,
+  compressionCodec);
+  loadINodeDirectorySection(ins);
+} catch (Exception e) {
+  LOG.error("An exception occurred loading INodeDirectories in " +
+  "parallel", e);
+  exceptions.add(new IOException(e));
+} finally {
+  latch.countDown();
+  try {
+ins.close();
+  } catch (IOException ioe) {
+LOG.warn("Failed to close the input stream, ignoring", ioe);
+  }
+}
+  }
+});
+  }
+  try {
+latch.await();
+  } catch (InterruptedException e) {
+LOG.error("Interrupted waiting for countdown latch", e);
+throw new IOException(e);
+  }
+  if (exceptions.size() != 0) {
+LOG.error("{} exceptions occurred loading INodeDirectories",
+exceptions.size());
+throw exceptions.get(0);
+  }
+  LOG.info("Completed loading all INodeDirectory sub-sections");
 }
 
 void loadINodeDirectorySection(InputStream in) throws IOException {
   final List refList = parent.getLoaderContext()
   .getRefList();
+  ArrayList inodeList = new ArrayList<>();
 
 Review comment:
   @jojochuang Thanks for your feedback, it is true that performance first. I 
am just concerned about memory overhead since the cache hit ratio decreased 
with batch cache way. Of course it is not serious issues.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve fsimage load time by writing sub-sections to the fsimage index

2019-08-15 Thread GitBox
Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve 
fsimage load time by writing sub-sections to the fsimage index
URL: https://github.com/apache/hadoop/pull/1028#discussion_r314201460
 
 

 ##
 File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
 ##
 @@ -217,33 +273,151 @@ void loadINodeDirectorySection(InputStream in) throws 
IOException {
 INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
 for (long id : e.getChildrenList()) {
   INode child = dir.getInode(id);
-  addToParent(p, child);
+  if (addToParent(p, child)) {
+if (child.isFile()) {
+  inodeList.add(child);
+}
+if (inodeList.size() >= 1000) {
+  addToCacheAndBlockMap(inodeList);
+  inodeList.clear();
+}
+  }
+
 }
+
 for (int refId : e.getRefChildrenList()) {
   INodeReference ref = refList.get(refId);
-  addToParent(p, ref);
+  if (addToParent(p, ref)) {
+if (ref.isFile()) {
+  inodeList.add(ref);
+}
+if (inodeList.size() >= 1000) {
+  addToCacheAndBlockMap(inodeList);
+  inodeList.clear();
+}
+  }
 }
   }
+  addToCacheAndBlockMap(inodeList);
+}
+
+private void addToCacheAndBlockMap(ArrayList inodeList) {
+  try {
+cacheNameMapLock.lock();
+for (INode i : inodeList) {
+  dir.cacheName(i);
+}
+  } finally {
+cacheNameMapLock.unlock();
+  }
+
+  try {
+blockMapLock.lock();
+for (INode i : inodeList) {
+  updateBlocksMap(i.asFile(), fsn.getBlockManager());
+}
+  } finally {
+blockMapLock.unlock();
+  }
 }
 
 void loadINodeSection(InputStream in, StartupProgress prog,
 Step currentStep) throws IOException {
-  INodeSection s = INodeSection.parseDelimitedFrom(in);
-  fsn.dir.resetLastInodeId(s.getLastInodeId());
-  long numInodes = s.getNumInodes();
-  LOG.info("Loading " + numInodes + " INodes.");
-  prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes);
+  loadINodeSectionHeader(in, prog, currentStep);
   Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep);
-  for (int i = 0; i < numInodes; ++i) {
+  int totalLoaded = loadINodesInSection(in, counter);
+  LOG.info("Successfully loaded {} inodes", totalLoaded);
+}
+
+private int loadINodesInSection(InputStream in, Counter counter)
+throws IOException {
+  // As the input stream is a LimitInputStream, the reading will stop when
+  // EOF is encountered at the end of the stream.
+  int cntr = 0;
+  while (true) {
 INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
+if (p == null) {
+  break;
+}
 if (p.getId() == INodeId.ROOT_INODE_ID) {
-  loadRootINode(p);
+  synchronized(this) {
 
 Review comment:
   Thanks for pinging, it is OK for me. I agree it will not bring any 
performance overhead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve fsimage load time by writing sub-sections to the fsimage index

2019-07-08 Thread GitBox
Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve 
fsimage load time by writing sub-sections to the fsimage index
URL: https://github.com/apache/hadoop/pull/1028#discussion_r301356812
 
 

 ##
 File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 ##
 @@ -878,6 +878,22 @@
   public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = 
"dfs.image.transfer.chunksize";
   public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024;
 
+  public static final String DFS_IMAGE_PARALLEL_LOAD_KEY =
+  "dfs.image.parallel.load";
+  public static final boolean DFS_IMAGE_PARALLEL_LOAD_DEFAULT = true;
+
+  public static final String DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY =
+  "dfs.image.parallel.target.sections";
+  public static final int DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT = 12;
+
+  public static final String DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY =
+  "dfs.image.parallel.inode.threshold";
+  public static final int DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT = 100;
+
+  public static final String DFS_IMAGE_PARALLEL_THREADS_KEY =
+  "dfs.image.parallel.threads";
+  public static final int DFS_IMAGE_PARALLEL_THREADS_DEFAULT = 4;
+
 
 Review comment:
   IIUC, threads size should be not greater than target sections, otherwise the 
remaining threads will not be used or some other issues. So is it necessary to 
warn this configuration limit?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve fsimage load time by writing sub-sections to the fsimage index

2019-07-08 Thread GitBox
Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve 
fsimage load time by writing sub-sections to the fsimage index
URL: https://github.com/apache/hadoop/pull/1028#discussion_r301366228
 
 

 ##
 File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
 ##
 @@ -187,6 +195,73 @@ void load(File file) throws IOException {
   }
 }
 
+/**
+ * Given a FSImage FileSummary.section, return a LimitInput stream set to
+ * the starting position of the section and limited to the section length
+ * @param section The FileSummary.Section containing the offset and length
+ * @param compressionCodec The compression codec in use, if any
 
 Review comment:
   missing any annotation?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve fsimage load time by writing sub-sections to the fsimage index

2019-07-08 Thread GitBox
Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve 
fsimage load time by writing sub-sections to the fsimage index
URL: https://github.com/apache/hadoop/pull/1028#discussion_r301354771
 
 

 ##
 File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
 ##
 @@ -462,6 +619,60 @@ long save(File file, FSImageCompression compression) 
throws IOException {
   }
 }
 
+private void enableSubSectionsIfRequired() {
+  boolean parallelEnabled = conf.getBoolean(
+  DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY,
+  DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT);
+  int inodeThreshold = conf.getInt(
+  DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY,
+  DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT);
+  int targetSections = conf.getInt(
+  DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY,
+  DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT);
+  boolean compressionEnabled = conf.getBoolean(
+  DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY,
+  DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT);
+
+
+  if (parallelEnabled) {
+if (compressionEnabled) {
+  LOG.warn("Parallel Image loading is not supported when {} is set to" 
+
+  " true. Parallel loading will be disabled.",
+  DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY);
+  WRITE_SUB_SECTIONS = false;
+  return;
+}
+if (targetSections <= 0) {
+  LOG.warn("{} is set to {}. It must be greater than zero. Setting to" 
+
+  "default of {}",
+  DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY,
+  targetSections,
+  DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT);
+  targetSections =
+  DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT;
+}
+if (inodeThreshold <= 0) {
+  LOG.warn("{} is set to {}. It must be greater than zero. Setting to" 
+
+  "default of {}",
+  DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY,
+  targetSections,
 
 Review comment:
   `targetSections` should be `inodeThreshold` here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve fsimage load time by writing sub-sections to the fsimage index

2019-07-08 Thread GitBox
Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve 
fsimage load time by writing sub-sections to the fsimage index
URL: https://github.com/apache/hadoop/pull/1028#discussion_r301362319
 
 

 ##
 File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
 ##
 @@ -197,16 +203,66 @@ public static void updateBlocksMap(INodeFile file, 
BlockManager bm) {
 private final FSDirectory dir;
 private final FSNamesystem fsn;
 private final FSImageFormatProtobuf.Loader parent;
+private ReentrantLock cacheNameMapLock;
+private ReentrantLock blockMapLock;
 
 Loader(FSNamesystem fsn, final FSImageFormatProtobuf.Loader parent) {
   this.fsn = fsn;
   this.dir = fsn.dir;
   this.parent = parent;
+  cacheNameMapLock = new ReentrantLock(true);
+  blockMapLock = new ReentrantLock(true);
+}
+
+void loadINodeDirectorySectionInParallel(ExecutorService service,
+ArrayList sections, String compressionCodec)
+throws IOException {
+  LOG.info("Loading the INodeDirectory section in parallel with {} sub-" +
+  "sections", sections.size());
+  CountDownLatch latch = new CountDownLatch(sections.size());
+  final CopyOnWriteArrayList exceptions =
+  new CopyOnWriteArrayList<>();
+  for (FileSummary.Section s : sections) {
+service.submit(new Runnable() {
+  public void run() {
+InputStream ins = null;
+try {
+  ins = parent.getInputStreamForSection(s,
+  compressionCodec);
+  loadINodeDirectorySection(ins);
+} catch (Exception e) {
+  LOG.error("An exception occurred loading INodeDirectories in " +
+  "parallel", e);
+  exceptions.add(new IOException(e));
+} finally {
+  latch.countDown();
+  try {
+ins.close();
+  } catch (IOException ioe) {
+LOG.warn("Failed to close the input stream, ignoring", ioe);
+  }
+}
+  }
+});
+  }
+  try {
+latch.await();
+  } catch (InterruptedException e) {
+LOG.error("Interrupted waiting for countdown latch", e);
+throw new IOException(e);
+  }
+  if (exceptions.size() != 0) {
+LOG.error("{} exceptions occurred loading INodeDirectories",
+exceptions.size());
+throw exceptions.get(0);
+  }
+  LOG.info("Completed loading all INodeDirectory sub-sections");
 }
 
 void loadINodeDirectorySection(InputStream in) throws IOException {
   final List refList = parent.getLoaderContext()
   .getRefList();
+  ArrayList inodeList = new ArrayList<>();
 
 Review comment:
   use `inodeList` to batch cache name and update `blocksMap` here, right?
   1. following code fragment limit the size under 1000, should this value be 
defined as constant?
   2. batch process update `blocksMap` is good idea, however it will bring 
negative effects for cache? anti-locality? Please correct me if I am wrong.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve fsimage load time by writing sub-sections to the fsimage index

2019-07-08 Thread GitBox
Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve 
fsimage load time by writing sub-sections to the fsimage index
URL: https://github.com/apache/hadoop/pull/1028#discussion_r301357453
 
 

 ##
 File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
 ##
 @@ -389,21 +496,26 @@ private void loadErasureCodingSection(InputStream in)
 
   public static final class Saver {
 public static final int CHECK_CANCEL_INTERVAL = 4096;
+public static boolean WRITE_SUB_SECTIONS = false;
 
 Review comment:
   `WRITE_SUB_SECTIONS` and `INODES_PER_SUB_SECTION `as attributes of Saver 
class rather than constant?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve fsimage load time by writing sub-sections to the fsimage index

2019-07-08 Thread GitBox
Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve 
fsimage load time by writing sub-sections to the fsimage index
URL: https://github.com/apache/hadoop/pull/1028#discussion_r301355625
 
 

 ##
 File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
 ##
 @@ -462,6 +619,60 @@ long save(File file, FSImageCompression compression) 
throws IOException {
   }
 }
 
+private void enableSubSectionsIfRequired() {
+  boolean parallelEnabled = conf.getBoolean(
+  DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY,
+  DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT);
+  int inodeThreshold = conf.getInt(
+  DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY,
+  DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT);
+  int targetSections = conf.getInt(
+  DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY,
+  DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT);
+  boolean compressionEnabled = conf.getBoolean(
+  DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY,
+  DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT);
+
 
 Review comment:
   Redundant empty lines.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve fsimage load time by writing sub-sections to the fsimage index

2019-07-08 Thread GitBox
Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve 
fsimage load time by writing sub-sections to the fsimage index
URL: https://github.com/apache/hadoop/pull/1028#discussion_r301360582
 
 

 ##
 File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
 ##
 @@ -217,33 +273,151 @@ void loadINodeDirectorySection(InputStream in) throws 
IOException {
 INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
 for (long id : e.getChildrenList()) {
   INode child = dir.getInode(id);
-  addToParent(p, child);
+  if (addToParent(p, child)) {
+if (child.isFile()) {
+  inodeList.add(child);
+}
+if (inodeList.size() >= 1000) {
+  addToCacheAndBlockMap(inodeList);
+  inodeList.clear();
+}
+  }
+
 
 Review comment:
   empty line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve fsimage load time by writing sub-sections to the fsimage index

2019-07-08 Thread GitBox
Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve 
fsimage load time by writing sub-sections to the fsimage index
URL: https://github.com/apache/hadoop/pull/1028#discussion_r301363874
 
 

 ##
 File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
 ##
 @@ -217,33 +273,151 @@ void loadINodeDirectorySection(InputStream in) throws 
IOException {
 INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
 for (long id : e.getChildrenList()) {
   INode child = dir.getInode(id);
-  addToParent(p, child);
+  if (addToParent(p, child)) {
+if (child.isFile()) {
+  inodeList.add(child);
+}
+if (inodeList.size() >= 1000) {
+  addToCacheAndBlockMap(inodeList);
+  inodeList.clear();
+}
+  }
+
 }
+
 for (int refId : e.getRefChildrenList()) {
   INodeReference ref = refList.get(refId);
-  addToParent(p, ref);
+  if (addToParent(p, ref)) {
+if (ref.isFile()) {
+  inodeList.add(ref);
+}
+if (inodeList.size() >= 1000) {
+  addToCacheAndBlockMap(inodeList);
+  inodeList.clear();
+}
+  }
 }
   }
+  addToCacheAndBlockMap(inodeList);
+}
+
+private void addToCacheAndBlockMap(ArrayList inodeList) {
+  try {
+cacheNameMapLock.lock();
+for (INode i : inodeList) {
+  dir.cacheName(i);
+}
+  } finally {
+cacheNameMapLock.unlock();
+  }
+
+  try {
+blockMapLock.lock();
+for (INode i : inodeList) {
+  updateBlocksMap(i.asFile(), fsn.getBlockManager());
+}
+  } finally {
+blockMapLock.unlock();
+  }
 }
 
 void loadINodeSection(InputStream in, StartupProgress prog,
 Step currentStep) throws IOException {
-  INodeSection s = INodeSection.parseDelimitedFrom(in);
-  fsn.dir.resetLastInodeId(s.getLastInodeId());
-  long numInodes = s.getNumInodes();
-  LOG.info("Loading " + numInodes + " INodes.");
-  prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes);
+  loadINodeSectionHeader(in, prog, currentStep);
   Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep);
-  for (int i = 0; i < numInodes; ++i) {
+  int totalLoaded = loadINodesInSection(in, counter);
+  LOG.info("Successfully loaded {} inodes", totalLoaded);
+}
+
+private int loadINodesInSection(InputStream in, Counter counter)
+throws IOException {
+  // As the input stream is a LimitInputStream, the reading will stop when
+  // EOF is encountered at the end of the stream.
+  int cntr = 0;
+  while (true) {
 INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
+if (p == null) {
+  break;
+}
 if (p.getId() == INodeId.ROOT_INODE_ID) {
-  loadRootINode(p);
+  synchronized(this) {
 
 Review comment:
   `synchronized` should take effect only for parallel mode rather than serial 
loading mode?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve fsimage load time by writing sub-sections to the fsimage index

2019-07-08 Thread GitBox
Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve 
fsimage load time by writing sub-sections to the fsimage index
URL: https://github.com/apache/hadoop/pull/1028#discussion_r301369183
 
 

 ##
 File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
 ##
 @@ -255,14 +345,28 @@ public int compare(FileSummary.Section s1, 
FileSummary.Section s2) {
 case INODE: {
   currentStep = new Step(StepType.INODES);
   prog.beginStep(Phase.LOADING_FSIMAGE, currentStep);
-  inodeLoader.loadINodeSection(in, prog, currentStep);
+  stageSubSections = getSubSectionsOfName(
+  subSections, SectionName.INODE_SUB);
+  if (loadInParallel && (stageSubSections.size() > 0)) {
+inodeLoader.loadINodeSectionInParallel(executorService,
+stageSubSections, summary.getCodec(), prog, currentStep);
+  } else {
+inodeLoader.loadINodeSection(in, prog, currentStep);
+  }
 }
   break;
 case INODE_REFERENCE:
   snapshotLoader.loadINodeReferenceSection(in);
   break;
 case INODE_DIR:
-  inodeLoader.loadINodeDirectorySection(in);
+  stageSubSections = getSubSectionsOfName(
+  subSections, SectionName.INODE_DIR_SUB);
+  if (loadInParallel && stageSubSections.size() > 0) {
 
 Review comment:
   Would you like to add some unit test to cover serial loading and parallel 
loading for old&new format fsimage?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve fsimage load time by writing sub-sections to the fsimage index

2019-07-08 Thread GitBox
Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve 
fsimage load time by writing sub-sections to the fsimage index
URL: https://github.com/apache/hadoop/pull/1028#discussion_r301364827
 
 

 ##
 File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
 ##
 @@ -217,33 +273,151 @@ void loadINodeDirectorySection(InputStream in) throws 
IOException {
 INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
 for (long id : e.getChildrenList()) {
   INode child = dir.getInode(id);
-  addToParent(p, child);
+  if (addToParent(p, child)) {
+if (child.isFile()) {
+  inodeList.add(child);
+}
+if (inodeList.size() >= 1000) {
+  addToCacheAndBlockMap(inodeList);
+  inodeList.clear();
+}
+  }
+
 }
+
 for (int refId : e.getRefChildrenList()) {
   INodeReference ref = refList.get(refId);
-  addToParent(p, ref);
+  if (addToParent(p, ref)) {
+if (ref.isFile()) {
+  inodeList.add(ref);
+}
+if (inodeList.size() >= 1000) {
+  addToCacheAndBlockMap(inodeList);
+  inodeList.clear();
+}
+  }
 }
   }
+  addToCacheAndBlockMap(inodeList);
+}
+
+private void addToCacheAndBlockMap(ArrayList inodeList) {
+  try {
+cacheNameMapLock.lock();
+for (INode i : inodeList) {
+  dir.cacheName(i);
+}
+  } finally {
+cacheNameMapLock.unlock();
+  }
+
+  try {
+blockMapLock.lock();
+for (INode i : inodeList) {
+  updateBlocksMap(i.asFile(), fsn.getBlockManager());
+}
+  } finally {
+blockMapLock.unlock();
+  }
 }
 
 void loadINodeSection(InputStream in, StartupProgress prog,
 Step currentStep) throws IOException {
-  INodeSection s = INodeSection.parseDelimitedFrom(in);
-  fsn.dir.resetLastInodeId(s.getLastInodeId());
-  long numInodes = s.getNumInodes();
-  LOG.info("Loading " + numInodes + " INodes.");
-  prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes);
+  loadINodeSectionHeader(in, prog, currentStep);
   Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep);
-  for (int i = 0; i < numInodes; ++i) {
+  int totalLoaded = loadINodesInSection(in, counter);
+  LOG.info("Successfully loaded {} inodes", totalLoaded);
+}
+
+private int loadINodesInSection(InputStream in, Counter counter)
+throws IOException {
+  // As the input stream is a LimitInputStream, the reading will stop when
+  // EOF is encountered at the end of the stream.
+  int cntr = 0;
+  while (true) {
 INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
+if (p == null) {
+  break;
+}
 if (p.getId() == INodeId.ROOT_INODE_ID) {
-  loadRootINode(p);
+  synchronized(this) {
+loadRootINode(p);
+  }
 } else {
   INode n = loadINode(p);
-  dir.addToInodeMap(n);
+  synchronized(this) {
+dir.addToInodeMap(n);
+  }
+}
+cntr ++;
+if (counter != null) {
+  counter.increment();
 }
-counter.increment();
   }
+  return cntr;
+}
+
+
+private void loadINodeSectionHeader(InputStream in, StartupProgress prog,
+Step currentStep) throws IOException {
+  INodeSection s = INodeSection.parseDelimitedFrom(in);
+  fsn.dir.resetLastInodeId(s.getLastInodeId());
+  long numInodes = s.getNumInodes();
+  LOG.info("Loading " + numInodes + " INodes.");
+  prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes);
+}
+
+void loadINodeSectionInParallel(ExecutorService service,
+ArrayList sections,
+String compressionCodec, StartupProgress prog,
+Step currentStep) throws IOException {
+  LOG.info("Loading the INode section in parallel with {} sub-sections",
+  sections.size());
+  CountDownLatch latch = new CountDownLatch(sections.size());
+  AtomicInteger totalLoaded = new AtomicInteger(0);
+  final CopyOnWriteArrayList exceptions =
+  new CopyOnWriteArrayList<>();
+
+  for (int i=0; i < sections.size(); i++) {
+FileSummary.Section s = sections.get(i);
+InputStream ins = parent.getInputStreamForSection(s, compressionCodec);
+if (i == 0) {
+  // The first inode section has a header which must be processed first
+  loadINodeSectionHeader(ins, prog, currentStep);
+}
+
+service.submit(new Runnable() {
+   public void run() {
+try {
+   totalLoaded.addAndGet(loadINodesInSection(ins, null));
+   prog.setCount(Phase.

[GitHub] [hadoop] Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve fsimage load time by writing sub-sections to the fsimage index

2019-07-08 Thread GitBox
Hexiaoqiao commented on a change in pull request #1028: HDFS-14617 - Improve 
fsimage load time by writing sub-sections to the fsimage index
URL: https://github.com/apache/hadoop/pull/1028#discussion_r301364041
 
 

 ##
 File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
 ##
 @@ -217,33 +273,151 @@ void loadINodeDirectorySection(InputStream in) throws 
IOException {
 INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
 for (long id : e.getChildrenList()) {
   INode child = dir.getInode(id);
-  addToParent(p, child);
+  if (addToParent(p, child)) {
+if (child.isFile()) {
+  inodeList.add(child);
+}
+if (inodeList.size() >= 1000) {
+  addToCacheAndBlockMap(inodeList);
+  inodeList.clear();
+}
+  }
+
 }
+
 for (int refId : e.getRefChildrenList()) {
   INodeReference ref = refList.get(refId);
-  addToParent(p, ref);
+  if (addToParent(p, ref)) {
+if (ref.isFile()) {
+  inodeList.add(ref);
+}
+if (inodeList.size() >= 1000) {
+  addToCacheAndBlockMap(inodeList);
+  inodeList.clear();
+}
+  }
 }
   }
+  addToCacheAndBlockMap(inodeList);
+}
+
+private void addToCacheAndBlockMap(ArrayList inodeList) {
+  try {
+cacheNameMapLock.lock();
+for (INode i : inodeList) {
+  dir.cacheName(i);
+}
+  } finally {
+cacheNameMapLock.unlock();
+  }
+
+  try {
+blockMapLock.lock();
+for (INode i : inodeList) {
+  updateBlocksMap(i.asFile(), fsn.getBlockManager());
+}
+  } finally {
+blockMapLock.unlock();
+  }
 }
 
 void loadINodeSection(InputStream in, StartupProgress prog,
 Step currentStep) throws IOException {
-  INodeSection s = INodeSection.parseDelimitedFrom(in);
-  fsn.dir.resetLastInodeId(s.getLastInodeId());
-  long numInodes = s.getNumInodes();
-  LOG.info("Loading " + numInodes + " INodes.");
-  prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes);
+  loadINodeSectionHeader(in, prog, currentStep);
   Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep);
-  for (int i = 0; i < numInodes; ++i) {
+  int totalLoaded = loadINodesInSection(in, counter);
+  LOG.info("Successfully loaded {} inodes", totalLoaded);
+}
+
+private int loadINodesInSection(InputStream in, Counter counter)
+throws IOException {
+  // As the input stream is a LimitInputStream, the reading will stop when
+  // EOF is encountered at the end of the stream.
+  int cntr = 0;
+  while (true) {
 INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
+if (p == null) {
+  break;
+}
 if (p.getId() == INodeId.ROOT_INODE_ID) {
-  loadRootINode(p);
+  synchronized(this) {
+loadRootINode(p);
+  }
 } else {
   INode n = loadINode(p);
-  dir.addToInodeMap(n);
+  synchronized(this) {
+dir.addToInodeMap(n);
+  }
+}
+cntr ++;
+if (counter != null) {
+  counter.increment();
 }
-counter.increment();
   }
+  return cntr;
+}
+
+
+private void loadINodeSectionHeader(InputStream in, StartupProgress prog,
+Step currentStep) throws IOException {
+  INodeSection s = INodeSection.parseDelimitedFrom(in);
+  fsn.dir.resetLastInodeId(s.getLastInodeId());
+  long numInodes = s.getNumInodes();
+  LOG.info("Loading " + numInodes + " INodes.");
+  prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes);
+}
+
+void loadINodeSectionInParallel(ExecutorService service,
+ArrayList sections,
+String compressionCodec, StartupProgress prog,
+Step currentStep) throws IOException {
+  LOG.info("Loading the INode section in parallel with {} sub-sections",
+  sections.size());
+  CountDownLatch latch = new CountDownLatch(sections.size());
+  AtomicInteger totalLoaded = new AtomicInteger(0);
+  final CopyOnWriteArrayList exceptions =
+  new CopyOnWriteArrayList<>();
+
+  for (int i=0; i < sections.size(); i++) {
+FileSummary.Section s = sections.get(i);
+InputStream ins = parent.getInputStreamForSection(s, compressionCodec);
+if (i == 0) {
+  // The first inode section has a header which must be processed first
+  loadINodeSectionHeader(ins, prog, currentStep);
+}
+
+service.submit(new Runnable() {
+   public void run() {
+try {
+   totalLoaded.addAndGet(loadINodesInSection(ins, null));
+   prog.setCount(Phase.