This is an automated email from the ASF dual-hosted git repository.

mahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 0603700  HIVE-21960 : Disable HMS tasks on replica databases. 
(Ashutosh Bapat reviewed by Mahesh Kumar Behera)
0603700 is described below

commit 0603700395827acdd819460fe110e35fe7c59f4a
Author: Ashutosh Bapat <aba...@cloudera.com>
AuthorDate: Fri Aug 2 10:14:54 2019 +0530

    HIVE-21960 : Disable HMS tasks on replica databases. (Ashutosh Bapat 
reviewed by Mahesh Kumar Behera)
    
    Signed-off-by: Mahesh Kumar Behera <mah...@apache.org>
---
 .../parse/BaseReplicationScenariosAcidTables.java  |  9 ++-
 .../parse/TestReplicationScenariosAcidTables.java  |  6 +-
 .../TestReplicationScenariosAcrossInstances.java   | 24 ++++--
 .../TestReplicationScenariosExternalTables.java    | 42 ++++++----
 .../parse/TestTableLevelReplicationScenarios.java  | 12 ++-
 .../hadoop/hive/ql/parse/WarehouseInstance.java    | 18 +++++
 .../ddl/table/creation/CreateTableOperation.java   |  9 +++
 .../hadoop/hive/ql/parse/ReplicationSpec.java      |  5 +-
 .../hadoop/hive/ql/stats/StatsUpdaterThread.java   | 14 +++-
 .../hive/ql/stats/TestStatsUpdaterThread.java      | 75 +++++++++++++++++-
 .../apache/hadoop/hive/common/repl/ReplConst.java  |  7 ++
 .../hive/metastore/PartitionManagementTask.java    | 15 +++-
 .../hive/metastore/TestPartitionManagement.java    | 89 ++++++++++++++++++++++
 13 files changed, 290 insertions(+), 35 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
