This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 1147120  IMPALA-8454 (part 1): Refactor file descriptor loading code
1147120 is described below

commit 114712031f2293af5b3d9509776f88c23d3fa0fc
Author: Todd Lipcon <t...@apache.org>
AuthorDate: Fri Apr 5 16:26:19 2019 -0700

    IMPALA-8454 (part 1): Refactor file descriptor loading code
    
    This refactors various file-descriptor loading code out of HdfsTable
    into new standalone classes. In order to support ACID tables, we'll need
    to make various changes to these bits of code, and having them extracted
    and cleaned up will make that easier.
    
    This consolidates all of the places in which we list partition
    directories into one method which does the appropriate thing regardless
    of situation.
    
    This has a small behavior change related to IMPALA-8406: previously, we
    had a bug where, while refreshing a table, if one or more partitions
    failed to refresh, the other partitions might still get refreshed
    despite an error being returned. Those other partitions wouldn't be
    available immediately until some other operation caused the table's
    catalog version number to increase. This was buggy behavior.
    
    Rather than tackle that problem in this "refactor" patch, this patch
    just slightly improves the behavior: we'll either atomically update or
    not update all partitions, but we might still add new partitions noticed
    by the REFRESH, and might still update other HMS metadata.
    
    This patch may end up slightly improving various other code paths that
    refresh file descriptor lists. We used to have slightly different ways
    of doing this in three different places, with different sets of
    optimizations. Now we do it all in one place, and we pull all the known
    tricks.
    
    Change-Id: I59edf493b9ba38be5f556b4795a7684d9c9e3a07
    Reviewed-on: http://gerrit.cloudera.org:8080/12950
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 .../apache/impala/catalog/FileMetadataLoader.java  | 229 ++++++++++
 .../org/apache/impala/catalog/HdfsPartition.java   |   2 +
 .../java/org/apache/impala/catalog/HdfsTable.java  | 482 ++++-----------------
 .../impala/catalog/ParallelFileMetadataLoader.java | 148 +++++++
 .../main/java/org/apache/impala/catalog/Table.java |   1 -
 .../impala/catalog/local/DirectMetaProvider.java   |  55 +--
 .../org/apache/impala/common/FileSystemUtil.java   |   5 +-
 .../org/apache/impala/catalog/CatalogTest.java     |   5 +-
 .../apache/impala/catalog/HdfsPartitionTest.java   |  14 +-
 .../apache/impala/planner/TestCaseLoaderTest.java  |   4 +-
 10 files changed, 479 insertions(+), 466 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java 
