This is an automated email from the ASF dual-hosted git repository. todd pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push: new 1147120 IMPALA-8454 (part 1): Refactor file descriptor loading code 1147120 is described below commit 114712031f2293af5b3d9509776f88c23d3fa0fc Author: Todd Lipcon <t...@apache.org> AuthorDate: Fri Apr 5 16:26:19 2019 -0700 IMPALA-8454 (part 1): Refactor file descriptor loading code This refactors various file-descriptor loading code out of HdfsTable into new standalone classes. In order to support ACID tables, we'll need to make various changes to these bits of code, and having them extracted and cleaned up will make that easier. This consolidates all of the places in which we list partition directories into one method which does the appropriate thing regardless of situation. This has a small behavior change related to IMPALA-8406: previously, we had a bug where, while refreshing a table, if one or more partitions failed to refresh, the other partitions might still get refreshed despite an error being returned. Those other partitions wouldn't be available immediately until some other operation caused the table's catalog version number to increase. This was buggy behavior. Rather than tackle that problem in this "refactor" patch, this patch just slightly improves the behavior: we'll either atomically update or not update all partitions, but we might still add new partitions noticed by the REFRESH, and might still update other HMS metadata. This patch may end up slightly improving various other code paths that refresh file descriptor lists. We used to have slightly different ways of doing this in three different places, with different sets of optimizations. Now we do it all in one place, and we pull all the known tricks. Change-Id: I59edf493b9ba38be5f556b4795a7684d9c9e3a07 Reviewed-on: http://gerrit.cloudera.org:8080/12950 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- .../apache/impala/catalog/FileMetadataLoader.java | 229 ++++++++++ .../org/apache/impala/catalog/HdfsPartition.java | 2 + .../java/org/apache/impala/catalog/HdfsTable.java | 482 ++++----------------- .../impala/catalog/ParallelFileMetadataLoader.java | 148 +++++++ .../main/java/org/apache/impala/catalog/Table.java | 1 - .../impala/catalog/local/DirectMetaProvider.java | 55 +-- .../org/apache/impala/common/FileSystemUtil.java | 5 +- .../org/apache/impala/catalog/CatalogTest.java | 5 +- .../apache/impala/catalog/HdfsPartitionTest.java | 14 +- .../apache/impala/planner/TestCaseLoaderTest.java | 4 +- 10 files changed, 479 insertions(+), 466 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java new file mode 100644 index 0000000..d0f71b5 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java @@ -0,0 +1,229 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.impala.catalog; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.impala.catalog.HdfsPartition.FileDescriptor; +import org.apache.impala.common.FileSystemUtil; +import org.apache.impala.common.Reference; +import org.apache.impala.compat.HdfsShim; +import org.apache.impala.thrift.TNetworkAddress; +import org.apache.impala.util.ListMap; +import org.apache.impala.util.ThreadNameAnnotator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; + +/** + * Utility for loading file metadata within a partition directory. + */ +public class FileMetadataLoader { + private final static Logger LOG = LoggerFactory.getLogger(FileMetadataLoader.class); + private static final Configuration CONF = new Configuration(); + + private final Path partDir_; + private final ImmutableMap<String, FileDescriptor> oldFdsByName_; + private final ListMap<TNetworkAddress> hostIndex_; + + private boolean forceRefreshLocations = false; + + private List<FileDescriptor> loadedFds_; + private LoadStats loadStats_; + + /** + * @param partDir the dir for which to fetch file metadata + * @param oldFds any pre-existing file descriptors loaded for this table, used + * to optimize refresh if available. + * @param hostIndex the host index with which to associate the file descriptors + */ + public FileMetadataLoader(Path partDir, List<FileDescriptor> oldFds, + ListMap<TNetworkAddress> hostIndex) { + partDir_ = Preconditions.checkNotNull(partDir); + hostIndex_ = Preconditions.checkNotNull(hostIndex); + oldFdsByName_ = Maps.uniqueIndex(oldFds, FileDescriptor::getFileName); + } + + /** + * If 'refresh' is true, force re-fetching block locations even if a file does not + * appear to have changed. + */ + public void setForceRefreshBlockLocations(boolean refresh) { + forceRefreshLocations = refresh; + } + + /** + * @return the file descriptors that were loaded after an invocation of load() + */ + public List<FileDescriptor> getLoadedFds() { + Preconditions.checkState(loadedFds_ != null, + "Must have successfully loaded first"); + return loadedFds_; + } + + /** + * @return statistics about the descriptor loading process, after an invocation of + * load() + */ + public LoadStats getStats() { + Preconditions.checkState(loadedFds_ != null, + "Must have successfully loaded first"); + return loadStats_; + } + + Path getPartDir() { return partDir_; } + + /** + * Load the file descriptors, which may later be fetched using {@link #getLoadedFds()}. + * After a successful load, stats may be fetched using {@link #getStats()}. + * + * If the directory does not exist, this succeeds and yields an empty list of + * descriptors. + * + * @throws IOException if listing fails. + */ + public void load() throws IOException { + Preconditions.checkState(loadStats_ == null, "already loaded"); + loadStats_ = new LoadStats(); + FileSystem fs = partDir_.getFileSystem(CONF); + + // If we don't have any prior FDs from which we could re-use old block location info, + // we'll need to fetch info for every returned file. In this case we can inline + // that request with the 'list' call and save a round-trip per file. + // + // In the case that we _do_ have existing FDs which we can reuse, we'll optimistically + // assume that most _can_ be reused, in which case it's faster to _not_ prefetch + // the locations. + boolean listWithLocations = FileSystemUtil.supportsStorageIds(fs) && + (oldFdsByName_.isEmpty() || forceRefreshLocations); + + String msg = String.format("%s file metadata%s from path %s", + oldFdsByName_.isEmpty() ? "Loading" : "Refreshing", + listWithLocations ? " with eager location-fetching" : "", + partDir_); + LOG.trace(msg); + try (ThreadNameAnnotator tna = new ThreadNameAnnotator(msg)) { + RemoteIterator<? extends FileStatus> fileStatuses; + if (listWithLocations) { + fileStatuses = FileSystemUtil.listFiles(fs, partDir_, /*recursive=*/false); + } else { + fileStatuses = FileSystemUtil.listStatus(fs, partDir_); + + // TODO(todd): we could look at the result of listing without locations, and if + // we see that a substantial number of the files have changed, it may be better + // to go back and re-list with locations vs doing an RPC per file. + } + loadedFds_ = new ArrayList<>(); + if (fileStatuses == null) return; + + Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0)); + while (fileStatuses.hasNext()) { + FileStatus fileStatus = fileStatuses.next(); + if (!FileSystemUtil.isValidDataFile(fileStatus)) { + ++loadStats_.hiddenFiles; + continue; + } + // TODO(todd): this logic will have to change when we support recursive partition + // listing -- we need to index the old FDs by their relative path to the partition + // directory, not just the file name (last path component) + String fileName = fileStatus.getPath().getName().toString(); + FileDescriptor fd = oldFdsByName_.get(fileName); + if (listWithLocations || forceRefreshLocations || + hasFileChanged(fd, fileStatus)) { + fd = createFd(fs, fileStatus, numUnknownDiskIds); + ++loadStats_.loadedFiles; + } else { + ++loadStats_.skippedFiles; + } + loadedFds_.add(Preconditions.checkNotNull(fd));; + } + loadStats_.unknownDiskIds += numUnknownDiskIds.getRef(); + if (LOG.isTraceEnabled()) { + LOG.trace(loadStats_.debugString()); + } + } + } + + /** + * Create a FileDescriptor for the given FileStatus. If the FS supports block locations, + * and FileStatus is a LocatedFileStatus (i.e. the location was prefetched) this uses + * the already-loaded information; otherwise, this may have to remotely look up the + * locations. + */ + private FileDescriptor createFd(FileSystem fs, FileStatus fileStatus, + Reference<Long> numUnknownDiskIds) throws IOException { + if (!FileSystemUtil.supportsStorageIds(fs)) { + return FileDescriptor.createWithNoBlocks(fileStatus); + } + BlockLocation[] locations; + if (fileStatus instanceof LocatedFileStatus) { + locations = ((LocatedFileStatus)fileStatus).getBlockLocations(); + } else { + locations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); + } + return FileDescriptor.create(fileStatus, locations, fs, hostIndex_, + HdfsShim.isErasureCoded(fileStatus), numUnknownDiskIds); + } + + /** + * Compares the modification time and file size between the FileDescriptor and the + * FileStatus to determine if the file has changed. Returns true if the file has changed + * and false otherwise. + */ + private static boolean hasFileChanged(FileDescriptor fd, FileStatus status) { + return (fd == null) || (fd.getFileLength() != status.getLen()) || + (fd.getModificationTime() != status.getModificationTime()); + } + + // File/Block metadata loading stats for a single HDFS path. + public class LoadStats { + // Number of files for which the metadata was loaded. + public int loadedFiles = 0; + + // Number of hidden files excluded from file metadata loading. More details at + // isValidDataFile(). + public int hiddenFiles = 0; + + // Number of files skipped from file metadata loading because the files have not + // changed since the last load. More details at hasFileChanged(). + // + // TODO(todd) rename this to something indicating it was fast-pathed, not skipped + public int skippedFiles = 0; + + // Number of unknown disk IDs encountered while loading block + // metadata for this path. + public long unknownDiskIds = 0; + + public String debugString() { + return String.format("Path: %s: Loaded files: %s Hidden files: %s " + + "Skipped files: %s Unknown diskIDs: %s", partDir_, loadedFiles, hiddenFiles, + skippedFiles, unknownDiskIds); + } + } +} diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java index c27d347..8b6f629 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java @@ -175,6 +175,8 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition { */ private static FbFileDesc createFbFileDesc(FlatBufferBuilder fbb, FileStatus fileStatus, int[] fbFileBlockOffets, boolean isEc) { + // TODO(todd): need to use path relative to the partition dir, not the + // filename here. int fileNameOffset = fbb.createString(fileStatus.getPath().getName()); // A negative block vector offset is used when no block offsets are specified. int blockVectorOffset = -1; diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 49f0e9a..de190d9 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -31,18 +31,11 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.conf.HiveConf; @@ -61,8 +54,6 @@ import org.apache.impala.catalog.HdfsPartition.FileDescriptor; import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.Pair; -import org.apache.impala.common.Reference; -import org.apache.impala.compat.HdfsShim; import org.apache.impala.fb.FbFileBlock; import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.CatalogLookupStatus; @@ -98,10 +89,9 @@ import org.slf4j.LoggerFactory; import com.codahale.metrics.Gauge; import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.HashMultiset; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -128,9 +118,6 @@ public class HdfsTable extends Table implements FeFsTable { // Number of times to retry fetching the partitions from the HMS should an error occur. private final static int NUM_PARTITION_FETCH_RETRIES = 5; - // Maximum number of errors logged when loading partitioned tables. - private static final int MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG = 100; - // Table property key for skip.header.line.count public static final String TBL_PROP_SKIP_HEADER_LINE_COUNT = "skip.header.line.count"; @@ -283,77 +270,6 @@ public class HdfsTable extends Table implements FeFsTable { // and its usage in getFileSystem suggests it should be. private static final Configuration CONF = new Configuration(); - private static final int MAX_HDFS_PARTITIONS_PARALLEL_LOAD = - BackendConfig.INSTANCE.maxHdfsPartsParallelLoad(); - - private static final int MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD = - BackendConfig.INSTANCE.maxNonHdfsPartsParallelLoad(); - - // File/Block metadata loading stats for a single HDFS path. - public static class FileMetadataLoadStats { - // Path corresponding to this metadata load request. - private final Path hdfsPath; - - // Number of files for which the metadata was loaded. - public int loadedFiles = 0; - - // Number of hidden files excluded from file metadata loading. More details at - // isValidDataFile(). - public int hiddenFiles = 0; - - // Number of files skipped from file metadata loading because the files have not - // changed since the last load. More details at hasFileChanged(). - public int skippedFiles = 0; - - // Number of unknown disk IDs encountered while loading block - // metadata for this path. - public long unknownDiskIds = 0; - - public FileMetadataLoadStats(Path path) { hdfsPath = path; } - - public String debugString() { - Preconditions.checkNotNull(hdfsPath); - return String.format("Path: %s: Loaded files: %s Hidden files: %s " + - "Skipped files: %s Unknown diskIDs: %s", hdfsPath, loadedFiles, hiddenFiles, - skippedFiles, unknownDiskIds); - } - } - - // A callable implementation of file metadata loading request for a given - // HDFS path. - public class FileMetadataLoadRequest - implements Callable<FileMetadataLoadStats> { - private final Path hdfsPath_; - // All the partitions mapped to the above path - private final List<HdfsPartition> partitionList_; - // If set to true, reloads the file metadata only when the files in this path - // have changed since last load (more details in hasFileChanged()). - private final boolean reuseFileMd_; - - public FileMetadataLoadRequest( - Path path, List<HdfsPartition> partitions, boolean reuseFileMd) { - hdfsPath_ = path; - partitionList_ = partitions; - reuseFileMd_ = reuseFileMd; - } - - @Override - public FileMetadataLoadStats call() throws IOException { - try (ThreadNameAnnotator tna = new ThreadNameAnnotator(debugString())) { - FileMetadataLoadStats loadingStats = - reuseFileMd_ ? refreshFileMetadata(hdfsPath_, partitionList_) : - resetAndLoadFileMetadata(hdfsPath_, partitionList_); - return loadingStats; - } - } - - private String debugString() { - String loadType = reuseFileMd_ ? "Refreshing" : "Loading"; - return String.format("%s file metadata for path: %s", loadType, - hdfsPath_.toString()); - } - } - public HdfsTable(org.apache.hadoop.hive.metastore.api.Table msTbl, Db db, String name, String owner) { super(msTbl, db, name, owner); @@ -394,196 +310,6 @@ public class HdfsTable extends Table implements FeFsTable { } } - /** - * Drops and re-loads the file metadata of all the partitions in 'partitions' that - * map to the path 'partDir'. 'partDir' may belong to any file system that - * implements the hadoop's FileSystem interface (like HDFS, S3, ADLS etc.). It involves - * the following steps: - * - Clear the current file metadata of the partitions. - * - Call FileSystem.listFiles() on 'partDir' to fetch the FileStatus and BlockLocations - * for each file under it. - * - For every valid data file, enumerate all its blocks and their corresponding hosts - * and disk IDs if the underlying file system supports the block locations API - * (for ex: HDFS). For other file systems (like S3), per-block information is not - * available so it's not stored. - */ - private FileMetadataLoadStats resetAndLoadFileMetadata( - Path partDir, List<HdfsPartition> partitions) throws IOException { - FileMetadataLoadStats loadStats = new FileMetadataLoadStats(partDir); - // No need to load blocks for empty partitions list. - if (partitions == null || partitions.isEmpty()) return loadStats; - if (LOG.isTraceEnabled()) { - LOG.trace("Loading block md for " + getFullName() + " path: " + partDir.toString()); - } - - FileSystem fs = partDir.getFileSystem(CONF); - - RemoteIterator<LocatedFileStatus> fileStatusIter = - FileSystemUtil.listFiles(fs, partDir, false); - if (fileStatusIter == null) return loadStats; - - List<FileDescriptor> newFileDescs = createFileDescriptors( - fs, fileStatusIter, hostIndex_, loadStats); - for (HdfsPartition partition: partitions) { - partition.setFileDescriptors(newFileDescs); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Loaded file metadata for " + getFullName() + " " + - loadStats.debugString()); - } - return loadStats; - } - - /** - * Convert LocatedFileStatuses to FileDescriptors. - * - * If 'fs' is a FileSystem that supports block locations, the resulting - * descriptors include location information, and 'hostIndex' is updated - * to include all of the hosts referred to by the locations. - * - * 'loadStats' is updated to reflect this loading operation. - * - * May throw IOException if the provided RemoteIterator throws. - */ - public static List<FileDescriptor> createFileDescriptors( - FileSystem fs, - RemoteIterator<LocatedFileStatus> fileStatusIter, - ListMap<TNetworkAddress> hostIndex, - FileMetadataLoadStats loadStats) throws IOException { - boolean supportsBlocks = FileSystemUtil.supportsStorageIds(fs); - Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0)); - List<FileDescriptor> newFileDescs = new ArrayList<>(); - while (fileStatusIter.hasNext()) { - LocatedFileStatus fileStatus = fileStatusIter.next(); - if (!FileSystemUtil.isValidDataFile(fileStatus)) { - ++loadStats.hiddenFiles; - continue; - } - FileDescriptor fd; - if (supportsBlocks) { - fd = FileDescriptor.create(fileStatus, fileStatus.getBlockLocations(), fs, - hostIndex, HdfsShim.isErasureCoded(fileStatus), numUnknownDiskIds); - } else { - fd = FileDescriptor.createWithNoBlocks(fileStatus); - } - newFileDescs.add(fd); - ++loadStats.loadedFiles; - } - loadStats.unknownDiskIds += numUnknownDiskIds.getRef(); - return newFileDescs; - } - - /** - * Refreshes file metadata information for 'path'. This method is optimized for - * the case where the files in the partition have not changed dramatically. It first - * uses a listStatus() call on the partition directory to detect the modified files - * (look at hasFileChanged()) and fetches their block locations using the - * getFileBlockLocations() method. Our benchmarks suggest that the listStatus() call - * is much faster then the listFiles() (up to ~40x faster in some cases). - */ - private FileMetadataLoadStats refreshFileMetadata( - Path partDir, List<HdfsPartition> partitions) throws IOException { - FileMetadataLoadStats loadStats = new FileMetadataLoadStats(partDir); - // No need to load blocks for empty partitions list. - if (partitions == null || partitions.isEmpty()) return loadStats; - if (LOG.isTraceEnabled()) { - LOG.trace("Refreshing block md for " + getFullName() + " path: " + - partDir.toString()); - } - - // Index the partition file descriptors by their file names for O(1) look ups. - // We just pick the first partition to generate the fileDescByName lookup table - // since all the partitions map to the same partDir. - ImmutableMap<String, FileDescriptor> fileDescsByName = Maps.uniqueIndex( - partitions.get(0).getFileDescriptors(), new Function<FileDescriptor, String>() { - @Override - public String apply(FileDescriptor desc) { return desc.getFileName(); } - }); - - FileSystem fs = partDir.getFileSystem(CONF); - FileStatus[] fileStatuses = FileSystemUtil.listStatus(fs, partDir); - if (fileStatuses == null) return loadStats; - boolean supportsBlocks = FileSystemUtil.supportsStorageIds(fs); - Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0)); - List<FileDescriptor> newFileDescs = new ArrayList<>(); - // If there is a cached partition mapped to this path, we recompute the block - // locations even if the underlying files have not changed (hasFileChanged()). - // This is done to keep the cached block metadata up to date. - boolean isPartitionMarkedCached = false; - for (HdfsPartition partition: partitions) { - if (partition.isMarkedCached()) { - isPartitionMarkedCached = true; - break; - } - } - for (FileStatus fileStatus: fileStatuses) { - if (!FileSystemUtil.isValidDataFile(fileStatus)) { - ++loadStats.hiddenFiles; - continue; - } - String fileName = fileStatus.getPath().getName().toString(); - FileDescriptor fd = fileDescsByName.get(fileName); - if (isPartitionMarkedCached || hasFileChanged(fd, fileStatus)) { - if (supportsBlocks) { - BlockLocation[] locations = - fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); - fd = FileDescriptor.create(fileStatus, locations, fs, hostIndex_, - HdfsShim.isErasureCoded(fileStatus), numUnknownDiskIds); - } else { - fd = FileDescriptor.createWithNoBlocks(fileStatus); - } - ++loadStats.loadedFiles; - } else { - ++loadStats.skippedFiles; - } - Preconditions.checkNotNull(fd); - newFileDescs.add(fd); - } - loadStats.unknownDiskIds += numUnknownDiskIds.getRef(); - for (HdfsPartition partition: partitions) partition.setFileDescriptors(newFileDescs); - if (LOG.isTraceEnabled()) { - LOG.trace("Refreshed file metadata for " + getFullName() + " " - + loadStats.debugString()); - } - return loadStats; - } - - /** - * Compares the modification time and file size between the FileDescriptor and the - * FileStatus to determine if the file has changed. Returns true if the file has changed - * and false otherwise. - */ - private static boolean hasFileChanged(FileDescriptor fd, FileStatus status) { - return (fd == null) || (fd.getFileLength() != status.getLen()) || - (fd.getModificationTime() != status.getModificationTime()); - } - - /** - * Helper method to reload the file metadata of a single partition. - */ - private void refreshPartitionFileMetadata(HdfsPartition partition) - throws CatalogException { - String annotation = String.format("refreshing table %s partition %s", - getFullName(), partition.getPartitionName()); - try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation)) { - Path partDir = partition.getLocationPath(); - // If there are no existing files in the partition, use the non-incremental path. - boolean useExistingFds = partition.hasFileDescriptors(); - FileMetadataLoadStats stats; - if (useExistingFds) { - stats = refreshFileMetadata(partDir, Collections.singletonList(partition)); - } else { - stats = resetAndLoadFileMetadata(partDir, Collections.singletonList(partition)); - } - if (LOG.isDebugEnabled()) { - LOG.debug("{} file metadata for {} {}", useExistingFds ? "Refreshed" : "Loaded", - getFullName(), stats.debugString()); - } - } catch (IOException e) { - throw new CatalogException("Encountered invalid partition path", e); - } - } - @Override public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.TABLE; @@ -783,10 +509,6 @@ public class HdfsTable extends Table implements FeFsTable { CatalogException { Preconditions.checkNotNull(msTbl); initializePartitionMetadata(msTbl); - // Map of partition paths to their corresponding HdfsPartition objects. Populated - // using createPartition() calls. A single partition path can correspond to multiple - // partitions. - Map<Path, List<HdfsPartition>> partsByPath = new HashMap<>(); FsPermissionCache permCache = preloadPermissionsCache(msPartitions); Path tblLocation = FileSystemUtil.createFullyQualifiedPath(getHdfsBaseDirPath()); @@ -799,7 +521,6 @@ public class HdfsTable extends Table implements FeFsTable { // partition, so add a single partition with no keys which will get all the // files in the table's root directory. HdfsPartition part = createPartition(msTbl.getSd(), null, permCache); - partsByPath.put(tblLocation, Lists.newArrayList(part)); if (isMarkedCached_) part.markCached(); addPartition(part); } else { @@ -810,102 +531,80 @@ public class HdfsTable extends Table implements FeFsTable { // If the partition is null, its HDFS path does not exist, and it was not added // to this table's partition list. Skip the partition. if (partition == null) continue; - Path partDir = FileSystemUtil.createFullyQualifiedPath( - new Path(msPartition.getSd().getLocation())); - List<HdfsPartition> parts = partsByPath.get(partDir); - if (parts == null) { - partsByPath.put(partDir, Lists.newArrayList(partition)); - } else { - parts.add(partition); - } } } // Load the file metadata from scratch. - loadMetadataAndDiskIds(partsByPath, false); + loadFileMetadataForPartitions(partitionMap_.values(), /*isRefresh=*/false); } + /** - * Returns the thread pool size to load the file metadata of this table. - * 'numPaths' is the number of paths for which the file metadata should be loaded. + * Helper method to load the block locations for each partition in 'parts'. + * New file descriptor lists are loaded and the partitions are updated in place. * - * We use different thread pool sizes for HDFS and non-HDFS tables since the latter - * supports much higher throughput of RPC calls for listStatus/listFiles. For - * simplicity, the filesystem type is determined based on the table's root path and - * not for each partition individually. Based on our experiments, S3 showed a linear - * speed up (up to ~100x) with increasing number of loading threads where as the HDFS - * throughput was limited to ~5x in un-secure clusters and up to ~3.7x in secure - * clusters. We narrowed it down to scalability bottlenecks in HDFS RPC implementation - * (HADOOP-14558) on both the server and the client side. + * @param isRefresh whether this is a refresh operation or an initial load. This only + * affects logging. */ - private int getLoadingThreadPoolSize(int numPaths) throws CatalogException { - Preconditions.checkState(numPaths > 0); + private void loadFileMetadataForPartitions(Iterable<HdfsPartition> parts, + boolean isRefresh) throws CatalogException { + // Group the partitions by their path (multiple partitions may point to the same + // path). + Map<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap(); + for (HdfsPartition p : parts) { + Path partPath = FileSystemUtil.createFullyQualifiedPath(new Path(p.getLocation())); + partsByPath.computeIfAbsent(partPath, (path) -> new ArrayList<HdfsPartition>()) + .add(p); + } + + // Create a FileMetadataLoader for each path. + Map<Path, FileMetadataLoader> loadersByPath = Maps.newHashMap(); + for (Map.Entry<Path, List<HdfsPartition>> e : partsByPath.entrySet()) { + List<FileDescriptor> oldFds = e.getValue().get(0).getFileDescriptors(); + FileMetadataLoader loader = new FileMetadataLoader(e.getKey(), oldFds, hostIndex_); + // If there is a cached partition mapped to this path, we recompute the block + // locations even if the underlying files have not changed. + // This is done to keep the cached block metadata up to date. + boolean hasCachedPartition = Iterables.any(e.getValue(), + HdfsPartition::isMarkedCached); + loader.setForceRefreshBlockLocations(hasCachedPartition); + loadersByPath.put(e.getKey(), loader); + } + + String logPrefix = String.format( + "%s file and block metadata for %s paths for table %s", + isRefresh ? "Refreshing" : "Loading", partsByPath.size(), + getFullName()); FileSystem tableFs; try { tableFs = (new Path(getLocation())).getFileSystem(CONF); } catch (IOException e) { throw new CatalogException("Invalid table path for table: " + getFullName(), e); } - int threadPoolSize = FileSystemUtil.supportsStorageIds(tableFs) ? - MAX_HDFS_PARTITIONS_PARALLEL_LOAD : MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD; - // Thread pool size need not exceed the number of paths to be loaded. - return Math.min(numPaths, threadPoolSize); - } - /** - * Helper method to load the block locations for each partition directory in - * partsByPath using a thread pool. 'partsByPath' maps each partition directory to - * the corresponding HdfsPartition objects. If 'reuseFileMd' is true, the block - * metadata is incrementally refreshed, else it is reloaded from scratch. - */ - private void loadMetadataAndDiskIds(Map<Path, List<HdfsPartition>> partsByPath, - boolean reuseFileMd) throws CatalogException { - int numPathsToLoad = partsByPath.size(); - // For tables without partitions we have no file metadata to load. - if (numPathsToLoad == 0) return; - - int threadPoolSize = getLoadingThreadPoolSize(numPathsToLoad); - LOG.info("{} file and block metadata for {} paths for table {} " + - "using a thread pool of size {}", - reuseFileMd ? "Refreshing" : "Loading", numPathsToLoad, getFullName(), - threadPoolSize); - ExecutorService partitionLoadingPool = Executors.newFixedThreadPool(threadPoolSize); - try { - List<Future<FileMetadataLoadStats>> pendingMdLoadTasks = new ArrayList<>(); - for (Path p: partsByPath.keySet()) { - FileMetadataLoadRequest blockMdLoadReq = - new FileMetadataLoadRequest(p, partsByPath.get(p), reuseFileMd); - pendingMdLoadTasks.add(partitionLoadingPool.submit(blockMdLoadReq)); - } - // Wait for the partition load tasks to finish. - int failedLoadTasks = 0; - for (Future<FileMetadataLoadStats> task: pendingMdLoadTasks) { - try { - FileMetadataLoadStats loadStats = task.get(); - if (LOG.isTraceEnabled()) LOG.trace(loadStats.debugString()); - } catch (ExecutionException | InterruptedException e) { - ++failedLoadTasks; - if (failedLoadTasks <= MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG) { - LOG.error("Encountered an error loading block metadata for table: " + - getFullName(), e); - } - } - } - if (failedLoadTasks > 0) { - int errorsNotLogged = failedLoadTasks - MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG; - if (errorsNotLogged > 0) { - LOG.error(String.format("Error loading file metadata for %s paths for table " + - "%s. Only the first %s errors were logged.", failedLoadTasks, getFullName(), - MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG)); - } - throw new TableLoadingException(String.format("Failed to load file metadata " - + "for %s paths for table %s. Table's file metadata could be partially " - + "loaded. Check the Catalog server log for more details.", failedLoadTasks, - getFullName())); + // Actually load the partitions. + // TODO(IMPALA-8406): if this fails to load files from one or more partitions, then + // we'll throw an exception here and end up bailing out of whatever catalog operation + // we're in the middle of. This could cause a partial metadata update -- eg we may + // have refreshed the top-level table properties without refreshing the files. + new ParallelFileMetadataLoader(logPrefix, tableFs, loadersByPath.values()) + .load(); + + // Store the loaded FDs into the partitions. + for (Map.Entry<Path, List<HdfsPartition>> e : partsByPath.entrySet()) { + Path p = e.getKey(); + FileMetadataLoader loader = loadersByPath.get(p); + + for (HdfsPartition part : e.getValue()) { + part.setFileDescriptors(loader.getLoadedFds()); } - } finally { - partitionLoadingPool.shutdown(); } - LOG.info(String.format("Loaded file and block metadata for %s", getFullName())); + + // TODO(todd): would be good to log a summary of the loading process: + // - how long did it take? + // - how many block locations did we reuse/load individually/load via batch + // - how many partitions did we read metadata for + // - etc... + LOG.info("Loaded file and block metadata for {}", getFullName()); } /** @@ -967,7 +666,6 @@ public class HdfsTable extends Table implements FeFsTable { public List<HdfsPartition> createAndLoadPartitions( List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions) throws CatalogException { - Map<Path, List<HdfsPartition>> partsByPath = new HashMap<>(); List<HdfsPartition> addedParts = new ArrayList<>(); FsPermissionCache permCache = preloadPermissionsCache(msPartitions); for (org.apache.hadoop.hive.metastore.api.Partition partition: msPartitions) { @@ -975,15 +673,8 @@ public class HdfsTable extends Table implements FeFsTable { permCache); Preconditions.checkNotNull(hdfsPartition); addedParts.add(hdfsPartition); - Path partitionPath = hdfsPartition.getLocationPath(); - List<HdfsPartition> hdfsPartitions = partsByPath.get(partitionPath); - if (hdfsPartitions == null) { - partsByPath.put(partitionPath, Lists.newArrayList(hdfsPartition)); - } else { - hdfsPartitions.add(hdfsPartition); - } } - loadMetadataAndDiskIds(partsByPath, false); + loadFileMetadataForPartitions(addedParts, /* isRefresh = */ false); return addedParts; } @@ -1306,7 +997,7 @@ public class HdfsTable extends Table implements FeFsTable { part.setFileDescriptors(oldPartition.getFileDescriptors()); addPartition(part); if (isMarkedCached_) part.markCached(); - refreshPartitionFileMetadata(part); + loadFileMetadataForPartitions(ImmutableList.of(part), /*isRefresh=*/true); } /** @@ -1335,8 +1026,8 @@ public class HdfsTable extends Table implements FeFsTable { msPartitionNames.addAll(client.listPartitionNames(db_.getName(), name_, (short) -1)); // Names of loaded partitions in this table Set<String> partitionNames = new HashSet<>(); - // Partitions for which file metadata must be loaded, grouped by partition paths. - Map<Path, List<HdfsPartition>> partitionsToUpdateFileMdByPath = new HashMap<>(); + // Partitions for which file metadata must be loaded + List<HdfsPartition> partitionsToLoadFiles = Lists.newArrayList(); // Partitions that need to be dropped and recreated from scratch List<HdfsPartition> dirtyPartitions = new ArrayList<>(); // Partitions removed from the Hive Metastore. @@ -1355,15 +1046,7 @@ public class HdfsTable extends Table implements FeFsTable { dirtyPartitions.add(partition); } else { if (partitionsToUpdate == null && loadPartitionFileMetadata) { - Path partitionPath = partition.getLocationPath(); - List<HdfsPartition> partitions = - partitionsToUpdateFileMdByPath.get(partitionPath); - if (partitions == null) { - partitionsToUpdateFileMdByPath.put( - partitionPath, Lists.newArrayList(partition)); - } else { - partitions.add(partition); - } + partitionsToLoadFiles.add(partition); } } Preconditions.checkNotNull(partition.getCachedMsPartitionDescriptor()); @@ -1373,6 +1056,7 @@ public class HdfsTable extends Table implements FeFsTable { // dirtyPartitions are reloaded and hence cache directives are not dropped. dropPartitions(dirtyPartitions, false); // Load dirty partitions from Hive Metastore + // TODO(todd): the logic around "dirty partitions" is highly suspicious. loadPartitionsFromMetastore(dirtyPartitions, client); // Identify and load partitions that were added in the Hive Metastore but don't @@ -1391,21 +1075,20 @@ public class HdfsTable extends Table implements FeFsTable { // descriptors and block metadata of a table (e.g. REFRESH statement). if (loadPartitionFileMetadata) { if (partitionsToUpdate != null) { + Preconditions.checkState(partitionsToLoadFiles.isEmpty()); // Only reload file metadata of partitions specified in 'partitionsToUpdate' - Preconditions.checkState(partitionsToUpdateFileMdByPath.isEmpty()); - partitionsToUpdateFileMdByPath = getPartitionsByPath(partitionsToUpdate); + partitionsToLoadFiles = getPartitionsForNames(partitionsToUpdate); } - loadMetadataAndDiskIds(partitionsToUpdateFileMdByPath, true); + loadFileMetadataForPartitions(partitionsToLoadFiles, /* isRefresh=*/true); } } /** * Given a set of partition names, returns the corresponding HdfsPartition - * objects grouped by their base directory path. + * objects. */ - private Map<Path, List<HdfsPartition>> getPartitionsByPath( - Collection<String> partitionNames) { - Map<Path, List<HdfsPartition>> partsByPath = new HashMap<>(); + private List<HdfsPartition> getPartitionsForNames(Collection<String> partitionNames) { + List<HdfsPartition> parts = Lists.newArrayListWithCapacity(partitionNames.size()); for (String partitionName: partitionNames) { String partName = DEFAULT_PARTITION_NAME; if (partitionName.length() > 0) { @@ -1414,15 +1097,9 @@ public class HdfsTable extends Table implements FeFsTable { } HdfsPartition partition = nameToPartitionMap_.get(partName); Preconditions.checkNotNull(partition, "Invalid partition name: " + partName); - Path partitionPath = partition.getLocationPath(); - List<HdfsPartition> partitions = partsByPath.get(partitionPath); - if (partitions == null) { - partsByPath.put(partitionPath, Lists.newArrayList(partition)); - } else { - partitions.add(partition); - } + parts.add(partition); } - return partsByPath; + return parts; } @Override @@ -1591,16 +1268,17 @@ public class HdfsTable extends Table implements FeFsTable { Lists.newArrayList(partitionNames), db_.getName(), name_)); FsPermissionCache permCache = preloadPermissionsCache(msPartitions); - + List<HdfsPartition> partitions = new ArrayList<>(msPartitions.size()); for (org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) { HdfsPartition partition = createPartition(msPartition.getSd(), msPartition, permCache); - addPartition(partition); // If the partition is null, its HDFS path does not exist, and it was not added to // this table's partition list. Skip the partition. if (partition == null) continue; - refreshPartitionFileMetadata(partition); + partitions.add(partition); } + loadFileMetadataForPartitions(partitions, /* isRefresh=*/false); + for (HdfsPartition partition : partitions) addPartition(partition); } /** @@ -1967,9 +1645,10 @@ public class HdfsTable extends Table implements FeFsTable { return; } - FileStatus[] statuses = FileSystemUtil.listStatus(fs, path); + RemoteIterator<FileStatus> statuses = FileSystemUtil.listStatus(fs, path); if (statuses == null) return; - for (FileStatus status: statuses) { + while (statuses.hasNext()) { + FileStatus status = statuses.next(); if (!status.isDirectory()) continue; Pair<String, LiteralExpr> keyValues = getTypeCompatibleValue(status.getPath(), partitionKeys.get(depth)); @@ -2192,7 +1871,8 @@ public class HdfsTable extends Table implements FeFsTable { if (oldPartition != null) { refreshedPartition.setFileDescriptors(oldPartition.getFileDescriptors()); } - refreshPartitionFileMetadata(refreshedPartition); + loadFileMetadataForPartitions(ImmutableList.of(refreshedPartition), + /*isRefresh=*/true); dropPartition(oldPartition, false); addPartition(refreshedPartition); } diff --git a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java new file mode 100644 index 0000000..03fbf7f --- /dev/null +++ b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.impala.catalog; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.impala.common.FileSystemUtil; +import org.apache.impala.service.BackendConfig; +import org.apache.impala.util.ThreadNameAnnotator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.MoreExecutors; + + +/** + * Utility to coordinate the issuing of parallel metadata loading requests + * on a thread pool. + * + * This may safely be used even to load a single path: if only one path is to + * be loaded, this avoids creating any extra threads and uses the current thread + * instead. + */ +public class ParallelFileMetadataLoader { + private final static Logger LOG = LoggerFactory.getLogger( + ParallelFileMetadataLoader.class); + + private static final int MAX_HDFS_PARTITIONS_PARALLEL_LOAD = + BackendConfig.INSTANCE.maxHdfsPartsParallelLoad(); + private static final int MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD = + BackendConfig.INSTANCE.maxNonHdfsPartsParallelLoad(); + + // Maximum number of errors logged when loading partitioned tables. + private static final int MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG = 100; + + private final String logPrefix_; + private List<FileMetadataLoader> loaders_; + private final FileSystem fs_; + + /** + * @param logPrefix informational prefix for log messages + * @param fs the filesystem to load from (used to determine appropriate parallelism) + * @param loaders the metadata loaders to execute in parallel. + */ + public ParallelFileMetadataLoader(String logPrefix, FileSystem fs, + Collection<FileMetadataLoader> loaders) { + logPrefix_ = logPrefix; + loaders_ = ImmutableList.copyOf(loaders); + + // TODO(todd) in actuality, different partitions could be on different file systems. + // We probably should create one pool per filesystem type, and size each of those + // pools based on that particular filesystem, so if there's a mixed S3+HDFS table + // we do the right thing. + fs_ = fs; + } + + /** + * Call 'load()' in parallel on all of the loaders. If any loaders fail, throws + * an exception. However, any successful loaders are guaranteed to complete + * before any exception is thrown. + */ + void load() throws TableLoadingException { + if (loaders_.isEmpty()) return; + + int failedLoadTasks = 0; + ExecutorService pool = createPool(); + try (ThreadNameAnnotator tna = new ThreadNameAnnotator(logPrefix_)) { + List<Future<Void>> futures = new ArrayList<>(loaders_.size()); + for (FileMetadataLoader loader : loaders_) { + futures.add(pool.submit(() -> { loader.load(); return null; })); + } + + // Wait for the loaders to finish. + for (int i = 0; i < futures.size(); i++) { + try { + futures.get(i).get(); + } catch (ExecutionException | InterruptedException e) { + if (++failedLoadTasks <= MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG) { + LOG.error(logPrefix_ + " encountered an error loading data for path " + + loaders_.get(i).getPartDir(), e); + } + } + } + } finally { + pool.shutdown(); + } + if (failedLoadTasks > 0) { + int errorsNotLogged = failedLoadTasks - MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG; + if (errorsNotLogged > 0) { + LOG.error(logPrefix_ + " error loading {} paths. Only the first {} errors " + + "were logged", failedLoadTasks, MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG); + } + throw new TableLoadingException(logPrefix_ + ": failed to load " + failedLoadTasks + + " paths. Check the catalog server log for more details."); + } + } + + /** + * Returns the thread pool to load the file metadata. + * + * We use different thread pool sizes for HDFS and non-HDFS tables since the latter + * supports much higher throughput of RPC calls for listStatus/listFiles. For + * simplicity, the filesystem type is determined based on the table's root path and + * not for each partition individually. Based on our experiments, S3 showed a linear + * speed up (up to ~100x) with increasing number of loading threads where as the HDFS + * throughput was limited to ~5x in un-secure clusters and up to ~3.7x in secure + * clusters. We narrowed it down to scalability bottlenecks in HDFS RPC implementation + * (HADOOP-14558) on both the server and the client side. + */ + private ExecutorService createPool() { + int numLoaders = loaders_.size(); + Preconditions.checkState(numLoaders > 0); + int poolSize = FileSystemUtil.supportsStorageIds(fs_) ? + MAX_HDFS_PARTITIONS_PARALLEL_LOAD : MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD; + // Thread pool size need not exceed the number of paths to be loaded. + poolSize = Math.min(numLoaders, poolSize); + + if (poolSize == 1) { + return MoreExecutors.sameThreadExecutor(); + } else { + LOG.info(logPrefix_ + " using a thread pool of size {}", poolSize); + return Executors.newFixedThreadPool(poolSize); + } + } +} diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java index 44b680d..24d9917 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Table.java +++ b/fe/src/main/java/org/apache/impala/catalog/Table.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.impala.analysis.TableName; -import org.apache.impala.catalog.events.MetastoreEvents; import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.common.Metrics; import org.apache.impala.common.Pair; diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java index 91deec6..bee5ada 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java @@ -19,18 +19,12 @@ package org.apache.impala.catalog.local; import java.io.FileNotFoundException; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Database; @@ -40,9 +34,9 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.impala.authorization.AuthorizationPolicy; +import org.apache.impala.catalog.FileMetadataLoader; import org.apache.impala.catalog.Function; import org.apache.impala.catalog.HdfsPartition.FileDescriptor; -import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.MetaStoreClientPool; import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; import org.apache.impala.common.Pair; @@ -67,7 +61,6 @@ import com.google.errorprone.annotations.Immutable; */ class DirectMetaProvider implements MetaProvider { private static MetaStoreClientPool msClientPool_; - private static Configuration CONF = new Configuration(); DirectMetaProvider() { initMsClientPool(); @@ -292,12 +285,12 @@ class DirectMetaProvider implements MetaProvider { private ImmutableList<FileDescriptor> loadFileMetadata(String fullTableName, String partName, Partition msPartition, ListMap<TNetworkAddress> hostIndex) { Path partDir = new Path(msPartition.getSd().getLocation()); + FileMetadataLoader fml = new FileMetadataLoader(partDir, + /* oldFds= */Collections.emptyList(), + hostIndex); - List<LocatedFileStatus> stats = new ArrayList<>(); try { - FileSystem fs = partDir.getFileSystem(CONF); - RemoteIterator<LocatedFileStatus> it = fs.listFiles(partDir, /*recursive=*/false); - while (it.hasNext()) stats.add(it.next()); + fml.load(); } catch (FileNotFoundException fnf) { // If the partition directory isn't found, this is treated as having no // files. @@ -308,19 +301,7 @@ class DirectMetaProvider implements MetaProvider { partName, fullTableName), ioe); } - HdfsTable.FileMetadataLoadStats loadStats = - new HdfsTable.FileMetadataLoadStats(partDir); - - try { - FileSystem fs = partDir.getFileSystem(CONF); - return ImmutableList.copyOf( - HdfsTable.createFileDescriptors(fs, new FakeRemoteIterator<>(stats), - hostIndex, loadStats)); - } catch (IOException e) { - throw new LocalCatalogException(String.format( - "Could not convert files to descriptors for partition %s of table %s", - partName, fullTableName), e); - } + return ImmutableList.copyOf(fml.getLoadedFds()); } @Immutable @@ -386,28 +367,4 @@ class DirectMetaProvider implements MetaProvider { return msTable_.getPartitionKeysSize() != 0; } } - - - /** - * Wrapper for a normal Iterable<T> to appear like a Hadoop RemoteIterator<T>. - * This is necessary because the existing code to convert file statuses to - * descriptors consumes the remote iterator directly and thus avoids materializing - * all of the LocatedFileStatus objects in memory at the same time. - */ - private static class FakeRemoteIterator<T> implements RemoteIterator<T> { - private final Iterator<T> it_; - - FakeRemoteIterator(Iterable<T> it) { - this.it_ = it.iterator(); - } - @Override - public boolean hasNext() throws IOException { - return it_.hasNext(); - } - - @Override - public T next() throws IOException { - return it_.next(); - } - } } diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java index 10f4c69..c326d23 100644 --- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java +++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java @@ -539,9 +539,10 @@ public class FileSystemUtil { * the file does not exist and also saves an RPC as the caller need not do a separate * exists check for the path. Returns null if the path does not exist. */ - public static FileStatus[] listStatus(FileSystem fs, Path p) throws IOException { + public static RemoteIterator<FileStatus> listStatus(FileSystem fs, Path p) + throws IOException { try { - return fs.listStatus(p); + return fs.listStatusIterator(p); } catch (FileNotFoundException e) { if (LOG.isWarnEnabled()) LOG.warn("Path does not exist: " + p.toString(), e); return null; diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java index e431baa..80a33fc 100644 --- a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java @@ -378,8 +378,9 @@ public class CatalogTest { assertEquals(1L, (long)opsCounts.getLong(GET_FILE_STATUS)); // REFRESH calls listStatus on each of the partitions, but doesn't re-check // the permissions of the partition directories themselves. - assertEquals(table.getPartitionIds().size(), - (long)opsCounts.getLong(LIST_STATUS)); + seenCalls = opsCounts.getLong(LIST_LOCATED_STATUS) + + opsCounts.getLong(LIST_STATUS); + assertEquals(table.getPartitionIds().size(), seenCalls); // None of the underlying files changed so we should not do any ops for the files. assertEquals(0L, (long)opsCounts.getLong(GET_FILE_BLOCK_LOCS)); diff --git a/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java b/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java index 8798fa7..5e518fc 100644 --- a/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java @@ -22,20 +22,16 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.impala.analysis.BoolLiteral; import org.apache.impala.analysis.LiteralExpr; import org.apache.impala.analysis.NullLiteral; import org.apache.impala.analysis.NumericLiteral; import org.apache.impala.analysis.StringLiteral; import org.apache.impala.catalog.HdfsPartition.FileDescriptor; -import org.apache.impala.catalog.HdfsTable.FileMetadataLoadStats; import org.apache.impala.service.FeSupport; import org.apache.impala.thrift.TNetworkAddress; import org.apache.impala.util.ListMap; @@ -153,11 +149,11 @@ public class HdfsPartitionTest { public void testCloneWithNewHostIndex() throws Exception { // Fetch some metadata from a directory in HDFS. Path p = new Path("hdfs://localhost:20500/test-warehouse/schemas"); - FileSystem fs = p.getFileSystem(new Configuration()); - RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(p); ListMap<TNetworkAddress> origIndex = new ListMap<>(); - List<FileDescriptor> fileDescriptors = HdfsTable.createFileDescriptors(fs, iter, - origIndex, new FileMetadataLoadStats(p)); + FileMetadataLoader fml = new FileMetadataLoader(p, Collections.emptyList(), + origIndex); + fml.load(); + List<FileDescriptor> fileDescriptors = fml.getLoadedFds(); assertTrue(!fileDescriptors.isEmpty()); FileDescriptor fd = fileDescriptors.get(0); diff --git a/fe/src/test/java/org/apache/impala/planner/TestCaseLoaderTest.java b/fe/src/test/java/org/apache/impala/planner/TestCaseLoaderTest.java index ff289c4..216daef 100644 --- a/fe/src/test/java/org/apache/impala/planner/TestCaseLoaderTest.java +++ b/fe/src/test/java/org/apache/impala/planner/TestCaseLoaderTest.java @@ -48,8 +48,8 @@ public class TestCaseLoaderTest { */ @Test public void testTestCaseImport() throws Exception { - FileStatus[] testCaseFiles = FileSystemUtil.listStatus(FileSystemUtil - .getFileSystemForPath(TESTCASE_DATA_DIR), TESTCASE_DATA_DIR); + FileStatus[] testCaseFiles = FileSystemUtil.getFileSystemForPath(TESTCASE_DATA_DIR) + .listStatus(TESTCASE_DATA_DIR); // Randomly pick testcases and try to replay them. Random rand = new Random(); int maxIterations = 10;