index e543695..5e869d2 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
@@ -192,7 +192,8 @@ public class BaseReplicationScenariosAcidTables {
             .run("show tables")
             .verifyResults(tableNames)
             .run("repl status " + replicatedDbName)
-            .verifyResult(lastReplId);
+            .verifyResult(lastReplId)
+            .verifyReplTargetProperty(replicatedDbName);
     verifyNonAcidTableLoad(replicatedDbName);
     if (includeAcid) {
       verifyAcidTableLoad(replicatedDbName);
@@ -295,7 +296,8 @@ public class BaseReplicationScenariosAcidTables {
             .run("show tables")
             .verifyResults(tableNames)
             .run("repl status " + dbName)
-            .verifyResult(lastReplId);
+            .verifyResult(lastReplId)
+            .verifyReplTargetProperty(replicatedDbName);
     verifyIncNonAcidLoad(dbName);
     verifyIncAcidLoad(dbName);
   }
@@ -308,7 +310,8 @@ public class BaseReplicationScenariosAcidTables {
             .run("show tables")
             .verifyResults(tableNames)
             .run("repl status " + dbName)
-            .verifyResult(lastReplId);
+            .verifyResult(lastReplId)
+            .verifyReplTargetProperty(replicatedDbName);
     verifyInc2NonAcidLoad(dbName);
     verifyInc2AcidLoad(dbName);
   }
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 96b074d..e23fdd8 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -512,7 +512,8 @@ public class TestReplicationScenariosAcidTables extends 
BaseReplicationScenarios
             .run("repl status " + replicatedDbName)
             .verifyResult("null")
             .run("show tables like t2")
-            .verifyResults(new String[] { });
+            .verifyResults(new String[] { })
+            .verifyReplTargetProperty(replicatedDbName);
 
     // Retry with different dump should fail.
     replica.loadFailure(replicatedDbName, tuple2.dumpLocation);
@@ -546,7 +547,8 @@ public class TestReplicationScenariosAcidTables extends 
BaseReplicationScenarios
             .run("select id from t1")
             .verifyResults(Arrays.asList("1"))
             .run("select name from t2 order by name")
-            .verifyResults(Arrays.asList("bob", "carl"));
+            .verifyResults(Arrays.asList("bob", "carl"))
+            .verifyReplTargetProperty(replicatedDbName);
   }
 
   @Test
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index af5746f..46a6627 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -579,7 +579,10 @@ public class TestReplicationScenariosAcrossInstances 
extends BaseReplicationAcro
         .verifyResults(new String[] { "t1" })
         .run("use " + dbTwo)
         .run("show tables")
-        .verifyResults(new String[] { "t1" });
+        .verifyResults(new String[] { "t1" })
+        .verifyReplTargetProperty(primaryDbName)
+        .verifyReplTargetProperty(dbOne)
+        .verifyReplTargetProperty(dbTwo);
 
     /*
        Start of cleanup
@@ -646,7 +649,10 @@ public class TestReplicationScenariosAcrossInstances 
extends BaseReplicationAcro
         .verifyResults(new String[] { "t1" })
         .run("use " + dbOne)
         .run("show tables")
-        .verifyResults(new String[] { "t1" });
+        .verifyResults(new String[] { "t1" })
+        .verifyReplTargetProperty(primaryDbName)
+        .verifyReplTargetProperty(dbOne)
+        .verifyReplTargetProperty(dbTwo);
 
     
assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase("default").getParameters()));
     
assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(primaryDbName).getParameters()));
@@ -660,7 +666,10 @@ public class TestReplicationScenariosAcrossInstances 
extends BaseReplicationAcro
         .verifyResults(new String[] { "t1" })
         .run("use " + dbOne)
         .run("show tables")
-        .verifyResults(new String[] { "t1", "t2" });
+        .verifyResults(new String[] { "t1", "t2" })
+        .verifyReplTargetProperty(primaryDbName)
+        .verifyReplTargetProperty(dbOne)
+        .verifyReplTargetProperty(dbTwo);
 
     
assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase("default").getParameters()));
     
assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(primaryDbName).getParameters()));
@@ -706,7 +715,8 @@ public class TestReplicationScenariosAcrossInstances 
extends BaseReplicationAcro
             .run("show tables")
             .verifyResults(new String[] { "table1", "table2" })
             .run("select * from table1")
-            .verifyResults(new String[]{ "1" });
+            .verifyResults(new String[]{ "1" })
+            .verifyReplTargetProperty(replicatedDbName);
 
     ////////////  First Incremental ////////////
     WarehouseInstance.Tuple incrementalOneTuple = primary
@@ -736,7 +746,8 @@ public class TestReplicationScenariosAcrossInstances 
extends BaseReplicationAcro
             .run("select * from table3")
             .verifyResults(new String[] { "10" })
             .run("show functions like '" + replicatedDbName + "%'")
-            .verifyResult(replicatedDbName + ".testFunctionOne");
+            .verifyResult(replicatedDbName + ".testFunctionOne")
+            .verifyReplTargetProperty(replicatedDbName);
 
     ////////////  Second Incremental ////////////
     WarehouseInstance.Tuple secondIncremental = primary
@@ -774,7 +785,8 @@ public class TestReplicationScenariosAcrossInstances 
extends BaseReplicationAcro
             .run("select * from table3")
             .verifyResults(Collections.emptyList())
             .run("show functions like '" + replicatedDbName + "%'")
-            .verifyResult(null);
+            .verifyResult(null)
+            .verifyReplTargetProperty(replicatedDbName);
   }
 
   @Test
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index fbdbb01..e1802ad 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -99,7 +99,8 @@ public class TestReplicationScenariosExternalTables extends 
BaseReplicationAcros
         .run("show tables like 't1'")
         .verifyFailure(new String[] { "t1" })
         .run("show tables like 't2'")
-        .verifyFailure(new String[] { "t2" });
+        .verifyFailure(new String[] { "t2" })
+        .verifyReplTargetProperty(replicatedDbName);
 
     tuple = primary.run("use " + primaryDbName)
         .run("create external table t3 (id int)")
@@ -114,7 +115,8 @@ public class TestReplicationScenariosExternalTables extends 
BaseReplicationAcros
     replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
         .run("use " + replicatedDbName)
         .run("show tables like 't3'")
-        .verifyFailure(new String[] { "t3" });
+        .verifyFailure(new String[] { "t3" })
+        .verifyReplTargetProperty(replicatedDbName);
   }
 
   @Test
@@ -300,7 +302,8 @@ public class TestReplicationScenariosExternalTables extends 
BaseReplicationAcros
         .run("show tables like 't2'")
         .verifyResults(new String[] { "t2" })
         .run("select place from t2")
-        .verifyResults(new String[] { "bangalore" });
+        .verifyResults(new String[] { "bangalore" })
+        .verifyReplTargetProperty(replicatedDbName);
 
     assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + 
".t2");
 
@@ -325,7 +328,8 @@ public class TestReplicationScenariosExternalTables extends 
BaseReplicationAcros
         .run("select place from t2 where country='india'")
         .verifyResults(new String[] { "bangalore", "pune", "mumbai" })
         .run("select place from t2 where country='australia'")
-        .verifyResults(new String[] { "sydney" });
+        .verifyResults(new String[] { "sydney" })
+        .verifyReplTargetProperty(replicatedDbName);
 
     Path customPartitionLocation =
         new Path("/" + testName.getMethodName() + 
"/partition_data/t2/country=france");
@@ -345,7 +349,8 @@ public class TestReplicationScenariosExternalTables extends 
BaseReplicationAcros
     replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
         .run("use " + replicatedDbName)
         .run("select place from t2 where country='france'")
-        .verifyResults(new String[] { "paris" });
+        .verifyResults(new String[] { "paris" })
+        .verifyReplTargetProperty(replicatedDbName);
 
     // change the location of the partition via alter command
     String tmpLocation = "/tmp/" + System.nanoTime();
@@ -358,7 +363,8 @@ public class TestReplicationScenariosExternalTables extends 
BaseReplicationAcros
     replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
         .run("use " + replicatedDbName)
         .run("select place from t2 where country='france'")
-        .verifyResults(new String[] {});
+        .verifyResults(new String[] {})
+        .verifyReplTargetProperty(replicatedDbName);
 
     // Changing location of one of the partitions shouldn't result in changing 
location of other
     // partitions as well as that of the table.
@@ -418,7 +424,8 @@ public class TestReplicationScenariosExternalTables extends 
BaseReplicationAcros
         .run("show partitions t1")
         .verifyResults(new String[] { "country=india", "country=us" })
         .run("select place from t1 order by place")
-        .verifyResults(new String[] { "bangalore", "mumbai", "pune" });
+        .verifyResults(new String[] { "bangalore", "mumbai", "pune" })
+        .verifyReplTargetProperty(replicatedDbName);
 
     // Delete one of the file and update another one.
     fs.delete(new Path(partitionDir, "file.txt"), true);
@@ -438,7 +445,8 @@ public class TestReplicationScenariosExternalTables extends 
BaseReplicationAcros
             .run("show partitions t1")
             .verifyResults(new String[] { "country=india", "country=us" })
             .run("select place from t1 order by place")
-            .verifyResults(new String[] { "chennai" });
+            .verifyResults(new String[] { "chennai" })
+            .verifyReplTargetProperty(replicatedDbName);
 
     Hive hive = Hive.get(replica.getConf());
     Set<Partition> partitions =
@@ -453,7 +461,8 @@ public class TestReplicationScenariosExternalTables extends 
BaseReplicationAcros
 
     replica.load(replicatedDbName, tuple.dumpLocation)
         .run("select * From t1")
-        .verifyResults(new String[] {});
+        .verifyResults(new String[] {})
+        .verifyReplTargetProperty(replicatedDbName);
 
     for (String path : paths) {
       assertTrue(replica.miniDFSCluster.getFileSystem().exists(new 
Path(path)));
@@ -489,7 +498,8 @@ public class TestReplicationScenariosExternalTables extends 
BaseReplicationAcros
             .run("show tables like 't1'")
             .verifyFailure(new String[] {"t1" })
             .run("show tables like 't2'")
-            .verifyFailure(new String[] {"t2" });
+            .verifyFailure(new String[] {"t2" })
+            .verifyReplTargetProperty(replicatedDbName);
 
     dumpWithClause = Arrays.asList("'" + 
HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
                                    "'" + 
HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'");
@@ -532,7 +542,8 @@ public class TestReplicationScenariosExternalTables extends 
BaseReplicationAcros
             .run("show tables like 't3'")
             .verifyResult("t3")
             .run("show tables like 't4'")
-            .verifyResult("t4");
+            .verifyResult("t4")
+            .verifyReplTargetProperty(replicatedDbName);
 
     // Ckpt should be set on bootstrapped tables.
     replica.verifyIfCkptSetForTables(replicatedDbName, Arrays.asList("t2", 
"t3"), tuple.dumpLocation);
@@ -551,7 +562,8 @@ public class TestReplicationScenariosExternalTables extends 
BaseReplicationAcros
             .run("select id from t3 order by id")
             .verifyResults(Arrays.asList("10", "20"))
             .run("select id from t4 order by id")
-            .verifyResults(Arrays.asList("10", "20"));
+            .verifyResults(Arrays.asList("10", "20"))
+            .verifyReplTargetProperty(replicatedDbName);
   }
 
   @Test
@@ -580,7 +592,8 @@ public class TestReplicationScenariosExternalTables extends 
BaseReplicationAcros
             .run("show tables")
             .verifyResult("t3")
             .run("select id from t3")
-            .verifyResult("1");
+            .verifyResult("1")
+            .verifyReplTargetProperty(replicatedDbName);
 
     dumpWithClause = Arrays.asList("'" + 
HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
             "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + 
"'='true'");
@@ -648,7 +661,8 @@ public class TestReplicationScenariosExternalTables extends 
BaseReplicationAcros
               .run("select id from t4")
               .verifyResults(Arrays.asList("10", "20"))
               .run("select id from t5")
-              .verifyResult("10");
+              .verifyResult("10")
+              .verifyReplTargetProperty(replicatedDbName);
 
       // Once the REPL LOAD is successful, the this config should be unset or 
else, the subsequent REPL LOAD
       // will also drop those tables which will cause data loss.
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
index 270e61a..78f505b 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
@@ -165,7 +165,8 @@ public class TestTableLevelReplicationScenarios extends 
BaseReplicationScenarios
     replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
             .run("use " + replicatedDbName)
             .run("show tables")
-            .verifyResults(expectedTables);
+            .verifyResults(expectedTables)
+            .verifyReplTargetProperty(replicatedDbName);
 
     if (records == null) {
       records = new String[] {"1"};
@@ -459,7 +460,8 @@ public class TestTableLevelReplicationScenarios extends 
BaseReplicationScenarios
     replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
             .run("use " + replicatedDbName)
             .run("show tables")
-            .verifyResults(replicatedTables);
+            .verifyResults(replicatedTables)
+            .verifyReplTargetProperty(replicatedDbName);
   }
 
   @Test
@@ -497,7 +499,8 @@ public class TestTableLevelReplicationScenarios extends 
BaseReplicationScenarios
     replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
             .run("use " + replicatedDbName)
             .run("show tables")
-            .verifyResults(incrementalReplicatedTables);
+            .verifyResults(incrementalReplicatedTables)
+            .verifyReplTargetProperty(replicatedDbName);
   }
 
   @Test
@@ -648,7 +651,8 @@ public class TestTableLevelReplicationScenarios extends 
BaseReplicationScenarios
     replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
             .run("use " + replicatedDbName)
             .run("show tables")
-            .verifyResults(incrementalReplicatedTables);
+            .verifyResults(incrementalReplicatedTables)
+            .verifyReplTargetProperty(replicatedDbName);
   }
 
   @Test
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 6326bc3..5fbe48d 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.repl.ReplConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
@@ -447,6 +448,23 @@ public class WarehouseInstance implements Closeable {
     }
   }
 
+  // Make sure that every table in the target database is marked as target of 
the replication.
+  // Stats updater task and partition management task skip processing tables 
being replicated into.
+  private void verifyReplTargetProperty(Map<String, String> props) {
+    assertTrue(props.containsKey(ReplConst.REPL_TARGET_TABLE_PROPERTY));
+  }
+
+  public WarehouseInstance verifyReplTargetProperty(String dbName, 
List<String> tblNames) throws Exception {
+    for (String tblName : tblNames) {
+      verifyReplTargetProperty(getTable(dbName, tblName).getParameters());
+    }
+    return this;
+  }
+
+  public WarehouseInstance verifyReplTargetProperty(String dbName) throws 
Exception {
+    return verifyReplTargetProperty(dbName, getAllTables(dbName));
+  }
+
   public Database getDatabase(String dbName) throws Exception {
     try {
       return client.getDatabase(dbName);
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/creation/CreateTableOperation.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/creation/CreateTableOperation.java
index bac0b4c..b6b7d1b 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/creation/CreateTableOperation.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/creation/CreateTableOperation.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.ddl.table.creation;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.repl.ReplConst;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
@@ -85,6 +86,14 @@ public class CreateTableOperation extends 
DDLOperation<CreateTableDesc> {
     if (desc.getReplaceMode()) {
       createTableReplaceMode(tbl, replDataLocationChanged);
     } else {
+      // Some HMS background tasks skip processing tables being replicated 
into. Set the
+      // replication property while creating the table so that they can 
identify such tables right
+      // from the beginning. Set it to 0, which is lesser than any eventId 
ever created. This will
+      // soon be overwritten by an actual value.
+      if (desc.getReplicationSpec().isInReplicationScope() &&
+              
!tbl.getParameters().containsKey(ReplConst.REPL_TARGET_TABLE_PROPERTY)) {
+        tbl.getParameters().put(ReplConst.REPL_TARGET_TABLE_PROPERTY, "0");
+      }
       createTableNonReplaceMode(tbl);
     }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 48213d1..ad3e55a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse;
 
 import com.google.common.base.Function;
 import com.google.common.base.Predicate;
+import org.apache.hadoop.hive.common.repl.ReplConst;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 
@@ -51,11 +52,11 @@ public class ReplicationSpec {
   private boolean isMigratingToExternalTable = false;
   private boolean needDupCopyCheck = false;
 
-  // Key definitions related to replication
+  // Key definitions related to replication.
   public enum KEY {
     REPL_SCOPE("repl.scope"),
     EVENT_ID("repl.event.id"),
-    CURR_STATE_ID("repl.last.id"),
+    CURR_STATE_ID(ReplConst.REPL_TARGET_TABLE_PROPERTY),
     NOOP("repl.noop"),
     LAZY("repl.lazy"),
     IS_REPLACE("repl.is.replace"),
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java 
b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
index 8acb1c5..444c7ad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.repl.ReplConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.MetaStoreThread;
 import org.apache.hadoop.hive.metastore.ObjectStore;
@@ -175,7 +176,7 @@ public class StatsUpdaterThread extends Thread implements 
MetaStoreThread {
   }
 
   @VisibleForTesting
-  boolean runOneIteration() {
+  public boolean runOneIteration() {
     List<TableName> fullTableNames;
     try {
       fullTableNames = getTablesToCheck();
@@ -220,6 +221,17 @@ public class StatsUpdaterThread extends Thread implements 
MetaStoreThread {
     String skipParam = 
table.getParameters().get(SKIP_STATS_AUTOUPDATE_PROPERTY);
     if ("true".equalsIgnoreCase(skipParam)) return null;
 
+    // If the table is being replicated into,
+    // 1. the stats are also replicated from the source, so we don't need 
those to be calculated
+    //    on the target again
+    // 2. updating stats requires a writeId to be created. Hence writeIds on 
source and target
+    //    can get out of sync when stats are updated. That can cause 
consistency issues.
+    String replTrgtParam = 
table.getParameters().get(ReplConst.REPL_TARGET_TABLE_PROPERTY);
+    if (replTrgtParam != null && !replTrgtParam.isEmpty()) {
+      LOG.debug("Skipping table {} since it is being replicated into", table);
+      return null;
+    }
+
     // Note: ideally we should take a lock here to pretend to be a real reader.
     //       For now, this check is going to have race potential; it may run a 
spurious analyze.
     String writeIdString = null;
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java 
b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
index a2f8bab..80251df 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
@@ -30,6 +30,7 @@ import 
org.apache.curator.shaded.com.google.common.collect.Lists;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.common.repl.ReplConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -39,13 +40,13 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.ql.DriverUtils;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -540,6 +541,73 @@ public class TestStatsUpdaterThread {
     msClient.close();
   }
 
+  // A table which is target of replication should not be queued for stats 
update, and hence its
+  // stats state should not change.
+  @Test(timeout=40000)
+  public void testNoStatsUpdateForSimpleReplTable() throws Exception {
+    testNoStatsUpdateForReplTable("simple", "");
+  }
+
+  // A table which is target of replication should not be queued for stats 
update, and hence its
+  // stats state should not change.
+  @Test(timeout=40000)
+  public void testNoStatsUpdateForTxnReplTable() throws Exception {
+    testNoStatsUpdateForReplTable("txn",
+            "TBLPROPERTIES 
(\"transactional\"=\"true\",\"transactional_properties\"=\"insert_only\")");
+  }
+
+  private void testNoStatsUpdateForReplTable(String tblNamePrefix, String 
txnProperty) throws Exception {
+    String tblWOStats = tblNamePrefix + "_repl_trgt_nostats";
+    String tblWithStats = tblNamePrefix + "_repl_trgt_stats";
+    String ptnTblWOStats = tblNamePrefix + "_ptn_repl_trgt_nostats";
+    String ptnTblWithStats = tblNamePrefix + "_ptn_repl_trgt_stats";
+
+    StatsUpdaterThread su = createUpdater();
+    su.startWorkers();
+    IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+
+    executeQuery("create table " + tblWOStats + "(i int, s string) " + 
txnProperty);
+    // Mark this table as being replicated into
+    setTableReplTargetProperty(tblWOStats);
+    executeQuery("insert into " + tblWOStats + "(i, s) values (1, 'test')");
+    verifyStatsUpToDate(tblWOStats, Lists.newArrayList("i"), msClient, false);
+
+    executeQuery("create table " + ptnTblWOStats + "(s string) partitioned by 
(i int) " + txnProperty);
+    // Mark this table as being replicated into
+    setTableReplTargetProperty(ptnTblWOStats);
+    executeQuery("insert into " + ptnTblWOStats + "(i, s) values (1, 'test')");
+    executeQuery("insert into " + ptnTblWOStats + "(i, s) values (2, 
'test2')");
+    executeQuery("insert into " + ptnTblWOStats + "(i, s) values (3, 
'test3')");
+    verifyPartStatsUpToDate(3, 1, msClient, ptnTblWOStats, false);
+
+    executeQuery("create table " + tblWithStats + "(i int, s string)" + 
txnProperty);
+    // Mark this table as being replicated into
+    setTableReplTargetProperty(tblWithStats);
+    executeQuery("insert into " + tblWithStats + "(i, s) values (1, 'test')");
+    executeQuery("analyze table " + tblWithStats + " compute statistics for 
columns");
+    verifyStatsUpToDate(tblWithStats, Lists.newArrayList("i"), msClient, true);
+
+    executeQuery("create table " + ptnTblWithStats + "(s string) partitioned 
by (i int) " + txnProperty);
+    // Mark this table as being replicated into
+    setTableReplTargetProperty(ptnTblWithStats);
+    executeQuery("insert into " + ptnTblWithStats + "(i, s) values (1, 
'test')");
+    executeQuery("insert into " + ptnTblWithStats + "(i, s) values (2, 
'test2')");
+    executeQuery("insert into " + ptnTblWithStats + "(i, s) values (3, 
'test3')");
+    executeQuery("analyze table " + ptnTblWithStats + " compute statistics for 
columns");
+    verifyPartStatsUpToDate(3, 1, msClient, ptnTblWithStats, true);
+
+    assertFalse(su.runOneIteration());
+    Assert.assertEquals(0, su.getQueueLength());
+    verifyStatsUpToDate(tblWOStats, Lists.newArrayList("i"), msClient, false);
+    verifyStatsUpToDate(tblWithStats, Lists.newArrayList("i"), msClient, true);
+    verifyPartStatsUpToDate(3, 1, msClient, ptnTblWOStats, false);
+    verifyPartStatsUpToDate(3, 1, msClient, ptnTblWithStats, true);
+
+    msClient.close();
+  }
+
   private void verifyPartStatsUpToDate(int partCount, int skip,
       IMetaStoreClient msClient, String tbl, boolean isUpToDate) throws 
Exception {
     for (int i = skip; i < partCount; ++i) {
@@ -566,6 +634,11 @@ public class TestStatsUpdaterThread {
     msClient.alter_table(table.getDbName(), table.getTableName(), table);
   }
 
+  private void setTableReplTargetProperty(String tblName) throws Exception {
+    executeQuery("alter table " + tblName +
+            " set tblproperties ('" + ReplConst.REPL_TARGET_TABLE_PROPERTY + 
"' = '1')");
+  }
+
   private void setPartitionSkipProperty(
       IMetaStoreClient msClient, String tblName, String partName, String val) 
throws Exception {
     Partition part = msClient.getPartition(ss.getCurrentDatabase(), tblName, 
partName);
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java
index 7c29969..f075e2a 100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java
@@ -37,4 +37,11 @@ public class ReplConst {
    * database is created as part of repl load and survives the incremental 
cycles.
    */
   public static final String REPL_TARGET_DB_PROPERTY = "hive.repl.ckpt.key";
+
+  /**
+   * A table which is target of replication will have this property set. The 
property serves two
+   * purposes, 1. identifies the tables being replicated into and 2. records 
the event id of the
+   * last event affecting this table.
+   */
+  public static final String REPL_TARGET_TABLE_PROPERTY = "repl.last.id";
 }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
