Repository: hive Updated Branches: refs/heads/master 54bba9cbf -> 64bea0354
http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionIterable.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionIterable.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionIterable.java new file mode 100644 index 0000000..2837ff4 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionIterable.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hive.metastore.api.MetastoreException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; + + +/** + * PartitionIterable - effectively a lazy Iterable<Partition> + * Sometimes, we have a need for iterating through a list of partitions, + * but the list of partitions can be too big to fetch as a single object. + * Thus, the goal of PartitionIterable is to act as an Iterable<Partition> + * while lazily fetching each relevant partition, one after the other as + * independent metadata calls. + * It is very likely that any calls to PartitionIterable are going to result + * in a large number of calls, so use sparingly only when the memory cost + * of fetching all the partitions in one shot is too prohibitive. + * This is still pretty costly in that it would retain a list of partition + * names, but that should be far less expensive than the entire partition + * objects. + * Note that remove() is an illegal call on this, and will result in an + * IllegalStateException. + */ +public class PartitionIterable implements Iterable<Partition> { + + @Override + public Iterator<Partition> iterator() { + return new Iterator<Partition>() { + + private boolean initialized = false; + private Iterator<Partition> ptnsIterator = null; + + private Iterator<String> partitionNamesIter = null; + private Iterator<Partition> batchIter = null; + + private void initialize() { + if (!initialized) { + if (currType == Type.LIST_PROVIDED) { + ptnsIterator = ptnsProvided.iterator(); + } else { + partitionNamesIter = partitionNames.iterator(); + } + initialized = true; + } + } + + @Override + public boolean hasNext() { + initialize(); + if (currType == Type.LIST_PROVIDED) { + return ptnsIterator.hasNext(); + } else { + return ((batchIter != null) && batchIter.hasNext()) || partitionNamesIter.hasNext(); + } + } + + @Override + public Partition next() { + initialize(); + if (currType == Type.LIST_PROVIDED) { + return ptnsIterator.next(); + } + + if ((batchIter == null) || !batchIter.hasNext()) { + getNextBatch(); + } + + return batchIter.next(); + } + + private void getNextBatch() { + int batch_counter = 0; + List<String> nameBatch = new ArrayList<String>(); + while (batch_counter < batch_size && partitionNamesIter.hasNext()) { + nameBatch.add(partitionNamesIter.next()); + batch_counter++; + } + try { + batchIter = + msc.getPartitionsByNames(table.getCatName(), table.getDbName(), table.getTableName(), nameBatch).iterator(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void remove() { + throw new IllegalStateException( + "PartitionIterable is a read-only iterable and remove() is unsupported"); + } + }; + } + + enum Type { + LIST_PROVIDED, // Where a List<Partitions is already provided + LAZY_FETCH_PARTITIONS // Where we want to fetch Partitions lazily when they're needed. + } + + private final Type currType; + + // used for LIST_PROVIDED cases + private Collection<Partition> ptnsProvided = null; + + // used for LAZY_FETCH_PARTITIONS cases + private IMetaStoreClient msc = null; // Assumes one instance of this + single-threaded compilation for each query. + private Table table = null; + private List<String> partitionNames = null; + private int batch_size; + + /** + * Dummy constructor, which simply acts as an iterator on an already-present + * list of partitions, allows for easy drop-in replacement for other methods + * that already have a List<Partition> + */ + public PartitionIterable(Collection<Partition> ptnsProvided) { + this.currType = Type.LIST_PROVIDED; + this.ptnsProvided = ptnsProvided; + } + + /** + * Primary constructor that fetches all partitions in a given table, given + * a Hive object and a table object, and a partial partition spec. + */ + public PartitionIterable(IMetaStoreClient msc, Table table, int batch_size) throws MetastoreException { + this.currType = Type.LAZY_FETCH_PARTITIONS; + this.msc = msc; + this.table = table; + this.batch_size = batch_size; + partitionNames = getPartitionNames(msc, table.getCatName(), table.getDbName(), table.getTableName(), (short) -1); + } + + public List<String> getPartitionNames(IMetaStoreClient msc, String catName, String dbName, String tblName, short max) + throws MetastoreException { + try { + return msc.listPartitionNames(catName, dbName, tblName, max); + } catch (Exception e) { + throw new MetastoreException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java new file mode 100644 index 0000000..901bf80 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableMeta; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.TimeValidator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Partition management task is primarily responsible for partition retention and discovery based on table properties. + * + * Partition Retention - If "partition.retention.period" table property is set with retention interval, when this + * metastore task runs periodically, it will drop partitions with age (creation time) greater than retention period. + * Dropping partitions after retention period will also delete the data in that partition. + * + * Partition Discovery - If "discover.partitions" table property is set, this metastore task monitors table location + * for newly added partition directories and create partition objects if it does not exist. Also, if partition object + * exist and if corresponding directory does not exists under table location then the partition object will be dropped. + * + */ +public class PartitionManagementTask implements MetastoreTaskThread { + private static final Logger LOG = LoggerFactory.getLogger(PartitionManagementTask.class); + public static final String DISCOVER_PARTITIONS_TBLPROPERTY = "discover.partitions"; + public static final String PARTITION_RETENTION_PERIOD_TBLPROPERTY = "partition.retention.period"; + private static final Lock lock = new ReentrantLock(); + // these are just for testing + private static int completedAttempts; + private static int skippedAttempts; + + private Configuration conf; + + @Override + public long runFrequency(TimeUnit unit) { + return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TASK_FREQUENCY, unit); + } + + @Override + public void setConf(Configuration configuration) { + // we modify conf in setupConf(), so we make a copy + conf = new Configuration(configuration); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void run() { + if (lock.tryLock()) { + skippedAttempts = 0; + String qualifiedTableName = null; + IMetaStoreClient msc = null; + try { + msc = new HiveMetaStoreClient(conf); + List<Table> candidateTables = new ArrayList<>(); + String catalogName = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_CATALOG_NAME); + String dbPattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_DATABASE_PATTERN); + String tablePattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_PATTERN); + String tableTypes = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES); + Set<String> tableTypesSet = new HashSet<>(); + List<String> tableTypesList; + // if tableTypes is empty, then a list with single empty string has to specified to scan no tables. + // specifying empty here is equivalent to disabling the partition discovery altogether as it scans no tables. + if (tableTypes.isEmpty()) { + tableTypesList = Lists.newArrayList(""); + } else { + for (String type : tableTypes.split(",")) { + try { + tableTypesSet.add(TableType.valueOf(type.trim().toUpperCase()).name()); + } catch (IllegalArgumentException e) { + // ignore + LOG.warn("Unknown table type: {}", type); + } + } + tableTypesList = Lists.newArrayList(tableTypesSet); + } + List<TableMeta> foundTableMetas = msc.getTableMeta(catalogName, dbPattern, tablePattern, tableTypesList); + LOG.info("Looking for tables using catalog: {} dbPattern: {} tablePattern: {} found: {}", catalogName, + dbPattern, tablePattern, foundTableMetas.size()); + + for (TableMeta tableMeta : foundTableMetas) { + Table table = msc.getTable(tableMeta.getCatName(), tableMeta.getDbName(), tableMeta.getTableName()); + if (table.getParameters() != null && table.getParameters().containsKey(DISCOVER_PARTITIONS_TBLPROPERTY) && + table.getParameters().get(DISCOVER_PARTITIONS_TBLPROPERTY).equalsIgnoreCase("true")) { + candidateTables.add(table); + } + } + if (candidateTables.isEmpty()) { + return; + } + + // TODO: Msck creates MetastoreClient (MSC) on its own. MSC creation is expensive. Sharing MSC also + // will not be safe unless synchronized MSC is used. Using synchronized MSC in multi-threaded context also + // defeats the purpose of thread pooled msck repair. + int threadPoolSize = MetastoreConf.getIntVar(conf, + MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TASK_THREAD_POOL_SIZE); + final ExecutorService executorService = Executors + .newFixedThreadPool(Math.min(candidateTables.size(), threadPoolSize), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("PartitionDiscoveryTask-%d").build()); + CountDownLatch countDownLatch = new CountDownLatch(candidateTables.size()); + LOG.info("Found {} candidate tables for partition discovery", candidateTables.size()); + setupMsckConf(); + for (Table table : candidateTables) { + qualifiedTableName = Warehouse.getCatalogQualifiedTableName(table); + long retentionSeconds = getRetentionPeriodInSeconds(table); + LOG.info("Running partition discovery for table {} retentionPeriod: {}s", qualifiedTableName, + retentionSeconds); + // this always runs in 'sync' mode where partitions can be added and dropped + MsckInfo msckInfo = new MsckInfo(table.getCatName(), table.getDbName(), table.getTableName(), + null, null, true, true, true, retentionSeconds); + executorService.submit(new MsckThread(msckInfo, conf, qualifiedTableName, countDownLatch)); + } + countDownLatch.await(); + executorService.shutdownNow(); + } catch (Exception e) { + LOG.error("Exception while running partition discovery task for table: " + qualifiedTableName, e); + } finally { + if (msc != null) { + msc.close(); + } + lock.unlock(); + } + completedAttempts++; + } else { + skippedAttempts++; + LOG.info("Lock is held by some other partition discovery task. Skipping this attempt..#{}", skippedAttempts); + } + } + + static long getRetentionPeriodInSeconds(final Table table) { + String retentionPeriod; + long retentionSeconds = -1; + if (table.getParameters() != null && table.getParameters().containsKey(PARTITION_RETENTION_PERIOD_TBLPROPERTY)) { + retentionPeriod = table.getParameters().get(PARTITION_RETENTION_PERIOD_TBLPROPERTY); + if (retentionPeriod.isEmpty()) { + LOG.warn("'{}' table property is defined but empty. Skipping retention period..", + PARTITION_RETENTION_PERIOD_TBLPROPERTY); + } else { + try { + TimeValidator timeValidator = new TimeValidator(TimeUnit.SECONDS); + timeValidator.validate(retentionPeriod); + retentionSeconds = MetastoreConf.convertTimeStr(retentionPeriod, TimeUnit.SECONDS, TimeUnit.SECONDS); + } catch (IllegalArgumentException e) { + LOG.warn("'{}' retentionPeriod value is invalid. Skipping retention period..", retentionPeriod); + // will return -1 + } + } + } + return retentionSeconds; + } + + private void setupMsckConf() { + // if invalid partition directory appears, we just skip and move on. We don't want partition management to throw + // when invalid path is encountered as these are background threads. We just want to skip and move on. Users will + // have to fix the invalid paths via external means. + conf.set(MetastoreConf.ConfVars.MSCK_PATH_VALIDATION.getVarname(), "skip"); + // since msck runs in thread pool and each of them create their own metastore client, we don't want explosion of + // connections to metastore for embedded mode. Also we don't need too many db connections anyway. + conf.setInt(MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS.getVarname(), 2); + } + + private static class MsckThread implements Runnable { + private MsckInfo msckInfo; + private Configuration conf; + private String qualifiedTableName; + private CountDownLatch countDownLatch; + + MsckThread(MsckInfo msckInfo, Configuration conf, String qualifiedTableName, CountDownLatch countDownLatch) { + this.msckInfo = msckInfo; + this.conf = conf; + this.qualifiedTableName = qualifiedTableName; + this.countDownLatch = countDownLatch; + } + + @Override + public void run() { + try { + Msck msck = new Msck( true, true); + msck.init(conf); + msck.repair(msckInfo); + } catch (Exception e) { + LOG.error("Exception while running partition discovery task for table: " + qualifiedTableName, e); + } finally { + // there is no recovery from exception, so we always count down and retry in next attempt + countDownLatch.countDown(); + } + } + } + + @VisibleForTesting + public static int getSkippedAttempts() { + return skippedAttempts; + } + + @VisibleForTesting + public static int getCompletedAttempts() { + return completedAttempts; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java index f3b3866..363db35 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; @@ -58,10 +59,14 @@ import org.apache.commons.collections.ListUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.metastore.ColumnType; import org.apache.hadoop.hive.metastore.HiveMetaStore; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.RawStore; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; @@ -71,6 +76,8 @@ import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.MetastoreException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionListComposingSpec; @@ -590,7 +597,7 @@ public class MetaStoreServerUtils { /** Duplicates AcidUtils; used in a couple places in metastore. */ public static boolean isTransactionalTable(Map<String, String> params) { String transactionalProp = params.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); - return (transactionalProp != null && "true".equalsIgnoreCase(transactionalProp)); + return "true".equalsIgnoreCase(transactionalProp); } /** @@ -1294,4 +1301,162 @@ public class MetaStoreServerUtils { return hashCode; } } + + // Some util methods from Hive.java, this is copied so as to avoid circular dependency with hive ql + public static Path getPath(Table table) { + String location = table.getSd().getLocation(); + if (location == null) { + return null; + } + return new Path(location); + } + + public static List<Partition> getAllPartitionsOf(IMetaStoreClient msc, Table table) throws MetastoreException { + try { + return msc.listPartitions(table.getCatName(), table.getDbName(), table.getTableName(), (short)-1); + } catch (Exception e) { + throw new MetastoreException(e); + } + } + + public static boolean isPartitioned(Table table) { + if (getPartCols(table) == null) { + return false; + } + return (getPartCols(table).size() != 0); + } + + public static List<FieldSchema> getPartCols(Table table) { + List<FieldSchema> partKeys = table.getPartitionKeys(); + if (partKeys == null) { + partKeys = new ArrayList<>(); + table.setPartitionKeys(partKeys); + } + return partKeys; + } + + public static List<String> getPartColNames(Table table) { + List<String> partColNames = new ArrayList<>(); + for (FieldSchema key : getPartCols(table)) { + partColNames.add(key.getName()); + } + return partColNames; + } + + public static Path getDataLocation(Table table, Partition partition) { + if (isPartitioned(table)) { + if (partition.getSd() == null) { + return null; + } else { + return new Path(partition.getSd().getLocation()); + } + } else { + if (table.getSd() == null) { + return null; + } + else { + return getPath(table); + } + } + } + + public static String getPartitionName(Table table, Partition partition) { + try { + return Warehouse.makePartName(getPartCols(table), partition.getValues()); + } catch (MetaException e) { + throw new RuntimeException(e); + } + } + + public static Map<String, String> getPartitionSpec(Table table, Partition partition) { + return Warehouse.makeSpecFromValues(getPartCols(table), partition.getValues()); + } + + public static Partition getPartition(IMetaStoreClient msc, Table tbl, Map<String, String> partSpec) throws MetastoreException { + List<String> pvals = new ArrayList<String>(); + for (FieldSchema field : getPartCols(tbl)) { + String val = partSpec.get(field.getName()); + pvals.add(val); + } + Partition tpart = null; + try { + tpart = msc.getPartition(tbl.getCatName(), tbl.getDbName(), tbl.getTableName(), pvals); + } catch (NoSuchObjectException nsoe) { + // this means no partition exists for the given partition + // key value pairs - thrift cannot handle null return values, hence + // getPartition() throws NoSuchObjectException to indicate null partition + } catch (Exception e) { + LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); + throw new MetastoreException(e); + } + + return tpart; + } + + + /** + * Get the partition name from the path. + * + * @param tablePath + * Path of the table. + * @param partitionPath + * Path of the partition. + * @param partCols + * Set of partition columns from table definition + * @return Partition name, for example partitiondate=2008-01-01 + */ + public static String getPartitionName(Path tablePath, Path partitionPath, Set<String> partCols) { + String result = null; + Path currPath = partitionPath; + LOG.debug("tablePath:" + tablePath + ", partCols: " + partCols); + + while (currPath != null && !tablePath.equals(currPath)) { + // format: partition=p_val + // Add only when table partition colName matches + String[] parts = currPath.getName().split("="); + if (parts.length > 0) { + if (parts.length != 2) { + LOG.warn(currPath.getName() + " is not a valid partition name"); + return result; + } + + String partitionName = parts[0]; + if (partCols.contains(partitionName)) { + if (result == null) { + result = currPath.getName(); + } else { + result = currPath.getName() + Path.SEPARATOR + result; + } + } + } + currPath = currPath.getParent(); + LOG.debug("currPath=" + currPath); + } + return result; + } + + public static Partition createMetaPartitionObject(Table tbl, Map<String, String> partSpec, Path location) + throws MetastoreException { + List<String> pvals = new ArrayList<String>(); + for (FieldSchema field : getPartCols(tbl)) { + String val = partSpec.get(field.getName()); + if (val == null || val.isEmpty()) { + throw new MetastoreException("partition spec is invalid; field " + + field.getName() + " does not exist or is empty"); + } + pvals.add(val); + } + + Partition tpart = new Partition(); + tpart.setCatName(tbl.getCatName()); + tpart.setDbName(tbl.getDbName()); + tpart.setTableName(tbl.getTableName()); + tpart.setValues(pvals); + + if (!MetaStoreUtils.isView(tbl)) { + tpart.setSd(tbl.getSd().deepCopy()); + tpart.getSd().setLocation((location != null) ? location.toString() : null); + } + return tpart; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryUtilities.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryUtilities.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryUtilities.java new file mode 100644 index 0000000..22513b9 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryUtilities.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RetryUtilities { + public static class RetryException extends Exception { + private static final long serialVersionUID = 1L; + + public RetryException(Exception ex) { + super(ex); + } + + public RetryException(String msg) { + super(msg); + } + } + + /** + * Interface used to create a ExponentialBackOffRetry policy + */ + public static interface ExponentialBackOffRetry<T> { + /** + * This method should be called by implementations of this ExponentialBackOffRetry policy + * It represents the actual work which needs to be done based on a given batch size + * @param batchSize The batch size for the work which needs to be executed + * @return + * @throws Exception + */ + public T execute(int batchSize) throws Exception; + } + + /** + * This class is a base implementation of a simple exponential back retry policy. The batch size + * and decaying factor are provided with the constructor. It reduces the batch size by dividing + * it by the decaying factor every time there is an exception in the execute method. + */ + public static abstract class ExponentiallyDecayingBatchWork<T> + implements ExponentialBackOffRetry<T> { + private int batchSize; + private final int decayingFactor; + private int maxRetries; + private static final Logger LOG = LoggerFactory.getLogger(ExponentiallyDecayingBatchWork.class); + + public ExponentiallyDecayingBatchWork(int batchSize, int reducingFactor, int maxRetries) { + if (batchSize <= 0) { + throw new IllegalArgumentException(String.format( + "Invalid batch size %d provided. Batch size must be greater than 0", batchSize)); + } + this.batchSize = batchSize; + if (reducingFactor <= 1) { + throw new IllegalArgumentException(String.format( + "Invalid decaying factor %d provided. Decaying factor must be greater than 1", + batchSize)); + } + if (maxRetries < 0) { + throw new IllegalArgumentException(String.format( + "Invalid number of maximum retries %d provided. It must be a non-negative integer value", + maxRetries)); + } + //if maxRetries is 0 code retries until batch decays to zero + this.maxRetries = maxRetries; + this.decayingFactor = reducingFactor; + } + + public T run() throws Exception { + int attempt = 0; + while (true) { + int size = getNextBatchSize(); + if (size == 0) { + throw new RetryException("Batch size reduced to zero"); + } + try { + return execute(size); + } catch (Exception ex) { + LOG.warn(String.format("Exception thrown while processing using a batch size %d", size), + ex); + } finally { + attempt++; + if (attempt == maxRetries) { + throw new RetryException(String.format("Maximum number of retry attempts %d exhausted", maxRetries)); + } + } + } + } + + private int getNextBatchSize() { + int ret = batchSize; + batchSize /= decayingFactor; + return ret; + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/NonCatCallsWithCatalog.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/NonCatCallsWithCatalog.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/NonCatCallsWithCatalog.java index f750ca2..377a550 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/NonCatCallsWithCatalog.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/NonCatCallsWithCatalog.java @@ -482,7 +482,9 @@ public abstract class NonCatCallsWithCatalog { .build(conf); table.unsetCatName(); client.createTable(table); - expected.add(new TableMeta(dbName, tableNames[i], TableType.MANAGED_TABLE.name())); + TableMeta tableMeta = new TableMeta(dbName, tableNames[i], TableType.MANAGED_TABLE.name()); + tableMeta.setCatName(expectedCatalog()); + expected.add(tableMeta); } List<String> types = Collections.singletonList(TableType.MANAGED_TABLE.name()); http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestCatalogOldClient.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestCatalogOldClient.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestCatalogOldClient.java index fc996c8..b3690ec 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestCatalogOldClient.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestCatalogOldClient.java @@ -17,10 +17,10 @@ */ package org.apache.hadoop.hive.metastore; -import org.apache.hadoop.hive.metastore.api.MetaException; - import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; +import org.apache.hadoop.hive.metastore.api.MetaException; + /** * This tests calls with an older client, to make sure that if the client supplies no catalog * information the server still does the right thing. I assumes the default catalog http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java new file mode 100644 index 0000000..059c166 --- /dev/null +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java @@ -0,0 +1,581 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore; + +import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; +import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest; +import org.apache.hadoop.hive.metastore.api.Catalog; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.client.builder.CatalogBuilder; +import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder; +import org.apache.hadoop.hive.metastore.client.builder.PartitionBuilder; +import org.apache.hadoop.hive.metastore.client.builder.TableBuilder; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.thrift.TException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +@Category(MetastoreUnitTest.class) +public class TestPartitionManagement { + private IMetaStoreClient client; + private Configuration conf; + + @Before + public void setUp() throws Exception { + conf = MetastoreConf.newMetastoreConf(); + conf.setClass(MetastoreConf.ConfVars.EXPRESSION_PROXY_CLASS.getVarname(), + MsckPartitionExpressionProxy.class, PartitionExpressionProxy.class); + MetaStoreTestUtils.setConfForStandloneMode(conf); + conf.setBoolean(MetastoreConf.ConfVars.MULTITHREADED.getVarname(), false); + MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(), conf); + TxnDbUtil.setConfValues(conf); + TxnDbUtil.prepDb(conf); + client = new HiveMetaStoreClient(conf); + } + + @After + public void tearDown() throws Exception { + if (client != null) { + // Drop any left over catalogs + List<String> catalogs = client.getCatalogs(); + for (String catName : catalogs) { + if (!catName.equalsIgnoreCase(DEFAULT_CATALOG_NAME)) { + // First drop any databases in catalog + List<String> databases = client.getAllDatabases(catName); + for (String db : databases) { + client.dropDatabase(catName, db, true, false, true); + } + client.dropCatalog(catName); + } else { + List<String> databases = client.getAllDatabases(catName); + for (String db : databases) { + if (!db.equalsIgnoreCase(Warehouse.DEFAULT_DATABASE_NAME)) { + client.dropDatabase(catName, db, true, false, true); + } + } + } + } + } + try { + if (client != null) { + client.close(); + } + } finally { + client = null; + } + } + + private Map<String, Column> buildAllColumns() { + Map<String, Column> colMap = new HashMap<>(6); + Column[] cols = {new Column("b", "binary"), new Column("bo", "boolean"), + new Column("d", "date"), new Column("do", "double"), new Column("l", "bigint"), + new Column("s", "string")}; + for (Column c : cols) { + colMap.put(c.colName, c); + } + return colMap; + } + + private List<String> createMetadata(String catName, String dbName, String tableName, + List<String> partKeys, List<String> partKeyTypes, List<List<String>> partVals, + Map<String, Column> colMap, boolean isOrc) + throws TException { + if (!DEFAULT_CATALOG_NAME.equals(catName)) { + Catalog cat = new CatalogBuilder() + .setName(catName) + .setLocation(MetaStoreTestUtils.getTestWarehouseDir(catName)) + .build(); + client.createCatalog(cat); + } + + Database db; + if (!DEFAULT_DATABASE_NAME.equals(dbName)) { + DatabaseBuilder dbBuilder = new DatabaseBuilder() + .setName(dbName); + dbBuilder.setCatalogName(catName); + db = dbBuilder.create(client, conf); + } else { + db = client.getDatabase(DEFAULT_CATALOG_NAME, DEFAULT_DATABASE_NAME); + } + + TableBuilder tb = new TableBuilder() + .inDb(db) + .setTableName(tableName); + + if (isOrc) { + tb.setInputFormat("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat") + .setOutputFormat("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"); + } + + for (Column col : colMap.values()) { + tb.addCol(col.colName, col.colType); + } + + if (partKeys != null) { + if (partKeyTypes == null) { + throw new IllegalArgumentException("partKeyTypes cannot be null when partKeys is non-null"); + } + if (partKeys.size() != partKeyTypes.size()) { + throw new IllegalArgumentException("partKeys and partKeyTypes size should be same"); + } + if (partVals.isEmpty()) { + throw new IllegalArgumentException("partVals cannot be empty for patitioned table"); + } + for (int i = 0; i < partKeys.size(); i++) { + tb.addPartCol(partKeys.get(i), partKeyTypes.get(i)); + } + } + Table table = tb.create(client, conf); + + if (partKeys != null) { + for (List<String> partVal : partVals) { + new PartitionBuilder() + .inTable(table) + .setValues(partVal) + .addToTable(client, conf); + } + } + + List<String> partNames = new ArrayList<>(); + if (partKeys != null) { + for (int i = 0; i < partKeys.size(); i++) { + String partKey = partKeys.get(i); + for (String partVal : partVals.get(i)) { + String partName = partKey + "=" + partVal; + partNames.add(partName); + } + } + } + client.flushCache(); + return partNames; + } + + @Test + public void testPartitionDiscoveryDisabledByDefault() throws TException, IOException { + String dbName = "db1"; + String tableName = "tbl1"; + Map<String, Column> colMap = buildAllColumns(); + List<String> partKeys = Lists.newArrayList("state", "dt"); + List<String> partKeyTypes = Lists.newArrayList("string", "date"); + List<List<String>> partVals = Lists.newArrayList( + Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"), + Lists.newArrayList("CA", "1986-04-28"), + Lists.newArrayList("MN", "2018-11-31")); + createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, partKeyTypes, partVals, colMap, false); + Table table = client.getTable(dbName, tableName); + List<Partition> partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + String tableLocation = table.getSd().getLocation(); + URI location = URI.create(tableLocation); + Path tablePath = new Path(location); + FileSystem fs = FileSystem.get(location, conf); + fs.mkdirs(new Path(tablePath, "state=WA/dt=2018-12-01")); + fs.mkdirs(new Path(tablePath, "state=UT/dt=2018-12-02")); + assertEquals(5, fs.listStatus(tablePath).length); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + + // partition discovery is not enabled via table property, so nothing should change on this table + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + + // table property is set to false, so no change expected + table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "false"); + client.alter_table(dbName, tableName, table); + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + } + + @Test + public void testPartitionDiscoveryEnabledBothTableTypes() throws TException, IOException { + String dbName = "db2"; + String tableName = "tbl2"; + Map<String, Column> colMap = buildAllColumns(); + List<String> partKeys = Lists.newArrayList("state", "dt"); + List<String> partKeyTypes = Lists.newArrayList("string", "date"); + List<List<String>> partVals = Lists.newArrayList( + Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"), + Lists.newArrayList("CA", "1986-04-28"), + Lists.newArrayList("MN", "2018-11-31")); + createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, partKeyTypes, partVals, colMap, false); + Table table = client.getTable(dbName, tableName); + List<Partition> partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + String tableLocation = table.getSd().getLocation(); + URI location = URI.create(tableLocation); + Path tablePath = new Path(location); + FileSystem fs = FileSystem.get(location, conf); + Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01"); + Path newPart2 = new Path(tablePath, "state=UT/dt=2018-12-02"); + fs.mkdirs(newPart1); + fs.mkdirs(newPart2); + assertEquals(5, fs.listStatus(tablePath).length); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + + // table property is set to true, we expect 5 partitions + table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "true"); + client.alter_table(dbName, tableName, table); + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(5, partitions.size()); + + // change table type to external, delete a partition directory and make sure partition discovery works + table.getParameters().put("EXTERNAL", "true"); + table.setTableType(TableType.EXTERNAL_TABLE.name()); + client.alter_table(dbName, tableName, table); + boolean deleted = fs.delete(newPart1.getParent(), true); + assertTrue(deleted); + assertEquals(4, fs.listStatus(tablePath).length); + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(4, partitions.size()); + + // remove external tables from partition discovery and expect no changes even after partition is deleted + conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES.getVarname(), TableType.MANAGED_TABLE.name()); + deleted = fs.delete(newPart2.getParent(), true); + assertTrue(deleted); + assertEquals(3, fs.listStatus(tablePath).length); + // this doesn't remove partition because table is still external and we have remove external table type from + // partition discovery + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(4, partitions.size()); + + // no table types specified, msck will not select any tables + conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES.getVarname(), ""); + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(4, partitions.size()); + + // only EXTERNAL table type, msck should drop a partition now + conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES.getVarname(), TableType.EXTERNAL_TABLE.name()); + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + } + + @Test + public void testPartitionDiscoveryNonDefaultCatalog() throws TException, IOException { + String catName = "cat3"; + String dbName = "db3"; + String tableName = "tbl3"; + Map<String, Column> colMap = buildAllColumns(); + List<String> partKeys = Lists.newArrayList("state", "dt"); + List<String> partKeyTypes = Lists.newArrayList("string", "date"); + List<List<String>> partVals = Lists.newArrayList( + Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"), + Lists.newArrayList("CA", "1986-04-28"), + Lists.newArrayList("MN", "2018-11-31")); + createMetadata(catName, dbName, tableName, partKeys, partKeyTypes, partVals, colMap, false); + Table table = client.getTable(catName, dbName, tableName); + List<Partition> partitions = client.listPartitions(catName, dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + String tableLocation = table.getSd().getLocation(); + URI location = URI.create(tableLocation); + Path tablePath = new Path(location); + FileSystem fs = FileSystem.get(location, conf); + Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01"); + Path newPart2 = new Path(tablePath, "state=UT/dt=2018-12-02"); + fs.mkdirs(newPart1); + fs.mkdirs(newPart2); + assertEquals(5, fs.listStatus(tablePath).length); + table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "true"); + client.alter_table(catName, dbName, tableName, table); + // default catalog in conf is 'hive' but we are using 'cat3' as catName for this test, so msck should not fix + // anything for this one + runPartitionManagementTask(conf); + partitions = client.listPartitions(catName, dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + + // using the correct catalog name, we expect msck to fix partitions + conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_CATALOG_NAME.getVarname(), catName); + runPartitionManagementTask(conf); + partitions = client.listPartitions(catName, dbName, tableName, (short) -1); + assertEquals(5, partitions.size()); + } + + @Test + public void testPartitionDiscoveryDBPattern() throws TException, IOException { + String dbName = "db4"; + String tableName = "tbl4"; + Map<String, Column> colMap = buildAllColumns(); + List<String> partKeys = Lists.newArrayList("state", "dt"); + List<String> partKeyTypes = Lists.newArrayList("string", "date"); + List<List<String>> partVals = Lists.newArrayList( + Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"), + Lists.newArrayList("CA", "1986-04-28"), + Lists.newArrayList("MN", "2018-11-31")); + createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, partKeyTypes, partVals, colMap, false); + Table table = client.getTable(dbName, tableName); + List<Partition> partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + String tableLocation = table.getSd().getLocation(); + URI location = URI.create(tableLocation); + Path tablePath = new Path(location); + FileSystem fs = FileSystem.get(location, conf); + Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01"); + Path newPart2 = new Path(tablePath, "state=UT/dt=2018-12-02"); + fs.mkdirs(newPart1); + fs.mkdirs(newPart2); + assertEquals(5, fs.listStatus(tablePath).length); + table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "true"); + client.alter_table(dbName, tableName, table); + // no match for this db pattern, so we will see only 3 partitions + conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_DATABASE_PATTERN.getVarname(), "*dbfoo*"); + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + + // matching db pattern, we will see all 5 partitions now + conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_DATABASE_PATTERN.getVarname(), "*db4*"); + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(5, partitions.size()); + } + + @Test + public void testPartitionDiscoveryTablePattern() throws TException, IOException { + String dbName = "db5"; + String tableName = "tbl5"; + Map<String, Column> colMap = buildAllColumns(); + List<String> partKeys = Lists.newArrayList("state", "dt"); + List<String> partKeyTypes = Lists.newArrayList("string", "date"); + List<List<String>> partVals = Lists.newArrayList( + Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"), + Lists.newArrayList("CA", "1986-04-28"), + Lists.newArrayList("MN", "2018-11-31")); + createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, partKeyTypes, partVals, colMap, false); + Table table = client.getTable(dbName, tableName); + List<Partition> partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + String tableLocation = table.getSd().getLocation(); + URI location = URI.create(tableLocation); + Path tablePath = new Path(location); + FileSystem fs = FileSystem.get(location, conf); + Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01"); + Path newPart2 = new Path(tablePath, "state=UT/dt=2018-12-02"); + fs.mkdirs(newPart1); + fs.mkdirs(newPart2); + assertEquals(5, fs.listStatus(tablePath).length); + table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "true"); + client.alter_table(dbName, tableName, table); + // no match for this table pattern, so we will see only 3 partitions + conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_PATTERN.getVarname(), "*tblfoo*"); + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + + // matching table pattern, we will see all 5 partitions now + conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_PATTERN.getVarname(), "tbl5*"); + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(5, partitions.size()); + } + + @Test + public void testPartitionDiscoveryTransactionalTable() + throws TException, IOException, InterruptedException, ExecutionException { + String dbName = "db6"; + String tableName = "tbl6"; + Map<String, Column> colMap = buildAllColumns(); + List<String> partKeys = Lists.newArrayList("state", "dt"); + List<String> partKeyTypes = Lists.newArrayList("string", "date"); + List<List<String>> partVals = Lists.newArrayList( + Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"), + Lists.newArrayList("CA", "1986-04-28"), + Lists.newArrayList("MN", "2018-11-31")); + createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, partKeyTypes, partVals, colMap, true); + Table table = client.getTable(dbName, tableName); + List<Partition> partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + String tableLocation = table.getSd().getLocation(); + URI location = URI.create(tableLocation); + Path tablePath = new Path(location); + FileSystem fs = FileSystem.get(location, conf); + Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01"); + Path newPart2 = new Path(tablePath, "state=UT/dt=2018-12-02"); + fs.mkdirs(newPart1); + fs.mkdirs(newPart2); + assertEquals(5, fs.listStatus(tablePath).length); + table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "true"); + table.getParameters().put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true"); + table.getParameters().put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, + TransactionalValidationListener.INSERTONLY_TRANSACTIONAL_PROPERTY); + client.alter_table(dbName, tableName, table); + + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(5, partitions.size()); + + // only one partition discovery task is running, there will be no skipped attempts + assertEquals(0, PartitionManagementTask.getSkippedAttempts()); + + // delete a partition from fs, and submit 3 tasks at the same time each of them trying to acquire X lock on the + // same table, only one of them will run other attempts will be skipped + boolean deleted = fs.delete(newPart1.getParent(), true); + assertTrue(deleted); + assertEquals(4, fs.listStatus(tablePath).length); + + // 3 tasks are submitted at the same time, only one will eventually lock the table and only one get to run at a time + // This is to simulate, skipping partition discovery task attempt when previous attempt is still incomplete + PartitionManagementTask partitionDiscoveryTask1 = new PartitionManagementTask(); + partitionDiscoveryTask1.setConf(conf); + PartitionManagementTask partitionDiscoveryTask2 = new PartitionManagementTask(); + partitionDiscoveryTask2.setConf(conf); + PartitionManagementTask partitionDiscoveryTask3 = new PartitionManagementTask(); + partitionDiscoveryTask3.setConf(conf); + List<PartitionManagementTask> tasks = Lists + .newArrayList(partitionDiscoveryTask1, partitionDiscoveryTask2, partitionDiscoveryTask3); + ExecutorService executorService = Executors.newFixedThreadPool(3); + int successBefore = PartitionManagementTask.getCompletedAttempts(); + int skippedBefore = PartitionManagementTask.getSkippedAttempts(); + List<Future<?>> futures = new ArrayList<>(); + for (PartitionManagementTask task : tasks) { + futures.add(executorService.submit(task)); + } + for (Future<?> future : futures) { + future.get(); + } + int successAfter = PartitionManagementTask.getCompletedAttempts(); + int skippedAfter = PartitionManagementTask.getSkippedAttempts(); + assertEquals(1, successAfter - successBefore); + assertEquals(2, skippedAfter - skippedBefore); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(4, partitions.size()); + } + + @Test + public void testPartitionRetention() throws TException, IOException, InterruptedException { + String dbName = "db7"; + String tableName = "tbl7"; + Map<String, Column> colMap = buildAllColumns(); + List<String> partKeys = Lists.newArrayList("state", "dt"); + List<String> partKeyTypes = Lists.newArrayList("string", "date"); + List<List<String>> partVals = Lists.newArrayList( + Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"), + Lists.newArrayList("CA", "1986-04-28"), + Lists.newArrayList("MN", "2018-11-31")); + createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, partKeyTypes, partVals, colMap, false); + Table table = client.getTable(dbName, tableName); + List<Partition> partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + String tableLocation = table.getSd().getLocation(); + URI location = URI.create(tableLocation); + Path tablePath = new Path(location); + FileSystem fs = FileSystem.get(location, conf); + Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01"); + Path newPart2 = new Path(tablePath, "state=UT/dt=2018-12-02"); + fs.mkdirs(newPart1); + fs.mkdirs(newPart2); + assertEquals(5, fs.listStatus(tablePath).length); + table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "true"); + table.getParameters().put(PartitionManagementTask.PARTITION_RETENTION_PERIOD_TBLPROPERTY, "20000ms"); + client.alter_table(dbName, tableName, table); + + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(5, partitions.size()); + + // after 30s all partitions should have been gone + Thread.sleep(30 * 1000); + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(0, partitions.size()); + } + + @Test + public void testPartitionDiscoverySkipInvalidPath() throws TException, IOException, InterruptedException { + String dbName = "db8"; + String tableName = "tbl8"; + Map<String, Column> colMap = buildAllColumns(); + List<String> partKeys = Lists.newArrayList("state", "dt"); + List<String> partKeyTypes = Lists.newArrayList("string", "date"); + List<List<String>> partVals = Lists.newArrayList( + Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"), + Lists.newArrayList("CA", "1986-04-28"), + Lists.newArrayList("MN", "2018-11-31")); + createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, partKeyTypes, partVals, colMap, false); + Table table = client.getTable(dbName, tableName); + List<Partition> partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + String tableLocation = table.getSd().getLocation(); + URI location = URI.create(tableLocation); + Path tablePath = new Path(location); + FileSystem fs = FileSystem.get(location, conf); + Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01"); + Path newPart2 = new Path(tablePath, "state=UT/dt="); + fs.mkdirs(newPart1); + fs.mkdirs(newPart2); + assertEquals(5, fs.listStatus(tablePath).length); + table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "true"); + // empty retention period basically means disabled + table.getParameters().put(PartitionManagementTask.PARTITION_RETENTION_PERIOD_TBLPROPERTY, ""); + client.alter_table(dbName, tableName, table); + + // there is one partition with invalid path which will get skipped + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(4, partitions.size()); + } + + private void runPartitionManagementTask(Configuration conf) { + PartitionManagementTask task = new PartitionManagementTask(); + task.setConf(conf); + task.run(); + } + + private static class Column { + private String colName; + private String colType; + + public Column(final String colName, final String colType) { + this.colName = colName; + this.colType = colType; + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestGetTableMeta.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestGetTableMeta.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestGetTableMeta.java index 59daa52..7720aa2 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestGetTableMeta.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestGetTableMeta.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.annotation.MetastoreCheckinTest; +import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest; import org.apache.hadoop.hive.metastore.api.CreationMetadata; import org.apache.hadoop.hive.metastore.api.Catalog; import org.apache.hadoop.hive.metastore.api.Database; @@ -123,6 +124,7 @@ public class TestGetTableMeta extends MetaStoreClientTest { private Table createTable(String dbName, String tableName, TableType type) throws Exception { TableBuilder builder = new TableBuilder() + .setCatName("hive") .setDbName(dbName) .setTableName(tableName) .addCol("id", "int") @@ -153,6 +155,7 @@ public class TestGetTableMeta extends MetaStoreClientTest { client.createTable(table); TableMeta tableMeta = new TableMeta(dbName, tableName, type.name()); tableMeta.setComments(comment); + tableMeta.setCatName("hive"); return tableMeta; } @@ -160,7 +163,9 @@ public class TestGetTableMeta extends MetaStoreClientTest { throws Exception { Table table = createTable(dbName, tableName, type); client.createTable(table); - return new TableMeta(dbName, tableName, type.name()); + TableMeta tableMeta = new TableMeta(dbName, tableName, type.name()); + tableMeta.setCatName("hive"); + return tableMeta; } private void assertTableMetas(int[] expected, List<TableMeta> actualTableMetas) { @@ -301,7 +306,9 @@ public class TestGetTableMeta extends MetaStoreClientTest { .addCol("id", "int") .addCol("name", "string") .build(metaStore.getConf())); - expected.add(new TableMeta(dbName, tableNames[i], TableType.MANAGED_TABLE.name())); + TableMeta tableMeta = new TableMeta(dbName, tableNames[i], TableType.MANAGED_TABLE.name()); + tableMeta.setCatName(catName); + expected.add(tableMeta); } List<String> types = Collections.singletonList(TableType.MANAGED_TABLE.name());