Repository: hive
Updated Branches:
  refs/heads/master 54bba9cbf -> 64bea0354


http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionIterable.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionIterable.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionIterable.java
new file mode 100644
index 0000000..2837ff4
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionIterable.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hive.metastore.api.MetastoreException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+
+/**
+ * PartitionIterable - effectively a lazy Iterable<Partition>
+ * Sometimes, we have a need for iterating through a list of partitions,
+ * but the list of partitions can be too big to fetch as a single object.
+ * Thus, the goal of PartitionIterable is to act as an Iterable<Partition>
+ * while lazily fetching each relevant partition, one after the other as
+ * independent metadata calls.
+ * It is very likely that any calls to PartitionIterable are going to result
+ * in a large number of calls, so use sparingly only when the memory cost
+ * of fetching all the partitions in one shot is too prohibitive.
+ * This is still pretty costly in that it would retain a list of partition
+ * names, but that should be far less expensive than the entire partition
+ * objects.
+ * Note that remove() is an illegal call on this, and will result in an
+ * IllegalStateException.
+ */
+public class PartitionIterable implements Iterable<Partition> {
+
+  @Override
+  public Iterator<Partition> iterator() {
+    return new Iterator<Partition>() {
+
+      private boolean initialized = false;
+      private Iterator<Partition> ptnsIterator = null;
+
+      private Iterator<String> partitionNamesIter = null;
+      private Iterator<Partition> batchIter = null;
+
+      private void initialize() {
+        if (!initialized) {
+          if (currType == Type.LIST_PROVIDED) {
+            ptnsIterator = ptnsProvided.iterator();
+          } else {
+            partitionNamesIter = partitionNames.iterator();
+          }
+          initialized = true;
+        }
+      }
+
+      @Override
+      public boolean hasNext() {
+        initialize();
+        if (currType == Type.LIST_PROVIDED) {
+          return ptnsIterator.hasNext();
+        } else {
+          return ((batchIter != null) && batchIter.hasNext()) || 
partitionNamesIter.hasNext();
+        }
+      }
+
+      @Override
+      public Partition next() {
+        initialize();
+        if (currType == Type.LIST_PROVIDED) {
+          return ptnsIterator.next();
+        }
+
+        if ((batchIter == null) || !batchIter.hasNext()) {
+          getNextBatch();
+        }
+
+        return batchIter.next();
+      }
+
+      private void getNextBatch() {
+        int batch_counter = 0;
+        List<String> nameBatch = new ArrayList<String>();
+        while (batch_counter < batch_size && partitionNamesIter.hasNext()) {
+          nameBatch.add(partitionNamesIter.next());
+          batch_counter++;
+        }
+        try {
+          batchIter =
+            msc.getPartitionsByNames(table.getCatName(), table.getDbName(), 
table.getTableName(), nameBatch).iterator();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public void remove() {
+        throw new IllegalStateException(
+          "PartitionIterable is a read-only iterable and remove() is 
unsupported");
+      }
+    };
+  }
+
+  enum Type {
+    LIST_PROVIDED,  // Where a List<Partitions is already provided
+    LAZY_FETCH_PARTITIONS // Where we want to fetch Partitions lazily when 
they're needed.
+  }
+
+  private final Type currType;
+
+  // used for LIST_PROVIDED cases
+  private Collection<Partition> ptnsProvided = null;
+
+  // used for LAZY_FETCH_PARTITIONS cases
+  private IMetaStoreClient msc = null; // Assumes one instance of this + 
single-threaded compilation for each query.
+  private Table table = null;
+  private List<String> partitionNames = null;
+  private int batch_size;
+
+  /**
+   * Dummy constructor, which simply acts as an iterator on an already-present
+   * list of partitions, allows for easy drop-in replacement for other methods
+   * that already have a List<Partition>
+   */
+  public PartitionIterable(Collection<Partition> ptnsProvided) {
+    this.currType = Type.LIST_PROVIDED;
+    this.ptnsProvided = ptnsProvided;
+  }
+
+  /**
+   * Primary constructor that fetches all partitions in a given table, given
+   * a Hive object and a table object, and a partial partition spec.
+   */
+  public PartitionIterable(IMetaStoreClient msc, Table table, int batch_size) 
throws MetastoreException {
+    this.currType = Type.LAZY_FETCH_PARTITIONS;
+    this.msc = msc;
+    this.table = table;
+    this.batch_size = batch_size;
+    partitionNames = getPartitionNames(msc, table.getCatName(), 
table.getDbName(), table.getTableName(), (short) -1);
+  }
+
+  public List<String> getPartitionNames(IMetaStoreClient msc, String catName, 
String dbName, String tblName, short max)
+    throws MetastoreException {
+    try {
+      return msc.listPartitionNames(catName, dbName, tblName, max);
+    } catch (Exception e) {
+      throw new MetastoreException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
new file mode 100644
index 0000000..901bf80
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.TimeValidator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Partition management task is primarily responsible for partition retention 
and discovery based on table properties.
+ *
+ * Partition Retention - If "partition.retention.period" table property is set 
with retention interval, when this
+ * metastore task runs periodically, it will drop partitions with age 
(creation time) greater than retention period.
+ * Dropping partitions after retention period will also delete the data in 
that partition.
+ *
+ * Partition Discovery - If "discover.partitions" table property is set, this 
metastore task monitors table location
+ * for newly added partition directories and create partition objects if it 
does not exist. Also, if partition object
+ * exist and if corresponding directory does not exists under table location 
then the partition object will be dropped.
+ *
+ */
+public class PartitionManagementTask implements MetastoreTaskThread {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PartitionManagementTask.class);
+  public static final String DISCOVER_PARTITIONS_TBLPROPERTY = 
"discover.partitions";
+  public static final String PARTITION_RETENTION_PERIOD_TBLPROPERTY = 
"partition.retention.period";
+  private static final Lock lock = new ReentrantLock();
+  // these are just for testing
+  private static int completedAttempts;
+  private static int skippedAttempts;
+
+  private Configuration conf;
+
+  @Override
+  public long runFrequency(TimeUnit unit) {
+    return MetastoreConf.getTimeVar(conf, 
MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TASK_FREQUENCY, unit);
+  }
+
+  @Override
+  public void setConf(Configuration configuration) {
+    // we modify conf in setupConf(), so we make a copy
+    conf = new Configuration(configuration);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void run() {
+    if (lock.tryLock()) {
+      skippedAttempts = 0;
+      String qualifiedTableName = null;
+      IMetaStoreClient msc = null;
+      try {
+        msc = new HiveMetaStoreClient(conf);
+        List<Table> candidateTables = new ArrayList<>();
+        String catalogName = MetastoreConf.getVar(conf, 
MetastoreConf.ConfVars.PARTITION_MANAGEMENT_CATALOG_NAME);
+        String dbPattern = MetastoreConf.getVar(conf, 
MetastoreConf.ConfVars.PARTITION_MANAGEMENT_DATABASE_PATTERN);
+        String tablePattern = MetastoreConf.getVar(conf, 
MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_PATTERN);
+        String tableTypes = MetastoreConf.getVar(conf, 
MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES);
+        Set<String> tableTypesSet = new HashSet<>();
+        List<String> tableTypesList;
+        // if tableTypes is empty, then a list with single empty string has to 
specified to scan no tables.
+        // specifying empty here is equivalent to disabling the partition 
discovery altogether as it scans no tables.
+        if (tableTypes.isEmpty()) {
+          tableTypesList = Lists.newArrayList("");
+        } else {
+          for (String type : tableTypes.split(",")) {
+            try {
+              
tableTypesSet.add(TableType.valueOf(type.trim().toUpperCase()).name());
+            } catch (IllegalArgumentException e) {
+              // ignore
+              LOG.warn("Unknown table type: {}", type);
+            }
+          }
+          tableTypesList = Lists.newArrayList(tableTypesSet);
+        }
+        List<TableMeta> foundTableMetas = msc.getTableMeta(catalogName, 
dbPattern, tablePattern, tableTypesList);
+        LOG.info("Looking for tables using catalog: {} dbPattern: {} 
tablePattern: {} found: {}", catalogName,
+          dbPattern, tablePattern, foundTableMetas.size());
+
+        for (TableMeta tableMeta : foundTableMetas) {
+          Table table = msc.getTable(tableMeta.getCatName(), 
tableMeta.getDbName(), tableMeta.getTableName());
+          if (table.getParameters() != null && 
table.getParameters().containsKey(DISCOVER_PARTITIONS_TBLPROPERTY) &&
+            
table.getParameters().get(DISCOVER_PARTITIONS_TBLPROPERTY).equalsIgnoreCase("true"))
 {
+            candidateTables.add(table);
+          }
+        }
+        if (candidateTables.isEmpty()) {
+          return;
+        }
+
+        // TODO: Msck creates MetastoreClient (MSC) on its own. MSC creation 
is expensive. Sharing MSC also
+        // will not be safe unless synchronized MSC is used. Using 
synchronized MSC in multi-threaded context also
+        // defeats the purpose of thread pooled msck repair.
+        int threadPoolSize = MetastoreConf.getIntVar(conf,
+          MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TASK_THREAD_POOL_SIZE);
+        final ExecutorService executorService = Executors
+          .newFixedThreadPool(Math.min(candidateTables.size(), threadPoolSize),
+            new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("PartitionDiscoveryTask-%d").build());
+        CountDownLatch countDownLatch = new 
CountDownLatch(candidateTables.size());
+        LOG.info("Found {} candidate tables for partition discovery", 
candidateTables.size());
+        setupMsckConf();
+        for (Table table : candidateTables) {
+          qualifiedTableName = Warehouse.getCatalogQualifiedTableName(table);
+          long retentionSeconds = getRetentionPeriodInSeconds(table);
+          LOG.info("Running partition discovery for table {} retentionPeriod: 
{}s", qualifiedTableName,
+            retentionSeconds);
+          // this always runs in 'sync' mode where partitions can be added and 
dropped
+          MsckInfo msckInfo = new MsckInfo(table.getCatName(), 
table.getDbName(), table.getTableName(),
+            null, null, true, true, true, retentionSeconds);
+          executorService.submit(new MsckThread(msckInfo, conf, 
qualifiedTableName, countDownLatch));
+        }
+        countDownLatch.await();
+        executorService.shutdownNow();
+      } catch (Exception e) {
+        LOG.error("Exception while running partition discovery task for table: 
" + qualifiedTableName, e);
+      } finally {
+        if (msc != null) {
+          msc.close();
+        }
+        lock.unlock();
+      }
+      completedAttempts++;
+    } else {
+      skippedAttempts++;
+      LOG.info("Lock is held by some other partition discovery task. Skipping 
this attempt..#{}", skippedAttempts);
+    }
+  }
+
+  static long getRetentionPeriodInSeconds(final Table table) {
+    String retentionPeriod;
+    long retentionSeconds = -1;
+    if (table.getParameters() != null && 
table.getParameters().containsKey(PARTITION_RETENTION_PERIOD_TBLPROPERTY)) {
+      retentionPeriod = 
table.getParameters().get(PARTITION_RETENTION_PERIOD_TBLPROPERTY);
+      if (retentionPeriod.isEmpty()) {
+        LOG.warn("'{}' table property is defined but empty. Skipping retention 
period..",
+          PARTITION_RETENTION_PERIOD_TBLPROPERTY);
+      } else {
+        try {
+          TimeValidator timeValidator = new TimeValidator(TimeUnit.SECONDS);
+          timeValidator.validate(retentionPeriod);
+          retentionSeconds = MetastoreConf.convertTimeStr(retentionPeriod, 
TimeUnit.SECONDS, TimeUnit.SECONDS);
+        } catch (IllegalArgumentException e) {
+          LOG.warn("'{}' retentionPeriod value is invalid. Skipping retention 
period..", retentionPeriod);
+          // will return -1
+        }
+      }
+    }
+    return retentionSeconds;
+  }
+
+  private void setupMsckConf() {
+    // if invalid partition directory appears, we just skip and move on. We 
don't want partition management to throw
+    // when invalid path is encountered as these are background threads. We 
just want to skip and move on. Users will
+    // have to fix the invalid paths via external means.
+    conf.set(MetastoreConf.ConfVars.MSCK_PATH_VALIDATION.getVarname(), "skip");
+    // since msck runs in thread pool and each of them create their own 
metastore client, we don't want explosion of
+    // connections to metastore for embedded mode. Also we don't need too many 
db connections anyway.
+    
conf.setInt(MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS.getVarname(),
 2);
+  }
+
+  private static class MsckThread implements Runnable {
+    private MsckInfo msckInfo;
+    private Configuration conf;
+    private String qualifiedTableName;
+    private CountDownLatch countDownLatch;
+
+    MsckThread(MsckInfo msckInfo, Configuration conf, String 
qualifiedTableName, CountDownLatch countDownLatch) {
+      this.msckInfo = msckInfo;
+      this.conf = conf;
+      this.qualifiedTableName = qualifiedTableName;
+      this.countDownLatch = countDownLatch;
+    }
+
+    @Override
+    public void run() {
+      try {
+        Msck msck = new Msck( true, true);
+        msck.init(conf);
+        msck.repair(msckInfo);
+      } catch (Exception e) {
+        LOG.error("Exception while running partition discovery task for table: 
" + qualifiedTableName, e);
+      } finally {
+        // there is no recovery from exception, so we always count down and 
retry in next attempt
+        countDownLatch.countDown();
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public static int getSkippedAttempts() {
+    return skippedAttempts;
+  }
+
+  @VisibleForTesting
+  public static int getCompletedAttempts() {
+    return completedAttempts;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java
index f3b3866..363db35 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
@@ -58,10 +59,14 @@ import org.apache.commons.collections.ListUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.metastore.ColumnType;
 import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
@@ -71,6 +76,8 @@ import 
org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.MetastoreException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PartitionListComposingSpec;
@@ -590,7 +597,7 @@ public class MetaStoreServerUtils {
   /** Duplicates AcidUtils; used in a couple places in metastore. */
   public static boolean isTransactionalTable(Map<String, String> params) {
     String transactionalProp = 
params.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
-    return (transactionalProp != null && 
"true".equalsIgnoreCase(transactionalProp));
+    return "true".equalsIgnoreCase(transactionalProp);
   }
 
   /**
@@ -1294,4 +1301,162 @@ public class MetaStoreServerUtils {
       return hashCode;
     }
   }
+
+  // Some util methods from Hive.java, this is copied so as to avoid circular 
dependency with hive ql
+  public static Path getPath(Table table) {
+    String location = table.getSd().getLocation();
+    if (location == null) {
+      return null;
+    }
+    return new Path(location);
+  }
+
+  public static List<Partition> getAllPartitionsOf(IMetaStoreClient msc, Table 
table) throws MetastoreException {
+    try {
+      return msc.listPartitions(table.getCatName(), table.getDbName(), 
table.getTableName(), (short)-1);
+    } catch (Exception e) {
+      throw new MetastoreException(e);
+    }
+  }
+
+  public static boolean isPartitioned(Table table) {
+    if (getPartCols(table) == null) {
+      return false;
+    }
+    return (getPartCols(table).size() != 0);
+  }
+
+  public static List<FieldSchema> getPartCols(Table table) {
+    List<FieldSchema> partKeys = table.getPartitionKeys();
+    if (partKeys == null) {
+      partKeys = new ArrayList<>();
+      table.setPartitionKeys(partKeys);
+    }
+    return partKeys;
+  }
+
+  public static List<String> getPartColNames(Table table) {
+    List<String> partColNames = new ArrayList<>();
+    for (FieldSchema key : getPartCols(table)) {
+      partColNames.add(key.getName());
+    }
+    return partColNames;
+  }
+
+  public static Path getDataLocation(Table table, Partition partition) {
+    if (isPartitioned(table)) {
+      if (partition.getSd() == null) {
+        return null;
+      } else {
+        return new Path(partition.getSd().getLocation());
+      }
+    } else {
+      if (table.getSd() == null) {
+        return null;
+      }
+      else {
+        return getPath(table);
+      }
+    }
+  }
+
+  public static String getPartitionName(Table table, Partition partition) {
+    try {
+      return Warehouse.makePartName(getPartCols(table), partition.getValues());
+    } catch (MetaException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static Map<String, String> getPartitionSpec(Table table, Partition 
partition) {
+    return Warehouse.makeSpecFromValues(getPartCols(table), 
partition.getValues());
+  }
+
+  public static Partition getPartition(IMetaStoreClient msc, Table tbl, 
Map<String, String> partSpec) throws MetastoreException {
+    List<String> pvals = new ArrayList<String>();
+    for (FieldSchema field : getPartCols(tbl)) {
+      String val = partSpec.get(field.getName());
+      pvals.add(val);
+    }
+    Partition tpart = null;
+    try {
+      tpart = msc.getPartition(tbl.getCatName(), tbl.getDbName(), 
tbl.getTableName(), pvals);
+    } catch (NoSuchObjectException nsoe) {
+      // this means no partition exists for the given partition
+      // key value pairs - thrift cannot handle null return values, hence
+      // getPartition() throws NoSuchObjectException to indicate null partition
+    } catch (Exception e) {
+      LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+      throw new MetastoreException(e);
+    }
+
+    return tpart;
+  }
+
+
+  /**
+   * Get the partition name from the path.
+   *
+   * @param tablePath
+   *          Path of the table.
+   * @param partitionPath
+   *          Path of the partition.
+   * @param partCols
+   *          Set of partition columns from table definition
+   * @return Partition name, for example partitiondate=2008-01-01
+   */
+  public static String getPartitionName(Path tablePath, Path partitionPath, 
Set<String> partCols) {
+    String result = null;
+    Path currPath = partitionPath;
+    LOG.debug("tablePath:" + tablePath + ", partCols: " + partCols);
+
+    while (currPath != null && !tablePath.equals(currPath)) {
+      // format: partition=p_val
+      // Add only when table partition colName matches
+      String[] parts = currPath.getName().split("=");
+      if (parts.length > 0) {
+        if (parts.length != 2) {
+          LOG.warn(currPath.getName() + " is not a valid partition name");
+          return result;
+        }
+
+        String partitionName = parts[0];
+        if (partCols.contains(partitionName)) {
+          if (result == null) {
+            result = currPath.getName();
+          } else {
+            result = currPath.getName() + Path.SEPARATOR + result;
+          }
+        }
+      }
+      currPath = currPath.getParent();
+      LOG.debug("currPath=" + currPath);
+    }
+    return result;
+  }
+
+  public static Partition createMetaPartitionObject(Table tbl, Map<String, 
String> partSpec, Path location)
+    throws MetastoreException {
+    List<String> pvals = new ArrayList<String>();
+    for (FieldSchema field : getPartCols(tbl)) {
+      String val = partSpec.get(field.getName());
+      if (val == null || val.isEmpty()) {
+        throw new MetastoreException("partition spec is invalid; field "
+          + field.getName() + " does not exist or is empty");
+      }
+      pvals.add(val);
+    }
+
+    Partition tpart = new Partition();
+    tpart.setCatName(tbl.getCatName());
+    tpart.setDbName(tbl.getDbName());
+    tpart.setTableName(tbl.getTableName());
+    tpart.setValues(pvals);
+
+    if (!MetaStoreUtils.isView(tbl)) {
+      tpart.setSd(tbl.getSd().deepCopy());
+      tpart.getSd().setLocation((location != null) ? location.toString() : 
null);
+    }
+    return tpart;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryUtilities.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryUtilities.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryUtilities.java
new file mode 100644
index 0000000..22513b9
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryUtilities.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RetryUtilities {
+  public static class RetryException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public RetryException(Exception ex) {
+      super(ex);
+    }
+
+    public RetryException(String msg) {
+      super(msg);
+    }
+  }
+
+  /**
+   * Interface used to create a ExponentialBackOffRetry policy
+   */
+  public static interface ExponentialBackOffRetry<T> {
+    /**
+     * This method should be called by implementations of this 
ExponentialBackOffRetry policy
+     * It represents the actual work which needs to be done based on a given 
batch size
+     * @param batchSize The batch size for the work which needs to be executed
+     * @return
+     * @throws Exception
+     */
+    public T execute(int batchSize) throws Exception;
+  }
+
+  /**
+   * This class is a base implementation of a simple exponential back retry 
policy. The batch size
+   * and decaying factor are provided with the constructor. It reduces the 
batch size by dividing
+   * it by the decaying factor every time there is an exception in the execute 
method.
+   */
+  public static abstract class ExponentiallyDecayingBatchWork<T>
+    implements ExponentialBackOffRetry<T> {
+    private int batchSize;
+    private final int decayingFactor;
+    private int maxRetries;
+    private static final Logger LOG = 
LoggerFactory.getLogger(ExponentiallyDecayingBatchWork.class);
+
+    public ExponentiallyDecayingBatchWork(int batchSize, int reducingFactor, 
int maxRetries) {
+      if (batchSize <= 0) {
+        throw new IllegalArgumentException(String.format(
+          "Invalid batch size %d provided. Batch size must be greater than 0", 
batchSize));
+      }
+      this.batchSize = batchSize;
+      if (reducingFactor <= 1) {
+        throw new IllegalArgumentException(String.format(
+          "Invalid decaying factor %d provided. Decaying factor must be 
greater than 1",
+          batchSize));
+      }
+      if (maxRetries < 0) {
+        throw new IllegalArgumentException(String.format(
+          "Invalid number of maximum retries %d provided. It must be a 
non-negative integer value",
+          maxRetries));
+      }
+      //if maxRetries is 0 code retries until batch decays to zero
+      this.maxRetries = maxRetries;
+      this.decayingFactor = reducingFactor;
+    }
+
+    public T run() throws Exception {
+      int attempt = 0;
+      while (true) {
+        int size = getNextBatchSize();
+        if (size == 0) {
+          throw new RetryException("Batch size reduced to zero");
+        }
+        try {
+          return execute(size);
+        } catch (Exception ex) {
+          LOG.warn(String.format("Exception thrown while processing using a 
batch size %d", size),
+            ex);
+        } finally {
+          attempt++;
+          if (attempt == maxRetries) {
+            throw new RetryException(String.format("Maximum number of retry 
attempts %d exhausted", maxRetries));
+          }
+        }
+      }
+    }
+
+    private int getNextBatchSize() {
+      int ret = batchSize;
+      batchSize /= decayingFactor;
+      return ret;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/NonCatCallsWithCatalog.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/NonCatCallsWithCatalog.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/NonCatCallsWithCatalog.java
index f750ca2..377a550 100644
--- 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/NonCatCallsWithCatalog.java
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/NonCatCallsWithCatalog.java
@@ -482,7 +482,9 @@ public abstract class NonCatCallsWithCatalog {
           .build(conf);
       table.unsetCatName();
       client.createTable(table);
-      expected.add(new TableMeta(dbName, tableNames[i], 
TableType.MANAGED_TABLE.name()));
+      TableMeta tableMeta = new TableMeta(dbName, tableNames[i], 
TableType.MANAGED_TABLE.name());
+      tableMeta.setCatName(expectedCatalog());
+      expected.add(tableMeta);
     }
 
     List<String> types = 
Collections.singletonList(TableType.MANAGED_TABLE.name());

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestCatalogOldClient.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestCatalogOldClient.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestCatalogOldClient.java
index fc996c8..b3690ec 100644
--- 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestCatalogOldClient.java
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestCatalogOldClient.java
@@ -17,10 +17,10 @@
  */
 package org.apache.hadoop.hive.metastore;
 
-import org.apache.hadoop.hive.metastore.api.MetaException;
-
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
 
+import org.apache.hadoop.hive.metastore.api.MetaException;
+
 /**
  * This tests calls with an older client, to make sure that if the client 
supplies no catalog
  * information the server still does the right thing.  I assumes the default 
catalog

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
new file mode 100644
index 0000000..059c166
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
+import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.apache.hadoop.hive.metastore.api.Catalog;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.client.builder.CatalogBuilder;
+import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder;
+import org.apache.hadoop.hive.metastore.client.builder.PartitionBuilder;
+import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+@Category(MetastoreUnitTest.class)
+public class TestPartitionManagement {
+  private IMetaStoreClient client;
+  private Configuration conf;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = MetastoreConf.newMetastoreConf();
+    conf.setClass(MetastoreConf.ConfVars.EXPRESSION_PROXY_CLASS.getVarname(),
+      MsckPartitionExpressionProxy.class, PartitionExpressionProxy.class);
+    MetaStoreTestUtils.setConfForStandloneMode(conf);
+    conf.setBoolean(MetastoreConf.ConfVars.MULTITHREADED.getVarname(), false);
+    
MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(), 
conf);
+    TxnDbUtil.setConfValues(conf);
+    TxnDbUtil.prepDb(conf);
+    client = new HiveMetaStoreClient(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (client != null) {
+      // Drop any left over catalogs
+      List<String> catalogs = client.getCatalogs();
+      for (String catName : catalogs) {
+        if (!catName.equalsIgnoreCase(DEFAULT_CATALOG_NAME)) {
+          // First drop any databases in catalog
+          List<String> databases = client.getAllDatabases(catName);
+          for (String db : databases) {
+            client.dropDatabase(catName, db, true, false, true);
+          }
+          client.dropCatalog(catName);
+        } else {
+          List<String> databases = client.getAllDatabases(catName);
+          for (String db : databases) {
+            if (!db.equalsIgnoreCase(Warehouse.DEFAULT_DATABASE_NAME)) {
+              client.dropDatabase(catName, db, true, false, true);
+            }
+          }
+        }
+      }
+    }
+    try {
+      if (client != null) {
+        client.close();
+      }
+    } finally {
+      client = null;
+    }
+  }
+
+  private Map<String, Column> buildAllColumns() {
+    Map<String, Column> colMap = new HashMap<>(6);
+    Column[] cols = {new Column("b", "binary"), new Column("bo", "boolean"),
+      new Column("d", "date"), new Column("do", "double"), new Column("l", 
"bigint"),
+      new Column("s", "string")};
+    for (Column c : cols) {
+      colMap.put(c.colName, c);
+    }
+    return colMap;
+  }
+
+  private List<String> createMetadata(String catName, String dbName, String 
tableName,
+    List<String> partKeys, List<String> partKeyTypes, List<List<String>> 
partVals,
+    Map<String, Column> colMap, boolean isOrc)
+    throws TException {
+    if (!DEFAULT_CATALOG_NAME.equals(catName)) {
+      Catalog cat = new CatalogBuilder()
+        .setName(catName)
+        .setLocation(MetaStoreTestUtils.getTestWarehouseDir(catName))
+        .build();
+      client.createCatalog(cat);
+    }
+
+    Database db;
+    if (!DEFAULT_DATABASE_NAME.equals(dbName)) {
+      DatabaseBuilder dbBuilder = new DatabaseBuilder()
+        .setName(dbName);
+      dbBuilder.setCatalogName(catName);
+      db = dbBuilder.create(client, conf);
+    } else {
+      db = client.getDatabase(DEFAULT_CATALOG_NAME, DEFAULT_DATABASE_NAME);
+    }
+
+    TableBuilder tb = new TableBuilder()
+      .inDb(db)
+      .setTableName(tableName);
+
+    if (isOrc) {
+      tb.setInputFormat("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat")
+        .setOutputFormat("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat");
+    }
+
+    for (Column col : colMap.values()) {
+      tb.addCol(col.colName, col.colType);
+    }
+
+    if (partKeys != null) {
+      if (partKeyTypes == null) {
+        throw new IllegalArgumentException("partKeyTypes cannot be null when 
partKeys is non-null");
+      }
+      if (partKeys.size() != partKeyTypes.size()) {
+        throw new IllegalArgumentException("partKeys and partKeyTypes size 
should be same");
+      }
+      if (partVals.isEmpty()) {
+        throw new IllegalArgumentException("partVals cannot be empty for 
patitioned table");
+      }
+      for (int i = 0; i < partKeys.size(); i++) {
+        tb.addPartCol(partKeys.get(i), partKeyTypes.get(i));
+      }
+    }
+    Table table = tb.create(client, conf);
+
+    if (partKeys != null) {
+      for (List<String> partVal : partVals) {
+        new PartitionBuilder()
+          .inTable(table)
+          .setValues(partVal)
+          .addToTable(client, conf);
+      }
+    }
+
+    List<String> partNames = new ArrayList<>();
+    if (partKeys != null) {
+      for (int i = 0; i < partKeys.size(); i++) {
+        String partKey = partKeys.get(i);
+        for (String partVal : partVals.get(i)) {
+          String partName = partKey + "=" + partVal;
+          partNames.add(partName);
+        }
+      }
+    }
+    client.flushCache();
+    return partNames;
+  }
+
+  @Test
+  public void testPartitionDiscoveryDisabledByDefault() throws TException, 
IOException {
+    String dbName = "db1";
+    String tableName = "tbl1";
+    Map<String, Column> colMap = buildAllColumns();
+    List<String> partKeys = Lists.newArrayList("state", "dt");
+    List<String> partKeyTypes = Lists.newArrayList("string", "date");
+    List<List<String>> partVals = Lists.newArrayList(
+      Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"),
+      Lists.newArrayList("CA", "1986-04-28"),
+      Lists.newArrayList("MN", "2018-11-31"));
+    createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, 
partKeyTypes, partVals, colMap, false);
+    Table table = client.getTable(dbName, tableName);
+    List<Partition> partitions = client.listPartitions(dbName, tableName, 
(short) -1);
+    assertEquals(3, partitions.size());
+    String tableLocation = table.getSd().getLocation();
+    URI location = URI.create(tableLocation);
+    Path tablePath = new Path(location);
+    FileSystem fs = FileSystem.get(location, conf);
+    fs.mkdirs(new Path(tablePath, "state=WA/dt=2018-12-01"));
+    fs.mkdirs(new Path(tablePath, "state=UT/dt=2018-12-02"));
+    assertEquals(5, fs.listStatus(tablePath).length);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+
+    // partition discovery is not enabled via table property, so nothing 
should change on this table
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+
+    // table property is set to false, so no change expected
+    
table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY,
 "false");
+    client.alter_table(dbName, tableName, table);
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+  }
+
+  @Test
+  public void testPartitionDiscoveryEnabledBothTableTypes() throws TException, 
IOException {
+    String dbName = "db2";
+    String tableName = "tbl2";
+    Map<String, Column> colMap = buildAllColumns();
+    List<String> partKeys = Lists.newArrayList("state", "dt");
+    List<String> partKeyTypes = Lists.newArrayList("string", "date");
+    List<List<String>> partVals = Lists.newArrayList(
+      Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"),
+      Lists.newArrayList("CA", "1986-04-28"),
+      Lists.newArrayList("MN", "2018-11-31"));
+    createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, 
partKeyTypes, partVals, colMap, false);
+    Table table = client.getTable(dbName, tableName);
+    List<Partition> partitions = client.listPartitions(dbName, tableName, 
(short) -1);
+    assertEquals(3, partitions.size());
+    String tableLocation = table.getSd().getLocation();
+    URI location = URI.create(tableLocation);
+    Path tablePath = new Path(location);
+    FileSystem fs = FileSystem.get(location, conf);
+    Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01");
+    Path newPart2 = new Path(tablePath, "state=UT/dt=2018-12-02");
+    fs.mkdirs(newPart1);
+    fs.mkdirs(newPart2);
+    assertEquals(5, fs.listStatus(tablePath).length);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+
+    // table property is set to true, we expect 5 partitions
+    
table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY,
 "true");
+    client.alter_table(dbName, tableName, table);
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(5, partitions.size());
+
+    // change table type to external, delete a partition directory and make 
sure partition discovery works
+    table.getParameters().put("EXTERNAL", "true");
+    table.setTableType(TableType.EXTERNAL_TABLE.name());
+    client.alter_table(dbName, tableName, table);
+    boolean deleted = fs.delete(newPart1.getParent(), true);
+    assertTrue(deleted);
+    assertEquals(4, fs.listStatus(tablePath).length);
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(4, partitions.size());
+
+    // remove external tables from partition discovery and expect no changes 
even after partition is deleted
+    
conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES.getVarname(), 
TableType.MANAGED_TABLE.name());
+    deleted = fs.delete(newPart2.getParent(), true);
+    assertTrue(deleted);
+    assertEquals(3, fs.listStatus(tablePath).length);
+    // this doesn't remove partition because table is still external and we 
have remove external table type from
+    // partition discovery
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(4, partitions.size());
+
+    // no table types specified, msck will not select any tables
+    
conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES.getVarname(), 
"");
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(4, partitions.size());
+
+    // only EXTERNAL table type, msck should drop a partition now
+    
conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES.getVarname(), 
TableType.EXTERNAL_TABLE.name());
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+  }
+
+  @Test
+  public void testPartitionDiscoveryNonDefaultCatalog() throws TException, 
IOException {
+    String catName = "cat3";
+    String dbName = "db3";
+    String tableName = "tbl3";
+    Map<String, Column> colMap = buildAllColumns();
+    List<String> partKeys = Lists.newArrayList("state", "dt");
+    List<String> partKeyTypes = Lists.newArrayList("string", "date");
+    List<List<String>> partVals = Lists.newArrayList(
+      Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"),
+      Lists.newArrayList("CA", "1986-04-28"),
+      Lists.newArrayList("MN", "2018-11-31"));
+    createMetadata(catName, dbName, tableName, partKeys, partKeyTypes, 
partVals, colMap, false);
+    Table table = client.getTable(catName, dbName, tableName);
+    List<Partition> partitions = client.listPartitions(catName, dbName, 
tableName, (short) -1);
+    assertEquals(3, partitions.size());
+    String tableLocation = table.getSd().getLocation();
+    URI location = URI.create(tableLocation);
+    Path tablePath = new Path(location);
+    FileSystem fs = FileSystem.get(location, conf);
+    Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01");
+    Path newPart2 = new Path(tablePath, "state=UT/dt=2018-12-02");
+    fs.mkdirs(newPart1);
+    fs.mkdirs(newPart2);
+    assertEquals(5, fs.listStatus(tablePath).length);
+    
table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY,
 "true");
+    client.alter_table(catName, dbName, tableName, table);
+    // default catalog in conf is 'hive' but we are using 'cat3' as catName 
for this test, so msck should not fix
+    // anything for this one
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(catName, dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+
+    // using the correct catalog name, we expect msck to fix partitions
+    
conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_CATALOG_NAME.getVarname(), 
catName);
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(catName, dbName, tableName, (short) -1);
+    assertEquals(5, partitions.size());
+  }
+
+  @Test
+  public void testPartitionDiscoveryDBPattern() throws TException, IOException 
{
+    String dbName = "db4";
+    String tableName = "tbl4";
+    Map<String, Column> colMap = buildAllColumns();
+    List<String> partKeys = Lists.newArrayList("state", "dt");
+    List<String> partKeyTypes = Lists.newArrayList("string", "date");
+    List<List<String>> partVals = Lists.newArrayList(
+      Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"),
+      Lists.newArrayList("CA", "1986-04-28"),
+      Lists.newArrayList("MN", "2018-11-31"));
+    createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, 
partKeyTypes, partVals, colMap, false);
+    Table table = client.getTable(dbName, tableName);
+    List<Partition> partitions = client.listPartitions(dbName, tableName, 
(short) -1);
+    assertEquals(3, partitions.size());
+    String tableLocation = table.getSd().getLocation();
+    URI location = URI.create(tableLocation);
+    Path tablePath = new Path(location);
+    FileSystem fs = FileSystem.get(location, conf);
+    Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01");
+    Path newPart2 = new Path(tablePath, "state=UT/dt=2018-12-02");
+    fs.mkdirs(newPart1);
+    fs.mkdirs(newPart2);
+    assertEquals(5, fs.listStatus(tablePath).length);
+    
table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY,
 "true");
+    client.alter_table(dbName, tableName, table);
+    // no match for this db pattern, so we will see only 3 partitions
+    
conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_DATABASE_PATTERN.getVarname(),
 "*dbfoo*");
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+
+    // matching db pattern, we will see all 5 partitions now
+    
conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_DATABASE_PATTERN.getVarname(),
 "*db4*");
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(5, partitions.size());
+  }
+
+  @Test
+  public void testPartitionDiscoveryTablePattern() throws TException, 
IOException {
+    String dbName = "db5";
+    String tableName = "tbl5";
+    Map<String, Column> colMap = buildAllColumns();
+    List<String> partKeys = Lists.newArrayList("state", "dt");
+    List<String> partKeyTypes = Lists.newArrayList("string", "date");
+    List<List<String>> partVals = Lists.newArrayList(
+      Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"),
+      Lists.newArrayList("CA", "1986-04-28"),
+      Lists.newArrayList("MN", "2018-11-31"));
+    createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, 
partKeyTypes, partVals, colMap, false);
+    Table table = client.getTable(dbName, tableName);
+    List<Partition> partitions = client.listPartitions(dbName, tableName, 
(short) -1);
+    assertEquals(3, partitions.size());
+    String tableLocation = table.getSd().getLocation();
+    URI location = URI.create(tableLocation);
+    Path tablePath = new Path(location);
+    FileSystem fs = FileSystem.get(location, conf);
+    Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01");
+    Path newPart2 = new Path(tablePath, "state=UT/dt=2018-12-02");
+    fs.mkdirs(newPart1);
+    fs.mkdirs(newPart2);
+    assertEquals(5, fs.listStatus(tablePath).length);
+    
table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY,
 "true");
+    client.alter_table(dbName, tableName, table);
+    // no match for this table pattern, so we will see only 3 partitions
+    
conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_PATTERN.getVarname(),
 "*tblfoo*");
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+
+    // matching table pattern, we will see all 5 partitions now
+    
conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_PATTERN.getVarname(),
 "tbl5*");
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(5, partitions.size());
+  }
+
+  @Test
+  public void testPartitionDiscoveryTransactionalTable()
+    throws TException, IOException, InterruptedException, ExecutionException {
+    String dbName = "db6";
+    String tableName = "tbl6";
+    Map<String, Column> colMap = buildAllColumns();
+    List<String> partKeys = Lists.newArrayList("state", "dt");
+    List<String> partKeyTypes = Lists.newArrayList("string", "date");
+    List<List<String>> partVals = Lists.newArrayList(
+      Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"),
+      Lists.newArrayList("CA", "1986-04-28"),
+      Lists.newArrayList("MN", "2018-11-31"));
+    createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, 
partKeyTypes, partVals, colMap, true);
+    Table table = client.getTable(dbName, tableName);
+    List<Partition> partitions = client.listPartitions(dbName, tableName, 
(short) -1);
+    assertEquals(3, partitions.size());
+    String tableLocation = table.getSd().getLocation();
+    URI location = URI.create(tableLocation);
+    Path tablePath = new Path(location);
+    FileSystem fs = FileSystem.get(location, conf);
+    Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01");
+    Path newPart2 = new Path(tablePath, "state=UT/dt=2018-12-02");
+    fs.mkdirs(newPart1);
+    fs.mkdirs(newPart2);
+    assertEquals(5, fs.listStatus(tablePath).length);
+    
table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY,
 "true");
+    table.getParameters().put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, 
"true");
+    
table.getParameters().put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES,
+      TransactionalValidationListener.INSERTONLY_TRANSACTIONAL_PROPERTY);
+    client.alter_table(dbName, tableName, table);
+
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(5, partitions.size());
+
+    // only one partition discovery task is running, there will be no skipped 
attempts
+    assertEquals(0, PartitionManagementTask.getSkippedAttempts());
+
+    // delete a partition from fs, and submit 3 tasks at the same time each of 
them trying to acquire X lock on the
+    // same table, only one of them will run other attempts will be skipped
+    boolean deleted = fs.delete(newPart1.getParent(), true);
+    assertTrue(deleted);
+    assertEquals(4, fs.listStatus(tablePath).length);
+
+    // 3 tasks are submitted at the same time, only one will eventually lock 
the table and only one get to run at a time
+    // This is to simulate, skipping partition discovery task attempt when 
previous attempt is still incomplete
+    PartitionManagementTask partitionDiscoveryTask1 = new 
PartitionManagementTask();
+    partitionDiscoveryTask1.setConf(conf);
+    PartitionManagementTask partitionDiscoveryTask2 = new 
PartitionManagementTask();
+    partitionDiscoveryTask2.setConf(conf);
+    PartitionManagementTask partitionDiscoveryTask3 = new 
PartitionManagementTask();
+    partitionDiscoveryTask3.setConf(conf);
+    List<PartitionManagementTask> tasks = Lists
+      .newArrayList(partitionDiscoveryTask1, partitionDiscoveryTask2, 
partitionDiscoveryTask3);
+    ExecutorService executorService = Executors.newFixedThreadPool(3);
+    int successBefore = PartitionManagementTask.getCompletedAttempts();
+    int skippedBefore = PartitionManagementTask.getSkippedAttempts();
+    List<Future<?>> futures = new ArrayList<>();
+    for (PartitionManagementTask task : tasks) {
+      futures.add(executorService.submit(task));
+    }
+    for (Future<?> future : futures) {
+      future.get();
+    }
+    int successAfter = PartitionManagementTask.getCompletedAttempts();
+    int skippedAfter = PartitionManagementTask.getSkippedAttempts();
+    assertEquals(1, successAfter - successBefore);
+    assertEquals(2, skippedAfter - skippedBefore);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(4, partitions.size());
+  }
+
+  @Test
+  public void testPartitionRetention() throws TException, IOException, 
InterruptedException {
+    String dbName = "db7";
+    String tableName = "tbl7";
+    Map<String, Column> colMap = buildAllColumns();
+    List<String> partKeys = Lists.newArrayList("state", "dt");
+    List<String> partKeyTypes = Lists.newArrayList("string", "date");
+    List<List<String>> partVals = Lists.newArrayList(
+      Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"),
+      Lists.newArrayList("CA", "1986-04-28"),
+      Lists.newArrayList("MN", "2018-11-31"));
+    createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, 
partKeyTypes, partVals, colMap, false);
+    Table table = client.getTable(dbName, tableName);
+    List<Partition> partitions = client.listPartitions(dbName, tableName, 
(short) -1);
+    assertEquals(3, partitions.size());
+    String tableLocation = table.getSd().getLocation();
+    URI location = URI.create(tableLocation);
+    Path tablePath = new Path(location);
+    FileSystem fs = FileSystem.get(location, conf);
+    Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01");
+    Path newPart2 = new Path(tablePath, "state=UT/dt=2018-12-02");
+    fs.mkdirs(newPart1);
+    fs.mkdirs(newPart2);
+    assertEquals(5, fs.listStatus(tablePath).length);
+    
table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY,
 "true");
+    
table.getParameters().put(PartitionManagementTask.PARTITION_RETENTION_PERIOD_TBLPROPERTY,
 "20000ms");
+    client.alter_table(dbName, tableName, table);
+
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(5, partitions.size());
+
+    // after 30s all partitions should have been gone
+    Thread.sleep(30 * 1000);
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(0, partitions.size());
+  }
+
+  @Test
+  public void testPartitionDiscoverySkipInvalidPath() throws TException, 
IOException, InterruptedException {
+    String dbName = "db8";
+    String tableName = "tbl8";
+    Map<String, Column> colMap = buildAllColumns();
+    List<String> partKeys = Lists.newArrayList("state", "dt");
+    List<String> partKeyTypes = Lists.newArrayList("string", "date");
+    List<List<String>> partVals = Lists.newArrayList(
+      Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"),
+      Lists.newArrayList("CA", "1986-04-28"),
+      Lists.newArrayList("MN", "2018-11-31"));
+    createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, 
partKeyTypes, partVals, colMap, false);
+    Table table = client.getTable(dbName, tableName);
+    List<Partition> partitions = client.listPartitions(dbName, tableName, 
(short) -1);
+    assertEquals(3, partitions.size());
+    String tableLocation = table.getSd().getLocation();
+    URI location = URI.create(tableLocation);
+    Path tablePath = new Path(location);
+    FileSystem fs = FileSystem.get(location, conf);
+    Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01");
+    Path newPart2 = new Path(tablePath, "state=UT/dt=");
+    fs.mkdirs(newPart1);
+    fs.mkdirs(newPart2);
+    assertEquals(5, fs.listStatus(tablePath).length);
+    
table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY,
 "true");
+    // empty retention period basically means disabled
+    
table.getParameters().put(PartitionManagementTask.PARTITION_RETENTION_PERIOD_TBLPROPERTY,
 "");
+    client.alter_table(dbName, tableName, table);
+
+    // there is one partition with invalid path which will get skipped
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(4, partitions.size());
+  }
+
+  private void runPartitionManagementTask(Configuration conf) {
+    PartitionManagementTask task = new PartitionManagementTask();
+    task.setConf(conf);
+    task.run();
+  }
+
+  private static class Column {
+    private String colName;
+    private String colType;
+
+    public Column(final String colName, final String colType) {
+      this.colName = colName;
+      this.colType = colType;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestGetTableMeta.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestGetTableMeta.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestGetTableMeta.java
index 59daa52..7720aa2 100644
--- 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestGetTableMeta.java
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestGetTableMeta.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.annotation.MetastoreCheckinTest;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
 import org.apache.hadoop.hive.metastore.api.CreationMetadata;
 import org.apache.hadoop.hive.metastore.api.Catalog;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -123,6 +124,7 @@ public class TestGetTableMeta extends MetaStoreClientTest {
   private Table createTable(String dbName, String tableName, TableType type)
           throws Exception {
     TableBuilder builder = new TableBuilder()
+            .setCatName("hive")
             .setDbName(dbName)
             .setTableName(tableName)
             .addCol("id", "int")
@@ -153,6 +155,7 @@ public class TestGetTableMeta extends MetaStoreClientTest {
     client.createTable(table);
     TableMeta tableMeta = new TableMeta(dbName, tableName, type.name());
     tableMeta.setComments(comment);
+    tableMeta.setCatName("hive");
     return tableMeta;
   }
 
@@ -160,7 +163,9 @@ public class TestGetTableMeta extends MetaStoreClientTest {
           throws Exception {
     Table table  = createTable(dbName, tableName, type);
     client.createTable(table);
-    return new TableMeta(dbName, tableName, type.name());
+    TableMeta tableMeta = new TableMeta(dbName, tableName, type.name());
+    tableMeta.setCatName("hive");
+    return tableMeta;
   }
 
   private void assertTableMetas(int[] expected, List<TableMeta> 
actualTableMetas) {
@@ -301,7 +306,9 @@ public class TestGetTableMeta extends MetaStoreClientTest {
           .addCol("id", "int")
           .addCol("name", "string")
           .build(metaStore.getConf()));
-      expected.add(new TableMeta(dbName, tableNames[i], 
TableType.MANAGED_TABLE.name()));
+      TableMeta tableMeta = new TableMeta(dbName, tableNames[i], 
TableType.MANAGED_TABLE.name());
+      tableMeta.setCatName(catName);
+      expected.add(tableMeta);
     }
 
     List<String> types = 
Collections.singletonList(TableType.MANAGED_TABLE.name());

Reply via email to