index da0259c..e4488f4 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -30,6 +31,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.repl.ReplConst;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableMeta;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
@@ -80,6 +82,16 @@ public class PartitionManagementTask implements 
MetastoreTaskThread {
     return conf;
   }
 
+  private static boolean partitionDiscoveryEnabled(Map<String, String> params) 
{
+    return params != null && 
params.containsKey(DISCOVER_PARTITIONS_TBLPROPERTY) &&
+            
params.get(DISCOVER_PARTITIONS_TBLPROPERTY).equalsIgnoreCase("true");
+  }
+
+  private static boolean tblBeingReplicatedInto(Map<String, String> params) {
+    return params != null && 
params.containsKey(ReplConst.REPL_TARGET_TABLE_PROPERTY) &&
+            !params.get(ReplConst.REPL_TARGET_TABLE_PROPERTY).trim().isEmpty();
+  }
+
   @Override
   public void run() {
     if (lock.tryLock()) {
@@ -116,8 +128,7 @@ public class PartitionManagementTask implements 
MetastoreTaskThread {
 
         for (TableMeta tableMeta : foundTableMetas) {
           Table table = msc.getTable(tableMeta.getCatName(), 
tableMeta.getDbName(), tableMeta.getTableName());
-          if (table.getParameters() != null && 
table.getParameters().containsKey(DISCOVER_PARTITIONS_TBLPROPERTY) &&
-            
table.getParameters().get(DISCOVER_PARTITIONS_TBLPROPERTY).equalsIgnoreCase("true"))
 {
+          if (partitionDiscoveryEnabled(table.getParameters()) && 
!tblBeingReplicatedInto(table.getParameters())) {
             candidateTables.add(table);
           }
         }
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
index 9562b4f..1961a70 100644
--- 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
@@ -36,6 +36,7 @@ import java.util.concurrent.Future;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.repl.ReplConst;
 import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
 import org.apache.hadoop.hive.metastore.api.Catalog;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -565,6 +566,94 @@ public class TestPartitionManagement {
     assertEquals(4, partitions.size());
   }
 
+  @Test
+  public void testNoPartitionDiscoveryForReplTable() throws Exception {
+    String dbName = "db_repl1";
+    String tableName = "tbl_repl1";
+    Map<String, Column> colMap = buildAllColumns();
+    List<String> partKeys = Lists.newArrayList("state", "dt");
+    List<String> partKeyTypes = Lists.newArrayList("string", "date");
+    List<List<String>> partVals = Lists.newArrayList(
+            Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"),
+            Lists.newArrayList("CA", "1986-04-28"),
+            Lists.newArrayList("MN", "2018-11-31"));
+    createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, 
partKeyTypes, partVals, colMap, false);
+    Table table = client.getTable(dbName, tableName);
+    List<Partition> partitions = client.listPartitions(dbName, tableName, 
(short) -1);
+    assertEquals(3, partitions.size());
+    String tableLocation = table.getSd().getLocation();
+    URI location = URI.create(tableLocation);
+    Path tablePath = new Path(location);
+    FileSystem fs = FileSystem.get(location, conf);
+    Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01");
+    Path newPart2 = new Path(tablePath, "state=UT/dt=2018-12-02");
+    fs.mkdirs(newPart1);
+    fs.mkdirs(newPart2);
+    assertEquals(5, fs.listStatus(tablePath).length);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+
+    // table property is set to true, but the table is marked as replication 
target. The new
+    // partitions should not be created
+    
table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY,
 "true");
+    table.getParameters().put(ReplConst.REPL_TARGET_TABLE_PROPERTY, "1");
+    client.alter_table(dbName, tableName, table);
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+
+    // change table type to external, delete a partition directory and make 
sure partition discovery works
+    table.getParameters().put("EXTERNAL", "true");
+    table.setTableType(TableType.EXTERNAL_TABLE.name());
+    client.alter_table(dbName, tableName, table);
+    // Delete location of one of the partitions. The partition discovery task 
should not drop
+    // that partition.
+    boolean deleted = fs.delete((new 
Path(URI.create(partitions.get(0).getSd().getLocation()))).getParent(),
+                    true);
+    assertTrue(deleted);
+    assertEquals(4, fs.listStatus(tablePath).length);
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+  }
+
+  @Test
+  public void testNoPartitionRetentionForReplTarget() throws TException, 
InterruptedException {
+    String dbName = "db_repl2";
+    String tableName = "tbl_repl2";
+    Map<String, Column> colMap = buildAllColumns();
+    List<String> partKeys = Lists.newArrayList("state", "dt");
+    List<String> partKeyTypes = Lists.newArrayList("string", "date");
+    List<List<String>> partVals = Lists.newArrayList(
+            Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"),
+            Lists.newArrayList("CA", "1986-04-28"),
+            Lists.newArrayList("MN", "2018-11-31"));
+    // Check for the existence of partitions 10 seconds after the partition 
retention period has
+    // elapsed. Gives enough time for the partition retention task to work.
+    long partitionRetentionPeriodMs = 20000;
+    long waitingPeriodForTest = partitionRetentionPeriodMs + 10 * 1000;
+    createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, 
partKeyTypes, partVals, colMap, false);
+    Table table = client.getTable(dbName, tableName);
+    List<Partition> partitions = client.listPartitions(dbName, tableName, 
(short) -1);
+    assertEquals(3, partitions.size());
+
+    
table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY,
 "true");
+    
table.getParameters().put(PartitionManagementTask.PARTITION_RETENTION_PERIOD_TBLPROPERTY,
+            partitionRetentionPeriodMs + "ms");
+    table.getParameters().put(ReplConst.REPL_TARGET_TABLE_PROPERTY, "1");
+    client.alter_table(dbName, tableName, table);
+
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+
+    // after 30s all partitions should remain in-tact for a table which is 
target of replication.
+    Thread.sleep(waitingPeriodForTest);
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+  }
+
   private void runPartitionManagementTask(Configuration conf) {
     PartitionManagementTask task = new PartitionManagementTask();
     task.setConf(conf);

Reply via email to