b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
new file mode 100644
index 0000000..d0f71b5
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
@@ -0,0 +1,229 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.impala.catalog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.common.Reference;
+import org.apache.impala.compat.HdfsShim;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.ListMap;
+import org.apache.impala.util.ThreadNameAnnotator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+/**
+ * Utility for loading file metadata within a partition directory.
+ */
+public class FileMetadataLoader {
+  private final static Logger LOG = 
LoggerFactory.getLogger(FileMetadataLoader.class);
+  private static final Configuration CONF = new Configuration();
+
+  private final Path partDir_;
+  private final ImmutableMap<String, FileDescriptor> oldFdsByName_;
+  private final ListMap<TNetworkAddress> hostIndex_;
+
+  private boolean forceRefreshLocations = false;
+
+  private List<FileDescriptor> loadedFds_;
+  private LoadStats loadStats_;
+
+  /**
+   * @param partDir the dir for which to fetch file metadata
+   * @param oldFds any pre-existing file descriptors loaded for this table, 
used
+   *   to optimize refresh if available.
+   * @param hostIndex the host index with which to associate the file 
descriptors
+   */
+  public FileMetadataLoader(Path partDir, List<FileDescriptor> oldFds,
+      ListMap<TNetworkAddress> hostIndex) {
+    partDir_ = Preconditions.checkNotNull(partDir);
+    hostIndex_ = Preconditions.checkNotNull(hostIndex);
+    oldFdsByName_ = Maps.uniqueIndex(oldFds, FileDescriptor::getFileName);
+  }
+
+  /**
+   * If 'refresh' is true, force re-fetching block locations even if a file 
does not
+   * appear to have changed.
+   */
+  public void setForceRefreshBlockLocations(boolean refresh) {
+    forceRefreshLocations = refresh;
+  }
+
+  /**
+   * @return the file descriptors that were loaded after an invocation of 
load()
+   */
+  public List<FileDescriptor> getLoadedFds() {
+    Preconditions.checkState(loadedFds_ != null,
+        "Must have successfully loaded first");
+    return loadedFds_;
+  }
+
+  /**
+   * @return statistics about the descriptor loading process, after an 
invocation of
+   * load()
+   */
+  public LoadStats getStats() {
+    Preconditions.checkState(loadedFds_ != null,
+        "Must have successfully loaded first");
+    return loadStats_;
+  }
+
+  Path getPartDir() { return partDir_; }
+
+  /**
+   * Load the file descriptors, which may later be fetched using {@link 
#getLoadedFds()}.
+   * After a successful load, stats may be fetched using {@link #getStats()}.
+   *
+   * If the directory does not exist, this succeeds and yields an empty list of
+   * descriptors.
+   *
+   * @throws IOException if listing fails.
+   */
+  public void load() throws IOException {
+    Preconditions.checkState(loadStats_ == null, "already loaded");
+    loadStats_ = new LoadStats();
+    FileSystem fs = partDir_.getFileSystem(CONF);
+
+    // If we don't have any prior FDs from which we could re-use old block 
location info,
+    // we'll need to fetch info for every returned file. In this case we can 
inline
+    // that request with the 'list' call and save a round-trip per file.
+    //
+    // In the case that we _do_ have existing FDs which we can reuse, we'll 
optimistically
+    // assume that most _can_ be reused, in which case it's faster to _not_ 
prefetch
+    // the locations.
+    boolean listWithLocations = FileSystemUtil.supportsStorageIds(fs) &&
+        (oldFdsByName_.isEmpty() || forceRefreshLocations);
+
+    String msg = String.format("%s file metadata%s from path %s",
+          oldFdsByName_.isEmpty() ? "Loading" : "Refreshing",
+          listWithLocations ? " with eager location-fetching" : "",
+          partDir_);
+    LOG.trace(msg);
+    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(msg)) {
+      RemoteIterator<? extends FileStatus> fileStatuses;
+      if (listWithLocations) {
+        fileStatuses = FileSystemUtil.listFiles(fs, partDir_, 
/*recursive=*/false);
+      } else {
+        fileStatuses = FileSystemUtil.listStatus(fs, partDir_);
+
+        // TODO(todd): we could look at the result of listing without 
locations, and if
+        // we see that a substantial number of the files have changed, it may 
be better
+        // to go back and re-list with locations vs doing an RPC per file.
+      }
+      loadedFds_ = new ArrayList<>();
+      if (fileStatuses == null) return;
+
+      Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0));
+      while (fileStatuses.hasNext()) {
+        FileStatus fileStatus = fileStatuses.next();
+        if (!FileSystemUtil.isValidDataFile(fileStatus)) {
+          ++loadStats_.hiddenFiles;
+          continue;
+        }
+        // TODO(todd): this logic will have to change when we support 
recursive partition
+        // listing -- we need to index the old FDs by their relative path to 
the partition
+        // directory, not just the file name (last path component)
+        String fileName = fileStatus.getPath().getName().toString();
+        FileDescriptor fd = oldFdsByName_.get(fileName);
+        if (listWithLocations || forceRefreshLocations ||
+            hasFileChanged(fd, fileStatus)) {
+          fd = createFd(fs, fileStatus, numUnknownDiskIds);
+          ++loadStats_.loadedFiles;
+        } else {
+          ++loadStats_.skippedFiles;
+        }
+        loadedFds_.add(Preconditions.checkNotNull(fd));;
+      }
+      loadStats_.unknownDiskIds += numUnknownDiskIds.getRef();
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(loadStats_.debugString());
+      }
+    }
+  }
+
+  /**
+   * Create a FileDescriptor for the given FileStatus. If the FS supports 
block locations,
+   * and FileStatus is a LocatedFileStatus (i.e. the location was prefetched) 
this uses
+   * the already-loaded information; otherwise, this may have to remotely look 
up the
+   * locations.
+   */
+  private FileDescriptor createFd(FileSystem fs, FileStatus fileStatus,
+      Reference<Long> numUnknownDiskIds) throws IOException {
+    if (!FileSystemUtil.supportsStorageIds(fs)) {
+      return FileDescriptor.createWithNoBlocks(fileStatus);
+    }
+    BlockLocation[] locations;
+    if (fileStatus instanceof LocatedFileStatus) {
+      locations = ((LocatedFileStatus)fileStatus).getBlockLocations();
+    } else {
+      locations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+    }
+    return FileDescriptor.create(fileStatus, locations, fs, hostIndex_,
+        HdfsShim.isErasureCoded(fileStatus), numUnknownDiskIds);
+  }
+
+  /**
+   * Compares the modification time and file size between the FileDescriptor 
and the
+   * FileStatus to determine if the file has changed. Returns true if the file 
has changed
+   * and false otherwise.
+   */
+  private static boolean hasFileChanged(FileDescriptor fd, FileStatus status) {
+    return (fd == null) || (fd.getFileLength() != status.getLen()) ||
+      (fd.getModificationTime() != status.getModificationTime());
+  }
+
+  // File/Block metadata loading stats for a single HDFS path.
+  public class LoadStats {
+    // Number of files for which the metadata was loaded.
+    public int loadedFiles = 0;
+
+    // Number of hidden files excluded from file metadata loading. More 
details at
+    // isValidDataFile().
+    public int hiddenFiles = 0;
+
+    // Number of files skipped from file metadata loading because the files 
have not
+    // changed since the last load. More details at hasFileChanged().
+    //
+    // TODO(todd) rename this to something indicating it was fast-pathed, not 
skipped
+    public int skippedFiles = 0;
+
+    // Number of unknown disk IDs encountered while loading block
+    // metadata for this path.
+    public long unknownDiskIds = 0;
+
+    public String debugString() {
+      return String.format("Path: %s: Loaded files: %s Hidden files: %s " +
+          "Skipped files: %s Unknown diskIDs: %s", partDir_, loadedFiles, 
hiddenFiles,
+          skippedFiles, unknownDiskIds);
+    }
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index c27d347..8b6f629 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -175,6 +175,8 @@ public class HdfsPartition implements FeFsPartition, 
PrunablePartition {
      */
     private static FbFileDesc createFbFileDesc(FlatBufferBuilder fbb,
         FileStatus fileStatus, int[] fbFileBlockOffets, boolean isEc) {
+      // TODO(todd): need to use path relative to the partition dir, not the
+      // filename here.
       int fileNameOffset = fbb.createString(fileStatus.getPath().getName());
       // A negative block vector offset is used when no block offsets are 
specified.
       int blockVectorOffset = -1;
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 49f0e9a..de190d9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -31,18 +31,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -61,8 +54,6 @@ import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
-import org.apache.impala.common.Reference;
-import org.apache.impala.compat.HdfsShim;
 import org.apache.impala.fb.FbFileBlock;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.CatalogLookupStatus;
@@ -98,10 +89,9 @@ import org.slf4j.LoggerFactory;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultiset;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -128,9 +118,6 @@ public class HdfsTable extends Table implements FeFsTable {
   // Number of times to retry fetching the partitions from the HMS should an 
error occur.
   private final static int NUM_PARTITION_FETCH_RETRIES = 5;
 
-  // Maximum number of errors logged when loading partitioned tables.
-  private static final int MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG = 100;
-
   // Table property key for skip.header.line.count
   public static final String TBL_PROP_SKIP_HEADER_LINE_COUNT = 
"skip.header.line.count";
 
@@ -283,77 +270,6 @@ public class HdfsTable extends Table implements FeFsTable {
   // and its usage in getFileSystem suggests it should be.
   private static final Configuration CONF = new Configuration();
 
-  private static final int MAX_HDFS_PARTITIONS_PARALLEL_LOAD =
-      BackendConfig.INSTANCE.maxHdfsPartsParallelLoad();
-
-  private static final int MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD =
-      BackendConfig.INSTANCE.maxNonHdfsPartsParallelLoad();
-
-  // File/Block metadata loading stats for a single HDFS path.
-  public static class FileMetadataLoadStats {
-    // Path corresponding to this metadata load request.
-    private final Path hdfsPath;
-
-    // Number of files for which the metadata was loaded.
-    public int loadedFiles = 0;
-
-    // Number of hidden files excluded from file metadata loading. More 
details at
-    // isValidDataFile().
-    public int hiddenFiles = 0;
-
-    // Number of files skipped from file metadata loading because the files 
have not
-    // changed since the last load. More details at hasFileChanged().
-    public int skippedFiles = 0;
-
-    // Number of unknown disk IDs encountered while loading block
-    // metadata for this path.
-    public long unknownDiskIds = 0;
-
-    public FileMetadataLoadStats(Path path) { hdfsPath = path; }
-
-    public String debugString() {
-      Preconditions.checkNotNull(hdfsPath);
-      return String.format("Path: %s: Loaded files: %s Hidden files: %s " +
-          "Skipped files: %s Unknown diskIDs: %s", hdfsPath, loadedFiles, 
hiddenFiles,
-          skippedFiles, unknownDiskIds);
-    }
-  }
-
-  // A callable implementation of file metadata loading request for a given
-  // HDFS path.
-  public class FileMetadataLoadRequest
-      implements Callable<FileMetadataLoadStats> {
-    private final Path hdfsPath_;
-    // All the partitions mapped to the above path
-    private final List<HdfsPartition> partitionList_;
-    // If set to true, reloads the file metadata only when the files in this 
path
-    // have changed since last load (more details in hasFileChanged()).
-    private final boolean reuseFileMd_;
-
-    public FileMetadataLoadRequest(
-        Path path, List<HdfsPartition> partitions, boolean reuseFileMd) {
-      hdfsPath_ = path;
-      partitionList_ = partitions;
-      reuseFileMd_ = reuseFileMd;
-    }
-
-    @Override
-    public FileMetadataLoadStats call() throws IOException {
-      try (ThreadNameAnnotator tna = new ThreadNameAnnotator(debugString())) {
-        FileMetadataLoadStats loadingStats =
-            reuseFileMd_ ? refreshFileMetadata(hdfsPath_, partitionList_) :
-            resetAndLoadFileMetadata(hdfsPath_, partitionList_);
-        return loadingStats;
-      }
-    }
-
-    private String debugString() {
-      String loadType = reuseFileMd_ ? "Refreshing" : "Loading";
-      return String.format("%s file metadata for path: %s", loadType,
-          hdfsPath_.toString());
-    }
-  }
-
   public HdfsTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
       Db db, String name, String owner) {
     super(msTbl, db, name, owner);
@@ -394,196 +310,6 @@ public class HdfsTable extends Table implements FeFsTable 
{
     }
   }
 
-  /**
-   * Drops and re-loads the file metadata of all the partitions in 
'partitions' that
-   * map to the path 'partDir'. 'partDir' may belong to any file system that
-   * implements the hadoop's FileSystem interface (like HDFS, S3, ADLS etc.). 
It involves
-   * the following steps:
-   * - Clear the current file metadata of the partitions.
-   * - Call FileSystem.listFiles() on 'partDir' to fetch the FileStatus and 
BlockLocations
-   *   for each file under it.
-   * - For every valid data file, enumerate all its blocks and their 
corresponding hosts
-   *   and disk IDs if the underlying file system supports the block locations 
API
-   *   (for ex: HDFS). For other file systems (like S3), per-block information 
is not
-   *   available so it's not stored.
-   */
-  private FileMetadataLoadStats resetAndLoadFileMetadata(
-      Path partDir, List<HdfsPartition> partitions) throws IOException {
-    FileMetadataLoadStats loadStats = new FileMetadataLoadStats(partDir);
-    // No need to load blocks for empty partitions list.
-    if (partitions == null || partitions.isEmpty()) return loadStats;
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Loading block md for " + getFullName() + " path: " + 
partDir.toString());
-    }
-
-    FileSystem fs = partDir.getFileSystem(CONF);
-
-    RemoteIterator<LocatedFileStatus> fileStatusIter =
-        FileSystemUtil.listFiles(fs, partDir, false);
-    if (fileStatusIter == null) return loadStats;
-
-    List<FileDescriptor> newFileDescs = createFileDescriptors(
-        fs, fileStatusIter, hostIndex_, loadStats);
-    for (HdfsPartition partition: partitions) {
-      partition.setFileDescriptors(newFileDescs);
-    }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Loaded file metadata for " + getFullName() + " " +
-          loadStats.debugString());
-    }
-    return loadStats;
-  }
-
-  /**
-   * Convert LocatedFileStatuses to FileDescriptors.
-   *
-   * If 'fs' is a FileSystem that supports block locations, the resulting
-   * descriptors include location information, and 'hostIndex' is updated
-   * to include all of the hosts referred to by the locations.
-   *
-   * 'loadStats' is updated to reflect this loading operation.
-   *
-   * May throw IOException if the provided RemoteIterator throws.
-   */
-  public static List<FileDescriptor> createFileDescriptors(
-      FileSystem fs,
-      RemoteIterator<LocatedFileStatus> fileStatusIter,
-      ListMap<TNetworkAddress> hostIndex,
-      FileMetadataLoadStats loadStats) throws IOException {
-    boolean supportsBlocks = FileSystemUtil.supportsStorageIds(fs);
-    Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0));
-    List<FileDescriptor> newFileDescs = new ArrayList<>();
-    while (fileStatusIter.hasNext()) {
-      LocatedFileStatus fileStatus = fileStatusIter.next();
-      if (!FileSystemUtil.isValidDataFile(fileStatus)) {
-        ++loadStats.hiddenFiles;
-        continue;
-      }
-      FileDescriptor fd;
-      if (supportsBlocks) {
-        fd = FileDescriptor.create(fileStatus, fileStatus.getBlockLocations(), 
fs,
-            hostIndex, HdfsShim.isErasureCoded(fileStatus), numUnknownDiskIds);
-      } else {
-        fd = FileDescriptor.createWithNoBlocks(fileStatus);
-      }
-      newFileDescs.add(fd);
-      ++loadStats.loadedFiles;
-    }
-    loadStats.unknownDiskIds += numUnknownDiskIds.getRef();
-    return newFileDescs;
-  }
-
-  /**
-   * Refreshes file metadata information for 'path'. This method is optimized 
for
-   * the case where the files in the partition have not changed dramatically. 
It first
-   * uses a listStatus() call on the partition directory to detect the 
modified files
-   * (look at hasFileChanged()) and fetches their block locations using the
-   * getFileBlockLocations() method. Our benchmarks suggest that the 
listStatus() call
-   * is much faster then the listFiles() (up to ~40x faster in some cases).
-   */
-  private FileMetadataLoadStats refreshFileMetadata(
-      Path partDir, List<HdfsPartition> partitions) throws IOException {
-    FileMetadataLoadStats loadStats = new FileMetadataLoadStats(partDir);
-    // No need to load blocks for empty partitions list.
-    if (partitions == null || partitions.isEmpty()) return loadStats;
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Refreshing block md for " + getFullName() + " path: " +
-          partDir.toString());
-    }
-
-    // Index the partition file descriptors by their file names for O(1) look 
ups.
-    // We just pick the first partition to generate the fileDescByName lookup 
table
-    // since all the partitions map to the same partDir.
-    ImmutableMap<String, FileDescriptor> fileDescsByName = Maps.uniqueIndex(
-          partitions.get(0).getFileDescriptors(), new Function<FileDescriptor, 
String>() {
-            @Override
-            public String apply(FileDescriptor desc) { return 
desc.getFileName(); }
-          });
-
-    FileSystem fs = partDir.getFileSystem(CONF);
-    FileStatus[] fileStatuses = FileSystemUtil.listStatus(fs, partDir);
-    if (fileStatuses == null) return loadStats;
-    boolean supportsBlocks = FileSystemUtil.supportsStorageIds(fs);
-    Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0));
-    List<FileDescriptor> newFileDescs = new ArrayList<>();
-    // If there is a cached partition mapped to this path, we recompute the 
block
-    // locations even if the underlying files have not changed 
(hasFileChanged()).
-    // This is done to keep the cached block metadata up to date.
-    boolean isPartitionMarkedCached = false;
-    for (HdfsPartition partition: partitions) {
-      if (partition.isMarkedCached()) {
-        isPartitionMarkedCached = true;
-        break;
-      }
-    }
-    for (FileStatus fileStatus: fileStatuses) {
-      if (!FileSystemUtil.isValidDataFile(fileStatus)) {
-        ++loadStats.hiddenFiles;
-        continue;
-      }
-      String fileName = fileStatus.getPath().getName().toString();
-      FileDescriptor fd = fileDescsByName.get(fileName);
-      if (isPartitionMarkedCached || hasFileChanged(fd, fileStatus)) {
-        if (supportsBlocks) {
-          BlockLocation[] locations =
-              fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
-          fd = FileDescriptor.create(fileStatus, locations, fs, hostIndex_,
-              HdfsShim.isErasureCoded(fileStatus), numUnknownDiskIds);
-        } else {
-          fd = FileDescriptor.createWithNoBlocks(fileStatus);
-        }
-        ++loadStats.loadedFiles;
-      } else {
-        ++loadStats.skippedFiles;
-      }
-      Preconditions.checkNotNull(fd);
-      newFileDescs.add(fd);
-    }
-    loadStats.unknownDiskIds += numUnknownDiskIds.getRef();
-    for (HdfsPartition partition: partitions) 
partition.setFileDescriptors(newFileDescs);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Refreshed file metadata for " + getFullName() + " "
-          + loadStats.debugString());
-    }
-    return loadStats;
-  }
-
-  /**
-   * Compares the modification time and file size between the FileDescriptor 
and the
-   * FileStatus to determine if the file has changed. Returns true if the file 
has changed
-   * and false otherwise.
-   */
-  private static boolean hasFileChanged(FileDescriptor fd, FileStatus status) {
-    return (fd == null) || (fd.getFileLength() != status.getLen()) ||
-      (fd.getModificationTime() != status.getModificationTime());
-  }
-
-  /**
-   * Helper method to reload the file metadata of a single partition.
-   */
-  private void refreshPartitionFileMetadata(HdfsPartition partition)
-      throws CatalogException {
-    String annotation = String.format("refreshing table %s partition %s",
-        getFullName(), partition.getPartitionName());
-    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation)) {
-      Path partDir = partition.getLocationPath();
-      // If there are no existing files in the partition, use the 
non-incremental path.
-      boolean useExistingFds = partition.hasFileDescriptors();
-      FileMetadataLoadStats stats;
-      if (useExistingFds) {
-        stats = refreshFileMetadata(partDir, 
Collections.singletonList(partition));
-      } else {
-        stats = resetAndLoadFileMetadata(partDir, 
Collections.singletonList(partition));
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("{} file metadata for {} {}", useExistingFds ? "Refreshed" : 
"Loaded",
-            getFullName(), stats.debugString());
-      }
-    } catch (IOException e) {
-      throw new CatalogException("Encountered invalid partition path", e);
-    }
-  }
-
   @Override
   public TCatalogObjectType getCatalogObjectType() {
     return TCatalogObjectType.TABLE;
@@ -783,10 +509,6 @@ public class HdfsTable extends Table implements FeFsTable {
       CatalogException {
     Preconditions.checkNotNull(msTbl);
     initializePartitionMetadata(msTbl);
-    // Map of partition paths to their corresponding HdfsPartition objects. 
Populated
-    // using createPartition() calls. A single partition path can correspond 
to multiple
-    // partitions.
-    Map<Path, List<HdfsPartition>> partsByPath = new HashMap<>();
     FsPermissionCache permCache = preloadPermissionsCache(msPartitions);
 
     Path tblLocation = 
FileSystemUtil.createFullyQualifiedPath(getHdfsBaseDirPath());
@@ -799,7 +521,6 @@ public class HdfsTable extends Table implements FeFsTable {
       // partition, so add a single partition with no keys which will get all 
the
       // files in the table's root directory.
       HdfsPartition part = createPartition(msTbl.getSd(), null, permCache);
-      partsByPath.put(tblLocation, Lists.newArrayList(part));
       if (isMarkedCached_) part.markCached();
       addPartition(part);
     } else {
@@ -810,102 +531,80 @@ public class HdfsTable extends Table implements 
FeFsTable {
         // If the partition is null, its HDFS path does not exist, and it was 
not added
         // to this table's partition list. Skip the partition.
         if (partition == null) continue;
-        Path partDir = FileSystemUtil.createFullyQualifiedPath(
-            new Path(msPartition.getSd().getLocation()));
-        List<HdfsPartition> parts = partsByPath.get(partDir);
-        if (parts == null) {
-          partsByPath.put(partDir, Lists.newArrayList(partition));
-        } else {
-          parts.add(partition);
-        }
       }
     }
     // Load the file metadata from scratch.
-    loadMetadataAndDiskIds(partsByPath, false);
+    loadFileMetadataForPartitions(partitionMap_.values(), /*isRefresh=*/false);
   }
 
+
   /**
-   * Returns the thread pool size to load the file metadata of this table.
-   * 'numPaths' is the number of paths for which the file metadata should be 
loaded.
+   * Helper method to load the block locations for each partition in 'parts'.
+   * New file descriptor lists are loaded and the partitions are updated in 
place.
    *
-   * We use different thread pool sizes for HDFS and non-HDFS tables since the 
latter
-   * supports much higher throughput of RPC calls for listStatus/listFiles. For
-   * simplicity, the filesystem type is determined based on the table's root 
path and
-   * not for each partition individually. Based on our experiments, S3 showed 
a linear
-   * speed up (up to ~100x) with increasing number of loading threads where as 
the HDFS
-   * throughput was limited to ~5x in un-secure clusters and up to ~3.7x in 
secure
-   * clusters. We narrowed it down to scalability bottlenecks in HDFS RPC 
implementation
-   * (HADOOP-14558) on both the server and the client side.
+   * @param isRefresh whether this is a refresh operation or an initial load. 
This only
+   * affects logging.
    */
-  private int getLoadingThreadPoolSize(int numPaths) throws CatalogException {
-    Preconditions.checkState(numPaths > 0);
+  private void loadFileMetadataForPartitions(Iterable<HdfsPartition> parts,
+      boolean isRefresh) throws CatalogException {
+    // Group the partitions by their path (multiple partitions may point to 
the same
+    // path).
+    Map<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
+    for (HdfsPartition p : parts) {
+      Path partPath = FileSystemUtil.createFullyQualifiedPath(new 
Path(p.getLocation()));
+      partsByPath.computeIfAbsent(partPath, (path) -> new 
ArrayList<HdfsPartition>())
+          .add(p);
+    }
+
+    // Create a FileMetadataLoader for each path.
+    Map<Path, FileMetadataLoader> loadersByPath = Maps.newHashMap();
+    for (Map.Entry<Path, List<HdfsPartition>> e : partsByPath.entrySet()) {
+      List<FileDescriptor> oldFds = e.getValue().get(0).getFileDescriptors();
+      FileMetadataLoader loader = new FileMetadataLoader(e.getKey(), oldFds, 
hostIndex_);
+      // If there is a cached partition mapped to this path, we recompute the 
block
+      // locations even if the underlying files have not changed.
+      // This is done to keep the cached block metadata up to date.
+      boolean hasCachedPartition = Iterables.any(e.getValue(),
+          HdfsPartition::isMarkedCached);
+      loader.setForceRefreshBlockLocations(hasCachedPartition);
+      loadersByPath.put(e.getKey(), loader);
+    }
+
+    String logPrefix = String.format(
+        "%s file and block metadata for %s paths for table %s",
+        isRefresh ? "Refreshing" : "Loading", partsByPath.size(),
+        getFullName());
     FileSystem tableFs;
     try {
       tableFs = (new Path(getLocation())).getFileSystem(CONF);
     } catch (IOException e) {
       throw new CatalogException("Invalid table path for table: " + 
getFullName(), e);
     }
-    int threadPoolSize = FileSystemUtil.supportsStorageIds(tableFs) ?
-        MAX_HDFS_PARTITIONS_PARALLEL_LOAD : 
MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD;
-    // Thread pool size need not exceed the number of paths to be loaded.
-    return Math.min(numPaths, threadPoolSize);
-  }
 
-  /**
-   * Helper method to load the block locations for each partition directory in
-   * partsByPath using a thread pool. 'partsByPath' maps each partition 
directory to
-   * the corresponding HdfsPartition objects. If 'reuseFileMd' is true, the 
block
-   * metadata is incrementally refreshed, else it is reloaded from scratch.
-   */
-  private void loadMetadataAndDiskIds(Map<Path, List<HdfsPartition>> 
partsByPath,
-      boolean reuseFileMd) throws CatalogException {
-    int numPathsToLoad = partsByPath.size();
-    // For tables without partitions we have no file metadata to load.
-    if (numPathsToLoad == 0) return;
-
-    int threadPoolSize = getLoadingThreadPoolSize(numPathsToLoad);
-    LOG.info("{} file and block metadata for {} paths for table {} " +
-        "using a thread pool of size {}",
-        reuseFileMd ? "Refreshing" : "Loading", numPathsToLoad, getFullName(),
-        threadPoolSize);
-    ExecutorService partitionLoadingPool = 
Executors.newFixedThreadPool(threadPoolSize);
-    try {
-      List<Future<FileMetadataLoadStats>> pendingMdLoadTasks = new 
ArrayList<>();
-      for (Path p: partsByPath.keySet()) {
-        FileMetadataLoadRequest blockMdLoadReq =
-            new FileMetadataLoadRequest(p, partsByPath.get(p), reuseFileMd);
-        pendingMdLoadTasks.add(partitionLoadingPool.submit(blockMdLoadReq));
-      }
-      // Wait for the partition load tasks to finish.
-      int failedLoadTasks = 0;
-      for (Future<FileMetadataLoadStats> task: pendingMdLoadTasks) {
-        try {
-          FileMetadataLoadStats loadStats = task.get();
-          if (LOG.isTraceEnabled()) LOG.trace(loadStats.debugString());
-        } catch (ExecutionException | InterruptedException e) {
-          ++failedLoadTasks;
-          if (failedLoadTasks <= MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG) {
-            LOG.error("Encountered an error loading block metadata for table: 
" +
-                getFullName(), e);
-          }
-        }
-      }
-      if (failedLoadTasks > 0) {
-        int errorsNotLogged = failedLoadTasks - 
MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG;
-        if (errorsNotLogged > 0) {
-          LOG.error(String.format("Error loading file metadata for %s paths 
for table " +
-              "%s. Only the first %s errors were logged.", failedLoadTasks, 
getFullName(),
-              MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG));
-        }
-        throw new TableLoadingException(String.format("Failed to load file 
metadata "
-            + "for %s paths for table %s. Table's file metadata could be 
partially "
-            + "loaded. Check the Catalog server log for more details.", 
failedLoadTasks,
-            getFullName()));
+    // Actually load the partitions.
+    // TODO(IMPALA-8406): if this fails to load files from one or more 
partitions, then
+    // we'll throw an exception here and end up bailing out of whatever 
catalog operation
+    // we're in the middle of. This could cause a partial metadata update -- 
eg we may
+    // have refreshed the top-level table properties without refreshing the 
files.
+    new ParallelFileMetadataLoader(logPrefix, tableFs, loadersByPath.values())
+        .load();
+
+    // Store the loaded FDs into the partitions.
+    for (Map.Entry<Path, List<HdfsPartition>> e : partsByPath.entrySet()) {
+      Path p = e.getKey();
+      FileMetadataLoader loader = loadersByPath.get(p);
+
+      for (HdfsPartition part : e.getValue()) {
+        part.setFileDescriptors(loader.getLoadedFds());
       }
-    } finally {
-      partitionLoadingPool.shutdown();
     }
-    LOG.info(String.format("Loaded file and block metadata for %s", 
getFullName()));
+
+    // TODO(todd): would be good to log a summary of the loading process:
+    // - how long did it take?
+    // - how many block locations did we reuse/load individually/load via batch
+    // - how many partitions did we read metadata for
+    // - etc...
+    LOG.info("Loaded file and block metadata for {}", getFullName());
   }
 
   /**
@@ -967,7 +666,6 @@ public class HdfsTable extends Table implements FeFsTable {
   public List<HdfsPartition> createAndLoadPartitions(
       List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions)
       throws CatalogException {
-    Map<Path, List<HdfsPartition>> partsByPath = new HashMap<>();
     List<HdfsPartition> addedParts = new ArrayList<>();
     FsPermissionCache permCache = preloadPermissionsCache(msPartitions);
     for (org.apache.hadoop.hive.metastore.api.Partition partition: 
msPartitions) {
@@ -975,15 +673,8 @@ public class HdfsTable extends Table implements FeFsTable {
           permCache);
       Preconditions.checkNotNull(hdfsPartition);
       addedParts.add(hdfsPartition);
-      Path partitionPath = hdfsPartition.getLocationPath();
-      List<HdfsPartition> hdfsPartitions = partsByPath.get(partitionPath);
-      if (hdfsPartitions == null) {
-        partsByPath.put(partitionPath, Lists.newArrayList(hdfsPartition));
-      } else {
-        hdfsPartitions.add(hdfsPartition);
-      }
     }
-    loadMetadataAndDiskIds(partsByPath, false);
+    loadFileMetadataForPartitions(addedParts, /* isRefresh = */ false);
     return addedParts;
   }
 
@@ -1306,7 +997,7 @@ public class HdfsTable extends Table implements FeFsTable {
     part.setFileDescriptors(oldPartition.getFileDescriptors());
     addPartition(part);
     if (isMarkedCached_) part.markCached();
-    refreshPartitionFileMetadata(part);
+    loadFileMetadataForPartitions(ImmutableList.of(part), /*isRefresh=*/true);
   }
 
   /**
@@ -1335,8 +1026,8 @@ public class HdfsTable extends Table implements FeFsTable 
{
     msPartitionNames.addAll(client.listPartitionNames(db_.getName(), name_, 
(short) -1));
     // Names of loaded partitions in this table
     Set<String> partitionNames = new HashSet<>();
-    // Partitions for which file metadata must be loaded, grouped by partition 
paths.
-    Map<Path, List<HdfsPartition>> partitionsToUpdateFileMdByPath = new 
HashMap<>();
+    // Partitions for which file metadata must be loaded
+    List<HdfsPartition> partitionsToLoadFiles = Lists.newArrayList();
     // Partitions that need to be dropped and recreated from scratch
     List<HdfsPartition> dirtyPartitions = new ArrayList<>();
     // Partitions removed from the Hive Metastore.
@@ -1355,15 +1046,7 @@ public class HdfsTable extends Table implements 
FeFsTable {
         dirtyPartitions.add(partition);
       } else {
         if (partitionsToUpdate == null && loadPartitionFileMetadata) {
-          Path partitionPath = partition.getLocationPath();
-          List<HdfsPartition> partitions =
-            partitionsToUpdateFileMdByPath.get(partitionPath);
-          if (partitions == null) {
-            partitionsToUpdateFileMdByPath.put(
-                partitionPath, Lists.newArrayList(partition));
-          } else {
-            partitions.add(partition);
-          }
+          partitionsToLoadFiles.add(partition);
         }
       }
       Preconditions.checkNotNull(partition.getCachedMsPartitionDescriptor());
@@ -1373,6 +1056,7 @@ public class HdfsTable extends Table implements FeFsTable 
{
     // dirtyPartitions are reloaded and hence cache directives are not dropped.
     dropPartitions(dirtyPartitions, false);
     // Load dirty partitions from Hive Metastore
+    // TODO(todd): the logic around "dirty partitions" is highly suspicious.
     loadPartitionsFromMetastore(dirtyPartitions, client);
 
     // Identify and load partitions that were added in the Hive Metastore but 
don't
@@ -1391,21 +1075,20 @@ public class HdfsTable extends Table implements 
FeFsTable {
     // descriptors and block metadata of a table (e.g. REFRESH statement).
     if (loadPartitionFileMetadata) {
       if (partitionsToUpdate != null) {
+        Preconditions.checkState(partitionsToLoadFiles.isEmpty());
         // Only reload file metadata of partitions specified in 
'partitionsToUpdate'
-        Preconditions.checkState(partitionsToUpdateFileMdByPath.isEmpty());
-        partitionsToUpdateFileMdByPath = 
getPartitionsByPath(partitionsToUpdate);
+        partitionsToLoadFiles = getPartitionsForNames(partitionsToUpdate);
       }
-      loadMetadataAndDiskIds(partitionsToUpdateFileMdByPath, true);
+      loadFileMetadataForPartitions(partitionsToLoadFiles, /* 
isRefresh=*/true);
     }
   }
 
   /**
    * Given a set of partition names, returns the corresponding HdfsPartition
-   * objects grouped by their base directory path.
+   * objects.
    */
-  private Map<Path, List<HdfsPartition>> getPartitionsByPath(
-      Collection<String> partitionNames) {
-    Map<Path, List<HdfsPartition>> partsByPath = new HashMap<>();
+  private List<HdfsPartition> getPartitionsForNames(Collection<String> 
partitionNames) {
+    List<HdfsPartition> parts = 
Lists.newArrayListWithCapacity(partitionNames.size());
     for (String partitionName: partitionNames) {
       String partName = DEFAULT_PARTITION_NAME;
       if (partitionName.length() > 0) {
@@ -1414,15 +1097,9 @@ public class HdfsTable extends Table implements 
FeFsTable {
       }
       HdfsPartition partition = nameToPartitionMap_.get(partName);
       Preconditions.checkNotNull(partition, "Invalid partition name: " + 
partName);
-      Path partitionPath = partition.getLocationPath();
-      List<HdfsPartition> partitions = partsByPath.get(partitionPath);
-      if (partitions == null) {
-        partsByPath.put(partitionPath, Lists.newArrayList(partition));
-      } else {
-        partitions.add(partition);
-      }
+      parts.add(partition);
     }
-    return partsByPath;
+    return parts;
   }
 
   @Override
@@ -1591,16 +1268,17 @@ public class HdfsTable extends Table implements 
FeFsTable {
         Lists.newArrayList(partitionNames), db_.getName(), name_));
 
     FsPermissionCache permCache = preloadPermissionsCache(msPartitions);
-
+    List<HdfsPartition> partitions = new ArrayList<>(msPartitions.size());
     for (org.apache.hadoop.hive.metastore.api.Partition msPartition: 
msPartitions) {
       HdfsPartition partition = createPartition(msPartition.getSd(), 
msPartition,
           permCache);
-      addPartition(partition);
       // If the partition is null, its HDFS path does not exist, and it was 
not added to
       // this table's partition list. Skip the partition.
       if (partition == null) continue;
-      refreshPartitionFileMetadata(partition);
+      partitions.add(partition);
     }
+    loadFileMetadataForPartitions(partitions, /* isRefresh=*/false);
+    for (HdfsPartition partition : partitions) addPartition(partition);
   }
 
   /**
@@ -1967,9 +1645,10 @@ public class HdfsTable extends Table implements 
FeFsTable {
       return;
     }
 
-    FileStatus[] statuses = FileSystemUtil.listStatus(fs, path);
+    RemoteIterator<FileStatus> statuses = FileSystemUtil.listStatus(fs, path);
     if (statuses == null) return;
-    for (FileStatus status: statuses) {
+    while (statuses.hasNext()) {
+      FileStatus status = statuses.next();
       if (!status.isDirectory()) continue;
       Pair<String, LiteralExpr> keyValues =
           getTypeCompatibleValue(status.getPath(), partitionKeys.get(depth));
@@ -2192,7 +1871,8 @@ public class HdfsTable extends Table implements FeFsTable 
{
     if (oldPartition != null) {
       refreshedPartition.setFileDescriptors(oldPartition.getFileDescriptors());
     }
-    refreshPartitionFileMetadata(refreshedPartition);
+    loadFileMetadataForPartitions(ImmutableList.of(refreshedPartition),
+        /*isRefresh=*/true);
     dropPartition(oldPartition, false);
     addPartition(refreshedPartition);
   }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java 
