This is an automated email from the ASF dual-hosted git repository. anishek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 6457170 HIVE-23611: Mandate fully qualified absolute path for external table base dir during REPL operation (Pravin Kumar Sinha, reviewed by Aasha Medhi) 6457170 is described below commit 64571702771b1b48d4366080fc49bf2f6978df5b Author: Anishek Agarwal <anis...@gmail.com> AuthorDate: Wed Jul 1 09:07:40 2020 +0530 HIVE-23611: Mandate fully qualified absolute path for external table base dir during REPL operation (Pravin Kumar Sinha, reviewed by Aasha Medhi) --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 4 +- .../ql/parse/BaseReplicationAcrossInstances.java | 51 +++++--- .../parse/BaseReplicationScenariosAcidTables.java | 11 ++ .../hadoop/hive/ql/parse/ReplicationTestUtils.java | 18 +-- .../TestReplicationScenariosExclusiveReplica.java | 8 -- .../TestReplicationScenariosExternalTables.java | 138 ++++++++++++--------- ...icationScenariosExternalTablesMetaDataOnly.java | 47 +++---- .../parse/TestReplicationWithTableMigration.java | 4 + .../parse/TestScheduledReplicationScenarios.java | 3 +- .../parse/TestTableLevelReplicationScenarios.java | 38 +++--- .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 7 +- .../hive/ql/exec/repl/ReplExternalTables.java | 16 ++- .../hadoop/hive/ql/exec/repl/util/ReplUtils.java | 2 + 13 files changed, 188 insertions(+), 159 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f3878ef..6d42ddc 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -526,8 +526,8 @@ public class HiveConf extends Configuration { + " Schemes of the file system which does not support atomic move (rename) can be specified here to \n " + " speed up the repl load operation. In file system like HDFS where move operation is atomic, this \n" + " optimization should not be enabled as it may lead to inconsistent data read for non acid tables."), - REPL_EXTERNAL_TABLE_BASE_DIR("hive.repl.replica.external.table.base.dir", "/", - "This is the base directory on the target/replica warehouse under which data for " + REPL_EXTERNAL_TABLE_BASE_DIR("hive.repl.replica.external.table.base.dir", null, + "This is the fully qualified base directory on the target/replica warehouse under which data for " + "external tables is stored. This is relative base path and hence prefixed to the source " + "external table path on target cluster."), REPL_INCLUDE_AUTHORIZATION_METADATA("hive.repl.include.authorization.metadata", false, diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java index 6b96d2b..5fb44dd 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; @@ -46,6 +48,8 @@ public class BaseReplicationAcrossInstances { String primaryDbName, replicatedDbName; static HiveConf conf; // for primary static HiveConf replicaConf; + protected static final Path REPLICA_EXTERNAL_BASE = new Path("/replica_external_base"); + protected static String fullyQualifiedReplicaExternalBase; static void internalBeforeClassSetup(Map<String, String> overrides, Class clazz) throws Exception { @@ -59,6 +63,7 @@ public class BaseReplicationAcrossInstances { put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true"); }}; localOverrides.putAll(overrides); + setReplicaExternalBase(miniDFSCluster.getFileSystem(), localOverrides); primary = new WarehouseInstance(LOG, miniDFSCluster, localOverrides); localOverrides.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir); replica = new WarehouseInstance(LOG, miniDFSCluster, localOverrides); @@ -68,33 +73,39 @@ public class BaseReplicationAcrossInstances { static void internalBeforeClassSetupExclusiveReplica(Map<String, String> primaryOverrides, Map<String, String> replicaOverrides, Class clazz) throws Exception { - conf = new HiveConf(clazz); - conf.set("dfs.client.use.datanode.hostname", "true"); - conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); - String primaryBaseDir = Files.createTempDirectory("base").toFile().getAbsolutePath(); - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, primaryBaseDir); - MiniDFSCluster miniPrimaryDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); - Map<String, String> localOverrides = new HashMap<String, String>() { - { - put("fs.defaultFS", miniPrimaryDFSCluster.getFileSystem().getUri().toString()); - put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true"); - } - }; - localOverrides.putAll(primaryOverrides); - primary = new WarehouseInstance(LOG, miniPrimaryDFSCluster, localOverrides); + /** + * Setup replica cluster. + */ String replicaBaseDir = Files.createTempDirectory("replica").toFile().getAbsolutePath(); - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, replicaBaseDir); replicaConf = new HiveConf(clazz); replicaConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, replicaBaseDir); replicaConf.set("dfs.client.use.datanode.hostname", "true"); replicaConf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); MiniDFSCluster miniReplicaDFSCluster = new MiniDFSCluster.Builder(replicaConf).numDataNodes(1).format(true).build(); - localOverrides.clear(); - localOverrides.putAll(replicaOverrides); + + Map<String, String> localOverrides = new HashMap<>(); localOverrides.put("fs.defaultFS", miniReplicaDFSCluster.getFileSystem().getUri().toString()); localOverrides.put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true"); + localOverrides.putAll(replicaOverrides); + setReplicaExternalBase(miniReplicaDFSCluster.getFileSystem(), localOverrides); replica = new WarehouseInstance(LOG, miniReplicaDFSCluster, localOverrides); + + /** + * Setup primary cluster. + */ + String primaryBaseDir = Files.createTempDirectory("base").toFile().getAbsolutePath(); + conf = new HiveConf(clazz); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, primaryBaseDir); + conf.set("dfs.client.use.datanode.hostname", "true"); + conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); + MiniDFSCluster miniPrimaryDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + localOverrides.clear(); + localOverrides.put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true"); + localOverrides.put(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname, fullyQualifiedReplicaExternalBase); + localOverrides.put("fs.defaultFS", miniPrimaryDFSCluster.getFileSystem().getUri().toString()); + localOverrides.putAll(primaryOverrides); + primary = new WarehouseInstance(LOG, miniPrimaryDFSCluster, localOverrides); } @AfterClass @@ -103,6 +114,12 @@ public class BaseReplicationAcrossInstances { replica.close(); } + private static void setReplicaExternalBase(FileSystem fs, Map<String, String> confMap) throws IOException { + fs.mkdirs(REPLICA_EXTERNAL_BASE); + fullyQualifiedReplicaExternalBase = fs.getFileStatus(REPLICA_EXTERNAL_BASE).getPath().toString(); + confMap.put(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname, fullyQualifiedReplicaExternalBase); + } + @Before public void setup() throws Throwable { primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java index 2284fca..521a2ef 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; @@ -58,6 +60,8 @@ public class BaseReplicationScenariosAcidTables { public final TestName testName = new TestName(); protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); + private static final Path REPLICA_EXTERNAL_BASE = new Path("/replica_external_base"); + protected static String fullyQualifiedReplicaExternalBase; static WarehouseInstance primary; static WarehouseInstance replica, replicaNonAcid; static HiveConf conf; @@ -88,6 +92,7 @@ public class BaseReplicationScenariosAcidTables { acidEnableConf.putAll(overrides); + setReplicaExternalBase(miniDFSCluster.getFileSystem(), acidEnableConf); primary = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf); acidEnableConf.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir); replica = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf); @@ -101,6 +106,12 @@ public class BaseReplicationScenariosAcidTables { replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1); } + private static void setReplicaExternalBase(FileSystem fs, Map<String, String> confMap) throws IOException { + fs.mkdirs(REPLICA_EXTERNAL_BASE); + fullyQualifiedReplicaExternalBase = fs.getFileStatus(REPLICA_EXTERNAL_BASE).getPath().toString(); + confMap.put(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname, fullyQualifiedReplicaExternalBase); + } + @AfterClass public static void classLevelTearDown() throws IOException { primary.close(); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java index 902c731..efae01a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java @@ -502,19 +502,11 @@ public class ReplicationTestUtils { "creation", "creation", "merge_update", "merge_insert", "merge_insert"}); } - public static List<String> externalTableBasePathWithClause(String replExternalBase, WarehouseInstance replica) - throws IOException, SemanticException { - Path externalTableLocation = new Path(replExternalBase); - DistributedFileSystem fileSystem = replica.miniDFSCluster.getFileSystem(); - externalTableLocation = PathBuilder.fullyQualifiedHDFSUri(externalTableLocation, fileSystem); - fileSystem.mkdirs(externalTableLocation); - - // this is required since the same filesystem is used in both source and target - return Arrays.asList( - "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" - + externalTableLocation.toString() + "'", - "'distcp.options.pugpb'=''" - ); + public static List<String> includeExternalTableClause(boolean enable) { + List<String> withClause = new ArrayList<>(); + withClause.add("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='" + enable + "'"); + withClause.add("'distcp.options.pugpb'=''"); + return withClause; } public static List<String> externalTableWithClause(List<String> externalTableBasePathWithClause, Boolean bootstrap, diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java index 371f90b..7e6b5e2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java @@ -43,8 +43,6 @@ import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; */ public class TestReplicationScenariosExclusiveReplica extends BaseReplicationAcrossInstances { - private static final String REPLICA_EXTERNAL_BASE = "/replica_external_base"; - @BeforeClass public static void classLevelSetup() throws Exception { Map<String, String> overrides = new HashMap<>(); @@ -70,7 +68,6 @@ public class TestReplicationScenariosExclusiveReplica extends BaseReplicationAcr @Test public void externalTableReplicationWithRemoteStaging() throws Throwable { List<String> withClauseOptions = getStagingLocationConfig(replica.repldDir); - withClauseOptions.addAll(externalTableBasePathWithClause()); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) .run("create external table t1 (id int)") @@ -126,7 +123,6 @@ public class TestReplicationScenariosExclusiveReplica extends BaseReplicationAcr @Test public void externalTableReplicationWithLocalStaging() throws Throwable { List<String> withClauseOptions = getStagingLocationConfig(primary.repldDir); - withClauseOptions.addAll(externalTableBasePathWithClause()); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) .run("create external table t1 (id int)") @@ -185,10 +181,6 @@ public class TestReplicationScenariosExclusiveReplica extends BaseReplicationAcr return confList; } - private List<String> externalTableBasePathWithClause() throws IOException, SemanticException { - return ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); - } - private void assertExternalFileInfo(List<String> expected, String dumplocation, boolean isIncremental, WarehouseInstance warehouseInstance) throws IOException { diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index b078ea1..493a467 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -66,7 +66,6 @@ import static org.junit.Assert.assertTrue; public class TestReplicationScenariosExternalTables extends BaseReplicationAcrossInstances { - private static final String REPLICA_EXTERNAL_BASE = "/replica_external_base"; String extraPrimaryDb; @BeforeClass @@ -96,10 +95,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros @Test public void replicationWithoutExternalTables() throws Throwable { - List<String> loadWithClause = externalTableBasePathWithClause(); - List<String> dumpWithClause = Collections.singletonList - ("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'"); - + List<String> withClause = ReplicationTestUtils.includeExternalTableClause(false); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) .run("create external table t1 (id int)") @@ -109,7 +105,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("insert into table t2 partition(country='india') values ('bangalore')") .run("insert into table t2 partition(country='us') values ('austin')") .run("insert into table t2 partition(country='france') values ('paris')") - .dump(primaryDbName, dumpWithClause); + .dump(primaryDbName, withClause); // the _external_tables_file info only should be created if external tables are to be replicated not otherwise Path metadataPath = new Path(new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR), @@ -117,7 +113,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros assertFalse(primary.miniDFSCluster.getFileSystem() .exists(new Path(metadataPath + relativeExtInfoPath(primaryDbName)))); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("repl status " + replicatedDbName) .verifyResult(tuple.lastReplicationId) .run("use " + replicatedDbName) @@ -131,7 +127,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("create external table t3 (id int)") .run("insert into table t3 values (10)") .run("insert into table t3 values (20)") - .dump(primaryDbName, dumpWithClause); + .dump(primaryDbName, withClause); // the _external_tables_file info only should be created if external tables are to be replicated not otherwise metadataPath = new Path(new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR), @@ -139,7 +135,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros assertFalse(primary.miniDFSCluster.getFileSystem() .exists(new Path(metadataPath + relativeExtInfoPath(null)))); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("show tables like 't3'") .verifyFailure(new String[] { "t3" }) @@ -148,7 +144,6 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros @Test public void externalTableReplicationWithDefaultPaths() throws Throwable { - List<String> withClauseOptions = externalTableBasePathWithClause(); //creates external tables with partitions WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) @@ -159,14 +154,14 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("insert into table t2 partition(country='india') values ('bangalore')") .run("insert into table t2 partition(country='us') values ('austin')") .run("insert into table t2 partition(country='france') values ('paris')") - .dump(primaryDbName, withClauseOptions); + .dump(primaryDbName); // verify that the external table info is written correctly for bootstrap assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, primaryDbName, false); - replica.load(replicatedDbName, primaryDbName, withClauseOptions) + replica.load(replicatedDbName, primaryDbName) .run("use " + replicatedDbName) .run("show tables like 't1'") .verifyResult("t1") @@ -191,12 +186,12 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("create external table t3 (id int)") .run("insert into table t3 values (10)") .run("create external table t4 as select id from t3") - .dump(primaryDbName, withClauseOptions); + .dump(primaryDbName); // verify that the external table info is written correctly for incremental assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"), tuple.dumpLocation, true); - replica.load(replicatedDbName, primaryDbName, withClauseOptions) + replica.load(replicatedDbName, primaryDbName) .run("use " + replicatedDbName) .run("show tables like 't3'") .verifyResult("t3") @@ -209,7 +204,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros tuple = primary.run("use " + primaryDbName) .run("drop table t1") - .dumpWithCommand("repl dump " + primaryDbName); + .dump(primaryDbName); // verify that the external table info is written correctly for incremental assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"), tuple.dumpLocation, true); @@ -254,10 +249,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros // Create base directory but use HDFS path without schema or authority details. // Hive should pick up the local cluster's HDFS schema/authority. - externalTableBasePathWithClause(); List<String> withClause = Arrays.asList( - "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" - + REPLICA_EXTERNAL_BASE + "'", "'distcp.options.update'=''" ); @@ -311,7 +303,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); fs.mkdirs(externalTableLocation, new FsPermission("777")); - List<String> withClause = externalTableBasePathWithClause(); + List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true); WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .run("create external table t2 (place string) partitioned by (country string) row format " @@ -414,7 +406,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros @Test public void externalTableIncrementalCheckpointing() throws Throwable { - List<String> withClause = externalTableBasePathWithClause(); + List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) .run("create external table t1 (id int)") @@ -439,7 +431,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros ReplDumpWork.testDeletePreviousDumpMetaPath(true); - withClause = externalTableWithClause(true, true); + withClause = ReplicationTestUtils.externalTableWithClause(new ArrayList<>(), true, true); WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + primaryDbName) .run("drop table t1") .run("insert into table t2 values (5)") @@ -503,8 +495,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros @Test public void externalTableIncrementalReplication() throws Throwable { - List<String> withClause = externalTableBasePathWithClause(); - WarehouseInstance.Tuple tuple = primary.dump(primaryDbName, withClause); + WarehouseInstance.Tuple tuple = primary.dump(primaryDbName); replica.load(replicatedDbName, primaryDbName); Path externalTableLocation = new Path("/" + testName.getMethodName() + "/t1/"); @@ -517,7 +508,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros + "'") .run("alter table t1 add partition(country='india')") .run("alter table t1 add partition(country='us')") - .dump(primaryDbName, withClause); + .dump(primaryDbName); assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation, true); @@ -533,8 +524,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros outputStream.write("bangalore\n".getBytes()); } - List<String> loadWithClause = externalTableBasePathWithClause(); - replica.load(replicatedDbName, primaryDbName, withClause) + replica.load(replicatedDbName, primaryDbName) .run("use " + replicatedDbName) .run("show tables like 't1'") .verifyResult("t1") @@ -546,8 +536,8 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros // The Data should be seen after next dump-and-load cycle. tuple = primary.run("use " + primaryDbName) - .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, withClause) + .dump(primaryDbName); + replica.load(replicatedDbName, primaryDbName) .run("use " + replicatedDbName) .run("show tables like 't1'") .verifyResult("t1") @@ -565,10 +555,10 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros } // Repl load with zero events but external tables location info should present. - tuple = primary.dump(primaryDbName, withClause); + tuple = primary.dump(primaryDbName); assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation, true); - replica.load(replicatedDbName, primaryDbName, withClause) + replica.load(replicatedDbName, primaryDbName) .run("use " + replicatedDbName) .run("show tables like 't1'") .verifyResult("t1") @@ -587,7 +577,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros tuple = primary .run("alter table t1 drop partition (country='india')") .run("alter table t1 drop partition (country='us')") - .dump(primaryDbName, withClause); + .dump(primaryDbName); replica.load(replicatedDbName, primaryDbName) .run("select * From t1") @@ -601,7 +591,6 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros @Test public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { - List<String> loadWithClause = externalTableBasePathWithClause(); List<String> dumpWithClause = Collections.singletonList( "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'" ); @@ -623,7 +612,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros assertFalse(primary.miniDFSCluster.getFileSystem() .exists(new Path(metadataPath + relativeExtInfoPath(null)))); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName) .status(replicatedDbName) .verifyResult(tuple.lastReplicationId) .run("use " + replicatedDbName) @@ -633,7 +622,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .verifyFailure(new String[] {"t2" }) .verifyReplTargetProperty(replicatedDbName); - dumpWithClause = externalTableWithClause(true, true); + dumpWithClause = ReplicationTestUtils.externalTableWithClause(new ArrayList<>(), true, true); tuple = primary.run("use " + primaryDbName) .run("drop table t1") @@ -664,7 +653,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros tblPath = new Path(dbPath, "t3"); assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath)); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName) .status(replicatedDbName) .verifyResult(tuple.lastReplicationId) .run("use " + replicatedDbName) @@ -701,12 +690,8 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros @Test public void retryBootstrapExternalTablesFromDifferentDump() throws Throwable { - List<String> loadWithClause = new ArrayList<>(); - loadWithClause.addAll(externalTableBasePathWithClause()); - - List<String> dumpWithClause = Collections.singletonList( - "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'" - ); + List<String> loadWithClause = ReplicationTestUtils.includeExternalTableClause(true); + List<String> dumpWithClause = ReplicationTestUtils.includeExternalTableClause(false); WarehouseInstance.Tuple tupleBootstrapWithoutExternal = primary .run("use " + primaryDbName) @@ -728,7 +713,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .verifyResult("1") .verifyReplTargetProperty(replicatedDbName); - dumpWithClause = externalTableWithClause(true, true); + dumpWithClause = ReplicationTestUtils.externalTableWithClause(new ArrayList<>(), true, true); primary.run("use " + primaryDbName) .run("drop table t1") .run("create external table t4 (id int)") @@ -777,7 +762,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros // Insert into existing external table and then Drop it, add another managed table with same name // and dump another bootstrap dump for external tables. - dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'"); + dumpWithClause = ReplicationTestUtils.includeExternalTableClause(true); primary.run("use " + primaryDbName) .run("insert into table t2 partition(country='india') values ('chennai')") .run("drop table t2") @@ -823,8 +808,8 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros @Test public void testExternalTablesIncReplicationWithConcurrentDropTable() throws Throwable { - List<String> dumpWithClause = this.externalTableWithClause(null, true); - List<String> loadWithClause = externalTableBasePathWithClause(); + List<String> dumpWithClause = ReplicationTestUtils.externalTableWithClause(new ArrayList<>(), null, true); + List<String> loadWithClause = ReplicationTestUtils.includeExternalTableClause(true); WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName) .run("create external table t1 (id int)") .run("insert into table t1 values (1)") @@ -878,10 +863,8 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros @Test public void testIncrementalDumpEmptyDumpDirectory() throws Throwable { - List<String> loadWithClause = externalTableBasePathWithClause(); - List<String> dumpWithClause = Collections.singletonList( - "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'" - ); + List<String> loadWithClause = ReplicationTestUtils.includeExternalTableClause(true); + List<String> dumpWithClause = ReplicationTestUtils.includeExternalTableClause(true); WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .run("create external table t1 (id int)") .run("insert into table t1 values (1)") @@ -917,7 +900,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros @Test public void testExtTableBootstrapDuringIncrementalWithoutAnyEvents() throws Throwable { - List<String> loadWithClause = externalTableBasePathWithClause(); + List<String> loadWithClause = ReplicationTestUtils.includeExternalTableClause(false); List<String> dumpWithClause = Collections.singletonList( "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'" ); @@ -942,13 +925,13 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros // This looks like an empty dump but it has the ALTER TABLE event created by the previous // dump. We need it here so that the next dump won't have any events. - WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName); + WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName, ReplicationTestUtils.includeExternalTableClause(true)); replica.load(replicatedDbName, primaryDbName) .status(replicatedDbName) .verifyResult(incTuple.lastReplicationId); // Take a dump with external tables bootstrapped and load it - dumpWithClause = externalTableWithClause(true, true); + dumpWithClause = ReplicationTestUtils.externalTableWithClause(new ArrayList<>(), true, true); WarehouseInstance.Tuple inc2Tuple = primary.run("use " + primaryDbName) .dump(primaryDbName, dumpWithClause); @@ -965,7 +948,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros @Test public void replicationWithTableNameContainsKeywords() throws Throwable { - List<String> withClause = externalTableBasePathWithClause(); + List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) @@ -1008,15 +991,49 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .verifyReplTargetProperty(replicatedDbName); } + @Test + public void testExternalTableBaseDirMandatory() throws Throwable { + List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true); + withClause.add("'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='/extTablebase'"); + WarehouseInstance.Tuple tuple = null; + try { + primary.run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values(1)") + .dump(primaryDbName, withClause); + } catch (SemanticException ex) { + assertTrue(ex.getMessage().contains( + "Fully qualified path for 'hive.repl.replica.external.table.base.dir' is required")); + } + withClause = ReplicationTestUtils.includeExternalTableClause(true); + withClause.add("'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + + "'='"+ fullyQualifiedReplicaExternalBase +"'"); + tuple = primary.run("use " + primaryDbName) + .dump(primaryDbName, withClause); + + withClause = ReplicationTestUtils.includeExternalTableClause(true); + withClause.add("'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'=''"); + try { + replica.load(replicatedDbName, primaryDbName, withClause); + } catch (SemanticException ex) { + assertTrue(ex.getMessage().contains( + "Fully qualified path for 'hive.repl.replica.external.table.base.dir' is required")); + } + + withClause = ReplicationTestUtils.includeExternalTableClause(true); + withClause.add("'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + + "'='"+ fullyQualifiedReplicaExternalBase +"'"); - private List<String> externalTableBasePathWithClause() throws IOException, SemanticException { - return ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); - } + replica.load(replicatedDbName, primaryDbName, withClause); - private List<String> externalTableWithClause(Boolean bootstrapExtTbl, Boolean includeExtTbl) - throws IOException, SemanticException { - List<String> extTblBaseDir = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); - return ReplicationTestUtils.externalTableWithClause(extTblBaseDir, bootstrapExtTbl, includeExtTbl); + replica.run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResults(new String[] {"t1"}) + .run("select id from t1") + .verifyResults(new String[] {"1"}) + .verifyReplTargetProperty(replicatedDbName); } private void assertExternalFileInfo(List<String> expected, String dumplocation, @@ -1044,4 +1061,5 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros return File.separator + dbName.toLowerCase() + File.separator + FILE_NAME; } } + } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java index e3e6661..357302c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java @@ -60,7 +60,6 @@ import static org.junit.Assert.assertTrue; */ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseReplicationAcrossInstances { - private static final String REPLICA_EXTERNAL_BASE = "/replica_external_base"; String extraPrimaryDb; @BeforeClass @@ -91,9 +90,8 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl @Test public void replicationWithoutExternalTables() throws Throwable { - List<String> loadWithClause = externalTableBasePathWithClause(); - List<String> dumpWithClause = Arrays.asList("'" - + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'"); + List<String> loadWithClause = ReplicationTestUtils.includeExternalTableClause(false); + List<String> dumpWithClause = ReplicationTestUtils.includeExternalTableClause(false); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) .run("create external table t1 (id int)") @@ -153,7 +151,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl // verify that the external table info is not written as metadata only replication assertFalseExternalFileInfo(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)); - List<String> withClauseOptions = externalTableBasePathWithClause(); + List<String> withClauseOptions = ReplicationTestUtils.includeExternalTableClause(true); replica.load(replicatedDbName, primaryDbName, withClauseOptions) .run("use " + replicatedDbName) @@ -208,10 +206,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl // Create base directory but use HDFS path without schema or authority details. // Hive should pick up the local cluster's HDFS schema/authority. - externalTableBasePathWithClause(); List<String> loadWithClause = Arrays.asList( - "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" - + REPLICA_EXTERNAL_BASE + "'", "'distcp.options.update'=''" ); @@ -263,7 +258,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); fs.mkdirs(externalTableLocation, new FsPermission("777")); - List<String> loadWithClause = externalTableBasePathWithClause(); + List<String> loadWithClause = ReplicationTestUtils.includeExternalTableClause(true); WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .run("create external table t2 (place string) partitioned by (country string) row format " @@ -391,7 +386,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl outputStream.write("bangalore\n".getBytes()); } - List<String> loadWithClause = externalTableBasePathWithClause(); + List<String> loadWithClause = ReplicationTestUtils.includeExternalTableClause(true); replica.load(replicatedDbName, primaryDbName, loadWithClause) .run("use " + replicatedDbName) .run("show tables like 't1'") @@ -446,9 +441,8 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl @Test public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { - List<String> loadWithClause = externalTableBasePathWithClause(); - List<String> dumpWithClause - = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'"); + List<String> loadWithClause = ReplicationTestUtils.includeExternalTableClause(false); + List<String> dumpWithClause = ReplicationTestUtils.includeExternalTableClause(false); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) @@ -478,8 +472,8 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='false'", - "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", "'distcp.options.pugpb'=''"); + loadWithClause = ReplicationTestUtils.includeExternalTableClause(true); tuple = primary.run("use " + primaryDbName) .run("drop table t1") .run("create external table t3 (id int)") @@ -548,10 +542,8 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl @Test public void testExternalTablesIncReplicationWithConcurrentDropTable() throws Throwable { - List<String> dumpWithClause = Collections.singletonList( - "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'" - ); - List<String> loadWithClause = externalTableBasePathWithClause(); + List<String> dumpWithClause = ReplicationTestUtils.includeExternalTableClause(true); + List<String> loadWithClause = ReplicationTestUtils.includeExternalTableClause(true); WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName) .run("create external table t1 (id int)") .run("insert into table t1 values (1)") @@ -606,15 +598,12 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl @Test public void testIncrementalDumpEmptyDumpDirectory() throws Throwable { - List<String> loadWithClause = externalTableBasePathWithClause(); - List<String> dumpWithClause = Collections.singletonList( - "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'" - ); + List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true); WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .run("create external table t1 (id int)") .run("insert into table t1 values (1)") .run("insert into table t1 values (2)") - .dump(primaryDbName, dumpWithClause); + .dump(primaryDbName, withClause); replica.load(replicatedDbName, primaryDbName) .status(replicatedDbName) @@ -622,8 +611,8 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl // This looks like an empty dump but it has the ALTER TABLE event created by the previous // dump. We need it here so that the next dump won't have any events. - WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName, dumpWithClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName, withClause); + replica.load(replicatedDbName, primaryDbName, withClause) .status(replicatedDbName) .verifyResult(incTuple.lastReplicationId); @@ -633,20 +622,16 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl WarehouseInstance.Tuple inc2Tuple = primary.run("use " + extraPrimaryDb) .run("create table tbl (fld int)") .run("use " + primaryDbName) - .dump(primaryDbName, dumpWithClause); + .dump(primaryDbName, withClause); Assert.assertEquals(primary.getCurrentNotificationEventId().getEventId(), Long.valueOf(inc2Tuple.lastReplicationId).longValue()); // Incremental load to existing database with empty dump directory should set the repl id to the last event at src. - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .status(replicatedDbName) .verifyResult(inc2Tuple.lastReplicationId); } - private List<String> externalTableBasePathWithClause() throws IOException, SemanticException { - return ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); - } - private void assertFalseExternalFileInfo(Path externalTableInfoFile) throws IOException { DistributedFileSystem fileSystem = primary.miniDFSCluster.getFileSystem(); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java index 642a8dc..0645cef 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java @@ -64,6 +64,7 @@ import static org.junit.Assert.assertTrue; */ public class TestReplicationWithTableMigration { private final static String AVRO_SCHEMA_FILE_NAME = "avro_table.avsc"; + private static String fullyQualifiedReplicaExternalBase; @Rule public final TestName testName = new TestName(); @@ -119,6 +120,8 @@ public class TestReplicationWithTableMigration { primary = new WarehouseInstance(LOG, miniDFSCluster, configsForPrimary); hiveConfigs.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir); replica = new WarehouseInstance(LOG, miniDFSCluster, hiveConfigs); + fullyQualifiedReplicaExternalBase = miniDFSCluster.getFileSystem().getFileStatus( + new Path("/")).getPath().toString(); } private static Path createAvroSchemaFile(FileSystem fs, Path testPath) throws IOException { @@ -557,6 +560,7 @@ public class TestReplicationWithTableMigration { withConfigs.add("'hive.repl.bootstrap.acid.tables'='true'"); withConfigs.add("'hive.repl.dump.include.acid.tables'='true'"); withConfigs.add("'hive.repl.include.external.tables'='true'"); + withConfigs.add("'hive.repl.replica.external.table.base.dir' = '" + fullyQualifiedReplicaExternalBase + "'"); withConfigs.add("'hive.distcp.privileged.doAs' = '" + UserGroupInformation.getCurrentUser().getUserName() + "'"); tuple = primary.dump(primaryDbName, withConfigs); replica.load(replicatedDbName, primaryDbName, withConfigs); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java index fd0a214..51e9ee9 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java @@ -183,8 +183,7 @@ public class TestScheduledReplicationScenarios extends BaseReplicationScenariosA @Ignore("HIVE-23395") public void testExternalTablesReplLoadBootstrapIncr() throws Throwable { // Bootstrap - String withClause = " WITH('" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname - + "'='/replica_external_base', '" + HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA + String withClause = " WITH('" + HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA + "' = 'true' ,'" + HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA + "' = 'true' , '" + HiveConf.ConfVars.HIVE_IN_TEST + "' = 'true'" + ",'"+ HiveConf.ConfVars.REPL_ATLAS_ENDPOINT + "' = 'http://localhost:21000/atlas'" + ",'"+ HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB + "' = 'tgt'" diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java index 5bdf551..ffa7d95 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java @@ -52,8 +52,6 @@ import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_R */ public class TestTableLevelReplicationScenarios extends BaseReplicationScenariosAcidTables { - private static final String REPLICA_EXTERNAL_BASE = "/replica_external_base"; - @BeforeClass public static void classLevelSetup() throws Exception { Map<String, String> overrides = new HashMap<>(); @@ -490,10 +488,8 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios createTables(originalExternalTables, CreateTableType.EXTERNAL); // Replicate and verify if only 2 tables are replicated to target. - List<String> loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); - List<String> dumpWithClause = Collections.singletonList( - "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'" - ); + List<String> loadWithClause = ReplicationTestUtils.includeExternalTableClause(true); + List<String> dumpWithClause = ReplicationTestUtils.includeExternalTableClause(true); String replPolicy = primaryDbName + ".'(a[0-9]+)|(b2)'.'a1'"; String[] replicatedTables = new String[] {"a2", "b2"}; WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) @@ -524,10 +520,9 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios createTables(originalExternalTables, CreateTableType.EXTERNAL); // Bootstrap should exclude external tables. - List<String> loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); - List<String> dumpWithClause = Collections.singletonList( - "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'" - ); + List<String> dumpWithClause = ReplicationTestUtils.includeExternalTableClause(false); + List<String> loadWithClause = ReplicationTestUtils.includeExternalTableClause(false); + String replPolicy = primaryDbName + ".'(a[0-9]+)|(b2)'.'a1'"; String[] bootstrapReplicatedTables = new String[] {"b2"}; String lastReplId = replicateAndVerify(replPolicy, null, @@ -535,10 +530,11 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios // Enable external tables replication and bootstrap in incremental phase. String[] incrementalReplicatedTables = new String[] {"a2", "b2"}; - dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", - "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); + dumpWithClause = ReplicationTestUtils.includeExternalTableClause(true); + dumpWithClause.add("'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .dump(replPolicy, dumpWithClause); + loadWithClause = ReplicationTestUtils.includeExternalTableClause(true); String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; // the _external_tables_file info should be created as external tables are to be replicated. @@ -661,10 +657,8 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios createTables(originalExternalTables, CreateTableType.EXTERNAL); // Bootstrap should exclude external tables. - List<String> loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); - List<String> dumpWithClause = Collections.singletonList( - "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'" - ); + List<String> loadWithClause = ReplicationTestUtils.includeExternalTableClause(false); + List<String> dumpWithClause = ReplicationTestUtils.includeExternalTableClause(false); String replPolicy = primaryDbName + ".'(a[0-9]+)|(b1)'.'a1'"; String[] bootstrapReplicatedTables = new String[] {"b1"}; String lastReplId = replicateAndVerify(replPolicy, null, @@ -689,6 +683,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .dump(replPolicy, oldReplPolicy, dumpWithClause); + loadWithClause = ReplicationTestUtils.includeExternalTableClause(true); String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; // the _external_tables_file info should be created as external tables are to be replicated. @@ -835,7 +830,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios @Test public void testRenameTableScenariosExternalTable() throws Throwable { String replPolicy = primaryDbName + ".'in[0-9]+'.'out[0-9]+'"; - List<String> loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); + List<String> loadWithClause = ReplicationTestUtils.includeExternalTableClause(false); List<String> dumpWithClause = Arrays.asList( "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='false'", @@ -868,9 +863,9 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES.varname + "'='true'", "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'", - "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", "'distcp.options.pugpb'=''" ); + loadWithClause = ReplicationTestUtils.includeExternalTableClause(true); replicatedTables = new String[] {"in1", "in2", "in3", "in4", "in5"}; bootstrapTables = new String[] {"in2", "in3", "in4", "in5"}; lastReplId = replicateAndVerify(replPolicy, null, lastReplId, dumpWithClause, @@ -894,7 +889,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios @Test public void testRenameTableScenariosWithReplaceExternalTable() throws Throwable { - List<String> loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); + List<String> loadWithClause = ReplicationTestUtils.includeExternalTableClause(true); List<String> dumpWithClause = ReplicationTestUtils.externalTableWithClause(loadWithClause, true, true); String replPolicy = primaryDbName + ".'(in[0-9]+)|(out4)|(out5)|(out1500)'"; String lastReplId = replicateAndVerify(replPolicy, null, null, dumpWithClause, @@ -919,7 +914,6 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios dumpWithClause = Arrays.asList( "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='false'", - "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", "'distcp.options.pugpb'=''" ); @@ -1010,7 +1004,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios public void testRenameTableScenariosUpgrade() throws Throwable { // Policy with no table level filter, no ACID and external table. String replPolicy = primaryDbName; - List<String> loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); + List<String> loadWithClause = ReplicationTestUtils.includeExternalTableClause(false); List<String> dumpWithClause = Arrays.asList( "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'", "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='false'" @@ -1047,7 +1041,6 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES.varname + "'='true'", "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'", - "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", "'distcp.options.pugpb'=''" ); @@ -1064,7 +1057,6 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios dumpWithClause = Arrays.asList( "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'", - "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", "'distcp.options.pugpb'=''" ); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index f21fb7d..3669f3a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; import org.apache.hadoop.hive.metastore.utils.Retry; +import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -91,6 +92,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.Serializable; import java.io.UnsupportedEncodingException; +import java.net.URI; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Set; @@ -750,11 +752,14 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { private List<DirCopyWork> dirLocationsToCopy(List<Path> sourceLocations) throws HiveException { + if (sourceLocations.isEmpty()) { + return Collections.emptyList(); + } List<DirCopyWork> list = new ArrayList<>(sourceLocations.size()); String baseDir = conf.get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname); // this is done to remove any scheme related information that will be present in the base path // specifically when we are replicating to cloud storage - Path basePath = new Path(baseDir); + Path basePath = ReplExternalTables.getExternalTableBaseDir(conf); for (Path sourcePath : sourceLocations) { Path targetPath = ReplExternalTables.externalTableDataPath(conf, basePath, sourcePath); list.add(new DirCopyWork(sourcePath, targetPath)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java index 0fdd1bf..a47a728 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -40,6 +41,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.StringWriter; +import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.HashSet; @@ -65,8 +67,7 @@ public final class ReplExternalTables { private ReplExternalTables(){} public static String externalTableLocation(HiveConf hiveConf, String location) throws SemanticException { - String baseDir = hiveConf.get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname); - Path basePath = new Path(baseDir); + Path basePath = getExternalTableBaseDir(hiveConf); Path currentPath = new Path(location); Path dataLocation = externalTableDataPath(hiveConf, basePath, currentPath); @@ -74,6 +75,17 @@ public final class ReplExternalTables { return dataLocation.toString(); } + public static Path getExternalTableBaseDir(HiveConf hiveConf) throws SemanticException { + String baseDir = hiveConf.get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname); + URI baseDirUri = StringUtils.isEmpty(baseDir) ? null : new Path(baseDir).toUri(); + if (baseDirUri == null || baseDirUri.getScheme() == null || baseDirUri.getAuthority() == null) { + throw new SemanticException( + String.format("Fully qualified path for 'hive.repl.replica.external.table.base.dir' is required %s", + baseDir == null ? "" : "- ('" + baseDir + "')")); + } + return new Path(baseDirUri); + } + public static Path externalTableDataPath(HiveConf hiveConf, Path basePath, Path sourcePath) throws SemanticException { String baseUriPath = basePath.toUri().getPath(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index ecf51a9..bccf56a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.repl.util; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.common.repl.ReplConst; @@ -54,6 +55,7 @@ import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; import org.apache.thrift.TException; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet;