This is an automated email from the ASF dual-hosted git repository. anishek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new a712f4b HIVE-24502:Store table level regular expression used during dump for table level replication (Aasha Medhi, reviewed by Pravin Kumar Sinha) a712f4b is described below commit a712f4b048cd8aa66f3692a3f08b62c14b278353 Author: Anishek Agarwal <anis...@gmail.com> AuthorDate: Mon Jan 4 09:57:45 2021 +0530 HIVE-24502:Store table level regular expression used during dump for table level replication (Aasha Medhi, reviewed by Pravin Kumar Sinha) --- .../hive/ql/parse/TestReplicationScenarios.java | 4 ++-- .../parse/TestTableLevelReplicationScenarios.java | 23 ++++++++++++++++++++++ .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 12 +++++------ .../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 4 +++- .../hadoop/hive/ql/exec/repl/ReplLoadWork.java | 5 ++++- .../hive/ql/parse/ReplicationSemanticAnalyzer.java | 2 +- .../hive/ql/parse/repl/load/DumpMetaData.java | 19 +++++++++++++----- 7 files changed, 53 insertions(+), 16 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index ab81834..4b54525 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -331,7 +331,7 @@ public class TestReplicationScenarios { * appropriately. This tests bootstrap behaviour primarily. */ @Test - public void testBasic() throws IOException { + public void testBasic() throws IOException, SemanticException { String name = testName.getMethodName(); String dbName = createDB(name, driver); run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); @@ -487,7 +487,7 @@ public class TestReplicationScenarios { } ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, loadPath.toString(), sourceDb, replicadb, null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId), - 0L, metricCollector); + 0L, metricCollector, false); Task replLoadTask = TaskFactory.get(replLoadWork, confTemp); replLoadTask.initialize(null, null, new TaskQueue(driver.getContext()), driver.getContext()); replLoadTask.executeTask(null); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java index 9d5e8af..4d47254 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.security.UserGroupInformation; import org.junit.Assert; @@ -46,6 +47,7 @@ import java.io.InputStreamReader; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME; +import static org.junit.Assert.assertEquals; /** * Tests Table level replication scenarios. @@ -156,6 +158,10 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios WarehouseInstance.Tuple tuple = primary.dump(replPolicy, oldReplPolicy, dumpWithClause); + DumpMetaData dumpMetaData = new DumpMetaData(new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR), conf); + Assert.assertEquals(oldReplPolicy != null && !replPolicy.equals(oldReplPolicy), + dumpMetaData.isReplScopeModified()); + if (bootstrappedTables != null) { verifyBootstrapDirInIncrementalDump(tuple.dumpLocation, bootstrappedTables); } @@ -163,6 +169,8 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios // If the policy contains '.'' means its table level replication. verifyTableListForPolicy(tuple.dumpLocation, replPolicy.contains(".'") ? expectedTables : null); + verifyDumpMetadata(replPolicy, new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR)); + replica.load(replicatedDbName, replPolicy, loadWithClause) .run("use " + replicatedDbName) .run("show tables") @@ -180,6 +188,21 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios return tuple.lastReplicationId; } + private void verifyDumpMetadata(String replPolicy, Path dumpPath) throws SemanticException { + String[] parseReplPolicy = replPolicy.split("\\.'"); + assertEquals(parseReplPolicy[0], new DumpMetaData(dumpPath, conf).getReplScope().getDbName()); + if (parseReplPolicy.length > 1) { + parseReplPolicy[1] = parseReplPolicy[1].replaceAll("'", ""); + assertEquals(parseReplPolicy[1], + new DumpMetaData(dumpPath, conf).getReplScope().getIncludedTableNames()); + } + if (parseReplPolicy.length > 2) { + parseReplPolicy[2] = parseReplPolicy[2].replaceAll("'", ""); + assertEquals(parseReplPolicy[2], + new DumpMetaData(dumpPath, conf).getReplScope().getExcludedTableNames()); + } + } + private String replicateAndVerifyClearDump(String replPolicy, String oldReplPolicy, String lastReplId, List<String> dumpWithClause, List<String> loadWithClause, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 1fce791..7e690fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -54,7 +54,6 @@ import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.orc.ExternalCache; import org.apache.hadoop.hive.ql.lockmgr.DbLockManager; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; import org.apache.hadoop.hive.ql.lockmgr.LockException; @@ -604,12 +603,11 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { replLogger.endLog(lastReplId.toString()); LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId); long executionId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); - dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, executionId); + dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, executionId, + work.oldReplScope != null); // If repl policy is changed (oldReplScope is set), then pass the current replication policy, // so that REPL LOAD would drop the tables which are not included in current policy. - if (work.oldReplScope != null) { - dmd.setReplScope(work.replScope); - } + dmd.setReplScope(work.replScope); dmd.write(true); int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE); try (FileList managedTblList = createTableFileList(dumpRoot, EximUtil.FILE_LIST, cacheSize); @@ -940,7 +938,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { LOG.info("Preparing to return {},{}->{}", dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); - dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot, executorId); + dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot, executorId, + work.oldReplScope != null); + dmd.setReplScope(work.replScope); dmd.write(true); work.setFunctionCopyPathIterator(functionsBinaryCopyPaths.iterator()); setDataCopyIterators(extTableFileList, managedTblList); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index e7245bd..2f41673 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -592,7 +592,9 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { private int executeIncrementalLoad() throws Exception { // If replication policy is changed between previous and current repl load, then drop the tables // that are excluded in the new replication policy. - dropTablesExcludedInReplScope(work.currentReplScope); + if (work.replScopeModified) { + dropTablesExcludedInReplScope(work.currentReplScope); + } IncrementalLoadTasksBuilder builder = work.incrementalLoadTasksBuilder(); // If incremental events are already applied, then check and perform if need to bootstrap any tables. if (!builder.hasMoreWork() && work.isLastReplIDUpdated()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index 376fd7c..a52dac2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -58,6 +58,7 @@ public class ReplLoadWork implements Serializable { private String sourceDbName; private Long dumpExecutionId; private final transient ReplicationMetricCollector metricCollector; + final boolean replScopeModified; private final ConstraintEventsIterator constraintsIterator; private int loadTaskRunCount = 0; @@ -78,7 +79,8 @@ public class ReplLoadWork implements Serializable { String sourceDbName, String dbNameToLoadIn, ReplScope currentReplScope, LineageState lineageState, boolean isIncrementalDump, Long eventTo, Long dumpExecutionId, - ReplicationMetricCollector metricCollector) throws IOException, SemanticException { + ReplicationMetricCollector metricCollector, + boolean replScopeModified) throws IOException, SemanticException { sessionStateLineageState = lineageState; this.dumpDirectory = dumpDirectory; this.dbNameToLoadIn = dbNameToLoadIn; @@ -86,6 +88,7 @@ public class ReplLoadWork implements Serializable { this.sourceDbName = sourceDbName; this.dumpExecutionId = dumpExecutionId; this.metricCollector = metricCollector; + this.replScopeModified = replScopeModified; // If DB name is changed during REPL LOAD, then set it instead of referring to source DB name. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 4c10499..ed408b0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -372,7 +372,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { dmd.getReplScope(), queryState.getLineageState(), evDump, dmd.getEventTo(), dmd.getDumpExecutionId(), initMetricCollection(!evDump, loadPath.toString(), replScope.getDbName(), - dmd.getDumpExecutionId())); + dmd.getDumpExecutionId()), dmd.isReplScopeModified()); rootTasks.add(TaskFactory.get(replLoadWork, conf)); } else { LOG.warn("Previous Dump Already Loaded"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java index b6d43f7..c428ea2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java @@ -52,6 +52,7 @@ public class DumpMetaData { private final Path dumpFile; private final HiveConf hiveConf; private Long dumpExecutionId; + private boolean replScopeModified = false; public DumpMetaData(Path dumpRoot, HiveConf hiveConf) { this.hiveConf = hiveConf; @@ -61,16 +62,18 @@ public class DumpMetaData { public DumpMetaData(Path dumpRoot, DumpType lvl, Long eventFrom, Long eventTo, Path cmRoot, HiveConf hiveConf) { this(dumpRoot, hiveConf); - setDump(lvl, eventFrom, eventTo, cmRoot, 0L); + setDump(lvl, eventFrom, eventTo, cmRoot, 0L, false); } - public void setDump(DumpType lvl, Long eventFrom, Long eventTo, Path cmRoot, Long dumpExecutionId) { + public void setDump(DumpType lvl, Long eventFrom, Long eventTo, Path cmRoot, Long dumpExecutionId, + boolean replScopeModified) { this.dumpType = lvl; this.eventFrom = eventFrom; this.eventTo = eventTo; this.cmRoot = cmRoot; this.initialized = true; this.dumpExecutionId = dumpExecutionId; + this.replScopeModified = replScopeModified; } public void setPayload(String payload) { @@ -117,10 +120,10 @@ public class DumpMetaData { br = new BufferedReader(new InputStreamReader(fs.open(dumpFile))); String line; if ((line = br.readLine()) != null) { - String[] lineContents = line.split("\t", 6); + String[] lineContents = line.split("\t", 7); setDump(DumpType.valueOf(lineContents[0]), Long.valueOf(lineContents[1]), Long.valueOf(lineContents[2]), - new Path(lineContents[3]), Long.valueOf(lineContents[4])); + new Path(lineContents[3]), Long.valueOf(lineContents[4]), Boolean.valueOf(lineContents[6])); setPayload(lineContents[5].equals(Utilities.nullStringOutput) ? null : lineContents[5]); } else { throw new IOException( @@ -165,6 +168,11 @@ public class DumpMetaData { return dumpExecutionId; } + public boolean isReplScopeModified() throws SemanticException { + initializeIfNot(); + return replScopeModified; + } + public ReplScope getReplScope() throws SemanticException { initializeIfNot(); return replScope; @@ -219,7 +227,8 @@ public class DumpMetaData { eventTo.toString(), cmRoot.toString(), dumpExecutionId.toString(), - payload) + payload, + String.valueOf(replScopeModified)) ); if (replScope != null) { listValues.add(prepareReplScopeValues());