This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 419aa2e30db326f02e9b4ec563ef7864e82df86e
Author: stiga-huang <huangquanl...@gmail.com>
AuthorDate: Mon May 25 18:01:38 2020 +0800

    IMPALA-9778: Refactor partition modifications in DDL/DMLs
    
    After this patch, in DDL/DMLs that update metadata of partitions,
    instead of updating partitions in place, we always create new ones and
    use them to replace the existing instances. This is guarded by making
    HdfsPartition immutable. There are several benefits for this:
     - HdfsPartition can be shared across table versions. In full catalog
       update mode, catalog update can ignore unchanged partitions
       (IMPALA-3234) and send the update in partition granularity.
     - Aborted DDL/DMLs won't leave partition metadata in a bad shape (e.g.
       IMPALA-8406), which usually requires invalidation to recover.
     - Fetch-on-demand coordinators can cache partition meta using the
       partition id as the key. When table version updates, only metadata of
       changed partitions need to be reloaded (IMPALA-7533).
     - In the work of decoupling partitions from tables (IMPALA-3127), we
       don't need to assign a catalog version to partitions since the
       partition ids already identify the partitions.
    
    However, HdfsPartition is not strictly immutable. Although all its
    fields are final, some fields are still referencing mutable objects. We
    need more refactoring to achieve this. This patch focuses on refactoring
    the DDL/DML code paths.
    
    Changes:
     - Make all fields of HdfsPartition final. Move
       HdfsPartition constructor logics and all its update methods into
       HdfsPartition.Builder.
     - Refactor in-place updates on HdfsPartition to be creating a new one
       and dropping the old one. HdfsPartition.Builder represents the
       in-progress modifications. Once all modifications are done, call its
       build() method to create the new HdfsPartition instance. The old
       HdfsPartition instance is only replaced at the end of the
       modifications.
     - Move the "dirty" marker of HdfsPartition into a map of HdfsTable. It
       maps from the old partition id to the in-progress partition builder.
       For "dirty" partitions, we’ll reload its HMS meta and file meta.
    
    Tests:
     - No new tests are added since the existing tests already provide
       sufficient coverage
     - Run CORE tests
    
    Change-Id: Ib52e5810d01d5e0c910daacb9c98977426d3914c
    Reviewed-on: http://gerrit.cloudera.org:8080/15985
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 .../impala/catalog/CatalogServiceCatalog.java      |   6 +-
 .../org/apache/impala/catalog/FeCatalogUtils.java  |  28 +-
 .../org/apache/impala/catalog/HdfsPartition.java   | 615 +++++++++++++--------
 .../java/org/apache/impala/catalog/HdfsTable.java  | 384 ++++++++-----
 .../impala/catalog/ParallelFileMetadataLoader.java |  23 +-
 .../apache/impala/catalog/PartitionStatsUtil.java  |   2 +-
 .../main/java/org/apache/impala/catalog/Table.java |  15 +
 .../apache/impala/service/CatalogOpExecutor.java   | 202 +++----
 .../org/apache/impala/util/HdfsCachingUtil.java    |  21 +-
 .../catalog/CatalogObjectToFromThriftTest.java     |  10 +-
 .../org/apache/impala/catalog/CatalogTest.java     |   7 +-
 11 files changed, 810 insertions(+), 503 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 994abff..fafad2c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 
 import org.apache.commons.collections.MapUtils;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -3125,8 +3126,11 @@ public class CatalogServiceCatalog extends Catalog {
             "Unable to fetch valid transaction ids while loading file metadata 
for table "
                 + table.getFullName(), ex);
       }
+      List<HdfsPartition.Builder> partBuilders = 
partToPartialInfoMap.keySet().stream()
+          .map(HdfsPartition.Builder::new)
+          .collect(Collectors.toList());
       Map<HdfsPartition, List<FileDescriptor>> fdsByPart = new 
ParallelFileMetadataLoader(
-          table, partToPartialInfoMap.keySet(), reqWriteIdList, validTxnList, 
logPrefix)
+          table, partBuilders, reqWriteIdList, validTxnList, logPrefix)
           .loadAndGet();
       for (HdfsPartition partition : fdsByPart.keySet()) {
         TPartialPartitionInfo partitionInfo = 
partToPartialInfoMap.get(partition);
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java 
b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
index f88beb6..ccaa238 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -264,14 +265,8 @@ public abstract class FeCatalogUtils {
    * TODO: this could be a default method in FeFsPartition in Java 8.
    */
   public static String getPartitionName(FeFsPartition partition) {
-    FeFsTable table = partition.getTable();
-    List<String> partitionCols = new ArrayList<>();
-    for (int i = 0; i < table.getNumClusteringCols(); ++i) {
-      partitionCols.add(table.getColumns().get(i).getName());
-    }
-
-    return FileUtils.makePartName(
-        partitionCols, getPartitionValuesAsStrings(partition, true));
+    return getPartitionName(partition.getTable(),
+        getPartitionValuesAsStrings(partition, true));
   }
 
   // TODO: this could be a default method in FeFsPartition in Java 8.
@@ -289,6 +284,23 @@ public abstract class FeCatalogUtils {
     return ret;
   }
 
+  public static String getPartitionName(HdfsPartition.Builder partBuilder) {
+    HdfsTable table = partBuilder.getTable();
+    List<String> partitionValues = new ArrayList<>();
+    for (LiteralExpr partValue : partBuilder.getPartitionValues()) {
+      partitionValues.add(PartitionKeyValue.getPartitionKeyValueString(
+          partValue, table.getNullPartitionKeyValue()));
+    }
+    return getPartitionName(table, partitionValues);
+  }
+
+  public static String getPartitionName(FeFsTable table, List<String> 
partitionValues) {
+    List<String> partitionKeys = table.getClusteringColumns().stream()
+        .map(Column::getName)
+        .collect(Collectors.toList());
+    return FileUtils.makePartName(partitionKeys, partitionValues);
+  }
+
   // TODO: this could be a default method in FeFsPartition in Java 8.
   public static String getConjunctSqlForPartition(FeFsPartition part) {
     List<String> partColSql = new ArrayList<>();
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index e7a5bb8..1cb5c30 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -30,6 +31,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
@@ -80,6 +82,8 @@ import com.google.flatbuffers.FlatBufferBuilder;
  * based on their partition-key values. The comparison orders partitions in 
ascending
  * order with NULLs sorting last. The ordering is useful for displaying 
partitions
  * in SHOW statements.
+ * This class is supposed to be immutable. We should use HdfsPartition.Builder 
to create
+ * new instances instead of updating the fields in-place.
  */
 public class HdfsPartition implements FeFsPartition, PrunablePartition {
   /**
@@ -514,9 +518,9 @@ public class HdfsPartition implements FeFsPartition, 
PrunablePartition {
     public final boolean sdCompressed;
     public final int sdNumBuckets;
     public final org.apache.hadoop.hive.metastore.api.SerDeInfo sdSerdeInfo;
-    public final List<String> sdBucketCols;
-    public final List<org.apache.hadoop.hive.metastore.api.Order> sdSortCols;
-    public final Map<String, String> sdParameters;
+    public final ImmutableList<String> sdBucketCols;
+    public final ImmutableList<org.apache.hadoop.hive.metastore.api.Order> 
sdSortCols;
+    public final ImmutableMap<String, String> sdParameters;
     public final int msCreateTime;
     public final int msLastAccessTime;
 
@@ -552,6 +556,19 @@ public class HdfsPartition implements FeFsPartition, 
PrunablePartition {
         sdParameters = ImmutableMap.of();
       }
     }
+
+    private CachedHmsPartitionDescriptor(CachedHmsPartitionDescriptor other) {
+      sdInputFormat = other.sdInputFormat;
+      sdOutputFormat = other.sdOutputFormat;
+      sdCompressed = other.sdCompressed;
+      sdNumBuckets = other.sdNumBuckets;
+      sdSerdeInfo = new 
org.apache.hadoop.hive.metastore.api.SerDeInfo(other.sdSerdeInfo);
+      sdBucketCols = other.sdBucketCols;
+      sdSortCols = other.sdSortCols;
+      sdParameters = ImmutableMap.copyOf(other.sdParameters);
+      msCreateTime = other.msCreateTime;
+      msLastAccessTime = other.msLastAccessTime;
+    }
   }
 
   private final static Logger LOG = 
LoggerFactory.getLogger(HdfsPartition.class);
@@ -567,14 +584,14 @@ public class HdfsPartition implements FeFsPartition, 
PrunablePartition {
         }
       };
 
+  private final static AtomicLong partitionIdCounter_ = new AtomicLong();
   private final HdfsTable table_;
-  private final List<LiteralExpr> partitionKeyValues_;
+  private final ImmutableList<LiteralExpr> partitionKeyValues_;
   // estimated number of rows in partition; -1: unknown
-  private long numRows_ = -1;
-  private static AtomicLong partitionIdCounter_ = new AtomicLong();
+  private final long numRows_;
 
-  // A unique ID for each partition, used to identify a partition in the thrift
-  // representation of a table.
+  // A unique ID across the whole catalog for each partition, used to identify 
a partition
+  // in the thrift representation of a table.
   private final long id_;
 
   /*
@@ -583,7 +600,7 @@ public class HdfsPartition implements FeFsPartition, 
PrunablePartition {
    * we. We should therefore treat mixing formats inside one partition as user 
error.
    * It's easy to add per-file metadata to FileDescriptor if this changes.
    */
-  private HdfsStorageDescriptor fileFormatDescriptor_;
+  private final HdfsStorageDescriptor fileFormatDescriptor_;
 
   /**
    * The file descriptors of this partition, encoded as flatbuffers. Storing 
the raw
@@ -598,76 +615,55 @@ public class HdfsPartition implements FeFsPartition, 
PrunablePartition {
    *    - 4-byte padding (objects are word-aligned)
    */
   @Nonnull
-  private ImmutableList<byte[]> encodedFileDescriptors_;
-  private HdfsPartitionLocationCompressor.Location location_;
-  private boolean isDirty_;
+  private final ImmutableList<byte[]> encodedFileDescriptors_;
+  private final HdfsPartitionLocationCompressor.Location location_;
   // True if this partition is marked as cached. Does not necessarily mean the 
data is
   // cached.
-  private boolean isMarkedCached_;
+  private final boolean isMarkedCached_;
   private final TAccessLevel accessLevel_;
 
   // (k,v) pairs of parameters for this partition, stored in the HMS.
-  private Map<String, String> hmsParameters_;
+  private final ImmutableMap<String, String> hmsParameters_;
+  private final CachedHmsPartitionDescriptor cachedMsPartitionDescriptor_;
 
   // Binary representation of the TPartitionStats for this partition. Populated
-  // when the partition is loaded and updated using setPartitionStatsBytes().
-  private byte[] partitionStats_;
+  // when the partition is being built in Builder#setPartitionStatsBytes().
+  private final byte[] partitionStats_;
 
   // True if partitionStats_ has intermediate_col_stats populated.
-  private boolean hasIncrementalStats_ ;
+  private final boolean hasIncrementalStats_ ;
 
   // The last committed write ID which modified this partition.
   // -1 means writeId_ is irrelevant(not supported).
-  private long writeId_ = -1L;
+  private final long writeId_;
 
-  private final InFlightEvents inFlightEvents_ = new InFlightEvents(20);
+  // The in-flight events of this partition tracked in catalogd. It's still 
mutable since
+  // it's not used in coordinators.
+  private final InFlightEvents inFlightEvents_;
 
-  private HdfsPartition(HdfsTable table,
-      org.apache.hadoop.hive.metastore.api.Partition msPartition,
-      List<LiteralExpr> partitionKeyValues,
+  private HdfsPartition(HdfsTable table, long id, List<LiteralExpr> 
partitionKeyValues,
       HdfsStorageDescriptor fileFormatDescriptor,
-      List<HdfsPartition.FileDescriptor> fileDescriptors, long id,
-      HdfsPartitionLocationCompressor.Location location, TAccessLevel 
accessLevel) {
+      @Nonnull ImmutableList<byte[]> encodedFileDescriptors,
+      HdfsPartitionLocationCompressor.Location location,
+      boolean isMarkedCached, TAccessLevel accessLevel, Map<String, String> 
hmsParameters,
+      CachedHmsPartitionDescriptor cachedMsPartitionDescriptor,
+      byte[] partitionStats, boolean hasIncrementalStats, long numRows, long 
writeId,
+      InFlightEvents inFlightEvents) {
     table_ = table;
-    if (msPartition == null) {
-      cachedMsPartitionDescriptor_ = null;
-    } else {
-      cachedMsPartitionDescriptor_ = new 
CachedHmsPartitionDescriptor(msPartition);
-    }
-    location_ = location;
+    id_ = id;
     partitionKeyValues_ = ImmutableList.copyOf(partitionKeyValues);
-    setFileDescriptors(fileDescriptors);
     fileFormatDescriptor_ = fileFormatDescriptor;
-    id_ = id;
+    encodedFileDescriptors_ = encodedFileDescriptors;
+    location_ = location;
+    isMarkedCached_ = isMarkedCached;
     accessLevel_ = accessLevel;
-    if (msPartition != null && msPartition.getParameters() != null) {
-      isMarkedCached_ = HdfsCachingUtil.getCacheDirectiveId(
-          msPartition.getParameters()) != null;
-      hmsParameters_ = msPartition.getParameters();
-    } else {
-      hmsParameters_ = new HashMap<>();
-    }
-    addInflightVersionsFromParameters();
-    extractAndCompressPartStats();
-    // Intern parameters after removing the incremental stats
-    hmsParameters_ = CatalogInterners.internParameters(hmsParameters_);
-    if (MetastoreShim.getMajorVersion() > 2 && msPartition != null) {
-      writeId_ = MetastoreShim.getWriteIdFromMSPartition(msPartition);
-    }
-  }
-
-  public HdfsPartition(HdfsTable table,
-      org.apache.hadoop.hive.metastore.api.Partition msPartition,
-      List<LiteralExpr> partitionKeyValues,
-      HdfsStorageDescriptor fileFormatDescriptor,
-      List<HdfsPartition.FileDescriptor> fileDescriptors,
-      TAccessLevel accessLevel) {
-    this(table, msPartition, partitionKeyValues, fileFormatDescriptor, 
fileDescriptors,
-        partitionIdCounter_.getAndIncrement(),
-        table.getPartitionLocationCompressor().new Location(msPartition != null
-                ? msPartition.getSd().getLocation()
-                : table.getLocation()),
-        accessLevel);
+    hmsParameters_ = ImmutableMap.copyOf(hmsParameters);
+    cachedMsPartitionDescriptor_ = cachedMsPartitionDescriptor;
+    partitionStats_ = partitionStats;
+    hasIncrementalStats_ = hasIncrementalStats;
+    numRows_ = numRows;
+    writeId_ = writeId;
+    inFlightEvents_ = inFlightEvents;
   }
 
   @Override // FeFsPartition
@@ -748,40 +744,16 @@ public class HdfsPartition implements FeFsPartition, 
PrunablePartition {
     return 
FileSystemUtil.FsType.getFsType(getLocationPath().toUri().getScheme());
   }
 
-  public void setNumRows(long numRows) { numRows_ = numRows; }
   @Override // FeFsPartition
   public long getNumRows() { return numRows_; }
   @Override
   public boolean isMarkedCached() { return isMarkedCached_; }
-  void markCached() { isMarkedCached_ = true; }
-  private CachedHmsPartitionDescriptor cachedMsPartitionDescriptor_;
-
-  /**
-   * Updates the file format of this partition and sets the corresponding 
input/output
-   * format classes.
-   */
-  public void setFileFormat(HdfsFileFormat fileFormat) {
-    fileFormatDescriptor_ = fileFormatDescriptor_.cloneWithChangedFileFormat(
-        fileFormat);
-    cachedMsPartitionDescriptor_.sdInputFormat = fileFormat.inputFormat();
-    cachedMsPartitionDescriptor_.sdOutputFormat = fileFormat.outputFormat();
-    cachedMsPartitionDescriptor_.sdSerdeInfo.setSerializationLib(
-        fileFormatDescriptor_.getFileFormat().serializationLib());
-  }
 
   @Override // FeFsPartition
   public HdfsFileFormat getFileFormat() {
     return fileFormatDescriptor_.getFileFormat();
   }
 
-  public void setLocation(String place) {
-    location_ = table_.getPartitionLocationCompressor().new Location(place);
-  }
-
-  public org.apache.hadoop.hive.metastore.api.SerDeInfo getSerdeInfo() {
-    return cachedMsPartitionDescriptor_.sdSerdeInfo;
-  }
-
   @Override // FeFsPartition
   public TPartitionStats getPartitionStats() {
     return PartitionStatsUtil.getPartStatsOrWarn(this);
@@ -792,38 +764,6 @@ public class HdfsPartition implements FeFsPartition, 
PrunablePartition {
     return partitionStats_;
   }
 
-  public void setPartitionStatsBytes(byte[] partitionStats, boolean 
hasIncrStats) {
-    if (hasIncrStats) Preconditions.checkNotNull(partitionStats);
-    partitionStats_ = partitionStats;
-    hasIncrementalStats_ = hasIncrStats;
-  }
-
-  /**
-   * Helper method that removes the partition stats from hmsParameters_, 
compresses them
-   * and updates partitionsStats_.
-   */
-  private void extractAndCompressPartStats() {
-    try {
-      // Convert the stats stored in the hmsParams map to a deflate-compressed 
in-memory
-      // byte array format. After conversion, delete the entries in the 
hmsParams map
-      // as they are not needed anymore.
-      Reference<Boolean> hasIncrStats = new Reference<Boolean>(false);
-      byte[] partitionStats =
-          PartitionStatsUtil.partStatsBytesFromParameters(hmsParameters_, 
hasIncrStats);
-      setPartitionStatsBytes(partitionStats, hasIncrStats.getRef());
-    } catch (ImpalaException e) {
-      LOG.warn(String.format("Failed to set partition stats for table %s 
partition %s",
-          getTable().getFullName(), getPartitionName()), e);
-    } finally {
-      // Delete the incremental stats entries. Cleared even on error 
conditions so that
-      // we do not persist the corrupt entries in the hmsParameters_ map when 
it is
-      // flushed to the HMS.
-      Maps.filterKeys(hmsParameters_, IS_INCREMENTAL_STATS_KEY).clear();
-    }
-  }
-
-  public void dropPartitionStats() { setPartitionStatsBytes(null, false); }
-
   @Override // FeFsPartition
   public boolean hasIncrementalStats() { return hasIncrementalStats_; }
 
@@ -841,15 +781,6 @@ public class HdfsPartition implements FeFsPartition, 
PrunablePartition {
     return hmsParameters_;
   }
 
-  public void putToParameters(String k, String v) {
-    Preconditions.checkArgument(!IS_INCREMENTAL_STATS_KEY.apply(k));
-    hmsParameters_.put(k, v);
-  }
-
-  public void putToParameters(Pair<String, String> kv) {
-    putToParameters(kv.first, kv.second);
-  }
-
   /**
    * Removes a given version from the in-flight events
    * @param isInsertEvent If true, remove eventId from list of eventIds for 
in-flight
@@ -886,33 +817,6 @@ public class HdfsPartition implements FeFsPartition, 
PrunablePartition {
     }
   }
 
-  /**
-   * Adds the version from the given Partition parameters. No-op if the 
parameters does
-   * not contain the <code>MetastoreEventPropertyKey.CATALOG_VERSION</code>. 
This is
-   * done to detect add partition events from this catalog which are generated 
when
-   * partitions are added or recovered.
-   */
-  private void addInflightVersionsFromParameters() {
-    Preconditions.checkNotNull(hmsParameters_);
-    Preconditions.checkState(inFlightEvents_.size(false) == 0);
-    // we should not check for table lock being held here since there are 
certain code
-    // paths which call this method without holding the table lock (eg. 
getOrLoadTable())
-    if 
(!hmsParameters_.containsKey(MetastoreEventPropertyKey.CATALOG_VERSION.getKey()))
 {
-      return;
-    }
-    inFlightEvents_.add(false,
-        Long.parseLong(
-            
hmsParameters_.get(MetastoreEventPropertyKey.CATALOG_VERSION.getKey())));
-  }
-
-  /**
-   * Marks this partition's metadata as "dirty" indicating that changes have 
been
-   * made and this partition's metadata should not be reused during the next
-   * incremental metadata refresh.
-   */
-  public void markDirty() { isDirty_ = true; }
-  public boolean isDirty() { return isDirty_; }
-
   @Override // FeFsPartition
   public List<LiteralExpr> getPartitionValues() { return partitionKeyValues_; }
   @Override // FeFsPartition
@@ -938,13 +842,6 @@ public class HdfsPartition implements FeFsPartition, 
PrunablePartition {
     return fileNames;
   }
 
-  public void setFileDescriptors(List<FileDescriptor> descriptors) {
-    // Store an eagerly transformed-and-copied list so that we drop the memory 
usage
-    // of the flatbuffer wrapper.
-    encodedFileDescriptors_ = ImmutableList.copyOf(Lists.transform(
-        descriptors, FileDescriptor.TO_BYTES));
-  }
-
   @Override // FeFsPartition
   public int getNumFileDescriptors() {
     return encodedFileDescriptors_.size();
@@ -995,12 +892,9 @@ public class HdfsPartition implements FeFsPartition, 
PrunablePartition {
 
   public static HdfsPartition prototypePartition(
       HdfsTable table, HdfsStorageDescriptor storageDescriptor) {
-    List<LiteralExpr> emptyExprList = new ArrayList<>();
-    List<FileDescriptor> emptyFileDescriptorList = new ArrayList<>();
-    return new HdfsPartition(table, null, emptyExprList,
-        storageDescriptor, emptyFileDescriptorList,
-        CatalogObjectsConstants.PROTOTYPE_PARTITION_ID, null,
-        TAccessLevel.READ_WRITE);
+    return new Builder(table, CatalogObjectsConstants.PROTOTYPE_PARTITION_ID)
+        .setFileFormatDescriptor(storageDescriptor)
+        .build();
   }
 
   @Override
@@ -1019,87 +913,354 @@ public class HdfsPartition implements FeFsPartition, 
PrunablePartition {
       .toString();
   }
 
-  public static HdfsPartition fromThrift(HdfsTable table,
-      long id, THdfsPartition thriftPartition) {
-    HdfsStorageDescriptor storageDesc = 
HdfsStorageDescriptor.fromThriftPartition(
-        thriftPartition, table.getName());
+  public static class Builder {
+    // For the meaning of these fields, see field comments of HdfsPartition.
+    private HdfsTable table_;
+    private long id_;
+    private List<LiteralExpr> partitionKeyValues_;
+    private HdfsStorageDescriptor fileFormatDescriptor_ = null;
+    private ImmutableList<byte[]> encodedFileDescriptors_;
+    private HdfsPartitionLocationCompressor.Location location_ = null;
+    private boolean isMarkedCached_ = false;
+    private TAccessLevel accessLevel_ = TAccessLevel.READ_WRITE;
+    private Map<String, String> hmsParameters_;
+    private CachedHmsPartitionDescriptor cachedMsPartitionDescriptor_;
+    private byte[] partitionStats_ = null;
+    private boolean hasIncrementalStats_ = false;
+    private long numRows_ = -1;
+    private long writeId_ = -1L;
+    private InFlightEvents inFlightEvents_ = new InFlightEvents(20);
+
+    @Nullable
+    private HdfsPartition oldInstance_ = null;
+
+    public Builder(HdfsTable table, long id) {
+      Preconditions.checkNotNull(table);
+      table_ = table;
+      id_ = id;
+    }
+
+    public Builder(HdfsTable table) {
+      this(table, partitionIdCounter_.getAndIncrement());
+    }
 
-    List<LiteralExpr> literalExpr = new ArrayList<>();
-    if (id != CatalogObjectsConstants.PROTOTYPE_PARTITION_ID) {
-      List<Column> clusterCols = new ArrayList<>();
-      for (int i = 0; i < table.getNumClusteringCols(); ++i) {
-        clusterCols.add(table.getColumns().get(i));
+    public Builder(HdfsPartition partition) {
+      this(partition.table_);
+      oldInstance_ = partition;
+      partitionKeyValues_ = partition.partitionKeyValues_;
+      fileFormatDescriptor_ = partition.fileFormatDescriptor_;
+      encodedFileDescriptors_ = partition.encodedFileDescriptors_;
+      location_ = partition.location_;
+      isMarkedCached_ = partition.isMarkedCached_;
+      accessLevel_ = partition.accessLevel_;
+      hmsParameters_ = Maps.newHashMap(partition.hmsParameters_);
+      partitionStats_ = partition.partitionStats_;
+      hasIncrementalStats_ = partition.hasIncrementalStats_;
+      numRows_ = partition.numRows_;
+      writeId_ = partition.writeId_;
+      if (partition.cachedMsPartitionDescriptor_ != null) {
+        cachedMsPartitionDescriptor_ = new CachedHmsPartitionDescriptor(
+            partition.cachedMsPartitionDescriptor_);
       }
+      // Take over the in-flight events
+      inFlightEvents_ = partition.inFlightEvents_;
+    }
 
-      List<TExprNode> exprNodes = new ArrayList<>();
-      for (TExpr expr: thriftPartition.getPartitionKeyExprs()) {
-        for (TExprNode node: expr.getNodes()) {
-          exprNodes.add(node);
-        }
+    public HdfsPartition build() {
+      if (partitionKeyValues_ == null) partitionKeyValues_ = 
Collections.emptyList();
+      if (encodedFileDescriptors_ == null) 
setFileDescriptors(Collections.emptyList());
+      if (hmsParameters_ == null) hmsParameters_ = Collections.emptyMap();
+      if (location_ == null) {
+        // Only prototype partitions can have null locations.
+        Preconditions.checkState(id_ == 
CatalogObjectsConstants.PROTOTYPE_PARTITION_ID);
       }
-      Preconditions.checkState(clusterCols.size() == exprNodes.size(),
-          String.format("Number of partition columns (%d) does not match 
number " +
-              "of partition key expressions (%d)",
-              clusterCols.size(), exprNodes.size()));
-
-      for (int i = 0; i < exprNodes.size(); ++i) {
-        literalExpr.add(LiteralExpr.fromThrift(
-            exprNodes.get(i), clusterCols.get(i).getType()));
+      return new HdfsPartition(table_, id_, partitionKeyValues_, 
fileFormatDescriptor_,
+          encodedFileDescriptors_, location_, isMarkedCached_, accessLevel_,
+          hmsParameters_, cachedMsPartitionDescriptor_, partitionStats_,
+          hasIncrementalStats_, numRows_, writeId_, inFlightEvents_);
+    }
+
+    public Builder setMsPartition(
+        org.apache.hadoop.hive.metastore.api.Partition msPartition)
+        throws CatalogException {
+      if (msPartition == null) {
+        setLocation(table_.getLocation());
+        cachedMsPartitionDescriptor_ = null;
+        hmsParameters_ = Collections.emptyMap();
+        partitionKeyValues_ = Collections.emptyList();
+        return this;
       }
+      setPartitionKeyValues(
+          FeCatalogUtils.parsePartitionKeyValues(table_, 
msPartition.getValues()));
+      setLocation(msPartition.getSd().getLocation());
+      cachedMsPartitionDescriptor_ = new 
CachedHmsPartitionDescriptor(msPartition);
+      if (msPartition.getParameters() != null) {
+        isMarkedCached_ = HdfsCachingUtil.getCacheDirectiveId(
+            msPartition.getParameters()) != null;
+        numRows_ = FeCatalogUtils.getRowCount(msPartition.getParameters());
+        hmsParameters_ = msPartition.getParameters();
+        extractAndCompressPartStats();
+        // Intern parameters after removing the incremental stats
+        hmsParameters_ = CatalogInterners.internParameters(hmsParameters_);
+      }
+      if (MetastoreShim.getMajorVersion() > 2) {
+        writeId_ = MetastoreShim.getWriteIdFromMSPartition(msPartition);
+      }
+      // If we have taken over the in-flight events from an old partition 
instance, don't
+      // overwrite the in-flight event list.
+      if (oldInstance_ == null) addInflightVersionsFromParameters();
+      return this;
     }
 
-    List<HdfsPartition.FileDescriptor> fileDescriptors = new ArrayList<>();
-    if (thriftPartition.isSetFile_desc()) {
-      for (THdfsFileDesc desc: thriftPartition.getFile_desc()) {
-        fileDescriptors.add(HdfsPartition.FileDescriptor.fromThrift(desc));
+    /**
+     * Helper method that removes the partition stats from hmsParameters_, 
compresses them
+     * and updates partitionsStats_.
+     */
+    private void extractAndCompressPartStats() {
+      try {
+        // Convert the stats stored in the hmsParams map to a 
deflate-compressed in-memory
+        // byte array format. After conversion, delete the entries in the 
hmsParams map
+        // as they are not needed anymore.
+        Reference<Boolean> hasIncrStats = new Reference<Boolean>(false);
+        byte[] partitionStats =
+            PartitionStatsUtil.partStatsBytesFromParameters(hmsParameters_, 
hasIncrStats);
+        setPartitionStatsBytes(partitionStats, hasIncrStats.getRef());
+      } catch (ImpalaException e) {
+        LOG.warn(String.format("Failed to set partition stats for table %s 
partition %s",
+            getTable().getFullName(), getPartitionName()), e);
+      } finally {
+        // Delete the incremental stats entries. Cleared even on error 
conditions so that
+        // we do not persist the corrupt entries in the hmsParameters_ map 
when it is
+        // flushed to the HMS.
+        Maps.filterKeys(hmsParameters_, IS_INCREMENTAL_STATS_KEY).clear();
       }
     }
 
-    TAccessLevel accessLevel = thriftPartition.isSetAccess_level() ?
-        thriftPartition.getAccess_level() : TAccessLevel.READ_WRITE;
-    HdfsPartitionLocationCompressor.Location location = 
thriftPartition.isSetLocation()
-        ? table.getPartitionLocationCompressor().new Location(
-              thriftPartition.getLocation())
-        : null;
-    HdfsPartition partition = new HdfsPartition(table, null, literalExpr, 
storageDesc,
-        fileDescriptors, id, location, accessLevel);
-    if (thriftPartition.isSetStats()) {
-      partition.setNumRows(thriftPartition.getStats().getNum_rows());
+    /**
+     * Updates the file format of this partition and sets the corresponding 
input/output
+     * format classes.
+     */
+    public Builder setFileFormat(HdfsFileFormat fileFormat) {
+      Preconditions.checkNotNull(fileFormatDescriptor_);
+      Preconditions.checkNotNull(cachedMsPartitionDescriptor_);
+      fileFormatDescriptor_ = fileFormatDescriptor_.cloneWithChangedFileFormat(
+          fileFormat);
+      cachedMsPartitionDescriptor_.sdInputFormat = fileFormat.inputFormat();
+      cachedMsPartitionDescriptor_.sdOutputFormat = fileFormat.outputFormat();
+      cachedMsPartitionDescriptor_.sdSerdeInfo.setSerializationLib(
+          fileFormatDescriptor_.getFileFormat().serializationLib());
+      return this;
+    }
+
+    public void putToParameters(Pair<String, String> kv) {
+      putToParameters(kv.first, kv.second);
     }
-    if (thriftPartition.isSetIs_marked_cached()) {
-      partition.isMarkedCached_ = thriftPartition.isIs_marked_cached();
+    public void putToParameters(String k, String v) {
+      Preconditions.checkArgument(!IS_INCREMENTAL_STATS_KEY.apply(k));
+      Preconditions.checkNotNull(hmsParameters_);
+      hmsParameters_.put(k, v);
     }
+    public Map<String, String> getParameters() { return hmsParameters_; }
 
-    if (thriftPartition.isSetHms_parameters()) {
-      partition.hmsParameters_ = CatalogInterners.internParameters(
-          thriftPartition.getHms_parameters());
-    } else {
-      partition.hmsParameters_ = new HashMap<>();
+    public org.apache.hadoop.hive.metastore.api.SerDeInfo getSerdeInfo() {
+      Preconditions.checkNotNull(cachedMsPartitionDescriptor_);
+      return cachedMsPartitionDescriptor_.sdSerdeInfo;
     }
 
-    partition.hasIncrementalStats_ = thriftPartition.has_incremental_stats;
-    if (thriftPartition.isSetPartition_stats()) {
-      partition.partitionStats_ = thriftPartition.getPartition_stats();
+    public Builder setFileFormatDescriptor(HdfsStorageDescriptor 
fileFormatDescriptor) {
+      fileFormatDescriptor_ = fileFormatDescriptor;
+      return this;
     }
 
-    partition.writeId_ = thriftPartition.isSetWrite_id() ?
-        thriftPartition.getWrite_id() : -1L;
+    public Builder setPartitionKeyValues(List<LiteralExpr> partitionKeyValues) 
{
+      partitionKeyValues_ = partitionKeyValues;
+      return this;
+    }
 
-    return partition;
-  }
+    public Builder setAccessLevel(TAccessLevel accessLevel) {
+      accessLevel_ = accessLevel;
+      return this;
+    }
 
-  /**
-   * Checks that this partition's metadata is well formed. This does not 
necessarily
-   * mean the partition is supported by Impala.
-   * Throws a CatalogException if there are any errors in the partition 
metadata.
-   */
-  public void checkWellFormed() throws CatalogException {
-    try {
-      // Validate all the partition key/values to ensure you can convert them 
toThrift()
-      Expr.treesToThrift(getPartitionValues());
-    } catch (Exception e) {
-      throw new CatalogException("Partition (" + getPartitionName() +
-          ") has invalid partition column values: ", e);
+    public Builder setNumRows(long numRows) {
+      numRows_ = numRows;
+      return this;
+    }
+    public Builder dropPartitionStats() { return setPartitionStatsBytes(null, 
false); }
+    public Builder setPartitionStatsBytes(byte[] partitionStats, boolean 
hasIncrStats) {
+      if (hasIncrStats) Preconditions.checkNotNull(partitionStats);
+      partitionStats_ = partitionStats;
+      hasIncrementalStats_ = hasIncrStats;
+      return this;
+    }
+
+    public Builder setLocation(HdfsPartitionLocationCompressor.Location 
location) {
+      location_ = location;
+      return this;
+    }
+    public Builder setLocation(String place) {
+      location_ = table_.getPartitionLocationCompressor().new Location(place);
+      return this;
+    }
+
+    public String getLocation() {
+      return (location_ != null) ? location_.toString() : null;
+    }
+
+    public List<FileDescriptor> getFileDescriptors() {
+      // Set an empty descriptors in case that setFileDescriptors hasn't been 
called.
+      if (encodedFileDescriptors_ == null) setFileDescriptors(new 
ArrayList<>());
+      // Return a lazily transformed list from our internal bytes storage.
+      return Lists.transform(encodedFileDescriptors_, 
FileDescriptor.FROM_BYTES);
+    }
+
+    public Builder setFileDescriptors(List<FileDescriptor> descriptors) {
+      // Store an eagerly transformed-and-copied list so that we drop the 
memory usage
+      // of the flatbuffer wrapper.
+      encodedFileDescriptors_ = ImmutableList.copyOf(Lists.transform(
+          descriptors, FileDescriptor.TO_BYTES));
+      return this;
+    }
+
+    public HdfsFileFormat getFileFormat() {
+      return fileFormatDescriptor_.getFileFormat();
+    }
+
+    public boolean isMarkedCached() { return isMarkedCached_; }
+    public Builder setIsMarkedCached(boolean isCached) {
+      isMarkedCached_ = isCached;
+      return this;
+    }
+
+    public HdfsTable getTable() { return table_; }
+
+    public String getPartitionName() {
+      return FeCatalogUtils.getPartitionName(this);
+    }
+
+    /**
+     * Adds a version number to the in-flight events of this partition
+     * @param isInsertEvent if true, add eventId to list of eventIds for 
in-flight Insert
+     * events if false, add version number to list of versions for in-flight 
DDL events
+     * @param versionNumber when isInsertEvent is true, it's eventId to add
+     *                      when isInsertEvent is false, it's version number 
to add
+     * TODO: merge this with HdfsPartition.addToVersionsForInflightEvents()
+     */
+    public void addToVersionsForInflightEvents(boolean isInsertEvent,
+        long versionNumber) {
+      Preconditions.checkState(table_.getLock().isHeldByCurrentThread(),
+          "addToVersionsForInflightEvents called without holding the table 
lock on "
+              + "partition " + getPartitionName() + " of table " + 
table_.getFullName());
+      if (!inFlightEvents_.add(isInsertEvent, versionNumber)) {
+        LOG.warn("Could not add {} version to the partition {} of table {}. 
This could " +
+                "cause unnecessary refresh of the partition when the event is 
received " +
+                "by the Events processor.",
+            versionNumber, getPartitionName(), getTable().getFullName());
+      }
+    }
+
+    /**
+     * Adds the version from the given Partition parameters. No-op if the 
parameters does
+     * not contain the <code>MetastoreEventPropertyKey.CATALOG_VERSION</code>. 
This is
+     * done to detect add partition events from this catalog which are 
generated when
+     * partitions are added or recovered.
+     */
+    private void addInflightVersionsFromParameters() {
+      Preconditions.checkNotNull(hmsParameters_);
+      Preconditions.checkState(inFlightEvents_.size(false) == 0);
+      // We should not check for table lock being held here since there are 
certain
+      // code paths which call this method without holding the table lock
+      // (e.g. getOrLoadTable())
+      if (!hmsParameters_.containsKey(
+          MetastoreEventPropertyKey.CATALOG_VERSION.getKey())) {
+        return;
+      }
+      inFlightEvents_.add(false, Long.parseLong(
+          
hmsParameters_.get(MetastoreEventPropertyKey.CATALOG_VERSION.getKey())));
+    }
+
+    public List<LiteralExpr> getPartitionValues() {
+      return partitionKeyValues_;
+    }
+
+    public HdfsPartition getOldInstance() { return oldInstance_; }
+    public long getOldId() { return 
Preconditions.checkNotNull(oldInstance_).id_; }
+
+    public org.apache.hadoop.hive.metastore.api.Partition toHmsPartition() {
+      // Build a temp HdfsPartition to create the HmsPartition.
+      return build().toHmsPartition();
+    }
+
+    public Builder fromThrift(THdfsPartition thriftPartition) {
+      fileFormatDescriptor_ = HdfsStorageDescriptor.fromThriftPartition(
+          thriftPartition, table_.getName());
+
+      partitionKeyValues_ = new ArrayList<>();
+      if (id_ != CatalogObjectsConstants.PROTOTYPE_PARTITION_ID) {
+        List<Column> clusterCols = table_.getClusteringColumns();
+        List<TExprNode> exprNodes = new ArrayList<>();
+        for (TExpr expr : thriftPartition.getPartitionKeyExprs()) {
+          exprNodes.addAll(expr.getNodes());
+        }
+        Preconditions.checkState(clusterCols.size() == exprNodes.size(),
+            String.format("Number of partition columns (%d) does not match 
number " +
+                    "of partition key expressions (%d)",
+                clusterCols.size(), exprNodes.size()));
+
+        for (int i = 0; i < exprNodes.size(); ++i) {
+          partitionKeyValues_.add(LiteralExpr.fromThrift(
+              exprNodes.get(i), clusterCols.get(i).getType()));
+        }
+      }
+
+      List<FileDescriptor> fileDescriptors = new ArrayList<>();
+      if (thriftPartition.isSetFile_desc()) {
+        for (THdfsFileDesc desc : thriftPartition.getFile_desc()) {
+          fileDescriptors.add(HdfsPartition.FileDescriptor.fromThrift(desc));
+        }
+      }
+      setFileDescriptors(fileDescriptors);
+
+      accessLevel_ = thriftPartition.isSetAccess_level() ?
+          thriftPartition.getAccess_level() : TAccessLevel.READ_WRITE;
+      location_ = thriftPartition.isSetLocation()
+          ? table_.getPartitionLocationCompressor().new Location(
+          thriftPartition.getLocation())
+          : null;
+      if (thriftPartition.isSetStats()) {
+        numRows_ = thriftPartition.getStats().getNum_rows();
+      }
+      hasIncrementalStats_ = thriftPartition.has_incremental_stats;
+      if (thriftPartition.isSetPartition_stats()) {
+        partitionStats_ = thriftPartition.getPartition_stats();
+      }
+      if (thriftPartition.isSetIs_marked_cached()) {
+        isMarkedCached_ = thriftPartition.isIs_marked_cached();
+      }
+      if (thriftPartition.isSetHms_parameters()) {
+        hmsParameters_ = CatalogInterners.internParameters(
+            thriftPartition.getHms_parameters());
+      } else {
+        hmsParameters_ = new HashMap<>();
+      }
+      writeId_ = thriftPartition.isSetWrite_id() ?
+          thriftPartition.getWrite_id() : -1L;
+      return this;
+    }
+
+    /**
+     * Checks that this partition's metadata is well formed. This does not 
necessarily
+     * mean the partition is supported by Impala.
+     * Throws a CatalogException if there are any errors in the partition 
metadata.
+     */
+    public void checkWellFormed() throws CatalogException {
+      try {
+        // Validate all the partition key/values to ensure you can convert 
them toThrift()
+        Expr.treesToThrift(getPartitionValues());
+      } catch (Exception e) {
+        throw new CatalogException("Partition (" + getPartitionName() +
+            ") has invalid partition column values: ", e);
+      }
     }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 41f2a60..a5b231e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
@@ -108,6 +109,7 @@ import com.google.common.collect.HashMultiset;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.Sets;
 
@@ -259,6 +261,13 @@ public class HdfsTable extends Table implements FeFsTable {
   // null in the case that this table is not transactional.
   protected ValidWriteIdList validWriteIds_ = null;
 
+  // Partitions are marked as "dirty" indicating there are in-progress 
modifications on
+  // their metadata. The corresponding partition builder contains the new 
version of the
+  // metadata so represents the in-progress modifications. The modifications 
will be
+  // finalized in the coming incremental metadata refresh (see 
updatePartitionsFromHms()
+  // for more details). This map is only maintained in the catalogd.
+  private final Map<Long, HdfsPartition.Builder> dirtyPartitions_ = new 
HashMap<>();
+
   // Represents a set of storage-related statistics aggregated at the table or 
partition
   // level.
   public final static class FileMetadataStats {
@@ -440,13 +449,47 @@ public class HdfsTable extends Table implements FeFsTable 
{
   }
 
   /**
-   * Gets the HdfsPartition matching the given partition spec. Returns null if 
no match
-   * was found.
+   * Marks a partition dirty by registering the partition builder for its new 
instance.
+   */
+  public void markDirtyPartition(HdfsPartition.Builder partBuilder) {
+    dirtyPartitions_.put(partBuilder.getOldId(), partBuilder);
+  }
+
+  /**
+   * @return true if whether there are any in-progress modifications on 
metadata of this
+   * partition.
+   */
+  public boolean isDirtyPartition(HdfsPartition partition) {
+    return dirtyPartitions_.containsKey(partition.getId());
+  }
+
+  /**
+   * Pick up the partition builder to continue the in-progress modifications.
+   * The builder is then unregistered so the callers should guarantee that the 
in-progress
+   * modifications are finalized (by calling Builder.build() and use the new 
instance to
+   * replace the old one).
+   * @return the builder for given partition's new instance.
    */
-  public HdfsPartition getPartition(List<PartitionKeyValue> partitionSpec) {
-    return (HdfsPartition)getPartition(this, partitionSpec);
+  public HdfsPartition.Builder pickInprogressPartitionBuilder(HdfsPartition 
partition) {
+    return dirtyPartitions_.remove(partition.getId());
   }
 
+  /**
+   * @return true if any partitions are dirty.
+   */
+  @Override
+  public boolean hasInProgressModification() { return 
!dirtyPartitions_.isEmpty(); }
+
+  /**
+   * Clears all the in-progress modifications by clearing all the partition 
builders.
+   */
+  @Override
+  public void resetInProgressModification() { dirtyPartitions_.clear(); }
+
+  /**
+   * Gets the PrunablePartition matching the given partition spec. Returns 
null if no
+   * match was found.
+   */
   public static PrunablePartition getPartition(FeFsTable table,
       List<PartitionKeyValue> partitionSpec) {
     List<TPartitionKeyValue> partitionKeyValues = new ArrayList<>();
@@ -563,8 +606,7 @@ public class HdfsTable extends Table implements FeFsTable {
    * If there are no partitions in the Hive metadata, a single partition is 
added with no
    * partition keys.
    */
-  private long loadAllPartitions(IMetaStoreClient client,
-      List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions,
+  private long loadAllPartitions(IMetaStoreClient client, List<Partition> 
msPartitions,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws IOException,
       CatalogException {
     Preconditions.checkNotNull(msTbl);
@@ -576,30 +618,30 @@ public class HdfsTable extends Table implements FeFsTable 
{
     Path tblLocation = 
FileSystemUtil.createFullyQualifiedPath(getHdfsBaseDirPath());
     accessLevel_ = getAvailableAccessLevel(getFullName(), tblLocation, 
permCache);
 
+    List<HdfsPartition.Builder> partBuilders = new ArrayList<>();
     if (msTbl.getPartitionKeysSize() == 0) {
       Preconditions.checkArgument(msPartitions == null || 
msPartitions.isEmpty());
       // This table has no partition key, which means it has no declared 
partitions.
       // We model partitions slightly differently to Hive - every file must 
exist in a
       // partition, so add a single partition with no keys which will get all 
the
       // files in the table's root directory.
-      HdfsPartition part = createPartition(msTbl.getSd(), null, permCache);
-      if (isMarkedCached_) part.markCached();
-      addPartition(part);
+      HdfsPartition.Builder partBuilder = createPartitionBuilder(msTbl.getSd(),
+          /*msPartition=*/null, permCache);
+      partBuilder.setIsMarkedCached(isMarkedCached_);
+      setUnpartitionedTableStats(partBuilder);
+      partBuilders.add(partBuilder);
     } else {
-      for (org.apache.hadoop.hive.metastore.api.Partition msPartition: 
msPartitions) {
-        HdfsPartition partition = createPartition(msPartition.getSd(), 
msPartition,
-            permCache);
-        addPartition(partition);
-        // If the partition is null, its HDFS path does not exist, and it was 
not added
-        // to this table's partition list. Skip the partition.
-        if (partition == null) continue;
+      for (Partition msPartition: msPartitions) {
+        partBuilders.add(createPartitionBuilder(
+            msPartition.getSd(), msPartition, permCache));
       }
     }
     // Load the file metadata from scratch.
     Timer.Context fileMetadataLdContext = getMetrics().getTimer(
         HdfsTable.LOAD_DURATION_FILE_METADATA_ALL_PARTITIONS).time();
-    loadFileMetadataForPartitions(client, partitionMap_.values(), 
/*isRefresh=*/false);
+    loadFileMetadataForPartitions(client, partBuilders, /*isRefresh=*/false);
     fileMetadataLdContext.stop();
+    for (HdfsPartition.Builder p : partBuilders) addPartition(p.build());
     return clock.getTick() - startTime;
   }
 
@@ -620,9 +662,11 @@ public class HdfsTable extends Table implements FeFsTable {
    *
    * @param isRefresh whether this is a refresh operation or an initial load. 
This only
    * affects logging.
+   * @return time in nanoseconds spent in loading file metadata.
    */
-  private void loadFileMetadataForPartitions(IMetaStoreClient client,
-      Collection<HdfsPartition> parts, boolean isRefresh) throws 
CatalogException {
+  private long loadFileMetadataForPartitions(IMetaStoreClient client,
+      Collection<HdfsPartition.Builder> partBuilders, boolean isRefresh)
+      throws CatalogException {
     final Clock clock = Clock.defaultClock();
     long startTime = clock.getTick();
 
@@ -635,7 +679,7 @@ public class HdfsTable extends Table implements FeFsTable {
     ValidTxnList validTxnList = validWriteIds_ != null ? loadValidTxns(client) 
: null;
     String logPrefix = String.format(
         "%s file and block metadata for %s paths for table %s",
-        isRefresh ? "Refreshing" : "Loading", parts.size(),
+        isRefresh ? "Refreshing" : "Loading", partBuilders.size(),
         getFullName());
 
     // Actually load the partitions.
@@ -643,7 +687,8 @@ public class HdfsTable extends Table implements FeFsTable {
     // we'll throw an exception here and end up bailing out of whatever 
catalog operation
     // we're in the middle of. This could cause a partial metadata update -- 
eg we may
     // have refreshed the top-level table properties without refreshing the 
files.
-    new ParallelFileMetadataLoader(this, parts, validWriteIds_, validTxnList, 
logPrefix)
+    new ParallelFileMetadataLoader(
+        this, partBuilders, validWriteIds_, validTxnList, logPrefix)
         .loadAndSet();
 
     // TODO(todd): would be good to log a summary of the loading process:
@@ -651,15 +696,18 @@ public class HdfsTable extends Table implements FeFsTable 
{
     // - how many partitions did we read metadata for
     // - etc...
     String partNames = Joiner.on(", ").join(
-        Iterables.limit(Iterables.transform(parts, 
HdfsPartition::getPartitionName), 3));
-    if (parts.size() > 3) {
+        Iterables.limit(
+            Iterables.transform(partBuilders, 
HdfsPartition.Builder::getPartitionName),
+            3));
+    if (partBuilders.size() > 3) {
       partNames += String.format(", and %s others",
-          Iterables.size(parts) - 3);
+          Iterables.size(partBuilders) - 3);
     }
 
     long duration = clock.getTick() - startTime;
     LOG.info("Loaded file and block metadata for {} partitions: {}. Time 
taken: {}",
         getFullName(), partNames, PrintUtils.printTimeNs(duration));
+    return duration;
   }
 
   public FileSystem getFileSystem() throws CatalogException {
@@ -729,49 +777,54 @@ public class HdfsTable extends Table implements FeFsTable 
{
   public List<HdfsPartition> createAndLoadPartitions(IMetaStoreClient client,
       List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions)
       throws CatalogException {
-    List<HdfsPartition> addedParts = new ArrayList<>();
+    List<HdfsPartition.Builder> addedPartBuilders = new ArrayList<>();
     FsPermissionCache permCache = preloadPermissionsCache(msPartitions);
     for (org.apache.hadoop.hive.metastore.api.Partition partition: 
msPartitions) {
-      HdfsPartition hdfsPartition = createPartition(partition.getSd(), 
partition,
-          permCache);
-      Preconditions.checkNotNull(hdfsPartition);
-      addedParts.add(hdfsPartition);
+      HdfsPartition.Builder partBuilder = 
createPartitionBuilder(partition.getSd(),
+          partition, permCache);
+      Preconditions.checkNotNull(partBuilder);
+      addedPartBuilders.add(partBuilder);
     }
-    loadFileMetadataForPartitions(client, addedParts, /* isRefresh = */ false);
-    return addedParts;
+    loadFileMetadataForPartitions(client, addedPartBuilders, 
/*isRefresh=*/false);
+    return addedPartBuilders.stream()
+        .map(HdfsPartition.Builder::build)
+        .collect(Collectors.toList());
   }
 
   /**
-   * Creates a new HdfsPartition from a specified StorageDescriptor and an HMS 
partition
-   * object.
+   * Creates a new HdfsPartition.Builder from a specified StorageDescriptor 
and an HMS
+   * partition object.
    */
-  private HdfsPartition createPartition(StorageDescriptor storageDescriptor,
-      org.apache.hadoop.hive.metastore.api.Partition msPartition,
+  private HdfsPartition.Builder createPartitionBuilder(
+      StorageDescriptor storageDescriptor, Partition msPartition,
       FsPermissionCache permCache) throws CatalogException {
-    HdfsStorageDescriptor fileFormatDescriptor =
-        HdfsStorageDescriptor.fromStorageDescriptor(this.name_, 
storageDescriptor);
-    List<LiteralExpr> keyValues;
-    if (msPartition != null) {
-      keyValues = FeCatalogUtils.parsePartitionKeyValues(this, 
msPartition.getValues());
-    } else {
-      keyValues = Collections.emptyList();
-    }
+    return createOrUpdatePartitionBuilder(
+        storageDescriptor, msPartition, permCache, null);
+  }
+
+  private HdfsPartition.Builder createOrUpdatePartitionBuilder(
+      StorageDescriptor storageDescriptor,
+      org.apache.hadoop.hive.metastore.api.Partition msPartition,
+      FsPermissionCache permCache, HdfsPartition.Builder partBuilder)
+      throws CatalogException {
+    if (partBuilder == null) partBuilder = new HdfsPartition.Builder(this);
+    partBuilder
+        .setMsPartition(msPartition)
+        .setFileFormatDescriptor(
+            HdfsStorageDescriptor.fromStorageDescriptor(this.name_, 
storageDescriptor));
     Path partDirPath = new Path(storageDescriptor.getLocation());
     try {
       if (msPartition != null) {
-        HdfsCachingUtil.validateCacheParams(msPartition.getParameters());
+        // Update the parameters based on validations with hdfs.
+        boolean isCached = HdfsCachingUtil.validateCacheParams(
+            partBuilder.getParameters());
+        partBuilder.setIsMarkedCached(isCached);
       }
       TAccessLevel accessLevel = getAvailableAccessLevel(getFullName(), 
partDirPath,
           permCache);
-      HdfsPartition partition =
-          new HdfsPartition(this, msPartition, keyValues, fileFormatDescriptor,
-          new ArrayList<FileDescriptor>(), accessLevel);
-      partition.checkWellFormed();
-      // Set the partition's #rows.
-      if (msPartition != null && msPartition.getParameters() != null) {
-         
partition.setNumRows(FeCatalogUtils.getRowCount(msPartition.getParameters()));
-      }
-      if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) {
+      partBuilder.setAccessLevel(accessLevel);
+      partBuilder.checkWellFormed();
+      if (!TAccessLevelUtil.impliesWriteAccess(accessLevel)) {
           // TODO: READ_ONLY isn't exactly correct because the it's possible 
the
           // partition does not have READ permissions either. When we start 
checking
           // whether we can READ from a table, this should be updated to set 
the
@@ -780,7 +833,7 @@ public class HdfsTable extends Table implements FeFsTable {
           // WRITE_ONLY the table's access level should be NONE.
           accessLevel_ = TAccessLevel.READ_ONLY;
       }
-      return partition;
+      return partBuilder;
     } catch (IOException e) {
       throw new CatalogException("Error initializing partition", e);
     }
@@ -833,6 +886,21 @@ public class HdfsTable extends Table implements FeFsTable {
     }
   }
 
+  public void updatePartitions(List<HdfsPartition.Builder> partBuilders)
+      throws CatalogException {
+    for (HdfsPartition.Builder p : partBuilders) updatePartition(p);
+  }
+
+  public void updatePartition(HdfsPartition.Builder partBuilder) throws 
CatalogException {
+    HdfsPartition oldPartition = partBuilder.getOldInstance();
+    Preconditions.checkNotNull(oldPartition);
+    Preconditions.checkState(partitionMap_.containsKey(oldPartition.getId()));
+    HdfsPartition newPartition = partBuilder.build();
+    // Partition is reloaded and hence cache directives are not dropped.
+    dropPartition(oldPartition, false);
+    addPartition(newPartition);
+  }
+
   /**
    * Drops the partition having the given partition spec from HdfsTable. 
Cleans up its
    * metadata from all the mappings used to speed up partition pruning/lookup.
@@ -862,13 +930,20 @@ public class HdfsTable extends Table implements FeFsTable 
{
     nameToPartitionMap_.remove(partition.getPartitionName());
     if (removeCacheDirective && partition.isMarkedCached()) {
       try {
-        HdfsCachingUtil.removePartitionCacheDirective(partition);
+        // Partition's parameters map is immutable. Create a temp one for the 
cleanup.
+        HdfsCachingUtil.removePartitionCacheDirective(Maps.newHashMap(
+            partition.getParameters()));
       } catch (ImpalaException e) {
         LOG.error("Unable to remove the cache directive on table " + 
getFullName() +
             ", partition " + partition.getPartitionName() + ": ", e);
       }
     }
-    if (!isStoredInImpaladCatalogCache()) return partition;
+    // dirtyPartitions_ is only maintained in the catalogd. nullPartitionIds_ 
and
+    // partitionValuesMap_ are only maintained in coordinators.
+    if (!isStoredInImpaladCatalogCache()) {
+      dirtyPartitions_.remove(partitionId);
+      return partition;
+    }
     for (int i = 0; i < partition.getPartitionValues().size(); ++i) {
       ColumnStats stats = getColumns().get(i).getStats();
       LiteralExpr literal = partition.getPartitionValues().get(i);
@@ -987,6 +1062,8 @@ public class HdfsTable extends Table implements FeFsTable {
           loadConstraintsInfo(client, msTbl);
         }
         loadValidWriteIdList(client);
+        // Set table-level stats first so partition stats can inherit it.
+        setTableStats(msTbl);
         // Load partition and file metadata
         if (reuseMetadata) {
           // Incrementally update this table's partitions and file metadata
@@ -996,6 +1073,8 @@ public class HdfsTable extends Table implements FeFsTable {
           if (msTbl.getPartitionKeysSize() == 0) {
             if (loadParitionFileMetadata) {
               storageMetadataLoadTime_ += 
updateUnpartitionedTableFileMd(client);
+            } else {  // Update the single partition stats in case table stats 
changes.
+              updateUnpartitionedTableStats();
             }
           } else {
             storageMetadataLoadTime_ += updatePartitionsFromHms(
@@ -1015,9 +1094,10 @@ public class HdfsTable extends Table implements 
FeFsTable {
           allPartitionsLdContext.stop();
         }
         if (loadTableSchema) setAvroSchema(client, msTbl);
-        setTableStats(msTbl);
         fileMetadataStats_.unset();
         refreshLastUsedTime();
+        // Make sure all the partition modifications are done.
+        Preconditions.checkState(dirtyPartitions_.isEmpty());
       } catch (TableLoadingException e) {
         throw e;
       } catch (Exception e) {
@@ -1079,7 +1159,7 @@ public class HdfsTable extends Table implements FeFsTable 
{
    * Returns time spent updating the file metadata in nanoseconds.
    *
    * This is optimized for the case where few files have changed. See
-   * {@link #refreshFileMetadata(Path, List)} above for details.
+   * {@link FileMetadataLoader#load} for details.
    */
   private long updateUnpartitionedTableFileMd(IMetaStoreClient client)
       throws CatalogException {
@@ -1087,26 +1167,36 @@ public class HdfsTable extends Table implements 
FeFsTable {
     if (LOG.isTraceEnabled()) {
       LOG.trace("update unpartitioned table: " + getFullName());
     }
-    final Clock clock = Clock.defaultClock();
-    long startTime = clock.getTick();
     HdfsPartition oldPartition = 
Iterables.getOnlyElement(partitionMap_.values());
-
-    // Instead of updating the existing partition in place, we create a new one
-    // so that we reflect any changes in the msTbl object and also assign a new
-    // ID. This is one step towards eventually implementing IMPALA-7533.
     resetPartitions();
     org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
     Preconditions.checkNotNull(msTbl);
     setPrototypePartition(msTbl.getSd());
-    HdfsPartition part = createPartition(msTbl.getSd(), null, new 
FsPermissionCache());
+    HdfsPartition.Builder partBuilder = createPartitionBuilder(msTbl.getSd(),
+        /*msPartition=*/null, new FsPermissionCache());
     // Copy over the FDs from the old partition to the new one, so that
     // 'refreshPartitionFileMetadata' below can compare modification times and
     // reload the locations only for those that changed.
-    part.setFileDescriptors(oldPartition.getFileDescriptors());
-    addPartition(part);
-    if (isMarkedCached_) part.markCached();
-    loadFileMetadataForPartitions(client, ImmutableList.of(part), 
/*isRefresh=*/true);
-    return clock.getTick() - startTime;
+    partBuilder.setFileDescriptors(oldPartition.getFileDescriptors())
+        .setIsMarkedCached(isMarkedCached_);
+    long fileMdLoadTime = loadFileMetadataForPartitions(client,
+        ImmutableList.of(partBuilder), /*isRefresh=*/true);
+    setUnpartitionedTableStats(partBuilder);
+    addPartition(partBuilder.build());
+    return fileMdLoadTime;
+  }
+
+  /**
+   * Updates the single partition stats of an unpartitioned HdfsTable.
+   */
+  private void updateUnpartitionedTableStats() throws CatalogException {
+    // Just update the single partition if its #rows is stale.
+    HdfsPartition oldPartition = 
Iterables.getOnlyElement(partitionMap_.values());
+    if (oldPartition.getNumRows() != getNumRows()) {
+      HdfsPartition.Builder partBuilder = new 
HdfsPartition.Builder(oldPartition)
+          .setNumRows(getNumRows());
+      updatePartition(partBuilder);
+    }
   }
 
   /**
@@ -1123,6 +1213,7 @@ public class HdfsTable extends Table implements FeFsTable 
{
   private long updatePartitionsFromHms(IMetaStoreClient client,
       Set<String> partitionsToUpdate, boolean loadPartitionFileMetadata)
       throws Exception {
+    long fileMdLoadTime = 0;
     if (LOG.isTraceEnabled()) LOG.trace("Sync table partitions: " + 
getFullName());
     org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
     Preconditions.checkNotNull(msTbl);
@@ -1138,9 +1229,9 @@ public class HdfsTable extends Table implements FeFsTable 
{
     // Names of loaded partitions in this table
     Set<String> partitionNames = new HashSet<>();
     // Partitions for which file metadata must be loaded
-    List<HdfsPartition> partitionsToLoadFiles = Lists.newArrayList();
+    List<HdfsPartition.Builder> partitionsToLoadFiles = new ArrayList<>();
     // Partitions that need to be dropped and recreated from scratch
-    List<HdfsPartition> dirtyPartitions = new ArrayList<>();
+    List<HdfsPartition.Builder> dirtyPartitions = new ArrayList<>();
     // Partitions removed from the Hive Metastore.
     List<HdfsPartition> removedPartitions = new ArrayList<>();
     // Identify dirty partitions that need to be loaded from the Hive 
Metastore and
@@ -1150,30 +1241,27 @@ public class HdfsTable extends Table implements 
FeFsTable {
       // that were removed from HMS using some external process, e.g. Hive.
       if (!msPartitionNames.contains(partition.getPartitionName())) {
         removedPartitions.add(partition);
-      }
-      if (partition.isDirty()) {
-        // Dirty partitions are updated by removing them from table's partition
-        // list and loading them from the Hive Metastore.
-        dirtyPartitions.add(partition);
-      } else {
-        if (partitionsToUpdate == null && loadPartitionFileMetadata) {
-          partitionsToLoadFiles.add(partition);
-        }
+      } else if (isDirtyPartition(partition)) {
+        // The modification of dirty partitions have been started. Pick up the 
in-progress
+        // partition builders to finalize the modification.
+        dirtyPartitions.add(pickInprogressPartitionBuilder(partition));
+      } else if (partitionsToUpdate == null && loadPartitionFileMetadata) {
+        partitionsToLoadFiles.add(new HdfsPartition.Builder(partition));
       }
       Preconditions.checkNotNull(partition.getCachedMsPartitionDescriptor());
       partitionNames.add(partition.getPartitionName());
     }
     dropPartitions(removedPartitions);
-    // dirtyPartitions are reloaded and hence cache directives are not dropped.
-    dropPartitions(dirtyPartitions, false);
-    // Load dirty partitions from Hive Metastore
-    // TODO(todd): the logic around "dirty partitions" is highly suspicious.
-    loadPartitionsFromMetastore(dirtyPartitions, client);
+    // Load dirty partitions from Hive Metastore. File metadata of dirty 
partitions will
+    // always be reloaded (ignore the loadPartitionFileMetadata flag).
+    fileMdLoadTime += loadPartitionsFromMetastore(dirtyPartitions, client);
+    Preconditions.checkState(!hasInProgressModification());
 
     // Identify and load partitions that were added in the Hive Metastore but 
don't
-    // exist in this table.
+    // exist in this table. File metadata of them will be loaded.
     Set<String> newPartitionsInHms = Sets.difference(msPartitionNames, 
partitionNames);
-    loadPartitionsFromMetastore(newPartitionsInHms, client);
+    fileMdLoadTime += loadPartitionsFromMetastore(newPartitionsInHms,
+        /*inprogressPartBuilders=*/null, client);
     // If a list of modified partitions (old and new) is specified, don't 
reload file
     // metadata for the new ones as they have already been detected in HMS and 
have been
     // reloaded by loadPartitionsFromMetastore().
@@ -1184,19 +1272,19 @@ public class HdfsTable extends Table implements 
FeFsTable {
     // Load file metadata. Until we have a notification mechanism for when a
     // file changes in hdfs, it is sometimes required to reload all the file
     // descriptors and block metadata of a table (e.g. REFRESH statement).
-    long fileLoadMdTime = 0;
     if (loadPartitionFileMetadata) {
-      final Clock clock = Clock.defaultClock();
-      long startTime = clock.getTick();
       if (partitionsToUpdate != null) {
         Preconditions.checkState(partitionsToLoadFiles.isEmpty());
         // Only reload file metadata of partitions specified in 
'partitionsToUpdate'
-        partitionsToLoadFiles = getPartitionsForNames(partitionsToUpdate);
+        List<HdfsPartition> parts = getPartitionsForNames(partitionsToUpdate);
+        partitionsToLoadFiles = parts.stream().map(HdfsPartition.Builder::new)
+            .collect(Collectors.toList());
       }
-      loadFileMetadataForPartitions(client, partitionsToLoadFiles, /* 
isRefresh=*/true);
-      fileLoadMdTime = clock.getTick() - startTime;
+      fileMdLoadTime += loadFileMetadataForPartitions(client, 
partitionsToLoadFiles,
+          /* isRefresh=*/true);
+      updatePartitions(partitionsToLoadFiles);
     }
-    return fileLoadMdTime;
+    return fileMdLoadTime;
   }
 
   /**
@@ -1218,18 +1306,11 @@ public class HdfsTable extends Table implements 
FeFsTable {
     return parts;
   }
 
-  @Override
-  public void setTableStats(org.apache.hadoop.hive.metastore.api.Table msTbl) {
-    super.setTableStats(msTbl);
+  private void setUnpartitionedTableStats(HdfsPartition.Builder partBuilder) {
+    Preconditions.checkState(numClusteringCols_ == 0);
     // For unpartitioned tables set the numRows in its single partition
     // to the table's numRows.
-    if (numClusteringCols_ == 0 && !partitionMap_.isEmpty()) {
-      // Unpartitioned tables have a default partition.
-      Preconditions.checkState(partitionMap_.size() == 1);
-      for (HdfsPartition p: partitionMap_.values()) {
-        p.setNumRows(getNumRows());
-      }
-    }
+    partBuilder.setNumRows(getNumRows());
   }
 
   /**
@@ -1318,48 +1399,66 @@ public class HdfsTable extends Table implements 
FeFsTable {
   /**
    * Loads partitions from the Hive Metastore and adds them to the internal 
list of
    * table partitions.
+   * @return time in nanoseconds spent in loading file metadata.
    */
-  private void loadPartitionsFromMetastore(List<HdfsPartition> partitions,
+  private long loadPartitionsFromMetastore(List<HdfsPartition.Builder> 
partitions,
       IMetaStoreClient client) throws Exception {
     Preconditions.checkNotNull(partitions);
-    if (partitions.isEmpty()) return;
+    if (partitions.isEmpty()) return 0;
     if (LOG.isTraceEnabled()) {
       LOG.trace(String.format("Incrementally updating %d/%d partitions.",
           partitions.size(), partitionMap_.size()));
     }
-    Set<String> partitionNames = new HashSet<>();
-    for (HdfsPartition part: partitions) {
-      partitionNames.add(part.getPartitionName());
+    Map<String, HdfsPartition.Builder> partBuilders = Maps.newHashMap();
+    for (HdfsPartition.Builder part: partitions) {
+      partBuilders.put(part.getPartitionName(), part);
     }
-    loadPartitionsFromMetastore(partitionNames, client);
+    return loadPartitionsFromMetastore(partBuilders.keySet(), partBuilders, 
client);
   }
 
   /**
-   * Loads from the Hive Metastore the partitions that correspond to the 
specified
-   * 'partitionNames' and adds them to the internal list of table partitions.
+   * Loads from the Hive Metastore and file system the partitions that 
correspond to
+   * the specified 'partitionNames' and adds/updates them to the internal list 
of table
+   * partitions.
+   * If 'inprogressPartBuilders' is null, new partitions will be created.
+   * If 'inprogressPartBuilders' is not null, take over the in-progress 
modifications
+   * and finalized them by updating the existing partitions.
+   * @return time in nanoseconds spent in loading file metadata.
    */
-  private void loadPartitionsFromMetastore(Set<String> partitionNames,
-      IMetaStoreClient client) throws Exception {
+  private long loadPartitionsFromMetastore(Set<String> partitionNames,
+      Map<String, HdfsPartition.Builder> inprogressPartBuilders, 
IMetaStoreClient client)
+      throws Exception {
     Preconditions.checkNotNull(partitionNames);
-    if (partitionNames.isEmpty()) return;
+    if (partitionNames.isEmpty()) return 0;
     // Load partition metadata from Hive Metastore.
-    List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions =
-        new ArrayList<>();
-    msPartitions.addAll(MetaStoreUtil.fetchPartitionsByName(client,
-        Lists.newArrayList(partitionNames), db_.getName(), name_));
+    List<Partition> msPartitions = new ArrayList<>(
+        MetaStoreUtil.fetchPartitionsByName(
+            client, Lists.newArrayList(partitionNames), db_.getName(), name_));
 
     FsPermissionCache permCache = preloadPermissionsCache(msPartitions);
-    List<HdfsPartition> partitions = new ArrayList<>(msPartitions.size());
+    List<HdfsPartition.Builder> partBuilders = new 
ArrayList<>(msPartitions.size());
     for (org.apache.hadoop.hive.metastore.api.Partition msPartition: 
msPartitions) {
-      HdfsPartition partition = createPartition(msPartition.getSd(), 
msPartition,
-          permCache);
-      // If the partition is null, its HDFS path does not exist, and it was 
not added to
-      // this table's partition list. Skip the partition.
-      if (partition == null) continue;
-      partitions.add(partition);
+      String partName = FeCatalogUtils.getPartitionName(this, 
msPartition.getValues());
+      HdfsPartition.Builder partBuilder = null;
+      if (inprogressPartBuilders != null) {
+        // If we have a in-progress partition modification, update the 
partition builder.
+        partBuilder = inprogressPartBuilders.get(partName);
+        Preconditions.checkNotNull(partBuilder);
+      }
+      partBuilder = createOrUpdatePartitionBuilder(
+          msPartition.getSd(), msPartition, permCache, partBuilder);
+      partBuilders.add(partBuilder);
+    }
+    long fileMdLoadTime = loadFileMetadataForPartitions(client, partBuilders,
+        /* isRefresh=*/false);
+    for (HdfsPartition.Builder p : partBuilders) {
+      if (inprogressPartBuilders == null) {
+        addPartition(p.build());
+      } else {
+        updatePartition(p);
+      }
     }
-    loadFileMetadataForPartitions(client, partitions, /* isRefresh=*/false);
-    for (HdfsPartition partition : partitions) addPartition(partition);
+    return fileMdLoadTime;
   }
 
   /**
@@ -1445,13 +1544,14 @@ public class HdfsTable extends Table implements 
FeFsTable {
     resetPartitions();
     try {
       for (Map.Entry<Long, THdfsPartition> part: 
hdfsTable.getPartitions().entrySet()) {
-        HdfsPartition hdfsPart =
-            HdfsPartition.fromThrift(this, part.getKey(), part.getValue());
-        addPartition(hdfsPart);
+        addPartition(new HdfsPartition.Builder(this, part.getKey())
+            .fromThrift(part.getValue())
+            .build());
       }
-      prototypePartition_ = HdfsPartition.fromThrift(this,
-          CatalogObjectsConstants.PROTOTYPE_PARTITION_ID,
-          hdfsTable.prototype_partition);
+      prototypePartition_ =
+          new HdfsPartition.Builder(this, 
CatalogObjectsConstants.PROTOTYPE_PARTITION_ID)
+              .fromThrift(hdfsTable.prototype_partition)
+              .build();
     } catch (CatalogException e) {
       throw new TableLoadingException(e.getMessage());
     }
@@ -1986,20 +2086,18 @@ public class HdfsTable extends Table implements 
FeFsTable {
    */
   public void reloadPartition(IMetaStoreClient client, HdfsPartition 
oldPartition,
       Partition hmsPartition) throws CatalogException {
-    // Instead of updating the existing partition in place, we create a new one
-    // so that we reflect any changes in the hmsPartition object and also 
assign a new
-    // ID. This is one step towards eventually implementing IMPALA-7533.
-    HdfsPartition refreshedPartition = createPartition(
+    HdfsPartition.Builder partBuilder = createPartitionBuilder(
         hmsPartition.getSd(), hmsPartition, new FsPermissionCache());
     Preconditions.checkArgument(oldPartition == null
-        || HdfsPartition.KV_COMPARATOR.compare(oldPartition, 
refreshedPartition) == 0);
+        || HdfsPartition.comparePartitionKeyValues(
+            oldPartition.getPartitionValues(), 
partBuilder.getPartitionValues()) == 0);
     if (oldPartition != null) {
-      refreshedPartition.setFileDescriptors(oldPartition.getFileDescriptors());
+      partBuilder.setFileDescriptors(oldPartition.getFileDescriptors());
     }
-    loadFileMetadataForPartitions(client, ImmutableList.of(refreshedPartition),
+    loadFileMetadataForPartitions(client, ImmutableList.of(partBuilder),
         /*isRefresh=*/true);
     dropPartition(oldPartition, false);
-    addPartition(refreshedPartition);
+    addPartition(partBuilder.build());
   }
 
   /**
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java 
b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
index c41b180..5381f2e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
@@ -70,10 +70,11 @@ public class ParallelFileMetadataLoader {
 
   private final String logPrefix_;
   private final Map<Path, FileMetadataLoader> loaders_;
-  private final Map<Path, List<HdfsPartition>> partsByPath_;
+  private final Map<Path, List<HdfsPartition.Builder>> partsByPath_;
   private final FileSystem fs_;
 
-  public ParallelFileMetadataLoader(HdfsTable table, Collection<HdfsPartition> 
parts,
+  public ParallelFileMetadataLoader(HdfsTable table,
+      Collection<HdfsPartition.Builder> partBuilders,
       ValidWriteIdList writeIdList, ValidTxnList validTxnList, String 
logPrefix)
       throws CatalogException {
     if (writeIdList != null || validTxnList != null) {
@@ -84,14 +85,14 @@ public class ParallelFileMetadataLoader {
     // Group the partitions by their path (multiple partitions may point to 
the same
     // path).
     partsByPath_ = Maps.newHashMap();
-    for (HdfsPartition p : parts) {
+    for (HdfsPartition.Builder p : partBuilders) {
       Path partPath = FileSystemUtil.createFullyQualifiedPath(new 
Path(p.getLocation()));
       partsByPath_.computeIfAbsent(partPath, (path) -> new ArrayList<>())
           .add(p);
     }
     // Create a FileMetadataLoader for each path.
     loaders_ = Maps.newHashMap();
-    for (Map.Entry<Path, List<HdfsPartition>> e : partsByPath_.entrySet()) {
+    for (Map.Entry<Path, List<HdfsPartition.Builder>> e : 
partsByPath_.entrySet()) {
       List<FileDescriptor> oldFds = e.getValue().get(0).getFileDescriptors();
       FileMetadataLoader loader = new FileMetadataLoader(e.getKey(),
           Utils.shouldRecursivelyListPartitions(table), oldFds, 
table.getHostIndex(),
@@ -100,7 +101,7 @@ public class ParallelFileMetadataLoader {
       // locations even if the underlying files have not changed.
       // This is done to keep the cached block metadata up to date.
       boolean hasCachedPartition = Iterables.any(e.getValue(),
-          HdfsPartition::isMarkedCached);
+          HdfsPartition.Builder::isMarkedCached);
       loader.setForceRefreshBlockLocations(hasCachedPartition);
       loaders_.put(e.getKey(), loader);
     }
@@ -117,12 +118,12 @@ public class ParallelFileMetadataLoader {
     load();
 
     // Store the loaded FDs into the partitions.
-    for (Map.Entry<Path, List<HdfsPartition>> e : partsByPath_.entrySet()) {
+    for (Map.Entry<Path, List<HdfsPartition.Builder>> e : 
partsByPath_.entrySet()) {
       Path p = e.getKey();
       FileMetadataLoader loader = loaders_.get(p);
 
-      for (HdfsPartition part : e.getValue()) {
-        part.setFileDescriptors(loader.getLoadedFds());
+      for (HdfsPartition.Builder partBuilder : e.getValue()) {
+        partBuilder.setFileDescriptors(loader.getLoadedFds());
       }
     }
   }
@@ -135,12 +136,12 @@ public class ParallelFileMetadataLoader {
   Map<HdfsPartition, List<FileDescriptor>> loadAndGet() throws 
TableLoadingException {
     load();
     Map<HdfsPartition, List<FileDescriptor>> result = Maps.newHashMap();
-    for (Map.Entry<Path, List<HdfsPartition>> e : partsByPath_.entrySet()) {
+    for (Map.Entry<Path, List<HdfsPartition.Builder>> e : 
partsByPath_.entrySet()) {
       Path p = e.getKey();
       FileMetadataLoader loader = loaders_.get(p);
 
-      for (HdfsPartition part : e.getValue()) {
-        result.put(part, loader.getLoadedFds());
+      for (HdfsPartition.Builder partBuilder : e.getValue()) {
+        result.put(partBuilder.getOldInstance(), loader.getLoadedFds());
       }
     }
     return result;
diff --git a/fe/src/main/java/org/apache/impala/catalog/PartitionStatsUtil.java 
b/fe/src/main/java/org/apache/impala/catalog/PartitionStatsUtil.java
index bc36439..e47591a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/PartitionStatsUtil.java
+++ b/fe/src/main/java/org/apache/impala/catalog/PartitionStatsUtil.java
@@ -134,7 +134,7 @@ public class PartitionStatsUtil {
    * partition's stats are removed.
    */
   public static void partStatsToPartition(TPartitionStats partStats,
-      HdfsPartition partition) throws ImpalaException {
+      HdfsPartition.Builder partition) throws ImpalaException {
     if (partStats == null) {
       partition.setPartitionStatsBytes(null, false);
       return;
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java 
b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 2071f75..c1bb27f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -798,4 +798,19 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
   public ValidWriteIdList getValidWriteIds() {
     return null;
   }
+
+  /**
+   * When altering a table we modify the cached metadata and apply the new 
metadata to
+   * HMS. Some DDL/DMLs require reloading the HMS and file metadata to update 
the cached
+   * metadata (See CatalogOpExecutor#alterTable(TAlterTableParams, 
TDdlExecResponse) for
+   * more details). Before reloading the metadata, these modifications are not 
finalized.
+   * @return true if there are any in-progress modifications need to be 
finalized in an
+   * incremental table reload.
+   */
+  public boolean hasInProgressModification() { return false; }
+
+  /**
+   * Clears the in-progress modifications in case of failures.
+   */
+  public void resetInProgressModification() { }
 }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index b2d6014..854d1d9 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -846,6 +846,8 @@ public class CatalogOpExecutor {
               "Unknown ALTER TABLE operation type: " + params.getAlter_type());
       }
 
+      // Make sure we won't forget finalizing the modification.
+      if (tbl.hasInProgressModification()) 
Preconditions.checkState(reloadMetadata);
       if (reloadMetadata) {
         loadTableMetadata(tbl, newCatalogVersion, reloadFileMetadata,
             reloadTableSchema, null, "ALTER TABLE " + 
params.getAlter_type().name());
@@ -854,9 +856,13 @@ public class CatalogOpExecutor {
         catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
         addTableToCatalogUpdate(tbl, response.result);
       }
+      // Make sure all the modifications are done.
+      Preconditions.checkState(!tbl.hasInProgressModification());
     } finally {
       context.stop();
       UnlockWriteLockIfErronouslyLocked();
+      // Clear in-progress modifications in case of exceptions.
+      tbl.resetInProgressModification();
       tbl.getLock().unlock();
     }
   }
@@ -1126,7 +1132,7 @@ public class CatalogOpExecutor {
 
     // Update partition-level row counts and incremental column stats for
     // partitioned Hdfs tables.
-    List<HdfsPartition> modifiedParts = null;
+    List<HdfsPartition.Builder> modifiedParts = null;
     if (params.isSetPartition_stats() && table.getNumClusteringCols() > 0) {
       Preconditions.checkState(table instanceof HdfsTable);
       modifiedParts = updatePartitionStats(params, (HdfsTable) table);
@@ -1159,10 +1165,10 @@ public class CatalogOpExecutor {
    * Row counts for missing or new partitions as a result of concurrent table 
alterations
    * are set to 0.
    */
-  private List<HdfsPartition> 
updatePartitionStats(TAlterTableUpdateStatsParams params,
-      HdfsTable table) throws ImpalaException {
+  private List<HdfsPartition.Builder> updatePartitionStats(
+      TAlterTableUpdateStatsParams params, HdfsTable table) throws 
ImpalaException {
     Preconditions.checkState(params.isSetPartition_stats());
-    List<HdfsPartition> modifiedParts = Lists.newArrayList();
+    List<HdfsPartition.Builder> modifiedParts = Lists.newArrayList();
     // TODO(todd) only load the partitions that were modified in 'params'.
     Collection<? extends FeFsPartition> parts =
         FeCatalogUtils.loadAllPartitions(table);
@@ -1199,12 +1205,13 @@ public class CatalogOpExecutor {
         LOG.trace(String.format("Updating stats for partition %s: numRows=%d",
             partition.getValuesAsString(), numRows));
       }
-      PartitionStatsUtil.partStatsToPartition(partitionStats, partition);
-      partition.putToParameters(StatsSetupConst.ROW_COUNT, 
String.valueOf(numRows));
+      HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition);
+      PartitionStatsUtil.partStatsToPartition(partitionStats, partBuilder);
+      partBuilder.putToParameters(StatsSetupConst.ROW_COUNT, 
String.valueOf(numRows));
       // HMS requires this param for stats changes to take effect.
-      
partition.putToParameters(MetastoreShim.statsGeneratedViaStatsTaskParam());
-      partition.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
-      modifiedParts.add(partition);
+      
partBuilder.putToParameters(MetastoreShim.statsGeneratedViaStatsTaskParam());
+      
partBuilder.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
+      modifiedParts.add(partBuilder);
     }
     return modifiedParts;
   }
@@ -1520,18 +1527,17 @@ public class CatalogOpExecutor {
           return;
         }
 
-        for(HdfsPartition partition : partitions) {
+        for (HdfsPartition partition : partitions) {
           if (partition.getPartitionStatsCompressed() != null) {
-            partition.dropPartitionStats();
-            try {
-              applyAlterPartition(table, partition);
-            } finally {
-              partition.markDirty();
-            }
+            HdfsPartition.Builder partBuilder = new 
HdfsPartition.Builder(partition);
+            partBuilder.dropPartitionStats();
+            applyAlterPartition(table, partBuilder);
+            hdfsTbl.updatePartition(partBuilder);
           }
         }
       }
-      loadTableMetadata(table, newCatalogVersion, false, true, null, "DROP 
STATS");
+      loadTableMetadata(table, newCatalogVersion, /*reloadFileMetadata=*/false,
+          /*reloadTableSchema=*/true, /*partitionsToUpdate=*/null, "DROP 
STATS");
       addTableToCatalogUpdate(table, resp.result);
       addSummary(resp, "Stats have been dropped.");
     } finally {
@@ -1602,24 +1608,27 @@ public class CatalogOpExecutor {
     Preconditions.checkNotNull(hdfsTable);
 
     // List of partitions that were modified as part of this operation.
-    List<HdfsPartition> modifiedParts = Lists.newArrayList();
+    List<HdfsPartition.Builder> modifiedParts = Lists.newArrayList();
     Collection<? extends FeFsPartition> parts =
         FeCatalogUtils.loadAllPartitions(hdfsTable);
     for (FeFsPartition fePart: parts) {
       // TODO(todd): avoid downcast
       HdfsPartition part = (HdfsPartition) fePart;
-      boolean isModified = false;
+      HdfsPartition.Builder partBuilder = null;
       if (part.getPartitionStatsCompressed() != null) {
-        part.dropPartitionStats();
-        isModified = true;
+        partBuilder = new HdfsPartition.Builder(part)
+            .dropPartitionStats();
       }
 
       // Remove the ROW_COUNT parameter if it has been set.
-      if (part.getParameters().remove(StatsSetupConst.ROW_COUNT) != null) {
-        isModified = true;
+      if (part.getParameters().containsKey(StatsSetupConst.ROW_COUNT)) {
+        if (partBuilder == null) {
+          partBuilder = new HdfsPartition.Builder(part);
+        }
+        partBuilder.getParameters().remove(StatsSetupConst.ROW_COUNT);
       }
 
-      if (isModified) modifiedParts.add(part);
+      if (partBuilder != null) modifiedParts.add(partBuilder);
     }
 
     bulkAlterPartitions(table, modifiedParts, null);
@@ -1944,8 +1953,12 @@ public class CatalogOpExecutor {
           FeCatalogUtils.loadAllPartitions(hdfsTable);
       for (FeFsPartition part: parts) {
         if (part.isMarkedCached()) {
+          HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(
+              (HdfsPartition) part);
           try {
-            HdfsCachingUtil.removePartitionCacheDirective(part);
+            HdfsCachingUtil.removePartitionCacheDirective(partBuilder);
+            // We are dropping the table. Don't need to update the existing 
partition so
+            // ignore the partBuilder here.
           } catch (Exception e) {
             LOG.error("Unable to uncache partition: " + 
part.getPartitionName(), e);
           }
@@ -3153,10 +3166,11 @@ public class CatalogOpExecutor {
       Preconditions.checkArgument(tbl instanceof HdfsTable);
       List<HdfsPartition> partitions =
           ((HdfsTable) tbl).getPartitionsFromPartitionSet(partitionSet);
-      List<HdfsPartition> modifiedParts = Lists.newArrayList();
+      List<HdfsPartition.Builder> modifiedParts = Lists.newArrayList();
       for(HdfsPartition partition: partitions) {
-        partition.setFileFormat(HdfsFileFormat.fromThrift(fileFormat));
-        modifiedParts.add(partition);
+        modifiedParts.add(
+            new HdfsPartition.Builder(partition)
+                .setFileFormat(HdfsFileFormat.fromThrift(fileFormat)));
       }
       bulkAlterPartitions(tbl, modifiedParts, null);
       numUpdatedPartitions.setRef((long) modifiedParts.size());
@@ -3190,10 +3204,11 @@ public class CatalogOpExecutor {
     } else {
       List<HdfsPartition> partitions =
           ((HdfsTable) tbl).getPartitionsFromPartitionSet(partitionSet);
-      List<HdfsPartition> modifiedParts = Lists.newArrayList();
+      List<HdfsPartition.Builder> modifiedParts = Lists.newArrayList();
       for(HdfsPartition partition: partitions) {
-        HiveStorageDescriptorFactory.setSerdeInfo(rowFormat, 
partition.getSerdeInfo());
-        modifiedParts.add(partition);
+        HdfsPartition.Builder partBuilder = new 
HdfsPartition.Builder(partition);
+        HiveStorageDescriptorFactory.setSerdeInfo(rowFormat, 
partBuilder.getSerdeInfo());
+        modifiedParts.add(partBuilder);
       }
       bulkAlterPartitions(tbl, modifiedParts, null);
       numUpdatedPartitions.setRef((long) modifiedParts.size());
@@ -3231,11 +3246,12 @@ public class CatalogOpExecutor {
       TableName tableName = tbl.getTableName();
       HdfsPartition partition = catalog_.getHdfsPartition(
           tableName.getDb(), tableName.getTbl(), partitionSpec);
-      partition.setLocation(location);
+      HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition);
+      partBuilder.setLocation(location);
       try {
-        applyAlterPartition(tbl, partition);
+        applyAlterPartition(tbl, partBuilder);
       } finally {
-        partition.markDirty();
+        ((HdfsTable) tbl).markDirtyPartition(partBuilder);
       }
     }
     return reloadFileMetadata;
@@ -3256,26 +3272,27 @@ public class CatalogOpExecutor {
       List<HdfsPartition> partitions =
           ((HdfsTable) 
tbl).getPartitionsFromPartitionSet(params.getPartition_set());
 
-      List<HdfsPartition> modifiedParts = Lists.newArrayList();
+      List<HdfsPartition.Builder> modifiedParts = Lists.newArrayList();
       for(HdfsPartition partition: partitions) {
+        HdfsPartition.Builder partBuilder = new 
HdfsPartition.Builder(partition);
         switch (params.getTarget()) {
           case TBL_PROPERTY:
-            partition.getParameters().putAll(properties);
+            partBuilder.getParameters().putAll(properties);
             break;
           case SERDE_PROPERTY:
-            partition.getSerdeInfo().getParameters().putAll(properties);
+            partBuilder.getSerdeInfo().getParameters().putAll(properties);
             break;
           default:
             throw new UnsupportedOperationException(
                 "Unknown target TTablePropertyType: " + params.getTarget());
         }
-        modifiedParts.add(partition);
+        modifiedParts.add(partBuilder);
       }
       try {
         bulkAlterPartitions(tbl, modifiedParts, null);
       } finally {
-        for (HdfsPartition modifiedPart : modifiedParts) {
-          modifiedPart.markDirty();
+        for (HdfsPartition.Builder modifiedPart : modifiedParts) {
+          ((HdfsTable) tbl).markDirtyPartition(modifiedPart);
         }
       }
       numUpdatedPartitions.setRef((long) modifiedParts.size());
@@ -3373,17 +3390,18 @@ public class CatalogOpExecutor {
           // needs to be updated
           if (!partition.isMarkedCached() ||
               HdfsCachingUtil.isUpdateOp(cacheOp, partition.getParameters())) {
+            HdfsPartition.Builder partBuilder = new 
HdfsPartition.Builder(partition);
             try {
               // If the partition was already cached, update the directive 
otherwise
               // issue new cache directive
               if (!partition.isMarkedCached()) {
                 cacheDirIds.add(HdfsCachingUtil.submitCachePartitionDirective(
-                    partition, cacheOp.getCache_pool_name(), 
cacheReplication));
+                    partBuilder, cacheOp.getCache_pool_name(), 
cacheReplication));
               } else {
                 Long directiveId = HdfsCachingUtil.getCacheDirectiveId(
                     partition.getParameters());
                 
cacheDirIds.add(HdfsCachingUtil.modifyCacheDirective(directiveId,
-                    partition, cacheOp.getCache_pool_name(), 
cacheReplication));
+                    partBuilder, cacheOp.getCache_pool_name(), 
cacheReplication));
               }
             } catch (ImpalaRuntimeException e) {
               if (partition.isMarkedCached()) {
@@ -3397,9 +3415,9 @@ public class CatalogOpExecutor {
 
             // Update the partition metadata.
             try {
-              applyAlterPartition(tbl, partition);
+              applyAlterPartition(tbl, partBuilder);
             } finally {
-              partition.markDirty();
+              ((HdfsTable) tbl).markDirtyPartition(partBuilder);
             }
           }
         }
@@ -3425,11 +3443,12 @@ public class CatalogOpExecutor {
           // TODO(todd): avoid downcast
           HdfsPartition partition = (HdfsPartition) fePartition;
           if (partition.isMarkedCached()) {
-            HdfsCachingUtil.removePartitionCacheDirective(partition);
+            HdfsPartition.Builder partBuilder = new 
HdfsPartition.Builder(partition);
+            HdfsCachingUtil.removePartitionCacheDirective(partBuilder);
             try {
-              applyAlterPartition(tbl, partition);
+              applyAlterPartition(tbl, partBuilder);
             } finally {
-              partition.markDirty();
+              ((HdfsTable) tbl).markDirtyPartition(partBuilder);
             }
           }
         }
@@ -3461,23 +3480,25 @@ public class CatalogOpExecutor {
     Preconditions.checkArgument(tbl instanceof HdfsTable);
     List<HdfsPartition> partitions =
         ((HdfsTable) 
tbl).getPartitionsFromPartitionSet(params.getPartition_set());
-    List<HdfsPartition> modifiedParts = Lists.newArrayList();
+    List<HdfsPartition.Builder> modifiedParts = Lists.newArrayList();
     if (cacheOp.isSet_cached()) {
       for (HdfsPartition partition : partitions) {
         // The directive is null if the partition is not cached
         Long directiveId =
             HdfsCachingUtil.getCacheDirectiveId(partition.getParameters());
+        HdfsPartition.Builder partBuilder = null;
         short replication = HdfsCachingUtil.getReplicationOrDefault(cacheOp);
         List<Long> cacheDirs = Lists.newArrayList();
         if (directiveId == null) {
+          partBuilder = new HdfsPartition.Builder(partition);
           cacheDirs.add(HdfsCachingUtil.submitCachePartitionDirective(
-              partition, cacheOp.getCache_pool_name(), replication));
+              partBuilder, cacheOp.getCache_pool_name(), replication));
         } else {
           if (HdfsCachingUtil.isUpdateOp(cacheOp, partition.getParameters())) {
+            partBuilder = new HdfsPartition.Builder(partition);
             HdfsCachingUtil.validateCachePool(cacheOp, directiveId, tableName, 
partition);
             cacheDirs.add(HdfsCachingUtil.modifyCacheDirective(
-                directiveId, partition, cacheOp.getCache_pool_name(),
-                replication));
+                directiveId, partBuilder, cacheOp.getCache_pool_name(), 
replication));
           }
         }
 
@@ -3487,23 +3508,22 @@ public class CatalogOpExecutor {
           catalog_.watchCacheDirs(cacheDirs, tableName.toThrift(),
               "ALTER PARTITION SET CACHED");
         }
-        if (!partition.isMarkedCached()) {
-          modifiedParts.add(partition);
-        }
+        if (partBuilder != null) modifiedParts.add(partBuilder);
       }
     } else {
       for (HdfsPartition partition : partitions) {
         if (partition.isMarkedCached()) {
-          HdfsCachingUtil.removePartitionCacheDirective(partition);
-          modifiedParts.add(partition);
+          HdfsPartition.Builder partBuilder = new 
HdfsPartition.Builder(partition);
+          HdfsCachingUtil.removePartitionCacheDirective(partBuilder);
+          modifiedParts.add(partBuilder);
         }
       }
     }
     try {
       bulkAlterPartitions(tbl, modifiedParts, null);
     } finally {
-      for (HdfsPartition modifiedPart : modifiedParts) {
-        modifiedPart.markDirty();
+      for (HdfsPartition.Builder modifiedPart : modifiedParts) {
+        ((HdfsTable) tbl).markDirtyPartition(modifiedPart);
       }
     }
     numUpdatedPartitions.setRef((long) modifiedParts.size());
@@ -3668,7 +3688,7 @@ public class CatalogOpExecutor {
    * processor to skip the event generated on the partition.
    */
   private void addToInflightVersionsOfPartition(
-      Map<String, String> partitionParams, HdfsPartition hdfsPartition) {
+      Map<String, String> partitionParams, HdfsPartition.Builder partBuilder) {
     if (!catalog_.isEventProcessingActive()) return;
     Preconditions.checkState(partitionParams != null);
     String version = partitionParams
@@ -3680,7 +3700,7 @@ public class CatalogOpExecutor {
     // catalog service identifiers
     if (catalog_.getCatalogServiceId().equals(serviceId)) {
       Preconditions.checkNotNull(version);
-      hdfsPartition.addToVersionsForInflightEvents(false, 
Long.parseLong(version));
+      partBuilder.addToVersionsForInflightEvents(false, 
Long.parseLong(version));
     }
   }
 
@@ -3795,14 +3815,14 @@ public class CatalogOpExecutor {
     }
   }
 
-  private void applyAlterPartition(Table tbl, HdfsPartition partition)
+  private void applyAlterPartition(Table tbl, HdfsPartition.Builder 
partBuilder)
       throws ImpalaException {
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      Partition hmsPartition = partition.toHmsPartition();
+      Partition hmsPartition = partBuilder.toHmsPartition();
       addCatalogServiceIdentifiers(tbl.getMetaStoreTable(), hmsPartition);
       applyAlterHmsPartitions(tbl.getMetaStoreTable().deepCopy(), msClient,
           tbl.getTableName(), Arrays.asList(hmsPartition));
-      addToInflightVersionsOfPartition(hmsPartition.getParameters(), 
partition);
+      addToInflightVersionsOfPartition(hmsPartition.getParameters(), 
partBuilder);
     }
   }
 
@@ -3939,51 +3959,48 @@ public class CatalogOpExecutor {
    * reduces the time spent in a single update and helps avoid metastore client
    * timeouts.
    */
-  private void bulkAlterPartitions(Table tbl, List<HdfsPartition> 
modifiedParts,
+  private void bulkAlterPartitions(Table tbl, List<HdfsPartition.Builder> 
modifiedParts,
       TblTransaction tblTxn) throws ImpalaException {
-    List<org.apache.hadoop.hive.metastore.api.Partition> hmsPartitions =
-        Lists.newArrayList();
-    for (HdfsPartition p: modifiedParts) {
-      org.apache.hadoop.hive.metastore.api.Partition msPart = 
p.toHmsPartition();
+    // Map from msPartitions to the partition builders. Use IdentityHashMap 
since
+    // modifications will change hash codes of msPartitions.
+    Map<Partition, HdfsPartition.Builder> msPartitionToBuilders =
+        Maps.newIdentityHashMap();
+    for (HdfsPartition.Builder p: modifiedParts) {
+      Partition msPart = p.toHmsPartition();
       if (msPart != null) {
         addCatalogServiceIdentifiers(tbl.getMetaStoreTable(), msPart);
-        hmsPartitions.add(msPart);
+        msPartitionToBuilders.put(msPart, p);
       }
     }
-    if (hmsPartitions.isEmpty()) return;
+    if (msPartitionToBuilders.isEmpty()) return;
 
     String dbName = tbl.getDb().getName();
     String tableName = tbl.getName();
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       // Apply the updates in batches of 'MAX_PARTITION_UPDATES_PER_RPC'.
-      for (List<Partition> hmsPartitionsSubList :
-        Lists.partition(hmsPartitions, MAX_PARTITION_UPDATES_PER_RPC)) {
+      for (List<Partition> msPartitionsSubList : Iterables.partition(
+          msPartitionToBuilders.keySet(), MAX_PARTITION_UPDATES_PER_RPC)) {
         try {
           // Alter partitions in bulk.
           if (tblTxn != null) {
             
MetastoreShim.alterPartitionsWithTransaction(msClient.getHiveClient(), dbName,
-                tableName, hmsPartitionsSubList, tblTxn);
-          }
-          else {
+                tableName, msPartitionsSubList, tblTxn);
+          } else {
             MetastoreShim.alterPartitions(msClient.getHiveClient(), dbName, 
tableName,
-                hmsPartitionsSubList);
+                msPartitionsSubList);
           }
           // Mark the corresponding HdfsPartition objects as dirty
-          for (org.apache.hadoop.hive.metastore.api.Partition msPartition:
-              hmsPartitionsSubList) {
-            try {
-              HdfsPartition hdfsPartition = catalog_.getHdfsPartition(dbName, 
tableName,
-                  msPartition);
-              hdfsPartition.markDirty();
-              // if event processing is turned on add the version number from 
partition
-              // paramters to the HdfsPartition's list of in-flight events
-              addToInflightVersionsOfPartition(msPartition.getParameters(),
-                  hdfsPartition);
-            } catch (PartitionNotFoundException e) {
-              LOG.error(String.format("Partition of table %s could not be 
found: %s",
-                  tableName, e.getMessage()));
-              continue;
-            }
+          for (Partition msPartition: msPartitionsSubList) {
+            HdfsPartition.Builder partBuilder = 
msPartitionToBuilders.get(msPartition);
+            Preconditions.checkNotNull(partBuilder);
+            // TODO(IMPALA-9779): Should we always mark this as dirty? It will 
trigger
+            //  file meta reload for this partition. Consider remove this and 
mark the
+            //  "dirty" flag in callers. For those don't need to reload file 
meta, the
+            //  caller can build and replace the partition directly.
+            ((HdfsTable) tbl).markDirtyPartition(partBuilder);
+            // If event processing is turned on add the version number from 
partition
+            // parameters to the HdfsPartition's list of in-flight events.
+            addToInflightVersionsOfPartition(msPartition.getParameters(), 
partBuilder);
           }
         } catch (TException e) {
           throw new ImpalaRuntimeException(
@@ -4352,7 +4369,7 @@ public class CatalogOpExecutor {
                   for (org.apache.hadoop.hive.metastore.api.Partition part:
                       cachedHmsParts) {
                     try {
-                      HdfsCachingUtil.removePartitionCacheDirective(part);
+                      
HdfsCachingUtil.removePartitionCacheDirective(part.getParameters());
                     } catch (ImpalaException e1) {
                       String msg = String.format(
                           "Partition %s.%s(%s): State: Leaked caching 
directive. " +
@@ -4383,7 +4400,6 @@ public class CatalogOpExecutor {
         // For non-partitioned table, only single part exists
         FeFsPartition singlePart = 
Iterables.getOnlyElement((List<FeFsPartition>) parts);
         affectedExistingPartitions.add(singlePart);
-
       }
       unsetTableColStats(table.getMetaStoreTable(), tblTxn);
       // Submit the watch request for the given cache directives.
diff --git a/fe/src/main/java/org/apache/impala/util/HdfsCachingUtil.java 
b/fe/src/main/java/org/apache/impala/util/HdfsCachingUtil.java
index b3f8bd7..1a0c1d9 100644
--- a/fe/src/main/java/org/apache/impala/util/HdfsCachingUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/HdfsCachingUtil.java
@@ -100,7 +100,7 @@ public class HdfsCachingUtil {
    * Returns the ID of the submitted cache directive and throws if there is an 
error
    * submitting the directive.
    */
-  public static long submitCachePartitionDirective(HdfsPartition part,
+  public static long submitCachePartitionDirective(HdfsPartition.Builder part,
       String poolName, short replication) throws ImpalaRuntimeException {
     long id = HdfsCachingUtil.submitDirective(new Path(part.getLocation()),
         poolName, replication);
@@ -154,7 +154,7 @@ public class HdfsCachingUtil {
    * data. Also updates the partition's metadata to remove the cache directive 
ID.
    * No-op if the table is not cached.
    */
-  public static void removePartitionCacheDirective(FeFsPartition part)
+  public static void removePartitionCacheDirective(HdfsPartition.Builder part)
       throws ImpalaException {
     Preconditions.checkNotNull(part);
     Map<String, String> parameters = part.getParameters();
@@ -174,17 +174,16 @@ public class HdfsCachingUtil {
   }
 
   /**
-   * Convenience method for working directly on a metastore partition. See
-   * removePartitionCacheDirective(HdfsPartition) for more details.
+   * Convenience method for working directly on a metastore partition params 
map. See
+   * removePartitionCacheDirective(HdfsPartition.Builder) for more details.
    */
   public static void removePartitionCacheDirective(
-      org.apache.hadoop.hive.metastore.api.Partition part) throws 
ImpalaException {
-    Preconditions.checkNotNull(part);
-    Long id = getCacheDirectiveId(part.getParameters());
+      Map<String, String> partitionParams) throws ImpalaException {
+    Long id = getCacheDirectiveId(partitionParams);
     if (id == null) return;
     HdfsCachingUtil.removeDirective(id);
-    part.getParameters().remove(CACHE_DIR_ID_PROP_NAME);
-    part.getParameters().remove(CACHE_DIR_REPLICATION_PROP_NAME);
+    partitionParams.remove(CACHE_DIR_ID_PROP_NAME);
+    partitionParams.remove(CACHE_DIR_REPLICATION_PROP_NAME);
   }
 
   /**
@@ -348,8 +347,8 @@ public class HdfsCachingUtil {
    * Update cache directive for a partition and update the metastore 
parameters.
    * Returns the cache directive ID
    */
-  public static long modifyCacheDirective(Long id, HdfsPartition part, String 
poolName,
-      short replication) throws ImpalaRuntimeException {
+  public static long modifyCacheDirective(Long id, HdfsPartition.Builder part,
+      String poolName, short replication) throws ImpalaRuntimeException {
     Preconditions.checkNotNull(id);
     HdfsCachingUtil.modifyCacheDirective(id, new Path(part.getLocation()),
         poolName, replication);
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java 
b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
index 838983d..4138c2f 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
@@ -236,14 +236,14 @@ public class CatalogObjectToFromThriftTest {
     Assert.assertNotNull(part);;
     // Create a dummy partition with an invalid decimal type.
     try {
-      new HdfsPartition(hdfsTable, part.toHmsPartition(),
-          Lists.newArrayList(
+      new HdfsPartition.Builder(hdfsTable)
+          .setMsPartition(part.toHmsPartition())
+          .setPartitionKeyValues(Lists.newArrayList(
               LiteralExpr.createFromUnescapedStr(
                   "11.1", ScalarType.createDecimalType(1, 0)),
               LiteralExpr.createFromUnescapedStr(
-                  "11.1", ScalarType.createDecimalType(1, 0))),
-          null, new ArrayList<>(),
-          TAccessLevel.READ_WRITE);
+                  "11.1", ScalarType.createDecimalType(1, 0))))
+          .setAccessLevel(TAccessLevel.READ_WRITE);
       fail("Expected metadata to be malformed.");
     } catch (SqlCastException e) {
       Assert.assertTrue(e.getMessage().contains(
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java 
b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
index a73e9a1..9a2ca3b 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
@@ -421,9 +421,10 @@ public class CatalogTest {
     // reload path. Since we can't modify HDFS itself via these tests, we
     // do the next best thing: modify the metadata to revise history as
     // though the partition used above were actually empty.
-    HdfsPartition hdfsPartition = table
-        .getPartitionFromThriftPartitionSpec(partitionSpec);
-    hdfsPartition.setFileDescriptors(new ArrayList<>());
+    HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(
+        table.getPartitionFromThriftPartitionSpec(partitionSpec));
+    partBuilder.setFileDescriptors(new ArrayList<>());
+    table.updatePartition(partBuilder);
     stats.reset();
     catalog_.reloadPartition(table, partitionSpec, new Reference<>(false), 
"test");
 

Reply via email to