This is an automated email from the ASF dual-hosted git repository. mahesh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 0603700 HIVE-21960 : Disable HMS tasks on replica databases. (Ashutosh Bapat reviewed by Mahesh Kumar Behera) 0603700 is described below commit 0603700395827acdd819460fe110e35fe7c59f4a Author: Ashutosh Bapat <aba...@cloudera.com> AuthorDate: Fri Aug 2 10:14:54 2019 +0530 HIVE-21960 : Disable HMS tasks on replica databases. (Ashutosh Bapat reviewed by Mahesh Kumar Behera) Signed-off-by: Mahesh Kumar Behera <mah...@apache.org> --- .../parse/BaseReplicationScenariosAcidTables.java | 9 ++- .../parse/TestReplicationScenariosAcidTables.java | 6 +- .../TestReplicationScenariosAcrossInstances.java | 24 ++++-- .../TestReplicationScenariosExternalTables.java | 42 ++++++---- .../parse/TestTableLevelReplicationScenarios.java | 12 ++- .../hadoop/hive/ql/parse/WarehouseInstance.java | 18 +++++ .../ddl/table/creation/CreateTableOperation.java | 9 +++ .../hadoop/hive/ql/parse/ReplicationSpec.java | 5 +- .../hadoop/hive/ql/stats/StatsUpdaterThread.java | 14 +++- .../hive/ql/stats/TestStatsUpdaterThread.java | 75 +++++++++++++++++- .../apache/hadoop/hive/common/repl/ReplConst.java | 7 ++ .../hive/metastore/PartitionManagementTask.java | 15 +++- .../hive/metastore/TestPartitionManagement.java | 89 ++++++++++++++++++++++ 13 files changed, 290 insertions(+), 35 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java index e543695..5e869d2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java @@ -192,7 +192,8 @@ public class BaseReplicationScenariosAcidTables { .run("show tables") .verifyResults(tableNames) .run("repl status " + replicatedDbName) - .verifyResult(lastReplId); + .verifyResult(lastReplId) + .verifyReplTargetProperty(replicatedDbName); verifyNonAcidTableLoad(replicatedDbName); if (includeAcid) { verifyAcidTableLoad(replicatedDbName); @@ -295,7 +296,8 @@ public class BaseReplicationScenariosAcidTables { .run("show tables") .verifyResults(tableNames) .run("repl status " + dbName) - .verifyResult(lastReplId); + .verifyResult(lastReplId) + .verifyReplTargetProperty(replicatedDbName); verifyIncNonAcidLoad(dbName); verifyIncAcidLoad(dbName); } @@ -308,7 +310,8 @@ public class BaseReplicationScenariosAcidTables { .run("show tables") .verifyResults(tableNames) .run("repl status " + dbName) - .verifyResult(lastReplId); + .verifyResult(lastReplId) + .verifyReplTargetProperty(replicatedDbName); verifyInc2NonAcidLoad(dbName); verifyInc2AcidLoad(dbName); } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 96b074d..e23fdd8 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -512,7 +512,8 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios .run("repl status " + replicatedDbName) .verifyResult("null") .run("show tables like t2") - .verifyResults(new String[] { }); + .verifyResults(new String[] { }) + .verifyReplTargetProperty(replicatedDbName); // Retry with different dump should fail. replica.loadFailure(replicatedDbName, tuple2.dumpLocation); @@ -546,7 +547,8 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios .run("select id from t1") .verifyResults(Arrays.asList("1")) .run("select name from t2 order by name") - .verifyResults(Arrays.asList("bob", "carl")); + .verifyResults(Arrays.asList("bob", "carl")) + .verifyReplTargetProperty(replicatedDbName); } @Test diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index af5746f..46a6627 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -579,7 +579,10 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro .verifyResults(new String[] { "t1" }) .run("use " + dbTwo) .run("show tables") - .verifyResults(new String[] { "t1" }); + .verifyResults(new String[] { "t1" }) + .verifyReplTargetProperty(primaryDbName) + .verifyReplTargetProperty(dbOne) + .verifyReplTargetProperty(dbTwo); /* Start of cleanup @@ -646,7 +649,10 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro .verifyResults(new String[] { "t1" }) .run("use " + dbOne) .run("show tables") - .verifyResults(new String[] { "t1" }); + .verifyResults(new String[] { "t1" }) + .verifyReplTargetProperty(primaryDbName) + .verifyReplTargetProperty(dbOne) + .verifyReplTargetProperty(dbTwo); assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase("default").getParameters())); assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(primaryDbName).getParameters())); @@ -660,7 +666,10 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro .verifyResults(new String[] { "t1" }) .run("use " + dbOne) .run("show tables") - .verifyResults(new String[] { "t1", "t2" }); + .verifyResults(new String[] { "t1", "t2" }) + .verifyReplTargetProperty(primaryDbName) + .verifyReplTargetProperty(dbOne) + .verifyReplTargetProperty(dbTwo); assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase("default").getParameters())); assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(primaryDbName).getParameters())); @@ -706,7 +715,8 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro .run("show tables") .verifyResults(new String[] { "table1", "table2" }) .run("select * from table1") - .verifyResults(new String[]{ "1" }); + .verifyResults(new String[]{ "1" }) + .verifyReplTargetProperty(replicatedDbName); //////////// First Incremental //////////// WarehouseInstance.Tuple incrementalOneTuple = primary @@ -736,7 +746,8 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro .run("select * from table3") .verifyResults(new String[] { "10" }) .run("show functions like '" + replicatedDbName + "%'") - .verifyResult(replicatedDbName + ".testFunctionOne"); + .verifyResult(replicatedDbName + ".testFunctionOne") + .verifyReplTargetProperty(replicatedDbName); //////////// Second Incremental //////////// WarehouseInstance.Tuple secondIncremental = primary @@ -774,7 +785,8 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro .run("select * from table3") .verifyResults(Collections.emptyList()) .run("show functions like '" + replicatedDbName + "%'") - .verifyResult(null); + .verifyResult(null) + .verifyReplTargetProperty(replicatedDbName); } @Test diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index fbdbb01..e1802ad 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -99,7 +99,8 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("show tables like 't1'") .verifyFailure(new String[] { "t1" }) .run("show tables like 't2'") - .verifyFailure(new String[] { "t2" }); + .verifyFailure(new String[] { "t2" }) + .verifyReplTargetProperty(replicatedDbName); tuple = primary.run("use " + primaryDbName) .run("create external table t3 (id int)") @@ -114,7 +115,8 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) .run("use " + replicatedDbName) .run("show tables like 't3'") - .verifyFailure(new String[] { "t3" }); + .verifyFailure(new String[] { "t3" }) + .verifyReplTargetProperty(replicatedDbName); } @Test @@ -300,7 +302,8 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("show tables like 't2'") .verifyResults(new String[] { "t2" }) .run("select place from t2") - .verifyResults(new String[] { "bangalore" }); + .verifyResults(new String[] { "bangalore" }) + .verifyReplTargetProperty(replicatedDbName); assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2"); @@ -325,7 +328,8 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("select place from t2 where country='india'") .verifyResults(new String[] { "bangalore", "pune", "mumbai" }) .run("select place from t2 where country='australia'") - .verifyResults(new String[] { "sydney" }); + .verifyResults(new String[] { "sydney" }) + .verifyReplTargetProperty(replicatedDbName); Path customPartitionLocation = new Path("/" + testName.getMethodName() + "/partition_data/t2/country=france"); @@ -345,7 +349,8 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) .run("use " + replicatedDbName) .run("select place from t2 where country='france'") - .verifyResults(new String[] { "paris" }); + .verifyResults(new String[] { "paris" }) + .verifyReplTargetProperty(replicatedDbName); // change the location of the partition via alter command String tmpLocation = "/tmp/" + System.nanoTime(); @@ -358,7 +363,8 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) .run("use " + replicatedDbName) .run("select place from t2 where country='france'") - .verifyResults(new String[] {}); + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); // Changing location of one of the partitions shouldn't result in changing location of other // partitions as well as that of the table. @@ -418,7 +424,8 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("show partitions t1") .verifyResults(new String[] { "country=india", "country=us" }) .run("select place from t1 order by place") - .verifyResults(new String[] { "bangalore", "mumbai", "pune" }); + .verifyResults(new String[] { "bangalore", "mumbai", "pune" }) + .verifyReplTargetProperty(replicatedDbName); // Delete one of the file and update another one. fs.delete(new Path(partitionDir, "file.txt"), true); @@ -438,7 +445,8 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("show partitions t1") .verifyResults(new String[] { "country=india", "country=us" }) .run("select place from t1 order by place") - .verifyResults(new String[] { "chennai" }); + .verifyResults(new String[] { "chennai" }) + .verifyReplTargetProperty(replicatedDbName); Hive hive = Hive.get(replica.getConf()); Set<Partition> partitions = @@ -453,7 +461,8 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros replica.load(replicatedDbName, tuple.dumpLocation) .run("select * From t1") - .verifyResults(new String[] {}); + .verifyResults(new String[] {}) + .verifyReplTargetProperty(replicatedDbName); for (String path : paths) { assertTrue(replica.miniDFSCluster.getFileSystem().exists(new Path(path))); @@ -489,7 +498,8 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("show tables like 't1'") .verifyFailure(new String[] {"t1" }) .run("show tables like 't2'") - .verifyFailure(new String[] {"t2" }); + .verifyFailure(new String[] {"t2" }) + .verifyReplTargetProperty(replicatedDbName); dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); @@ -532,7 +542,8 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("show tables like 't3'") .verifyResult("t3") .run("show tables like 't4'") - .verifyResult("t4"); + .verifyResult("t4") + .verifyReplTargetProperty(replicatedDbName); // Ckpt should be set on bootstrapped tables. replica.verifyIfCkptSetForTables(replicatedDbName, Arrays.asList("t2", "t3"), tuple.dumpLocation); @@ -551,7 +562,8 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("select id from t3 order by id") .verifyResults(Arrays.asList("10", "20")) .run("select id from t4 order by id") - .verifyResults(Arrays.asList("10", "20")); + .verifyResults(Arrays.asList("10", "20")) + .verifyReplTargetProperty(replicatedDbName); } @Test @@ -580,7 +592,8 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("show tables") .verifyResult("t3") .run("select id from t3") - .verifyResult("1"); + .verifyResult("1") + .verifyReplTargetProperty(replicatedDbName); dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); @@ -648,7 +661,8 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("select id from t4") .verifyResults(Arrays.asList("10", "20")) .run("select id from t5") - .verifyResult("10"); + .verifyResult("10") + .verifyReplTargetProperty(replicatedDbName); // Once the REPL LOAD is successful, the this config should be unset or else, the subsequent REPL LOAD // will also drop those tables which will cause data loss. diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java index 270e61a..78f505b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java @@ -165,7 +165,8 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) .run("use " + replicatedDbName) .run("show tables") - .verifyResults(expectedTables); + .verifyResults(expectedTables) + .verifyReplTargetProperty(replicatedDbName); if (records == null) { records = new String[] {"1"}; @@ -459,7 +460,8 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) .run("use " + replicatedDbName) .run("show tables") - .verifyResults(replicatedTables); + .verifyResults(replicatedTables) + .verifyReplTargetProperty(replicatedDbName); } @Test @@ -497,7 +499,8 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) .run("use " + replicatedDbName) .run("show tables") - .verifyResults(incrementalReplicatedTables); + .verifyResults(incrementalReplicatedTables) + .verifyReplTargetProperty(replicatedDbName); } @Test @@ -648,7 +651,8 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) .run("use " + replicatedDbName) .run("show tables") - .verifyResults(incrementalReplicatedTables); + .verifyResults(incrementalReplicatedTables) + .verifyReplTargetProperty(replicatedDbName); } @Test diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 6326bc3..5fbe48d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.repl.ReplConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; @@ -447,6 +448,23 @@ public class WarehouseInstance implements Closeable { } } + // Make sure that every table in the target database is marked as target of the replication. + // Stats updater task and partition management task skip processing tables being replicated into. + private void verifyReplTargetProperty(Map<String, String> props) { + assertTrue(props.containsKey(ReplConst.REPL_TARGET_TABLE_PROPERTY)); + } + + public WarehouseInstance verifyReplTargetProperty(String dbName, List<String> tblNames) throws Exception { + for (String tblName : tblNames) { + verifyReplTargetProperty(getTable(dbName, tblName).getParameters()); + } + return this; + } + + public WarehouseInstance verifyReplTargetProperty(String dbName) throws Exception { + return verifyReplTargetProperty(dbName, getAllTables(dbName)); + } + public Database getDatabase(String dbName) throws Exception { try { return client.getDatabase(dbName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/creation/CreateTableOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/creation/CreateTableOperation.java index bac0b4c..b6b7d1b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/creation/CreateTableOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/creation/CreateTableOperation.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.ddl.table.creation; import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.repl.ReplConst; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; @@ -85,6 +86,14 @@ public class CreateTableOperation extends DDLOperation<CreateTableDesc> { if (desc.getReplaceMode()) { createTableReplaceMode(tbl, replDataLocationChanged); } else { + // Some HMS background tasks skip processing tables being replicated into. Set the + // replication property while creating the table so that they can identify such tables right + // from the beginning. Set it to 0, which is lesser than any eventId ever created. This will + // soon be overwritten by an actual value. + if (desc.getReplicationSpec().isInReplicationScope() && + !tbl.getParameters().containsKey(ReplConst.REPL_TARGET_TABLE_PROPERTY)) { + tbl.getParameters().put(ReplConst.REPL_TARGET_TABLE_PROPERTY, "0"); + } createTableNonReplaceMode(tbl); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 48213d1..ad3e55a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse; import com.google.common.base.Function; import com.google.common.base.Predicate; +import org.apache.hadoop.hive.common.repl.ReplConst; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.plan.PlanUtils; @@ -51,11 +52,11 @@ public class ReplicationSpec { private boolean isMigratingToExternalTable = false; private boolean needDupCopyCheck = false; - // Key definitions related to replication + // Key definitions related to replication. public enum KEY { REPL_SCOPE("repl.scope"), EVENT_ID("repl.event.id"), - CURR_STATE_ID("repl.last.id"), + CURR_STATE_ID(ReplConst.REPL_TARGET_TABLE_PROPERTY), NOOP("repl.noop"), LAZY("repl.lazy"), IS_REPLACE("repl.is.replace"), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java index 8acb1c5..444c7ad 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java @@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.apache.hadoop.hive.common.repl.ReplConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreThread; import org.apache.hadoop.hive.metastore.ObjectStore; @@ -175,7 +176,7 @@ public class StatsUpdaterThread extends Thread implements MetaStoreThread { } @VisibleForTesting - boolean runOneIteration() { + public boolean runOneIteration() { List<TableName> fullTableNames; try { fullTableNames = getTablesToCheck(); @@ -220,6 +221,17 @@ public class StatsUpdaterThread extends Thread implements MetaStoreThread { String skipParam = table.getParameters().get(SKIP_STATS_AUTOUPDATE_PROPERTY); if ("true".equalsIgnoreCase(skipParam)) return null; + // If the table is being replicated into, + // 1. the stats are also replicated from the source, so we don't need those to be calculated + // on the target again + // 2. updating stats requires a writeId to be created. Hence writeIds on source and target + // can get out of sync when stats are updated. That can cause consistency issues. + String replTrgtParam = table.getParameters().get(ReplConst.REPL_TARGET_TABLE_PROPERTY); + if (replTrgtParam != null && !replTrgtParam.isEmpty()) { + LOG.debug("Skipping table {} since it is being replicated into", table); + return null; + } + // Note: ideally we should take a lock here to pretend to be a real reader. // For now, this check is going to have race potential; it may run a spurious analyze. String writeIdString = null; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java index a2f8bab..80251df 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java @@ -30,6 +30,7 @@ import org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.common.repl.ReplConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -39,13 +40,13 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.DriverUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.SessionState; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -540,6 +541,73 @@ public class TestStatsUpdaterThread { msClient.close(); } + // A table which is target of replication should not be queued for stats update, and hence its + // stats state should not change. + @Test(timeout=40000) + public void testNoStatsUpdateForSimpleReplTable() throws Exception { + testNoStatsUpdateForReplTable("simple", ""); + } + + // A table which is target of replication should not be queued for stats update, and hence its + // stats state should not change. + @Test(timeout=40000) + public void testNoStatsUpdateForTxnReplTable() throws Exception { + testNoStatsUpdateForReplTable("txn", + "TBLPROPERTIES (\"transactional\"=\"true\",\"transactional_properties\"=\"insert_only\")"); + } + + private void testNoStatsUpdateForReplTable(String tblNamePrefix, String txnProperty) throws Exception { + String tblWOStats = tblNamePrefix + "_repl_trgt_nostats"; + String tblWithStats = tblNamePrefix + "_repl_trgt_stats"; + String ptnTblWOStats = tblNamePrefix + "_ptn_repl_trgt_nostats"; + String ptnTblWithStats = tblNamePrefix + "_ptn_repl_trgt_stats"; + + StatsUpdaterThread su = createUpdater(); + su.startWorkers(); + IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); + + executeQuery("create table " + tblWOStats + "(i int, s string) " + txnProperty); + // Mark this table as being replicated into + setTableReplTargetProperty(tblWOStats); + executeQuery("insert into " + tblWOStats + "(i, s) values (1, 'test')"); + verifyStatsUpToDate(tblWOStats, Lists.newArrayList("i"), msClient, false); + + executeQuery("create table " + ptnTblWOStats + "(s string) partitioned by (i int) " + txnProperty); + // Mark this table as being replicated into + setTableReplTargetProperty(ptnTblWOStats); + executeQuery("insert into " + ptnTblWOStats + "(i, s) values (1, 'test')"); + executeQuery("insert into " + ptnTblWOStats + "(i, s) values (2, 'test2')"); + executeQuery("insert into " + ptnTblWOStats + "(i, s) values (3, 'test3')"); + verifyPartStatsUpToDate(3, 1, msClient, ptnTblWOStats, false); + + executeQuery("create table " + tblWithStats + "(i int, s string)" + txnProperty); + // Mark this table as being replicated into + setTableReplTargetProperty(tblWithStats); + executeQuery("insert into " + tblWithStats + "(i, s) values (1, 'test')"); + executeQuery("analyze table " + tblWithStats + " compute statistics for columns"); + verifyStatsUpToDate(tblWithStats, Lists.newArrayList("i"), msClient, true); + + executeQuery("create table " + ptnTblWithStats + "(s string) partitioned by (i int) " + txnProperty); + // Mark this table as being replicated into + setTableReplTargetProperty(ptnTblWithStats); + executeQuery("insert into " + ptnTblWithStats + "(i, s) values (1, 'test')"); + executeQuery("insert into " + ptnTblWithStats + "(i, s) values (2, 'test2')"); + executeQuery("insert into " + ptnTblWithStats + "(i, s) values (3, 'test3')"); + executeQuery("analyze table " + ptnTblWithStats + " compute statistics for columns"); + verifyPartStatsUpToDate(3, 1, msClient, ptnTblWithStats, true); + + assertFalse(su.runOneIteration()); + Assert.assertEquals(0, su.getQueueLength()); + verifyStatsUpToDate(tblWOStats, Lists.newArrayList("i"), msClient, false); + verifyStatsUpToDate(tblWithStats, Lists.newArrayList("i"), msClient, true); + verifyPartStatsUpToDate(3, 1, msClient, ptnTblWOStats, false); + verifyPartStatsUpToDate(3, 1, msClient, ptnTblWithStats, true); + + msClient.close(); + } + private void verifyPartStatsUpToDate(int partCount, int skip, IMetaStoreClient msClient, String tbl, boolean isUpToDate) throws Exception { for (int i = skip; i < partCount; ++i) { @@ -566,6 +634,11 @@ public class TestStatsUpdaterThread { msClient.alter_table(table.getDbName(), table.getTableName(), table); } + private void setTableReplTargetProperty(String tblName) throws Exception { + executeQuery("alter table " + tblName + + " set tblproperties ('" + ReplConst.REPL_TARGET_TABLE_PROPERTY + "' = '1')"); + } + private void setPartitionSkipProperty( IMetaStoreClient msClient, String tblName, String partName, String val) throws Exception { Partition part = msClient.getPartition(ss.getCurrentDatabase(), tblName, partName); diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java index 7c29969..f075e2a 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java @@ -37,4 +37,11 @@ public class ReplConst { * database is created as part of repl load and survives the incremental cycles. */ public static final String REPL_TARGET_DB_PROPERTY = "hive.repl.ckpt.key"; + + /** + * A table which is target of replication will have this property set. The property serves two + * purposes, 1. identifies the tables being replicated into and 2. records the event id of the + * last event affecting this table. + */ + public static final String REPL_TARGET_TABLE_PROPERTY = "repl.last.id"; } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java index da0259c..e4488f4 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore; import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -30,6 +31,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.repl.ReplConst; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableMeta; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; @@ -80,6 +82,16 @@ public class PartitionManagementTask implements MetastoreTaskThread { return conf; } + private static boolean partitionDiscoveryEnabled(Map<String, String> params) { + return params != null && params.containsKey(DISCOVER_PARTITIONS_TBLPROPERTY) && + params.get(DISCOVER_PARTITIONS_TBLPROPERTY).equalsIgnoreCase("true"); + } + + private static boolean tblBeingReplicatedInto(Map<String, String> params) { + return params != null && params.containsKey(ReplConst.REPL_TARGET_TABLE_PROPERTY) && + !params.get(ReplConst.REPL_TARGET_TABLE_PROPERTY).trim().isEmpty(); + } + @Override public void run() { if (lock.tryLock()) { @@ -116,8 +128,7 @@ public class PartitionManagementTask implements MetastoreTaskThread { for (TableMeta tableMeta : foundTableMetas) { Table table = msc.getTable(tableMeta.getCatName(), tableMeta.getDbName(), tableMeta.getTableName()); - if (table.getParameters() != null && table.getParameters().containsKey(DISCOVER_PARTITIONS_TBLPROPERTY) && - table.getParameters().get(DISCOVER_PARTITIONS_TBLPROPERTY).equalsIgnoreCase("true")) { + if (partitionDiscoveryEnabled(table.getParameters()) && !tblBeingReplicatedInto(table.getParameters())) { candidateTables.add(table); } } diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java index 9562b4f..1961a70 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java @@ -36,6 +36,7 @@ import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.repl.ReplConst; import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest; import org.apache.hadoop.hive.metastore.api.Catalog; import org.apache.hadoop.hive.metastore.api.Database; @@ -565,6 +566,94 @@ public class TestPartitionManagement { assertEquals(4, partitions.size()); } + @Test + public void testNoPartitionDiscoveryForReplTable() throws Exception { + String dbName = "db_repl1"; + String tableName = "tbl_repl1"; + Map<String, Column> colMap = buildAllColumns(); + List<String> partKeys = Lists.newArrayList("state", "dt"); + List<String> partKeyTypes = Lists.newArrayList("string", "date"); + List<List<String>> partVals = Lists.newArrayList( + Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"), + Lists.newArrayList("CA", "1986-04-28"), + Lists.newArrayList("MN", "2018-11-31")); + createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, partKeyTypes, partVals, colMap, false); + Table table = client.getTable(dbName, tableName); + List<Partition> partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + String tableLocation = table.getSd().getLocation(); + URI location = URI.create(tableLocation); + Path tablePath = new Path(location); + FileSystem fs = FileSystem.get(location, conf); + Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01"); + Path newPart2 = new Path(tablePath, "state=UT/dt=2018-12-02"); + fs.mkdirs(newPart1); + fs.mkdirs(newPart2); + assertEquals(5, fs.listStatus(tablePath).length); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + + // table property is set to true, but the table is marked as replication target. The new + // partitions should not be created + table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "true"); + table.getParameters().put(ReplConst.REPL_TARGET_TABLE_PROPERTY, "1"); + client.alter_table(dbName, tableName, table); + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + + // change table type to external, delete a partition directory and make sure partition discovery works + table.getParameters().put("EXTERNAL", "true"); + table.setTableType(TableType.EXTERNAL_TABLE.name()); + client.alter_table(dbName, tableName, table); + // Delete location of one of the partitions. The partition discovery task should not drop + // that partition. + boolean deleted = fs.delete((new Path(URI.create(partitions.get(0).getSd().getLocation()))).getParent(), + true); + assertTrue(deleted); + assertEquals(4, fs.listStatus(tablePath).length); + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + } + + @Test + public void testNoPartitionRetentionForReplTarget() throws TException, InterruptedException { + String dbName = "db_repl2"; + String tableName = "tbl_repl2"; + Map<String, Column> colMap = buildAllColumns(); + List<String> partKeys = Lists.newArrayList("state", "dt"); + List<String> partKeyTypes = Lists.newArrayList("string", "date"); + List<List<String>> partVals = Lists.newArrayList( + Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"), + Lists.newArrayList("CA", "1986-04-28"), + Lists.newArrayList("MN", "2018-11-31")); + // Check for the existence of partitions 10 seconds after the partition retention period has + // elapsed. Gives enough time for the partition retention task to work. + long partitionRetentionPeriodMs = 20000; + long waitingPeriodForTest = partitionRetentionPeriodMs + 10 * 1000; + createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, partKeyTypes, partVals, colMap, false); + Table table = client.getTable(dbName, tableName); + List<Partition> partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + + table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "true"); + table.getParameters().put(PartitionManagementTask.PARTITION_RETENTION_PERIOD_TBLPROPERTY, + partitionRetentionPeriodMs + "ms"); + table.getParameters().put(ReplConst.REPL_TARGET_TABLE_PROPERTY, "1"); + client.alter_table(dbName, tableName, table); + + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + + // after 30s all partitions should remain in-tact for a table which is target of replication. + Thread.sleep(waitingPeriodForTest); + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(3, partitions.size()); + } + private void runPartitionManagementTask(Configuration conf) { PartitionManagementTask task = new PartitionManagementTask(); task.setConf(conf);