This is an automated email from the ASF dual-hosted git repository. anishek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 4a591b0 HIVE-22997: Copy external table to target during Repl Dump operation ( Pravin Kumar Sinha, reviewed by Aasha Medhi, Anishek Agarwal) 4a591b0 is described below commit 4a591b0bf79a3e8c8592f2383f209788fd4f86d3 Author: Pravin Kumar Sinha <mailpravi...@gmail.com> AuthorDate: Thu Mar 26 11:25:27 2020 +0530 HIVE-22997: Copy external table to target during Repl Dump operation ( Pravin Kumar Sinha, reviewed by Aasha Medhi, Anishek Agarwal) --- .../hadoop/hive/ql/parse/ReplicationTestUtils.java | 15 ++ .../parse/TestReplicationOnHDFSEncryptedZones.java | 12 +- .../hive/ql/parse/TestReplicationScenarios.java | 229 +++++++++++++++++- .../TestReplicationScenariosExternalTables.java | 107 +++++---- ...icationScenariosExternalTablesMetaDataOnly.java | 4 +- .../parse/TestScheduledReplicationScenarios.java | 49 ++++ .../parse/TestTableLevelReplicationScenarios.java | 21 +- .../apache/hadoop/hive/ql/plan/api/StageType.java | 8 +- .../apache/hadoop/hive/ql/exec/TaskFactory.java | 13 +- .../{ReplLoadCompleteAckTask.java => AckTask.java} | 17 +- .../{ReplLoadCompleteAckWork.java => AckWork.java} | 19 +- .../hadoop/hive/ql/exec/repl/DirCopyTask.java | 210 ++++++++++++++++ .../hadoop/hive/ql/exec/repl/DirCopyWork.java | 53 +++++ .../ql/exec/repl/ExternalTableCopyTaskBuilder.java | 264 --------------------- .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 219 +++++++++++------ .../hadoop/hive/ql/exec/repl/ReplDumpWork.java | 82 +++++++ .../hive/ql/exec/repl/ReplExternalTables.java | 13 +- .../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 55 ++--- .../hadoop/hive/ql/exec/repl/ReplLoadWork.java | 17 +- .../org/apache/hadoop/hive/ql/parse/EximUtil.java | 47 +++- .../hive/ql/parse/ReplicationSemanticAnalyzer.java | 59 ++--- .../hive/ql/parse/repl/dump/PartitionExport.java | 15 +- .../hive/ql/parse/repl/dump/TableExport.java | 23 +- .../hadoop/hive/ql/exec/repl/TestReplDumpTask.java | 7 +- 24 files changed, 999 insertions(+), 559 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java index a82bbad..e0c3ed2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java @@ -27,6 +27,7 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.util.ArrayList; import java.util.List; import java.util.Arrays; import java.util.Set; @@ -516,6 +517,20 @@ public class ReplicationTestUtils { ); } + public static List<String> externalTableWithClause(List<String> externalTableBasePathWithClause, Boolean bootstrap, + Boolean includeExtTbl) { + List<String> withClause = new ArrayList<>(externalTableBasePathWithClause); + if (bootstrap != null) { + withClause.add("'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES + "'='" + Boolean.toString(bootstrap) + + "'"); + } + if (includeExtTbl != null) { + withClause.add("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES + "'='" + Boolean.toString(includeExtTbl) + + "'"); + } + return withClause; + } + public static void assertExternalFileInfo(WarehouseInstance primary, List<String> expected, Path externalTableInfoFile) throws IOException { diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java index f6a33bc..bed0235 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java @@ -36,7 +36,9 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_ENABLED; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; @@ -102,12 +104,20 @@ public class TestReplicationOnHDFSEncryptedZones { put(HiveConf.ConfVars.REPLDIR.varname, primary.repldDir); }}, "test_key123"); + List<String> dumpWithClause = Arrays.asList( + "'hive.repl.add.raw.reserved.namespace'='true'", + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + + replica.externalTableWarehouseRoot + "'", + "'distcp.options.skipcrccheck'=''", + "'" + HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + + UserGroupInformation.getCurrentUser().getUserName() +"'"); WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .run("create table encrypted_table (id int, value string)") .run("insert into table encrypted_table values (1,'value1')") .run("insert into table encrypted_table values (2,'value2')") - .dump(primaryDbName); + .dump(primaryDbName, dumpWithClause); replica .run("repl load " + primaryDbName + " into " + replicatedDbName diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 49027a3..efe9fff 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -92,7 +92,6 @@ import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -326,6 +325,58 @@ public class TestReplicationScenarios { verifyRun("SELECT * from " + replicatedDbName + ".unptned_empty", empty, driverMirror); } + @Test + public void testBootstrapFailedDump() throws IOException { + String name = testName.getMethodName(); + String dbName = createDB(name, driver); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + + String[] unptnData = new String[]{"eleven", "twelve"}; + String[] ptnData1 = new String[]{"thirteen", "fourteen", "fifteen"}; + String[] ptnData2 = new String[]{"fifteen", "sixteen", "seventeen"}; + String[] empty = new String[]{}; + + String unptnLocn = new Path(TEST_PATH, name + "_unptn").toUri().getPath(); + String ptnLocn1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath(); + String ptnLocn2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath(); + + createTestDataFile(unptnLocn, unptnData); + createTestDataFile(ptnLocn1, ptnData1); + createTestDataFile(ptnLocn2, ptnData2); + + run("LOAD DATA LOCAL INPATH '" + unptnLocn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned", unptnData, driver); + run("LOAD DATA LOCAL INPATH '" + ptnLocn1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptnData1, driver); + run("LOAD DATA LOCAL INPATH '" + ptnLocn2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptnData2, driver); + verifySetup("SELECT a from " + dbName + ".ptned_empty", empty, driver); + verifySetup("SELECT * from " + dbName + ".unptned_empty", empty, driver); + + String replicatedDbName = dbName + "_dupe"; + + + EximUtil.ManagedTableCopyPath.setNullSrcPath(hconf, true); + verifyFail("REPL DUMP " + dbName, driver); + advanceDumpDir(); + EximUtil.ManagedTableCopyPath.setNullSrcPath(hconf, false); + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replicatedDbName); + advanceDumpDir(); + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(new Path(dumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT))); + assertTrue(fs.exists(new Path(dumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT))); + + verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptnData, driverMirror); + verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptnData1, driverMirror); + verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=2", ptnData2, driverMirror); + verifyRun("SELECT a from " + replicatedDbName + ".ptned_empty", empty, driverMirror); + verifyRun("SELECT * from " + replicatedDbName + ".unptned_empty", empty, driverMirror); + } + private abstract class checkTaskPresent { public boolean hasTask(Task rootTask) { if (rootTask == null) { @@ -376,8 +427,7 @@ public class TestReplicationScenarios { confTemp.set("hive.repl.enable.move.optimization", "true"); Path loadPath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, loadPath.toString(), replicadb, - null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId), - Collections.emptyList()); + null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId)); Task replLoadTask = TaskFactory.get(replLoadWork, confTemp); replLoadTask.initialize(null, null, new TaskQueue(driver.getContext()), driver.getContext()); replLoadTask.executeTask(null); @@ -1457,20 +1507,20 @@ public class TestReplicationScenarios { run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver); run("LOAD DATA LOCAL INPATH '" + ptnLocn1 + "' OVERWRITE INTO TABLE " + dbName - + ".ptned PARTITION(b=1)", driver); + + ".ptned PARTITION(b=1)", driver); verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptnData1, driver); run("LOAD DATA LOCAL INPATH '" + ptnLocn2 + "' OVERWRITE INTO TABLE " + dbName - + ".ptned PARTITION(b=2)", driver); + + ".ptned PARTITION(b=2)", driver); verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptnData2, driver); run("CREATE TABLE " + dbName - + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver); + + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver); run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=1) SELECT a FROM " + dbName - + ".ptned WHERE b=1", driver); + + ".ptned WHERE b=1", driver); verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1", ptnData1, driver); run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName - + ".ptned WHERE b=2", driver); + + ".ptned WHERE b=2", driver); verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptnData2, driver); incrementalLoadAndVerify(dbName, replDbName); @@ -1635,6 +1685,169 @@ public class TestReplicationScenarios { } @Test + public void testIncrementalLoadWithOneFailedDump() throws IOException { + String nameOfTest = "testIncrementalLoadWithOneFailedDump"; + String dbName = createDB(nameOfTest, driver); + String replDbName = dbName + "_dupe"; + + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + + String[] unptnData = new String[] {"eleven", "twelve"}; + String[] ptnData1 = new String[] {"thirteen", "fourteen", "fifteen"}; + String[] ptnData2 = new String[] {"fifteen", "sixteen", "seventeen"}; + String[] empty = new String[] {}; + + String unptnLocn = new Path(TEST_PATH, nameOfTest + "_unptn").toUri().getPath(); + String ptnLocn1 = new Path(TEST_PATH, nameOfTest + "_ptn1").toUri().getPath(); + String ptnLocn2 = new Path(TEST_PATH, nameOfTest + "_ptn2").toUri().getPath(); + + createTestDataFile(unptnLocn, unptnData); + createTestDataFile(ptnLocn1, ptnData1); + createTestDataFile(ptnLocn2, ptnData2); + + verifySetup("SELECT a from " + dbName + ".ptned_empty", empty, driverMirror); + verifySetup("SELECT * from " + dbName + ".unptned_empty", empty, driverMirror); + + run("LOAD DATA LOCAL INPATH '" + unptnLocn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned", unptnData, driver); + run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned", driver); + run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned_late", unptnData, driver); + + Tuple incrementalDump = replDumpDb(dbName); + + //Remove the dump ack file, so that dump is treated as an invalid dump. + String ackFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + ReplUtils.DUMP_ACKNOWLEDGEMENT; + Path dumpFinishedAckFilePath = new Path(incrementalDump.dumpLocation, ackFileRelativePath); + Path tmpDumpFinishedAckFilePath = new Path(dumpFinishedAckFilePath.getParent(), + "old_" + dumpFinishedAckFilePath.getName()); + FileSystem fs = FileSystem.get(new Path(incrementalDump.dumpLocation).toUri(), hconf); + fs.rename(dumpFinishedAckFilePath, tmpDumpFinishedAckFilePath); + loadAndVerify(replDbName, dbName, bootstrapDump.lastReplId); + + fs.rename(tmpDumpFinishedAckFilePath, dumpFinishedAckFilePath); + //Repl Load should recover when it finds valid load + loadAndVerify(replDbName, dbName, incrementalDump.lastReplId); + + verifyRun("SELECT * from " + replDbName + ".unptned_late", unptnData, driverMirror); + + run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver); + run("LOAD DATA LOCAL INPATH '" + ptnLocn1 + "' OVERWRITE INTO TABLE " + dbName + + ".ptned PARTITION(b=1)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptnData1, driver); + run("LOAD DATA LOCAL INPATH '" + ptnLocn2 + "' OVERWRITE INTO TABLE " + dbName + + ".ptned PARTITION(b=2)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptnData2, driver); + + run("CREATE TABLE " + dbName + + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver); + run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=1) SELECT a FROM " + dbName + + ".ptned WHERE b=1", driver); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1", ptnData1, driver); + + run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName + + ".ptned WHERE b=2", driver); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptnData2, driver); + + incrementalLoadAndVerify(dbName, replDbName); + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=1", ptnData1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=2", ptnData2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptnData1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptnData2, driverMirror); + } + + @Test + public void testIncrementalLoadWithPreviousDumpDeleteFailed() throws IOException { + String nameOfTest = "testIncrementalLoadWithPreviousDumpDeleteFailed"; + String dbName = createDB(nameOfTest, driver); + String replDbName = dbName + "_dupe"; + + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + + String[] unptnData = new String[] {"eleven", "twelve"}; + String[] ptnData1 = new String[] {"thirteen", "fourteen", "fifteen"}; + String[] ptnData2 = new String[] {"fifteen", "sixteen", "seventeen"}; + String[] empty = new String[] {}; + + String unptnLocn = new Path(TEST_PATH, nameOfTest + "_unptn").toUri().getPath(); + String ptnLocn1 = new Path(TEST_PATH, nameOfTest + "_ptn1").toUri().getPath(); + String ptnLocn2 = new Path(TEST_PATH, nameOfTest + "_ptn2").toUri().getPath(); + + createTestDataFile(unptnLocn, unptnData); + createTestDataFile(ptnLocn1, ptnData1); + createTestDataFile(ptnLocn2, ptnData2); + + verifySetup("SELECT a from " + dbName + ".ptned_empty", empty, driverMirror); + verifySetup("SELECT * from " + dbName + ".unptned_empty", empty, driverMirror); + + run("LOAD DATA LOCAL INPATH '" + unptnLocn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned", unptnData, driver); + run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned", driver); + run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned_late", unptnData, driver); + + ReplDumpWork.testDeletePreviousDumpMetaPath(true); + + Tuple incrDump = replDumpDb(dbName); + + // Delete some file except ack. + Path bootstrapDumpDir = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + String tablePath = dbName + File.separator + "unptned"; + Path fileToDelete = new Path(bootstrapDumpDir, tablePath); + FileSystem fs = FileSystem.get(fileToDelete.toUri(), hconf); + fs.delete(fileToDelete, true); + assertTrue(fs.exists(bootstrapDumpDir)); + assertTrue(fs.exists(new Path(bootstrapDumpDir, ReplUtils.DUMP_ACKNOWLEDGEMENT))); + + loadAndVerify(replDbName, dbName, incrDump.lastReplId); + + verifyRun("SELECT * from " + replDbName + ".unptned_late", unptnData, driverMirror); + + run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver); + run("LOAD DATA LOCAL INPATH '" + ptnLocn1 + "' OVERWRITE INTO TABLE " + dbName + + ".ptned PARTITION(b=1)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptnData1, driver); + run("LOAD DATA LOCAL INPATH '" + ptnLocn2 + "' OVERWRITE INTO TABLE " + dbName + + ".ptned PARTITION(b=2)", driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptnData2, driver); + + run("CREATE TABLE " + dbName + + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver); + run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=1) SELECT a FROM " + dbName + + ".ptned WHERE b=1", driver); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1", ptnData1, driver); + + run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName + + ".ptned WHERE b=2", driver); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptnData2, driver); + + ReplDumpWork.testDeletePreviousDumpMetaPath(false); + + Path incrHiveDumpDir = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + incrDump = replDumpDb(dbName); + //This time delete previous dump dir should work fine. + assertFalse(FileSystem.get(fileToDelete.toUri(), hconf).exists(incrHiveDumpDir)); + assertFalse(fs.exists(bootstrapDumpDir)); + loadAndVerify(replDbName, dbName, incrDump.lastReplId); + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=1", ptnData1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=2", ptnData2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptnData1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptnData2, driverMirror); + } + + @Test public void testIncrementalInsertToPartition() throws IOException { String testName = "incrementalInsertToPartition"; String dbName = createDB(testName, driver); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index 6372d26..1325789 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -138,6 +138,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros @Test public void externalTableReplicationWithDefaultPaths() throws Throwable { + List<String> withClauseOptions = externalTableBasePathWithClause(); //creates external tables with partitions WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) @@ -148,12 +149,12 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("insert into table t2 partition(country='india') values ('bangalore')") .run("insert into table t2 partition(country='us') values ('austin')") .run("insert into table t2 partition(country='france') values ('paris')") - .dumpWithCommand("repl dump " + primaryDbName); + .dump(primaryDbName, withClauseOptions); // verify that the external table info is written correctly for bootstrap assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, primaryDbName); - List<String> withClauseOptions = externalTableBasePathWithClause(); + replica.load(replicatedDbName, primaryDbName, withClauseOptions) .run("use " + replicatedDbName) @@ -180,7 +181,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("create external table t3 (id int)") .run("insert into table t3 values (10)") .run("create external table t4 as select id from t3") - .dumpWithCommand("repl dump " + primaryDbName); + .dump(primaryDbName, withClauseOptions); // verify that the external table info is written correctly for incremental assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"), tuple.dumpLocation); @@ -244,7 +245,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros // Create base directory but use HDFS path without schema or authority details. // Hive should pick up the local cluster's HDFS schema/authority. externalTableBasePathWithClause(); - List<String> loadWithClause = Arrays.asList( + List<String> withClause = Arrays.asList( "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", "'distcp.options.update'=''" @@ -254,9 +255,9 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("create external table a (i int, j int) " + "row format delimited fields terminated by ',' " + "location '" + externalTableLocation.toUri() + "'") - .dump(primaryDbName); + .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("show tables like 'a'") .verifyResults(Collections.singletonList("a")) @@ -270,11 +271,10 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros outputStream.write("1,2\n".getBytes()); outputStream.write("13,21\n".getBytes()); } + WarehouseInstance.Tuple incrementalTuple = primary.run("create table b (i int)") + .dump(primaryDbName, withClause); - primary.run("create table b (i int)") - .dump(primaryDbName); - - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("select i From a") .verifyResults(new String[] { "1", "13" }) .run("select j from a") @@ -285,9 +285,9 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/new_location/a/"); primary.run("use " + primaryDbName) .run("alter table a set location '" + externalTableLocation + "'") - .dump(primaryDbName); + .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("select i From a") .verifyResults(Collections.emptyList()); @@ -301,18 +301,18 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); fs.mkdirs(externalTableLocation, new FsPermission("777")); - List<String> loadWithClause = externalTableBasePathWithClause(); + List<String> withClause = externalTableBasePathWithClause(); WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .run("create external table t2 (place string) partitioned by (country string) row format " + "delimited fields terminated by ',' location '" + externalTableLocation.toString() + "'") .run("insert into t2 partition(country='india') values ('bangalore')") - .dumpWithCommand("repl dump " + primaryDbName); + .dump(primaryDbName, withClause); assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation, primaryDbName); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("show tables like 't2'") .verifyResults(new String[] { "t2" }) @@ -331,11 +331,11 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros tuple = primary.run("use " + primaryDbName) .run("insert into t2 partition(country='australia') values ('sydney')") - .dump(primaryDbName); + .dump(primaryDbName, withClause); assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("select distinct(country) from t2") .verifyResults(new String[] { "india", "australia" }) @@ -360,9 +360,9 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros primary.run("use " + primaryDbName) .run("ALTER TABLE t2 ADD PARTITION (country='france') LOCATION '" + customPartitionLocation .toString() + "'") - .dump(primaryDbName); + .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("select place from t2 where country='france'") .verifyResults(new String[] { "paris" }) @@ -376,9 +376,9 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros primary.run("use " + primaryDbName) .run("alter table t2 partition (country='france') set location '" + tmpLocation + "'") - .dump(primaryDbName); + .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("select place from t2 where country='france'") .verifyResults(new String[] {}) @@ -396,17 +396,17 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros primary.run("use " + primaryDbName) .run("insert into table t2 partition(country='france') values ('lyon')") .run("alter table t2 set location '" + tmpLocation2 + "'") - .dump(primaryDbName); + .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause); + replica.load(replicatedDbName, primaryDbName, withClause); assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2"); } @Test public void externalTableIncrementalReplication() throws Throwable { - WarehouseInstance.Tuple tuple = primary.dumpWithCommand("repl dump " + primaryDbName); + List<String> withClause = externalTableBasePathWithClause(); + WarehouseInstance.Tuple tuple = primary.dump(primaryDbName, withClause); replica.load(replicatedDbName, primaryDbName); - Path externalTableLocation = new Path("/" + testName.getMethodName() + "/t1/"); DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); @@ -418,12 +418,12 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros + "'") .run("alter table t1 add partition(country='india')") .run("alter table t1 add partition(country='us')") - .dump(primaryDbName); + .dump(primaryDbName, withClause); assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation); // Add new data externally, to a partition, but under the partition level top directory - // Also, it is added after dumping the events but data should be seen at target after REPL LOAD. + // Also, it is added after dumping the events so data should not be seen at target after REPL LOAD. Path partitionDir = new Path(externalTableLocation, "country=india"); try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file.txt"))) { outputStream.write("pune\n".getBytes()); @@ -435,16 +435,29 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros } List<String> loadWithClause = externalTableBasePathWithClause(); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("show tables like 't1'") .verifyResult("t1") .run("show partitions t1") .verifyResults(new String[] { "country=india", "country=us" }) .run("select place from t1 order by place") - .verifyResults(new String[] { "bangalore", "mumbai", "pune" }) + .verifyResults(new String[] {}) .verifyReplTargetProperty(replicatedDbName); + // The Data should be seen after next dump-and-load cycle. + tuple = primary.run("use " + primaryDbName) + .dump(primaryDbName, withClause); + replica.load(replicatedDbName, primaryDbName, withClause) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show partitions t1") + .verifyResults(new String[] {"country=india", "country=us"}) + .run("select place from t1 order by place") + .verifyResults(new String[] {"bangalore", "mumbai", "pune"}) + .verifyReplTargetProperty(replicatedDbName); + // Delete one of the file and update another one. fs.delete(new Path(partitionDir, "file.txt"), true); fs.delete(new Path(partitionDir, "file1.txt"), true); @@ -453,10 +466,10 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros } // Repl load with zero events but external tables location info should present. - tuple = primary.dump(primaryDbName); + tuple = primary.dump(primaryDbName, withClause); assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("show tables like 't1'") .verifyResult("t1") @@ -475,7 +488,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros tuple = primary .run("alter table t1 drop partition (country='india')") .run("alter table t1 drop partition (country='us')") - .dump(primaryDbName); + .dump(primaryDbName, withClause); replica.load(replicatedDbName, primaryDbName) .run("select * From t1") @@ -519,8 +532,8 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .verifyFailure(new String[] {"t2" }) .verifyReplTargetProperty(replicatedDbName); - dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", - "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); + dumpWithClause = externalTableWithClause(true, true); + tuple = primary.run("use " + primaryDbName) .run("drop table t1") .run("create external table t3 (id int)") @@ -613,8 +626,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .verifyResult("1") .verifyReplTargetProperty(replicatedDbName); - dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", - "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); + dumpWithClause = externalTableWithClause(true, true); primary.run("use " + primaryDbName) .run("drop table t1") .run("create external table t4 (id int)") @@ -709,9 +721,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros @Test public void testExternalTablesIncReplicationWithConcurrentDropTable() throws Throwable { - List<String> dumpWithClause = Collections.singletonList( - "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'" - ); + List<String> dumpWithClause = this.externalTableWithClause(null, true); List<String> loadWithClause = externalTableBasePathWithClause(); WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName) .run("create external table t1 (id int)") @@ -836,8 +846,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .verifyResult(incTuple.lastReplicationId); // Take a dump with external tables bootstrapped and load it - dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", - "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); + dumpWithClause = externalTableWithClause(true, true); WarehouseInstance.Tuple inc2Tuple = primary.run("use " + primaryDbName) .dump(primaryDbName, dumpWithClause); @@ -854,7 +863,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros @Test public void replicationWithTableNameContainsKeywords() throws Throwable { - List<String> loadWithClause = externalTableBasePathWithClause(); + List<String> withClause = externalTableBasePathWithClause(); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) @@ -865,9 +874,9 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("insert into table t2_constraints partition(country='india') values ('bangalore')") .run("insert into table t2_constraints partition(country='us') values ('austin')") .run("insert into table t2_constraints partition(country='france') values ('paris')") - .dump(primaryDbName); + .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("repl status " + replicatedDbName) .verifyResult(tuple.lastReplicationId) .run("use " + replicatedDbName) @@ -886,9 +895,9 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("create table t4_tables (id int)") .run("insert into table t4_tables values (10)") .run("insert into table t4_tables values (20)") - .dump(primaryDbName); + .dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, loadWithClause) + replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) .run("show tables like 't3_bootstrap'") .verifyResults(new String[] {"t3_bootstrap"}) @@ -902,6 +911,12 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros return ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); } + private List<String> externalTableWithClause(Boolean bootstrapExtTbl, Boolean includeExtTbl) + throws IOException, SemanticException { + List<String> extTblBaseDir = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); + return ReplicationTestUtils.externalTableWithClause(extTblBaseDir, bootstrapExtTbl, includeExtTbl); + } + private void assertExternalFileInfo(List<String> expected, String dumplocation) throws IOException { assertExternalFileInfo(expected, dumplocation, null); } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java index c260a7d..8b2c556 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java @@ -477,7 +477,9 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", - "'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='false'"); + "'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='false'", + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", + "'distcp.options.pugpb'=''"); tuple = primary.run("use " + primaryDbName) .run("drop table t1") .run("create external table t3 (id int)") diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java index afb53b8..692d40d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java @@ -156,4 +156,53 @@ public class TestScheduledReplicationScenarios extends BaseReplicationScenariosA replica.run("drop scheduled query s2"); } } + + @Test + public void testExternalTablesReplLoadBootstrapIncr() throws Throwable { + // Bootstrap + String withClause = " WITH('" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + + "'='/replica_external_base')"; + primary.run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into t1 values(1)") + .run("insert into t1 values(2)"); + try (ScheduledQueryExecutionService schqS = + ScheduledQueryExecutionService.startScheduledQueryExecutorService(primary.hiveConf)) { + int next = 0; + ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next)); + primary.run("create scheduled query s1 every 10 minutes as repl dump " + primaryDbName + withClause); + primary.run("alter scheduled query s1 execute"); + Thread.sleep(80000); + replica.run("create scheduled query s2 every 10 minutes as repl load " + primaryDbName + " INTO " + + replicatedDbName); + replica.run("alter scheduled query s2 execute"); + Thread.sleep(80000); + replica.run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("select id from t1 order by id") + .verifyResults(new String[]{"1", "2"}); + + // First incremental, after bootstrap + primary.run("use " + primaryDbName) + .run("insert into t1 values(3)") + .run("insert into t1 values(4)"); + next++; + ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next)); + primary.run("alter scheduled query s1 execute"); + Thread.sleep(80000); + replica.run("alter scheduled query s2 execute"); + Thread.sleep(80000); + replica.run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("select id from t1 order by id") + .verifyResults(new String[]{"1", "2", "3", "4"}); + + + } finally { + primary.run("drop scheduled query s1"); + replica.run("drop scheduled query s2"); + } + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java index ad6c002..78251f2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java @@ -866,7 +866,9 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES.varname + "'='true'", - "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'" + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'", + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", + "'distcp.options.pugpb'=''" ); replicatedTables = new String[] {"in1", "in2", "in3", "in4", "in5"}; bootstrapTables = new String[] {"in2", "in3", "in4", "in5"}; @@ -892,10 +894,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios @Test public void testRenameTableScenariosWithReplaceExternalTable() throws Throwable { List<String> loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); - List<String> dumpWithClause = Arrays.asList( - "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", - "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'" - ); + List<String> dumpWithClause = ReplicationTestUtils.externalTableWithClause(loadWithClause, true, true); String replPolicy = primaryDbName + ".'(in[0-9]+)|(out4)|(out5)|(out1500)'"; String lastReplId = replicateAndVerify(replPolicy, null, null, dumpWithClause, loadWithClause, new String[] {}, new String[] {}); @@ -918,7 +917,9 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios String newPolicy = primaryDbName + ".'(in[0-9]+)|(out1500)|(in2)'"; dumpWithClause = Arrays.asList( "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", - "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='false'" + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='false'", + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", + "'distcp.options.pugpb'=''" ); // in2 should be dropped. @@ -1044,7 +1045,9 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES.varname + "'='true'", - "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'" + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'", + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", + "'distcp.options.pugpb'=''" ); replicatedTables = new String[] {"in1", "in2", "in3", "in4", "in5", "in6", "in7", "in9"}; @@ -1059,7 +1062,9 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios dumpWithClause = Arrays.asList( "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", - "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'" + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'", + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'", + "'distcp.options.pugpb'=''" ); // Database replication with ACID and EXTERNAL table. diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java index 9b9b68f..25d530c 100644 --- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java +++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java @@ -7,10 +7,6 @@ package org.apache.hadoop.hive.ql.plan.api; -import java.util.Map; -import java.util.HashMap; -import org.apache.thrift.TEnum; - public enum StageType implements org.apache.thrift.TEnum { CONDITIONAL(0), COPY(1), @@ -30,11 +26,11 @@ public enum StageType implements org.apache.thrift.TEnum { REPL_TXN(15), REPL_INCREMENTAL_LOAD(16), SCHEDULED_QUERY_MAINT(17), - REPL_LOAD_COMPLETE_ACK(18); + ACK(18); private final int value; - private StageType(int value) { + StageType(int value) { this.value = value; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index 4f87bac..c82e8d2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -31,12 +31,12 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplLoadTask; import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; +import org.apache.hadoop.hive.ql.exec.repl.AckTask; +import org.apache.hadoop.hive.ql.exec.repl.AckWork; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogTask; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; -import org.apache.hadoop.hive.ql.exec.repl.ReplLoadCompleteAckTask; -import org.apache.hadoop.hive.ql.exec.repl.ReplLoadCompleteAckWork; -import org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyTask; -import org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyWork; +import org.apache.hadoop.hive.ql.exec.repl.DirCopyTask; +import org.apache.hadoop.hive.ql.exec.repl.DirCopyWork; import org.apache.hadoop.hive.ql.exec.schq.ScheduledQueryMaintenanceTask; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; @@ -113,11 +113,12 @@ public final class TaskFactory { taskvec.add(new TaskTuple<>(ReplDumpWork.class, ReplDumpTask.class)); taskvec.add(new TaskTuple<>(ReplLoadWork.class, ReplLoadTask.class)); taskvec.add(new TaskTuple<>(ReplStateLogWork.class, ReplStateLogTask.class)); - taskvec.add(new TaskTuple<ReplLoadCompleteAckWork>(ReplLoadCompleteAckWork.class, ReplLoadCompleteAckTask.class)); + taskvec.add(new TaskTuple<AckWork>(AckWork.class, AckTask.class)); taskvec.add(new TaskTuple<ExportWork>(ExportWork.class, ExportTask.class)); taskvec.add(new TaskTuple<ReplTxnWork>(ReplTxnWork.class, ReplTxnTask.class)); taskvec.add(new TaskTuple<DirCopyWork>(DirCopyWork.class, DirCopyTask.class)); - taskvec.add(new TaskTuple<ScheduledQueryMaintenanceWork>(ScheduledQueryMaintenanceWork.class, ScheduledQueryMaintenanceTask.class)); + taskvec.add(new TaskTuple<ScheduledQueryMaintenanceWork>(ScheduledQueryMaintenanceWork.class, + ScheduledQueryMaintenanceTask.class)); } private static ThreadLocal<Integer> tid = new ThreadLocal<Integer>() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadCompleteAckTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckTask.java similarity index 75% rename from ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadCompleteAckTask.java rename to ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckTask.java index 975dfb0..03e8c4e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadCompleteAckTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckTask.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.exec.repl; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.plan.api.StageType; @@ -29,18 +28,18 @@ import org.apache.hadoop.hive.ql.plan.api.StageType; import java.io.Serializable; /** - * ReplLoadCompleteAckTask. + * AckTask. * - * Add the load complete acknoledgement. + * Add the repl dump/ repl load complete acknowledgement. **/ -public class ReplLoadCompleteAckTask extends Task<ReplLoadCompleteAckWork> implements Serializable { +public class AckTask extends Task<AckWork> implements Serializable { private static final long serialVersionUID = 1L; @Override public int execute() { try { - Path ackPath = new Path(work.getDumpPath(), ReplUtils.LOAD_ACKNOWLEDGEMENT); + Path ackPath = work.getAckFilePath(); Utils.create(ackPath, conf); } catch (SemanticException e) { setException(e); @@ -51,19 +50,17 @@ public class ReplLoadCompleteAckTask extends Task<ReplLoadCompleteAckWork> imple @Override public StageType getType() { - return StageType.REPL_LOAD_COMPLETE_ACK; + return StageType.ACK; } @Override public String getName() { - return "REPL_LOAD_COMPLETE_ACK"; + return "ACK_TASK"; } @Override public boolean canExecuteInParallel() { - // REPL_LOAD_COMPLETE_ACK is executed only when all its parents are done with execution. - // So running it in parallel has no - // benefits. + // ACK_TASK must be executed only when all its parents are done with execution. return false; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadCompleteAckWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckWork.java similarity index 71% rename from ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadCompleteAckWork.java rename to ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckWork.java index c36ee6d..0fa0a95 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadCompleteAckWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckWork.java @@ -18,27 +18,28 @@ package org.apache.hadoop.hive.ql.exec.repl; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.plan.Explain.Level; import java.io.Serializable; /** - * ReplLoadCompleteAckWork. - * FS based Acknowledgement for repl load complete + * AckWork. + * FS based acknowledgement on repl dump and repl load completion. * */ -@Explain(displayName = "Repl Load Complete Ack", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) -public class ReplLoadCompleteAckWork implements Serializable { +@Explain(displayName = "Replication Ack", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) +public class AckWork implements Serializable { private static final long serialVersionUID = 1L; - private String dumpPath; + private Path ackFilePath; - public String getDumpPath() { - return dumpPath; + public Path getAckFilePath() { + return ackFilePath; } - public ReplLoadCompleteAckWork(String dumpPath) { - this.dumpPath = dumpPath; + public AckWork(Path ackFilePath) { + this.ackFilePath = ackFilePath; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java new file mode 100644 index 0000000..e8a8df1 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; +import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import java.security.PrivilegedExceptionAction; +import java.io.Serializable; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.Collections; + +/** + * DirCopyTask, mainly to be used to copy External table data. + */ +public class DirCopyTask extends Task<DirCopyWork> implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(DirCopyTask.class); + private static final int MAX_COPY_RETRY = 5; + + private boolean createAndSetPathOwner(Path destPath, Path sourcePath) throws IOException { + FileSystem targetFs = destPath.getFileSystem(conf); + boolean createdDir = false; + if (!targetFs.exists(destPath)) { + // target path is created even if the source path is missing, so that ddl task does not try to create it. + if (!targetFs.mkdirs(destPath)) { + throw new IOException(destPath + " is not a directory or unable to create one"); + } + createdDir = true; + } + + FileStatus status; + try { + status = sourcePath.getFileSystem(conf).getFileStatus(sourcePath); + } catch (FileNotFoundException e) { + // Don't delete target path created else ddl task will try to create it using user hive and may fail. + LOG.warn("source path missing " + sourcePath); + return createdDir; + } + LOG.info("Setting permission for path dest {} from source {} owner {} : {} : {}", + destPath, sourcePath, status.getOwner(), status.getGroup(), status.getPermission()); + destPath.getFileSystem(conf).setOwner(destPath, status.getOwner(), status.getGroup()); + destPath.getFileSystem(conf).setPermission(destPath, status.getPermission()); + return createdDir; + } + + private boolean setTargetPathOwner(Path targetPath, Path sourcePath, UserGroupInformation proxyUser) + throws IOException, InterruptedException { + if (proxyUser == null) { + return createAndSetPathOwner(targetPath, sourcePath); + } + return proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () -> createAndSetPathOwner(targetPath, sourcePath)); + } + + private boolean checkIfPathExist(Path sourcePath, UserGroupInformation proxyUser) throws Exception { + if (proxyUser == null) { + return sourcePath.getFileSystem(conf).exists(sourcePath); + } + return proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () -> sourcePath.getFileSystem(conf).exists(sourcePath)); + } + + private int handleException(Exception e, Path sourcePath, Path targetPath, + int currentRetry, UserGroupInformation proxyUser) { + try { + LOG.info("Checking if source path " + sourcePath + " is missing for exception ", e); + if (!checkIfPathExist(sourcePath, proxyUser)) { + LOG.info("Source path is missing. Ignoring exception."); + return 0; + } + } catch (Exception ex) { + LOG.warn("Source path missing check failed. ", ex); + } + // retry logic only for i/o exception + if (!(e instanceof IOException)) { + LOG.error("Unable to copy {} to {}", sourcePath, targetPath, e); + setException(e); + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + } + + if (currentRetry <= MAX_COPY_RETRY) { + LOG.warn("Unable to copy {} to {}", sourcePath, targetPath, e); + } else { + LOG.error("Unable to copy {} to {} even after retrying for {} time", sourcePath, targetPath, currentRetry, e); + setException(e); + return ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getErrorCode(); + } + int sleepTime = FileUtils.getSleepTime(currentRetry); + LOG.info("Sleep for " + sleepTime + " milliseconds before retry no " + (currentRetry)); + try { + Thread.sleep(sleepTime); + } catch (InterruptedException timerEx) { + LOG.info("Sleep interrupted", timerEx.getMessage()); + } + try { + if (proxyUser == null) { + proxyUser = Utils.getUGI(); + } + FileSystem.closeAllForUGI(proxyUser); + } catch (Exception ex) { + LOG.warn("Unable to closeAllForUGI for user " + proxyUser, ex); + } + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + } + + @Override + public int execute() { + String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); + + Path sourcePath = work.getFullyQualifiedSourcePath(); + Path targetPath = work.getFullyQualifiedTargetPath(); + if (conf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) { + sourcePath = reservedRawPath(work.getFullyQualifiedSourcePath().toUri()); + targetPath = reservedRawPath(work.getFullyQualifiedTargetPath().toUri()); + } + int currentRetry = 0; + int error = 0; + UserGroupInformation proxyUser = null; + while (currentRetry <= MAX_COPY_RETRY) { + try { + UserGroupInformation ugi = Utils.getUGI(); + String currentUser = ugi.getShortUserName(); + if (distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser)) { + proxyUser = UserGroupInformation.createProxyUser( + distCpDoAsUser, UserGroupInformation.getLoginUser()); + } + + setTargetPathOwner(targetPath, sourcePath, proxyUser); + + // do we create a new conf and only here provide this additional option so that we get away from + // differences of data in two location for the same directories ? + // basically add distcp.options.delete to hiveconf new object ? + FileUtils.distCp( + sourcePath.getFileSystem(conf), // source file system + Collections.singletonList(sourcePath), // list of source paths + targetPath, + false, + proxyUser, + conf, + ShimLoader.getHadoopShims()); + return 0; + } catch (Exception e) { + currentRetry++; + error = handleException(e, sourcePath, targetPath, currentRetry, proxyUser); + if (error == 0) { + return 0; + } + } finally { + if (proxyUser != null) { + try { + FileSystem.closeAllForUGI(proxyUser); + } catch (IOException e) { + LOG.error("Unable to closeAllForUGI for user " + proxyUser, e); + if (error == 0) { + setException(e); + error = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + } + break; + } + } + } + } + return error; + } + + private static Path reservedRawPath(URI uri) { + return new Path(uri.getScheme(), uri.getAuthority(), CopyUtils.RAW_RESERVED_VIRTUAL_PATH + uri.getPath()); + } + + @Override + public StageType getType() { + return StageType.COPY; + } + + @Override + public String getName() { + return "DIR_COPY_TASK"; + } + + @Override + public boolean canExecuteInParallel() { + return true; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java new file mode 100644 index 0000000..efef052 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.plan.Explain; +import java.io.Serializable; + +/** + * DirCopyWork, mainly to be used to copy External table data. + */ +@Explain(displayName = "HDFS Copy Operator", explainLevels = { Explain.Level.USER, + Explain.Level.DEFAULT, + Explain.Level.EXTENDED }) +public class DirCopyWork implements Serializable { + private final Path fullyQualifiedSourcePath; + private final Path fullyQualifiedTargetPath; + + public DirCopyWork(Path fullyQualifiedSourcePath, Path fullyQualifiedTargetPath) { + this.fullyQualifiedSourcePath = fullyQualifiedSourcePath; + this.fullyQualifiedTargetPath = fullyQualifiedTargetPath; + } + @Override + public String toString() { + return "DirCopyWork{" + + "fullyQualifiedSourcePath=" + getFullyQualifiedSourcePath() + + ", fullyQualifiedTargetPath=" + getFullyQualifiedTargetPath() + + '}'; + } + + public Path getFullyQualifiedSourcePath() { + return fullyQualifiedSourcePath; + } + + public Path getFullyQualifiedTargetPath() { + return fullyQualifiedTargetPath; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java deleted file mode 100644 index 5c5543c..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.repl; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; -import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; -import org.apache.hadoop.hive.ql.plan.Explain; -import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.hive.shims.Utils; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import java.security.PrivilegedExceptionAction; - -import java.io.Serializable; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; - -public class ExternalTableCopyTaskBuilder { - private static final Logger LOG = LoggerFactory.getLogger(ExternalTableCopyTaskBuilder.class); - private final ReplLoadWork work; - private final HiveConf conf; - - ExternalTableCopyTaskBuilder(ReplLoadWork work, HiveConf conf) { - this.work = work; - this.conf = conf; - } - - List<Task<?>> tasks(TaskTracker tracker) { - List<Task<?>> tasks = new ArrayList<>(); - Iterator<DirCopyWork> itr = work.getPathsToCopyIterator(); - while (tracker.canAddMoreTasks() && itr.hasNext()) { - DirCopyWork dirCopyWork = itr.next(); - Task<DirCopyWork> task = TaskFactory.get(dirCopyWork, conf); - tasks.add(task); - tracker.addTask(task); - LOG.debug("added task for {}", dirCopyWork); - } - return tasks; - } - - public static class DirCopyTask extends Task<DirCopyWork> implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(DirCopyTask.class); - private static final int MAX_COPY_RETRY = 5; - - private boolean createAndSetPathOwner(Path destPath, Path sourcePath) throws IOException { - FileSystem targetFs = destPath.getFileSystem(conf); - boolean createdDir = false; - if (!targetFs.exists(destPath)) { - // target path is created even if the source path is missing, so that ddl task does not try to create it. - if (!targetFs.mkdirs(destPath)) { - throw new IOException(destPath + " is not a directory or unable to create one"); - } - createdDir = true; - } - - FileStatus status; - try { - status = sourcePath.getFileSystem(conf).getFileStatus(sourcePath); - } catch (FileNotFoundException e) { - // Don't delete target path created else ddl task will try to create it using user hive and may fail. - LOG.warn("source path missing " + sourcePath); - return createdDir; - } - LOG.info("Setting permission for path dest {} from source {} owner {} : {} : {}", - destPath, sourcePath, status.getOwner(), status.getGroup(), status.getPermission()); - destPath.getFileSystem(conf).setOwner(destPath, status.getOwner(), status.getGroup()); - destPath.getFileSystem(conf).setPermission(destPath, status.getPermission()); - return createdDir; - } - - private boolean setTargetPathOwner(Path targetPath, Path sourcePath, UserGroupInformation proxyUser) - throws IOException, InterruptedException { - if (proxyUser == null) { - return createAndSetPathOwner(targetPath, sourcePath); - } - return proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () -> - createAndSetPathOwner(targetPath, sourcePath)); - } - - private boolean checkIfPathExist(Path sourcePath, UserGroupInformation proxyUser) throws Exception { - if (proxyUser == null) { - return sourcePath.getFileSystem(conf).exists(sourcePath); - } - return proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () -> - sourcePath.getFileSystem(conf).exists(sourcePath)); - } - - private int handleException(Exception e, Path sourcePath, Path targetPath, - int currentRetry, UserGroupInformation proxyUser) { - try { - LOG.info("Checking if source path " + sourcePath + " is missing for exception ", e); - if (!checkIfPathExist(sourcePath, proxyUser)) { - LOG.info("Source path is missing. Ignoring exception."); - return 0; - } - } catch (Exception ex) { - LOG.warn("Source path missing check failed. ", ex); - } - - // retry logic only for i/o exception - if (!(e instanceof IOException)) { - LOG.error("Unable to copy {} to {}", sourcePath, targetPath, e); - setException(e); - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); - } - - if (currentRetry <= MAX_COPY_RETRY) { - LOG.warn("Unable to copy {} to {}", sourcePath, targetPath, e); - } else { - LOG.error("Unable to copy {} to {} even after retrying for {} time", sourcePath, targetPath, currentRetry, e); - setException(e); - return ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getErrorCode(); - } - - int sleepTime = FileUtils.getSleepTime(currentRetry); - LOG.info("Sleep for " + sleepTime + " milliseconds before retry no " + (currentRetry)); - try { - Thread.sleep(sleepTime); - } catch (InterruptedException timerEx) { - LOG.info("Sleep interrupted", timerEx.getMessage()); - } - - try { - if (proxyUser == null) { - proxyUser = Utils.getUGI(); - } - FileSystem.closeAllForUGI(proxyUser); - } catch (Exception ex) { - LOG.warn("Unable to closeAllForUGI for user " + proxyUser, ex); - } - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); - } - - @Override - public int execute() { - String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); - - Path sourcePath = work.fullyQualifiedSourcePath; - Path targetPath = work.fullyQualifiedTargetPath; - if (conf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) { - sourcePath = reservedRawPath(work.fullyQualifiedSourcePath.toUri()); - targetPath = reservedRawPath(work.fullyQualifiedTargetPath.toUri()); - } - int currentRetry = 0; - int error = 0; - UserGroupInformation proxyUser = null; - while (currentRetry <= MAX_COPY_RETRY) { - try { - UserGroupInformation ugi = Utils.getUGI(); - String currentUser = ugi.getShortUserName(); - if (distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser)) { - proxyUser = UserGroupInformation.createProxyUser( - distCpDoAsUser, UserGroupInformation.getLoginUser()); - } - - setTargetPathOwner(targetPath, sourcePath, proxyUser); - - // do we create a new conf and only here provide this additional option so that we get away from - // differences of data in two location for the same directories ? - // basically add distcp.options.delete to hiveconf new object ? - FileUtils.distCp( - sourcePath.getFileSystem(conf), // source file system - Collections.singletonList(sourcePath), // list of source paths - targetPath, - false, - proxyUser, - conf, - ShimLoader.getHadoopShims()); - return 0; - } catch (Exception e) { - currentRetry++; - error = handleException(e, sourcePath, targetPath, currentRetry, proxyUser); - if (error == 0) { - return 0; - } - } finally { - if (proxyUser != null) { - try { - FileSystem.closeAllForUGI(proxyUser); - } catch (IOException e) { - LOG.error("Unable to closeAllForUGI for user " + proxyUser, e); - if (error == 0) { - setException(e); - error = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); - } - break; - } - } - } - } - return error; - } - - private static Path reservedRawPath(URI uri) { - return new Path(uri.getScheme(), uri.getAuthority(), - CopyUtils.RAW_RESERVED_VIRTUAL_PATH + uri.getPath()); - } - - @Override - public StageType getType() { - return StageType.COPY; - } - - @Override - public String getName() { - return "DIR_COPY_TASK"; - } - - @Override - public boolean canExecuteInParallel(){ - return true; - } - } - - @Explain(displayName = "HDFS Copy Operator", explainLevels = { Explain.Level.USER, - Explain.Level.DEFAULT, - Explain.Level.EXTENDED }) - public static class DirCopyWork implements Serializable { - private final Path fullyQualifiedSourcePath, fullyQualifiedTargetPath; - - public DirCopyWork(Path fullyQualifiedSourcePath, Path fullyQualifiedTargetPath) { - this.fullyQualifiedSourcePath = fullyQualifiedSourcePath; - this.fullyQualifiedTargetPath = fullyQualifiedTargetPath; - } - - @Override - public String toString() { - return "DirCopyWork{" + - "fullyQualifiedSourcePath=" + fullyQualifiedSourcePath + - ", fullyQualifiedTargetPath=" + fullyQualifiedTargetPath + - '}'; - } - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 8da1c48..69f6ffe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -40,9 +40,12 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; +import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -52,7 +55,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.events.EventUtils; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.EximUtil; -import org.apache.hadoop.hive.ql.parse.EximUtil.ReplPathMapping; +import org.apache.hadoop.hive.ql.parse.EximUtil.ManagedTableCopyPath; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.DumpType; @@ -75,6 +78,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.security.auth.login.LoginException; +import java.io.File; import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; @@ -84,16 +88,18 @@ import java.util.List; import java.util.Arrays; import java.util.Collections; import java.util.Base64; -import java.util.ArrayList; +import java.util.LinkedList; import java.util.UUID; +import java.util.ArrayList; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { + private static final long serialVersionUID = 1L; private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; private static final String FUNCTION_METADATA_FILE_NAME = EximUtil.METADATA_NAME; private static final long SLEEP_TIME = 60000; - Set<String> tablesForBootstrap = new HashSet<>(); + private Set<String> tablesForBootstrap = new HashSet<>(); public enum ConstraintFileType {COMMON("common", "c_"), FOREIGNKEY("fk", "f_"); private final String name; @@ -122,33 +128,35 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { @Override public int execute() { try { - Hive hiveDb = getHive(); - Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), - Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase() - .getBytes(StandardCharsets.UTF_8.name()))); - Path previousDumpMetaPath = getPreviousDumpMetadataPath(dumpRoot); - Path previousHiveDumpPath = - previousDumpMetaPath != null ? new Path(previousDumpMetaPath, ReplUtils.REPL_HIVE_BASE_DIR) : null; - //If no previous dump is present or previous dump was loaded, proceed - if (shouldDump(previousHiveDumpPath)) { - Path currentDumpPath = new Path(dumpRoot, getNextDumpDir()); - Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); - DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); - // Initialize ReplChangeManager instance since we will require it to encode file URI. - ReplChangeManager.getInstance(conf); - Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); - Long lastReplId; - if (previousHiveDumpPath == null) { - lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + if (work.tableDataCopyIteratorsInitialized()) { + initiateDataCopyTasks(); + } else { + Hive hiveDb = getHive(); + Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), + Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase() + .getBytes(StandardCharsets.UTF_8.name()))); + Path previousHiveDumpPath = getPreviousDumpMetadataPath(dumpRoot); + //If no previous dump is present or previous dump is already loaded, proceed with the dump operation. + if (shouldDump(previousHiveDumpPath)) { + Path currentDumpPath = new Path(dumpRoot, getNextDumpDir()); + Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); + DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); + // Initialize ReplChangeManager instance since we will require it to encode file URI. + ReplChangeManager.getInstance(conf); + Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); + Long lastReplId; + if (previousHiveDumpPath == null) { + lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + } else { + work.setEventFrom(getEventFromPreviousDumpMetadata(previousHiveDumpPath)); + lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + } + work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); + work.setCurrentDumpPath(currentDumpPath); + initiateDataCopyTasks(); } else { - work.setEventFrom(getEventFromPreviousDumpMetadata(previousHiveDumpPath)); - lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb); + LOG.info("Previous Dump is not yet loaded"); } - prepareReturnValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); - writeDumpCompleteAck(hiveDumpRoot); - deleteAllPreviousDumpMeta(dumpRoot, currentDumpPath); - } else { - LOG.warn("Previous Dump is not yet loaded"); } } catch (Exception e) { LOG.error("failed", e); @@ -158,20 +166,59 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { return 0; } - private void deleteAllPreviousDumpMeta(Path dumpRoot, Path currentDumpPath) throws IOException { - FileSystem fs = dumpRoot.getFileSystem(conf); - if (fs.exists(dumpRoot)) { - FileStatus[] statuses = fs.listStatus(dumpRoot, - path -> !path.equals(currentDumpPath) && !path.toUri().getPath().equals(currentDumpPath.toString())); - for (FileStatus status : statuses) { - fs.delete(status.getPath(), true); + private void initiateDataCopyTasks() throws SemanticException, IOException { + TaskTracker taskTracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS)); + List<Task<?>> childTasks = new ArrayList<>(); + childTasks.addAll(work.externalTableCopyTasks(taskTracker, conf)); + childTasks.addAll(work.managedTableCopyTasks(taskTracker, conf)); + if (childTasks.isEmpty()) { + //All table data copy work finished. + finishRemainingTasks(); + } else { + DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(TaskFactory.get(work, conf))); + this.childTasks = childTasks; + } + } + + private void finishRemainingTasks() throws SemanticException, IOException { + prepareReturnValues(work.getResultValues()); + Path dumpAckFile = new Path(work.getCurrentDumpPath(), + ReplUtils.REPL_HIVE_BASE_DIR + File.separator + ReplUtils.DUMP_ACKNOWLEDGEMENT); + Utils.create(dumpAckFile, conf); + deleteAllPreviousDumpMeta(work.getCurrentDumpPath()); + } + + private void prepareReturnValues(List<String> values) throws SemanticException { + LOG.debug("prepareReturnValues : " + dumpSchema); + for (String s : values) { + LOG.debug(" > " + s); + } + Utils.writeOutput(Collections.singletonList(values), new Path(work.resultTempPath), conf); + } + + private void deleteAllPreviousDumpMeta(Path currentDumpPath) { + try { + Path dumpRoot = getDumpRoot(currentDumpPath); + FileSystem fs = dumpRoot.getFileSystem(conf); + if (fs.exists(dumpRoot)) { + FileStatus[] statuses = fs.listStatus(dumpRoot, + path -> !path.equals(currentDumpPath) && !path.toUri().getPath().equals(currentDumpPath.toString())); + for (FileStatus status : statuses) { + fs.delete(status.getPath(), true); + } } + } catch (Exception ex) { + LOG.warn("Possible leak on disk, could not delete the previous dump directory:" + currentDumpPath, ex); } } - private void writeDumpCompleteAck(Path currentDumpPath) throws SemanticException { - Path ackPath = new Path(currentDumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT); - Utils.create(ackPath, conf); + private Path getDumpRoot(Path currentDumpPath) { + if (ReplDumpWork.testDeletePreviousDumpMetaPath && conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { + //testDeleteDumpMetaDumpPath to be used only for test. + return null; + } else { + return currentDumpPath.getParent(); + } } private Long getEventFromPreviousDumpMetadata(Path previousDumpPath) throws SemanticException { @@ -187,20 +234,30 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { } private Path getPreviousDumpMetadataPath(Path dumpRoot) throws IOException { + FileStatus latestValidStatus = null; FileSystem fs = dumpRoot.getFileSystem(conf); if (fs.exists(dumpRoot)) { FileStatus[] statuses = fs.listStatus(dumpRoot); - if (statuses.length > 0) { - FileStatus latestUpdatedStatus = statuses[0]; - for (FileStatus status : statuses) { - if (status.getModificationTime() > latestUpdatedStatus.getModificationTime()) { - latestUpdatedStatus = status; - } + for (FileStatus status : statuses) { + LOG.info("Evaluating previous dump dir path:{}", status.getPath()); + if (latestValidStatus == null) { + latestValidStatus = validDump(fs, status.getPath()) ? status : null; + } else if (validDump(fs, status.getPath()) + && status.getModificationTime() > latestValidStatus.getModificationTime()) { + latestValidStatus = status; } - return latestUpdatedStatus.getPath(); } } - return null; + Path latestDumpDir = (latestValidStatus == null) + ? null : new Path(latestValidStatus.getPath(), ReplUtils.REPL_HIVE_BASE_DIR); + LOG.info("Selecting latest valid dump dir as {}", (latestDumpDir == null) ? "null" : latestDumpDir.toString()); + return latestDumpDir; + } + + private boolean validDump(FileSystem fs, Path dumpDir) throws IOException { + //Check if it was a successful dump + Path hiveDumpDir = new Path(dumpDir, ReplUtils.REPL_HIVE_BASE_DIR); + return fs.exists(new Path(hiveDumpDir, ReplUtils.DUMP_ACKNOWLEDGEMENT)); } private boolean shouldDump(Path previousDumpPath) throws IOException { @@ -214,14 +271,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { } } - private void prepareReturnValues(List<String> values) throws SemanticException { - LOG.debug("prepareReturnValues : " + dumpSchema); - for (String s : values) { - LOG.debug(" > " + s); - } - Utils.writeOutput(Collections.singletonList(values), new Path(work.resultTempPath), conf); - } - /** * Decide whether to examine all the tables to dump. We do this if * 1. External tables are going to be part of the dump : In which case we need to list their @@ -326,6 +375,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { String validTxnList = null; long waitUntilTime = 0; long bootDumpBeginReplId = -1; + List<EximUtil.ManagedTableCopyPath> managedTableCopyPaths = Collections.emptyList(); + List<DirCopyWork> extTableCopyWorks = Collections.emptyList(); List<String> tableList = work.replScope.includeAllTables() ? null : new ArrayList<>(); @@ -403,8 +454,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime); } + managedTableCopyPaths = new ArrayList<>(); Path dbRoot = getBootstrapDbRoot(dumpRoot, dbName, true); - + List<Path> extTableLocations = new LinkedList<>(); try (Writer writer = new Writer(dumpRoot, conf)) { for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { try { @@ -413,13 +465,15 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { // Dump external table locations if required. if (TableType.EXTERNAL_TABLE.equals(table.getTableType()) && shouldDumpExternalTableLocation()) { - writer.dataLocationDump(table); + extTableLocations.addAll(writer.dataLocationDump(table)); } // Dump the table to be bootstrapped if required. if (shouldBootstrapDumpTable(table)) { HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(table); - dumpTable(dbName, tableName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, hiveDb, tableTuple); + managedTableCopyPaths.addAll( + dumpTable(dbName, tableName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, + hiveDb, tableTuple)); } if (tableList != null && isTableSatifiesConfig(table)) { tableList.add(tableName); @@ -432,8 +486,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { } } dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf); + extTableCopyWorks = dirLocationsToCopy(extTableLocations); } - + work.setDirCopyIterator(extTableCopyWorks.iterator()); + work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator()); return lastReplId; } @@ -530,6 +586,20 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { LOG.info("Table list file " + tableListFile.toUri() + " is created for table list - " + tableList); } + private List<DirCopyWork> dirLocationsToCopy(List<Path> sourceLocations) + throws HiveException { + List<DirCopyWork> list = new ArrayList<>(sourceLocations.size()); + String baseDir = conf.get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname); + // this is done to remove any scheme related information that will be present in the base path + // specifically when we are replicating to cloud storage + Path basePath = new Path(baseDir); + for (Path sourcePath : sourceLocations) { + Path targetPath = ReplExternalTables.externalTableDataPath(conf, basePath, sourcePath); + list.add(new DirCopyWork(sourcePath, targetPath)); + } + return list; + } + Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throws Exception { // bootstrap case @@ -546,6 +616,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { long waitUntilTime = System.currentTimeMillis() + timeoutInMs; String validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime); + List<DirCopyWork> extTableCopyWorks = new ArrayList<>(); + List<ManagedTableCopyPath> managedTableCopyPaths = new ArrayList<>(); for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) { LOG.debug("Dumping db: " + dbName); @@ -568,6 +640,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName); Exception caught = null; try (Writer writer = new Writer(dbRoot, conf)) { + List<Path> extTableLocations = new LinkedList<>(); for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { LOG.debug("Dumping table: " + tblName + " to db root " + dbRoot.toUri()); Table table = null; @@ -578,10 +651,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { if (shouldDumpExternalTableLocation() && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())) { LOG.debug("Adding table {} to external tables list", tblName); - writer.dataLocationDump(tableTuple.object); + extTableLocations.addAll(writer.dataLocationDump(tableTuple.object)); } - dumpTable(dbName, tblName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, hiveDb, - tableTuple); + managedTableCopyPaths.addAll(dumpTable(dbName, tblName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, + hiveDb, tableTuple)); } catch (InvalidTableException te) { // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it. // Just log a debug message and skip it. @@ -593,6 +666,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { } } dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf); + extTableCopyWorks = dirLocationsToCopy(extTableLocations); } catch (Exception e) { caught = e; } finally { @@ -618,9 +692,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot); dmd.write(); - - // Set the correct last repl id to return to the user - // Currently returned bootDumpBeginReplId as we don't consolidate the events after bootstrap + work.setDirCopyIterator(extTableCopyWorks.iterator()); + work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator()); return bootDumpBeginReplId; } @@ -638,8 +711,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { return dbRoot; } - void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, Path dumproot, long lastReplId, - Hive hiveDb, HiveWrapper.Tuple<Table> tuple) throws Exception { + List<ManagedTableCopyPath> dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, Path dumproot, + long lastReplId, Hive hiveDb, HiveWrapper.Tuple<Table> tuple) throws Exception { LOG.info("Bootstrap Dump for table " + tblName); TableSpec tableSpec = new TableSpec(tuple.object); TableExport.Paths exportPaths = @@ -657,20 +730,14 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { } MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle); tuple.replicationSpec.setRepl(true); - List<ReplPathMapping> replPathMappings = new TableExport( + List<ManagedTableCopyPath> managedTableCopyPaths = new TableExport( exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write(false); replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); if (tableSpec.tableHandle.getTableType().equals(TableType.EXTERNAL_TABLE) || Utils.shouldDumpMetaDataOnly(conf)) { - return; - } - for (ReplPathMapping replPathMapping: replPathMappings) { - Task<?> copyTask = ReplCopyTask.getLoadCopyTask( - tuple.replicationSpec, replPathMapping.getSrcPath(), replPathMapping.getTargetPath(), conf, false); - this.addDependentTask(copyTask); - LOG.info("Scheduled a repl copy task from [{}] to [{}]", - replPathMapping.getSrcPath(), replPathMapping.getTargetPath()); + return Collections.emptyList(); } + return managedTableCopyPaths; } private String getValidWriteIdList(String dbName, String tblName, String validTxnString) throws LockException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index 9b11bae..1f0d702 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -18,29 +18,51 @@ package org.apache.hadoop.hive.ql.exec.repl; import com.google.common.primitives.Ints; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.repl.ReplScope; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.ReplCopyTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.plan.Explain; +import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; @Explain(displayName = "Replication Dump Operator", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED }) public class ReplDumpWork implements Serializable { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(ReplDumpWork.class); final ReplScope replScope; final ReplScope oldReplScope; final String dbNameOrPattern, astRepresentationForErrorMsg, resultTempPath; Long eventTo; Long eventFrom; static String testInjectDumpDir = null; + static boolean testDeletePreviousDumpMetaPath = false; private Integer maxEventLimit; + private transient Iterator<DirCopyWork> dirCopyIterator; + private transient Iterator<EximUtil.ManagedTableCopyPath> managedTableCopyPathIterator; + private Path currentDumpPath; + private List<String> resultValues; public static void injectNextDumpDirForTest(String dumpDir) { testInjectDumpDir = dumpDir; } + public static void testDeletePreviousDumpMetaPath(boolean failDeleteDumpMeta) { + testDeletePreviousDumpMetaPath = failDeleteDumpMeta; + } + public ReplDumpWork(ReplScope replScope, ReplScope oldReplScope, String astRepresentationForErrorMsg, String resultTempPath) { @@ -87,4 +109,64 @@ public class ReplDumpWork implements Serializable { .debug("eventTo not specified, using current event id : {}", eventTo); } } + + public void setDirCopyIterator(Iterator<DirCopyWork> dirCopyIterator) { + if (this.dirCopyIterator != null) { + throw new IllegalStateException("Dir Copy iterator has already been initialized"); + } + this.dirCopyIterator = dirCopyIterator; + } + + public void setManagedTableCopyPathIterator(Iterator<EximUtil.ManagedTableCopyPath> managedTableCopyPathIterator) { + if (this.managedTableCopyPathIterator != null) { + throw new IllegalStateException("Managed table copy path iterator has already been initialized"); + } + this.managedTableCopyPathIterator = managedTableCopyPathIterator; + } + + public boolean tableDataCopyIteratorsInitialized() { + return dirCopyIterator != null || managedTableCopyPathIterator != null; + } + + public Path getCurrentDumpPath() { + return currentDumpPath; + } + + public void setCurrentDumpPath(Path currentDumpPath) { + this.currentDumpPath = currentDumpPath; + } + + public List<String> getResultValues() { + return resultValues; + } + + public void setResultValues(List<String> resultValues) { + this.resultValues = resultValues; + } + + public List<Task<?>> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) { + List<Task<?>> tasks = new ArrayList<>(); + while (dirCopyIterator.hasNext() && tracker.canAddMoreTasks()) { + DirCopyWork dirCopyWork = dirCopyIterator.next(); + Task<DirCopyWork> task = TaskFactory.get(dirCopyWork, conf); + tasks.add(task); + tracker.addTask(task); + LOG.debug("added task for {}", dirCopyWork); + } + return tasks; + } + + public List<Task<?>> managedTableCopyTasks(TaskTracker tracker, HiveConf conf) { + List<Task<?>> tasks = new ArrayList<>(); + while (managedTableCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) { + EximUtil.ManagedTableCopyPath managedTableCopyPath = managedTableCopyPathIterator.next(); + Task<?> copyTask = ReplCopyTask.getLoadCopyTask( + managedTableCopyPath.getReplicationSpec(), managedTableCopyPath.getSrcPath(), + managedTableCopyPath.getTargetPath(), conf, false); + tasks.add(copyTask); + tracker.addTask(copyTask); + LOG.debug("added task for {}", managedTableCopyPath); + } + return tasks; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java index c7aa007..fddee28 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java @@ -43,6 +43,7 @@ import java.io.StringWriter; import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -120,11 +121,12 @@ public final class ReplExternalTables { /** * this will dump a single line per external table. it can include additional lines for the same * table if the table is partitioned and the partition location is outside the table. + * It returns list of all the external table locations. */ - void dataLocationDump(Table table) - throws InterruptedException, IOException, HiveException { + List<Path> dataLocationDump(Table table) throws InterruptedException, IOException, HiveException { + List<Path> extTableLocations = new LinkedList<>(); if (!shouldWrite()) { - return; + return extTableLocations; } if (!TableType.EXTERNAL_TABLE.equals(table.getTableType())) { throw new IllegalArgumentException( @@ -134,6 +136,7 @@ public final class ReplExternalTables { Path fullyQualifiedDataLocation = PathBuilder.fullyQualifiedHDFSUri(table.getDataLocation(), FileSystem.get(hiveConf)); write(lineFor(table.getTableName(), fullyQualifiedDataLocation, hiveConf)); + extTableLocations.add(fullyQualifiedDataLocation); if (table.isPartitioned()) { List<Partition> partitions; try { @@ -142,7 +145,7 @@ public final class ReplExternalTables { if (e.getCause() instanceof NoSuchObjectException) { // If table is dropped when dump in progress, just skip partitions data location dump LOG.debug(e.getMessage()); - return; + return extTableLocations; } throw e; } @@ -155,9 +158,11 @@ public final class ReplExternalTables { fullyQualifiedDataLocation = PathBuilder .fullyQualifiedHDFSUri(partition.getDataLocation(), FileSystem.get(hiveConf)); write(lineFor(table.getTableName(), fullyQualifiedDataLocation, hiveConf)); + extTableLocations.add(fullyQualifiedDataLocation); } } } + return extTableLocations; } private static String lineFor(String tableName, Path dataLoc, HiveConf hiveConf) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index e792bc5..3427b59 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.repl; import com.google.common.collect.Collections2; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; @@ -44,6 +45,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -126,16 +128,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { if (!iterator.hasNext() && constraintIterator.hasNext()) { loadingConstraint = true; } - while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext()) || - (work.getPathsToCopyIterator().hasNext())) && loadTaskTracker.canAddMoreTasks()) { - // First start the distcp tasks to copy the files related to external table. The distcp tasks should be - // started first to avoid ddl task trying to create table/partition directory. Distcp task creates these - // directory with proper permission and owner. - if (work.getPathsToCopyIterator().hasNext()) { - scope.rootTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(loadTaskTracker)); - break; - } - + while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext())) + && loadTaskTracker.canAddMoreTasks()) { BootstrapEvent next; if (!loadingConstraint) { next = iterator.next(); @@ -252,8 +246,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState() - || constraintIterator.hasNext() - || work.getPathsToCopyIterator().hasNext(); + || constraintIterator.hasNext(); if (addAnotherLoadTask) { createBuilderTask(scope.rootTasks); @@ -262,8 +255,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { // Update last repl ID of the database only if the current dump is not incremental. If bootstrap // is combined with incremental dump, it contains only tables to bootstrap. So, needn't change // last repl ID of the database. - if (!iterator.hasNext() && !constraintIterator.hasNext() && !work.getPathsToCopyIterator().hasNext() - && !work.isIncrementalLoad()) { + if (!iterator.hasNext() && !constraintIterator.hasNext() && !work.isIncrementalLoad()) { loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, loadContext, scope)); work.updateDbEventState(null); } @@ -320,18 +312,17 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { } private void createReplLoadCompleteAckTask() { - if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks() - && !work.getPathsToCopyIterator().hasNext()) - || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks() - && !work.getPathsToCopyIterator().hasNext())) { + if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks()) + || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) { //All repl load tasks are executed and status is 0, create the task to add the acknowledgement - ReplLoadCompleteAckWork replLoadCompleteAckWork = new ReplLoadCompleteAckWork(work.dumpDirectory); - Task<ReplLoadCompleteAckWork> loadCompleteAckWorkTask = TaskFactory.get(replLoadCompleteAckWork, conf); + AckWork replLoadAckWork = new AckWork( + new Path(work.dumpDirectory, ReplUtils.LOAD_ACKNOWLEDGEMENT)); + Task<AckWork> loadAckWorkTask = TaskFactory.get(replLoadAckWork, conf); if (this.childTasks.isEmpty()) { - this.childTasks.add(loadCompleteAckWorkTask); + this.childTasks.add(loadAckWorkTask); } else { DAGTraversal.traverse(this.childTasks, - new AddDependencyToLeaves(Collections.singletonList(loadCompleteAckWorkTask))); + new AddDependencyToLeaves(Collections.singletonList(loadAckWorkTask))); } } } @@ -431,7 +422,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { IncrementalLoadTasksBuilder builder = work.incrementalLoadTasksBuilder(); // If incremental events are already applied, then check and perform if need to bootstrap any tables. - if (!builder.hasMoreWork() && !work.getPathsToCopyIterator().hasNext()) { + if (!builder.hasMoreWork() && work.isLastReplIDUpdated()) { if (work.hasBootstrapLoadTasks()) { LOG.debug("Current incremental dump have tables to be bootstrapped. Switching to bootstrap " + "mode after applying all events."); @@ -442,20 +433,13 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { List<Task<?>> childTasks = new ArrayList<>(); int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); - // First start the distcp tasks to copy the files related to external table. The distcp tasks should be - // started first to avoid ddl task trying to create table/partition directory. Distcp task creates these - // directory with proper permission and owner. TaskTracker tracker = new TaskTracker(maxTasks); - if (work.getPathsToCopyIterator().hasNext()) { - childTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(tracker)); - } else { - childTasks.add(builder.build(context, getHive(), LOG, tracker)); - } + childTasks.add(builder.build(context, getHive(), LOG, tracker)); // If there are no more events to be applied, add a task to update the last.repl.id of the // target database to the event id of the last event considered by the dump. Next // incremental cycle won't consider the events in this dump again if it starts from this id. - if (!builder.hasMoreWork() && !work.getPathsToCopyIterator().hasNext()) { + if (!builder.hasMoreWork()) { // The name of the database to be loaded into is either specified directly in REPL LOAD // command i.e. when dbNameToLoadIn has a valid dbname or is available through dump // metadata during table level replication. @@ -484,14 +468,13 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc), conf); DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(updateReplIdTask)); + work.setLastReplIDUpdated(true); LOG.debug("Added task to set last repl id of db " + dbName + " to " + lastEventid); } } - // Either the incremental has more work or the external table file copy has more paths to process. - // Once all the incremental events are applied and external tables file copies are done, enable - // bootstrap of tables if exist. - if (builder.hasMoreWork() || work.getPathsToCopyIterator().hasNext() || work.hasBootstrapLoadTasks()) { + // Once all the incremental events are applied, enable bootstrap of tables if exist. + if (builder.hasMoreWork() || work.hasBootstrapLoadTasks()) { DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(TaskFactory.get(work, conf))); } this.childTasks = childTasks; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index 370c5ec..474d8c2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -34,9 +34,6 @@ import org.apache.hadoop.hive.ql.exec.Task; import java.io.IOException; import java.io.Serializable; -import java.util.Iterator; -import java.util.List; -import static org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyWork; @Explain(displayName = "Replication Load Operator", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, @@ -45,6 +42,7 @@ public class ReplLoadWork implements Serializable { final String dbNameToLoadIn; final ReplScope currentReplScope; final String dumpDirectory; + private boolean lastReplIDUpdated; private final ConstraintEventsIterator constraintsIterator; private int loadTaskRunCount = 0; @@ -52,7 +50,6 @@ public class ReplLoadWork implements Serializable { private final transient BootstrapEventsIterator bootstrapIterator; private transient IncrementalLoadTasksBuilder incrementalLoadTasksBuilder; private transient Task<?> rootTask; - private final transient Iterator<DirCopyWork> pathsToCopyIterator; /* these are sessionState objects that are copied over to work to allow for parallel execution. @@ -63,8 +60,7 @@ public class ReplLoadWork implements Serializable { public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn, ReplScope currentReplScope, - LineageState lineageState, boolean isIncrementalDump, Long eventTo, - List<DirCopyWork> pathsToCopyIterator) throws IOException { + LineageState lineageState, boolean isIncrementalDump, Long eventTo) throws IOException { sessionStateLineageState = lineageState; this.dumpDirectory = dumpDirectory; this.dbNameToLoadIn = dbNameToLoadIn; @@ -99,7 +95,6 @@ public class ReplLoadWork implements Serializable { this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); incrementalLoadTasksBuilder = null; } - this.pathsToCopyIterator = pathsToCopyIterator.iterator(); } BootstrapEventsIterator bootstrapIterator() { @@ -147,7 +142,11 @@ public class ReplLoadWork implements Serializable { this.rootTask = rootTask; } - public Iterator<DirCopyWork> getPathsToCopyIterator() { - return pathsToCopyIterator; + public boolean isLastReplIDUpdated() { + return lastReplIDUpdated; + } + + public void setLastReplIDUpdated(boolean lastReplIDUpdated) { + this.lastReplIDUpdated = lastReplIDUpdated; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index e65cbf5..bc90ea1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -51,7 +51,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; @@ -158,37 +157,61 @@ public class EximUtil { } /** - * Wrapper class for mapping replication source and target path for copying data. + * Wrapper class for mapping source and target path for copying managed table data. */ - public static class ReplPathMapping { + public static class ManagedTableCopyPath { + private ReplicationSpec replicationSpec; + private static boolean nullSrcPathForTest = false; private Path srcPath; private Path tgtPath; - public ReplPathMapping(Path srcPath, Path tgtPath) { + public ManagedTableCopyPath(ReplicationSpec replicationSpec, Path srcPath, Path tgtPath) { + this.replicationSpec = replicationSpec; if (srcPath == null) { - throw new IllegalArgumentException("Source Path can not be null."); + throw new IllegalArgumentException("Source path can not be null."); } this.srcPath = srcPath; if (tgtPath == null) { - throw new IllegalArgumentException("Target Path can not be null."); + throw new IllegalArgumentException("Target path can not be null."); } this.tgtPath = tgtPath; } public Path getSrcPath() { + if (nullSrcPathForTest) { + return null; + } return srcPath; } - public void setSrcPath(Path srcPath) { - this.srcPath = srcPath; - } - public Path getTargetPath() { return tgtPath; } - public void setTargetPath(Path targetPath) { - this.tgtPath = targetPath; + @Override + public String toString() { + return "ManagedTableCopyPath{" + + "fullyQualifiedSourcePath=" + srcPath + + ", fullyQualifiedTargetPath=" + tgtPath + + '}'; + } + + public ReplicationSpec getReplicationSpec() { + return replicationSpec; + } + + public void setReplicationSpec(ReplicationSpec replicationSpec) { + this.replicationSpec = replicationSpec; + } + + /** + * To be used only for testing purpose. + * It has been used to make repl dump operation fail. + */ + public static void setNullSrcPath(HiveConf conf, boolean aNullSrcPath) { + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { + nullSrcPathForTest = aNullSrcPath; + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index a6b15bc..8802139 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; -import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables; import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.hooks.ReadEntity; @@ -43,22 +42,18 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.plan.PlanUtils; -import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Base64; import java.util.List; -import java.util.ArrayList; import java.util.Collections; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_MOVE_OPTIMIZED_FILE_SCHEMES; -import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Reader; -import static org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyWork; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPLACE; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG; @@ -400,8 +395,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), replScope.getDbName(), dmd.getReplScope(), - queryState.getLineageState(), evDump, dmd.getEventTo(), - dirLocationsToCopy(loadPath, evDump)); + queryState.getLineageState(), evDump, dmd.getEventTo()); rootTasks.add(TaskFactory.get(replLoadWork, conf)); } else { LOG.warn("Previous Dump Already Loaded"); @@ -415,51 +409,30 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { private Path getCurrentLoadPath() throws IOException, SemanticException { Path loadPathBase = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), Base64.getEncoder().encodeToString(sourceDbNameOrPattern.toLowerCase() - .getBytes(StandardCharsets.UTF_8.name()))); + .getBytes(StandardCharsets.UTF_8.name()))); final FileSystem fs = loadPathBase.getFileSystem(conf); - // Make fully qualified path for further use. loadPathBase = fs.makeQualified(loadPathBase); - - if (!fs.exists(loadPathBase)) { - // supposed dump path does not exist. - LOG.error("File not found " + loadPathBase.toUri().toString()); - throw new FileNotFoundException(ErrorMsg.REPL_LOAD_PATH_NOT_FOUND.getMsg()); - } - FileStatus[] statuses = loadPathBase.getFileSystem(conf).listStatus(loadPathBase); - if (statuses.length > 0) { - //sort based on last modified. Recent one is at the beginning - FileStatus latestUpdatedStatus = statuses[0]; - for (FileStatus status : statuses) { - if (status.getModificationTime() > latestUpdatedStatus.getModificationTime()) { - latestUpdatedStatus = status; + if (fs.exists(loadPathBase)) { + FileStatus[] statuses = loadPathBase.getFileSystem(conf).listStatus(loadPathBase); + if (statuses.length > 0) { + //sort based on last modified. Recent one is at the beginning + FileStatus latestUpdatedStatus = statuses[0]; + for (FileStatus status : statuses) { + if (status.getModificationTime() > latestUpdatedStatus.getModificationTime()) { + latestUpdatedStatus = status; + } + } + Path hiveDumpPath = new Path(latestUpdatedStatus.getPath(), ReplUtils.REPL_HIVE_BASE_DIR); + if (loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT)) + && !loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT))) { + return hiveDumpPath; } - } - Path hiveDumpPath = new Path(latestUpdatedStatus.getPath(), ReplUtils.REPL_HIVE_BASE_DIR); - if (loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT)) - && !loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT))) { - return hiveDumpPath; } } return null; } - private List<DirCopyWork> dirLocationsToCopy(Path loadPath, boolean isIncrementalPhase) - throws HiveException, IOException { - List<DirCopyWork> list = new ArrayList<>(); - String baseDir = conf.get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname); - // this is done to remove any scheme related information that will be present in the base path - // specifically when we are replicating to cloud storage - Path basePath = new Path(baseDir); - - for (String location : new Reader(conf, loadPath, isIncrementalPhase).sourceLocationsToCopy()) { - Path sourcePath = new Path(location); - Path targetPath = ReplExternalTables.externalTableDataPath(conf, basePath, sourcePath); - list.add(new DirCopyWork(sourcePath, targetPath)); - } - return list; - } - private void setConfigs(ASTNode node) throws SemanticException { Map<String, String> replConfigs = getProps(node); if (null != replConfigs) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index 454998f..c3b1081 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PartitionIterable; import org.apache.hadoop.hive.ql.parse.EximUtil; -import org.apache.hadoop.hive.ql.parse.EximUtil.ReplPathMapping; +import org.apache.hadoop.hive.ql.parse.EximUtil.ManagedTableCopyPath; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; @@ -75,10 +75,10 @@ class PartitionExport { this.callersSession = SessionState.get(); } - List<ReplPathMapping> write(final ReplicationSpec forReplicationSpec, boolean isExportTask) + List<ManagedTableCopyPath> write(final ReplicationSpec forReplicationSpec, boolean isExportTask) throws InterruptedException, HiveException { List<Future<?>> futures = new LinkedList<>(); - List<ReplPathMapping> replCopyPathMappings = new LinkedList<>(); //Collections.synchronizedList(new LinkedList<>()); + List<ManagedTableCopyPath> managedTableCopyPaths = new LinkedList<>(); ExecutorService producer = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("partition-submitter-thread-%d").build()); futures.add(producer.submit(() -> { @@ -122,7 +122,8 @@ class PartitionExport { new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf, mmCtx) .export(isExportTask); LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName); - return new ReplPathMapping(partition.getDataLocation(), new Path(rootDataDumpDir, EximUtil.DATA_PATH_NAME)); + return new ManagedTableCopyPath(forReplicationSpec, partition.getDataLocation(), + new Path(rootDataDumpDir, EximUtil.DATA_PATH_NAME)); } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } @@ -133,8 +134,8 @@ class PartitionExport { try { Object retVal = future.get(); if (retVal != null) { - ReplPathMapping replPathMapping = (ReplPathMapping)retVal; - replCopyPathMappings.add(replPathMapping); + ManagedTableCopyPath managedTableCopyPath = (ManagedTableCopyPath) retVal; + managedTableCopyPaths.add(managedTableCopyPath); } } catch (Exception e) { LOG.error("failed", e.getCause()); @@ -143,6 +144,6 @@ class PartitionExport { } // may be drive this via configuration as well. consumer.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); - return replCopyPathMappings; + return managedTableCopyPaths; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index a26b159..683f3c0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -33,7 +33,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PartitionIterable; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.EximUtil; -import org.apache.hadoop.hive.ql.parse.EximUtil.ReplPathMapping; +import org.apache.hadoop.hive.ql.parse.EximUtil.ManagedTableCopyPath; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; @@ -47,7 +47,6 @@ import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -97,8 +96,8 @@ public class TableExport { this.mmCtx = mmCtx; } - public List<ReplPathMapping> write(boolean isExportTask) throws SemanticException { - List<ReplPathMapping> replPathMappings = Collections.emptyList(); + public List<ManagedTableCopyPath> write(boolean isExportTask) throws SemanticException { + List<ManagedTableCopyPath> managedTableCopyPaths = Collections.emptyList(); if (tableSpec == null) { writeMetaData(null); } else if (shouldExport()) { @@ -106,12 +105,12 @@ public class TableExport { writeMetaData(withPartitions); if (!replicationSpec.isMetadataOnly() && !(replicationSpec.isRepl() && tableSpec.tableHandle.getTableType().equals(TableType.EXTERNAL_TABLE))) { - replPathMappings = writeData(withPartitions, isExportTask); + managedTableCopyPaths = writeData(withPartitions, isExportTask); } } else if (isExportTask) { throw new SemanticException(ErrorMsg.INCOMPATIBLE_SCHEMA.getMsg()); } - return replPathMappings; + return managedTableCopyPaths; } private PartitionIterable getPartitions() throws SemanticException { @@ -162,24 +161,26 @@ public class TableExport { } } - private List<ReplPathMapping> writeData(PartitionIterable partitions, boolean isExportTask) throws SemanticException { - List<ReplPathMapping> replCopyPathMappings = new LinkedList<>(); + private List<ManagedTableCopyPath> writeData(PartitionIterable partitions, boolean isExportTask) + throws SemanticException { + List<ManagedTableCopyPath> managedTableCopyPaths = new ArrayList<>(); try { if (tableSpec.tableHandle.isPartitioned()) { if (partitions == null) { throw new IllegalStateException("partitions cannot be null for partitionTable :" + tableSpec.getTableName().getTable()); } - replCopyPathMappings = new PartitionExport( + managedTableCopyPaths = new PartitionExport( paths, partitions, distCpDoAsUser, conf, mmCtx).write(replicationSpec, isExportTask); } else { List<Path> dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(), replicationSpec, conf); - replCopyPathMappings.add(new ReplPathMapping(tableSpec.tableHandle.getDataLocation(), paths.dataExportDir())); + managedTableCopyPaths.add(new ManagedTableCopyPath(replicationSpec, tableSpec.tableHandle.getDataLocation(), + paths.dataExportDir())); new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx) .export(isExportTask); } - return replCopyPathMappings; + return managedTableCopyPaths; } catch (Exception e) { throw new SemanticException(e.getMessage(), e); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index 2651ea4..81fac25 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.junit.Test; @@ -127,13 +128,15 @@ public class TestReplDumpTask { private int tableDumpCount = 0; @Override - void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, Path replDataDir, - long lastReplId, Hive hiveDb, HiveWrapper.Tuple<Table> tuple) + List<EximUtil.ManagedTableCopyPath> dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, + Path replDataDir, long lastReplId, Hive hiveDb, + HiveWrapper.Tuple<Table> tuple) throws Exception { tableDumpCount++; if (tableDumpCount > 1) { throw new TestException(); } + return Collections.emptyList(); } };