b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
new file mode 100644
index 0000000..03fbf7f
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.impala.catalog;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.service.BackendConfig;
+import org.apache.impala.util.ThreadNameAnnotator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.MoreExecutors;
+
+
+/**
+ * Utility to coordinate the issuing of parallel metadata loading requests
+ * on a thread pool.
+ *
+ * This may safely be used even to load a single path: if only one path is to
+ * be loaded, this avoids creating any extra threads and uses the current 
thread
+ * instead.
+ */
+public class ParallelFileMetadataLoader {
+  private final static Logger LOG = LoggerFactory.getLogger(
+      ParallelFileMetadataLoader.class);
+
+  private static final int MAX_HDFS_PARTITIONS_PARALLEL_LOAD =
+      BackendConfig.INSTANCE.maxHdfsPartsParallelLoad();
+  private static final int MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD =
+      BackendConfig.INSTANCE.maxNonHdfsPartsParallelLoad();
+
+  // Maximum number of errors logged when loading partitioned tables.
+  private static final int MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG = 100;
+
+  private final String logPrefix_;
+  private List<FileMetadataLoader> loaders_;
+  private final FileSystem fs_;
+
+  /**
+   * @param logPrefix informational prefix for log messages
+   * @param fs the filesystem to load from (used to determine appropriate 
parallelism)
+   * @param loaders the metadata loaders to execute in parallel.
+   */
+  public ParallelFileMetadataLoader(String logPrefix, FileSystem fs,
+      Collection<FileMetadataLoader> loaders) {
+    logPrefix_ = logPrefix;
+    loaders_ = ImmutableList.copyOf(loaders);
+
+    // TODO(todd) in actuality, different partitions could be on different 
file systems.
+    // We probably should create one pool per filesystem type, and size each 
of those
+    // pools based on that particular filesystem, so if there's a mixed 
S3+HDFS table
+    // we do the right thing.
+    fs_ = fs;
+  }
+
+  /**
+   * Call 'load()' in parallel on all of the loaders. If any loaders fail, 
throws
+   * an exception. However, any successful loaders are guaranteed to complete
+   * before any exception is thrown.
+   */
+  void load() throws TableLoadingException {
+    if (loaders_.isEmpty()) return;
+
+    int failedLoadTasks = 0;
+    ExecutorService pool = createPool();
+    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(logPrefix_)) {
+      List<Future<Void>> futures = new ArrayList<>(loaders_.size());
+      for (FileMetadataLoader loader : loaders_) {
+        futures.add(pool.submit(() -> { loader.load(); return null; }));
+      }
+
+      // Wait for the loaders to finish.
+      for (int i = 0; i < futures.size(); i++) {
+        try {
+          futures.get(i).get();
+        } catch (ExecutionException | InterruptedException e) {
+          if (++failedLoadTasks <= MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG) {
+            LOG.error(logPrefix_ + " encountered an error loading data for 
path " +
+                loaders_.get(i).getPartDir(), e);
+          }
+        }
+      }
+    } finally {
+      pool.shutdown();
+    }
+    if (failedLoadTasks > 0) {
+      int errorsNotLogged = failedLoadTasks - 
MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG;
+      if (errorsNotLogged > 0) {
+        LOG.error(logPrefix_ + " error loading {} paths. Only the first {} 
errors " +
+            "were logged", failedLoadTasks, 
MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG);
+      }
+      throw new TableLoadingException(logPrefix_ + ": failed to load " + 
failedLoadTasks
+          + " paths. Check the catalog server log for more details.");
+    }
+  }
+
+  /**
+   * Returns the thread pool to load the file metadata.
+   *
+   * We use different thread pool sizes for HDFS and non-HDFS tables since the 
latter
+   * supports much higher throughput of RPC calls for listStatus/listFiles. For
+   * simplicity, the filesystem type is determined based on the table's root 
path and
+   * not for each partition individually. Based on our experiments, S3 showed 
a linear
+   * speed up (up to ~100x) with increasing number of loading threads where as 
the HDFS
+   * throughput was limited to ~5x in un-secure clusters and up to ~3.7x in 
secure
+   * clusters. We narrowed it down to scalability bottlenecks in HDFS RPC 
implementation
+   * (HADOOP-14558) on both the server and the client side.
+   */
+  private ExecutorService createPool() {
+    int numLoaders = loaders_.size();
+    Preconditions.checkState(numLoaders > 0);
+    int poolSize = FileSystemUtil.supportsStorageIds(fs_) ?
+        MAX_HDFS_PARTITIONS_PARALLEL_LOAD : 
MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD;
+    // Thread pool size need not exceed the number of paths to be loaded.
+    poolSize = Math.min(numLoaders, poolSize);
+
+    if (poolSize == 1) {
+      return MoreExecutors.sameThreadExecutor();
+    } else {
+      LOG.info(logPrefix_ + " using a thread pool of size {}", poolSize);
+      return Executors.newFixedThreadPool(poolSize);
+    }
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java 
b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 44b680d..24d9917 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -33,7 +33,6 @@ import 
org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.impala.analysis.TableName;
-import org.apache.impala.catalog.events.MetastoreEvents;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.Metrics;
 import org.apache.impala.common.Pair;
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index 91deec6..bee5ada 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -19,18 +19,12 @@ package org.apache.impala.catalog.local;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -40,9 +34,9 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.impala.authorization.AuthorizationPolicy;
+import org.apache.impala.catalog.FileMetadataLoader;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
-import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.MetaStoreClientPool;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.common.Pair;
@@ -67,7 +61,6 @@ import com.google.errorprone.annotations.Immutable;
  */
 class DirectMetaProvider implements MetaProvider {
   private static MetaStoreClientPool msClientPool_;
-  private static Configuration CONF = new Configuration();
 
   DirectMetaProvider() {
     initMsClientPool();
@@ -292,12 +285,12 @@ class DirectMetaProvider implements MetaProvider {
   private ImmutableList<FileDescriptor> loadFileMetadata(String fullTableName,
       String partName, Partition msPartition, ListMap<TNetworkAddress> 
hostIndex) {
     Path partDir = new Path(msPartition.getSd().getLocation());
+    FileMetadataLoader fml = new FileMetadataLoader(partDir,
+        /* oldFds= */Collections.emptyList(),
+        hostIndex);
 
-    List<LocatedFileStatus> stats = new ArrayList<>();
     try {
-      FileSystem fs = partDir.getFileSystem(CONF);
-      RemoteIterator<LocatedFileStatus> it = fs.listFiles(partDir, 
/*recursive=*/false);
-      while (it.hasNext()) stats.add(it.next());
+      fml.load();
     } catch (FileNotFoundException fnf) {
       // If the partition directory isn't found, this is treated as having no
       // files.
@@ -308,19 +301,7 @@ class DirectMetaProvider implements MetaProvider {
           partName, fullTableName), ioe);
     }
 
-    HdfsTable.FileMetadataLoadStats loadStats =
-        new HdfsTable.FileMetadataLoadStats(partDir);
-
-    try {
-      FileSystem fs = partDir.getFileSystem(CONF);
-      return ImmutableList.copyOf(
-          HdfsTable.createFileDescriptors(fs, new FakeRemoteIterator<>(stats),
-              hostIndex, loadStats));
-    } catch (IOException e) {
-        throw new LocalCatalogException(String.format(
-            "Could not convert files to descriptors for partition %s of table 
%s",
-            partName, fullTableName), e);
-    }
+    return ImmutableList.copyOf(fml.getLoadedFds());
   }
 
   @Immutable
@@ -386,28 +367,4 @@ class DirectMetaProvider implements MetaProvider {
       return msTable_.getPartitionKeysSize() != 0;
     }
   }
-
-
-  /**
-   * Wrapper for a normal Iterable<T> to appear like a Hadoop 
RemoteIterator<T>.
-   * This is necessary because the existing code to convert file statuses to
-   * descriptors consumes the remote iterator directly and thus avoids 
materializing
-   * all of the LocatedFileStatus objects in memory at the same time.
-   */
-  private static class FakeRemoteIterator<T> implements RemoteIterator<T> {
-    private final Iterator<T> it_;
-
-    FakeRemoteIterator(Iterable<T> it) {
-      this.it_ = it.iterator();
-    }
-    @Override
-    public boolean hasNext() throws IOException {
-      return it_.hasNext();
-    }
-
-    @Override
-    public T next() throws IOException {
-      return it_.next();
-    }
-  }
 }
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java 
b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index 10f4c69..c326d23 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -539,9 +539,10 @@ public class FileSystemUtil {
    * the file does not exist and also saves an RPC as the caller need not do a 
separate
    * exists check for the path. Returns null if the path does not exist.
    */
-  public static FileStatus[] listStatus(FileSystem fs, Path p) throws 
IOException {
+  public static RemoteIterator<FileStatus> listStatus(FileSystem fs, Path p)
+      throws IOException {
     try {
-      return fs.listStatus(p);
+      return fs.listStatusIterator(p);
     } catch (FileNotFoundException e) {
       if (LOG.isWarnEnabled()) LOG.warn("Path does not exist: " + 
p.toString(), e);
       return null;
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java 
b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
index e431baa..80a33fc 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
@@ -378,8 +378,9 @@ public class CatalogTest {
     assertEquals(1L, (long)opsCounts.getLong(GET_FILE_STATUS));
     // REFRESH calls listStatus on each of the partitions, but doesn't re-check
     // the permissions of the partition directories themselves.
-    assertEquals(table.getPartitionIds().size(),
-        (long)opsCounts.getLong(LIST_STATUS));
+    seenCalls = opsCounts.getLong(LIST_LOCATED_STATUS) +
+        opsCounts.getLong(LIST_STATUS);
+    assertEquals(table.getPartitionIds().size(), seenCalls);
     // None of the underlying files changed so we should not do any ops for 
the files.
     assertEquals(0L, (long)opsCounts.getLong(GET_FILE_BLOCK_LOCS));
 
diff --git a/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java 
b/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java
index 8798fa7..5e518fc 100644
--- a/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java
@@ -22,20 +22,16 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.impala.analysis.BoolLiteral;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.NullLiteral;
 import org.apache.impala.analysis.NumericLiteral;
 import org.apache.impala.analysis.StringLiteral;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
-import org.apache.impala.catalog.HdfsTable.FileMetadataLoadStats;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.util.ListMap;
@@ -153,11 +149,11 @@ public class HdfsPartitionTest {
   public void testCloneWithNewHostIndex() throws Exception {
     // Fetch some metadata from a directory in HDFS.
     Path p = new Path("hdfs://localhost:20500/test-warehouse/schemas");
-    FileSystem fs = p.getFileSystem(new Configuration());
-    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(p);
     ListMap<TNetworkAddress> origIndex = new ListMap<>();
-    List<FileDescriptor> fileDescriptors = HdfsTable.createFileDescriptors(fs, 
iter,
-        origIndex, new FileMetadataLoadStats(p));
+    FileMetadataLoader fml = new FileMetadataLoader(p, Collections.emptyList(),
+        origIndex);
+    fml.load();
+    List<FileDescriptor> fileDescriptors = fml.getLoadedFds();
     assertTrue(!fileDescriptors.isEmpty());
 
     FileDescriptor fd = fileDescriptors.get(0);
diff --git a/fe/src/test/java/org/apache/impala/planner/TestCaseLoaderTest.java 
b/fe/src/test/java/org/apache/impala/planner/TestCaseLoaderTest.java
index ff289c4..216daef 100644
--- a/fe/src/test/java/org/apache/impala/planner/TestCaseLoaderTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/TestCaseLoaderTest.java
@@ -48,8 +48,8 @@ public class TestCaseLoaderTest {
    */
   @Test
   public void testTestCaseImport() throws Exception {
-    FileStatus[] testCaseFiles = FileSystemUtil.listStatus(FileSystemUtil
-        .getFileSystemForPath(TESTCASE_DATA_DIR), TESTCASE_DATA_DIR);
+    FileStatus[] testCaseFiles = 
FileSystemUtil.getFileSystemForPath(TESTCASE_DATA_DIR)
+        .listStatus(TESTCASE_DATA_DIR);
     // Randomly pick testcases and try to replay them.
     Random rand = new Random();
     int maxIterations = 10;

Reply via email to