This is an automated email from the ASF dual-hosted git repository. anishek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 997fb16 HIVE-23039: Checkpointing for repl dump bootstrap phase (Aasha Medhi, reviewed by Pravin Kumar Sinha,Anishek Agarwal) 997fb16 is described below commit 997fb162eb58d2422d36d6d3171c59b8c40754a7 Author: Aasha Medhi <aasha.medhi2...@gmail.com> AuthorDate: Thu Apr 9 17:00:05 2020 +0530 HIVE-23039: Checkpointing for repl dump bootstrap phase (Aasha Medhi, reviewed by Pravin Kumar Sinha,Anishek Agarwal) --- .../hive/ql/parse/TestReplicationScenarios.java | 32 ++- .../parse/TestReplicationScenariosAcidTables.java | 310 +++++++++++++++++++++ .../TestReplicationScenariosAcrossInstances.java | 11 +- .../TestReplicationScenariosExternalTables.java | 56 ++-- .../parse/TestTableLevelReplicationScenarios.java | 13 +- .../org/apache/hadoop/hive/ql/exec/ExportTask.java | 2 +- .../hadoop/hive/ql/exec/repl/DirCopyWork.java | 1 + .../apache/hadoop/hive/ql/exec/repl/ReplAck.java | 35 +++ .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 120 +++++--- .../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 4 +- .../hadoop/hive/ql/exec/repl/ReplLoadWork.java | 7 +- .../ql/exec/repl/bootstrap/events/TableEvent.java | 2 + .../events/filesystem/BootstrapEventsIterator.java | 6 +- .../events/filesystem/DatabaseEventsIterator.java | 22 +- .../events/filesystem/FSPartitionEvent.java | 9 +- .../bootstrap/events/filesystem/FSTableEvent.java | 20 +- .../repl/bootstrap/load/table/LoadPartitions.java | 10 +- .../exec/repl/bootstrap/load/table/LoadTable.java | 5 +- .../hadoop/hive/ql/exec/repl/util/ReplUtils.java | 10 +- .../org/apache/hadoop/hive/ql/parse/EximUtil.java | 1 + .../hive/ql/parse/ReplicationSemanticAnalyzer.java | 7 +- .../hive/ql/parse/repl/dump/PartitionExport.java | 6 +- .../hive/ql/parse/repl/dump/TableExport.java | 98 +++++-- .../hadoop/hive/ql/exec/repl/TestReplDumpTask.java | 11 +- 24 files changed, 660 insertions(+), 138 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index efe9fff..c79d4c3 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -98,6 +98,8 @@ import java.util.Map; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -315,8 +317,8 @@ public class TestReplicationScenarios { FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); - assertTrue(fs.exists(new Path(dumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT))); - assertTrue(fs.exists(new Path(dumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT))); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString()))); verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptn_data, driverMirror); verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptn_data_1, driverMirror); @@ -367,8 +369,8 @@ public class TestReplicationScenarios { advanceDumpDir(); FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); - assertTrue(fs.exists(new Path(dumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT))); - assertTrue(fs.exists(new Path(dumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT))); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString()))); verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptnData, driverMirror); verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptnData1, driverMirror); @@ -452,7 +454,7 @@ public class TestReplicationScenarios { Path loadPath = new Path(dump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); //delete load ack to reload the same dump - loadPath.getFileSystem(hconf).delete(new Path(loadPath, ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + loadPath.getFileSystem(hconf).delete(new Path(loadPath, LOAD_ACKNOWLEDGEMENT.toString()), true); loadAndVerify(dbNameReplica, dbName, dump.lastReplId); run("insert into table " + dbName + ".t2 partition(country='india') values ('delhi')", driver); @@ -466,7 +468,7 @@ public class TestReplicationScenarios { loadPath = new Path(dump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); //delete load ack to reload the same dump - loadPath.getFileSystem(hconf).delete(new Path(loadPath, ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + loadPath.getFileSystem(hconf).delete(new Path(loadPath, LOAD_ACKNOWLEDGEMENT.toString()), true); loadAndVerify(dbNameReplica, dbName, dump.lastReplId); run("insert into table " + dbName + ".t2 partition(country='us') values ('sf')", driver); @@ -902,8 +904,8 @@ public class TestReplicationScenarios { Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName); FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf); Path dumpPath = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); - assertTrue(fs.exists(new Path(dumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT))); - assertTrue(fs.exists(new Path(dumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT))); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString()))); // VERIFY tables and partitions on destination for equivalence. verifyRun("SELECT * from " + replDbName + ".unptned_empty", empty, driverMirror); @@ -1439,8 +1441,8 @@ public class TestReplicationScenarios { Path path = new Path(System.getProperty("test.warehouse.dir", "")); String tableRelativeSrcPath = dbName.toLowerCase()+".db" + File.separator + "unptned"; Path srcFileLocation = new Path(path, tableRelativeSrcPath + File.separator + unptnedFileName1); - String tgtFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + dbName.toLowerCase() + File.separator - + "unptned" + File.separator + EximUtil.DATA_PATH_NAME +File.separator + unptnedFileName1; + String tgtFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + EximUtil.DATA_PATH_NAME + + File.separator + dbName.toLowerCase() + File.separator + "unptned" +File.separator + unptnedFileName1; Path tgtFileLocation = new Path(dump.dumpLocation, tgtFileRelativePath); //A file in table at src location should be copied to $dumplocation/hive/<db>/<table>/data/<unptned_fileName> verifyChecksum(srcFileLocation, tgtFileLocation, true); @@ -1449,9 +1451,10 @@ public class TestReplicationScenarios { String partitionRelativeSrcPath = dbName.toLowerCase()+".db" + File.separator + "ptned" + File.separator + "b=1"; srcFileLocation = new Path(path, partitionRelativeSrcPath + File.separator + ptnedFileName1); - tgtFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + dbName.toLowerCase() + tgtFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + EximUtil.DATA_PATH_NAME + + File.separator + dbName.toLowerCase() + File.separator + "ptned" + File.separator + "b=1" + File.separator - + EximUtil.DATA_PATH_NAME +File.separator + ptnedFileName1; + + ptnedFileName1; tgtFileLocation = new Path(dump.dumpLocation, tgtFileRelativePath); //A partitioned file in table at src location should be copied to // $dumplocation/hive/<db>/<table>/<partition>/data/<unptned_fileName> @@ -1723,7 +1726,8 @@ public class TestReplicationScenarios { Tuple incrementalDump = replDumpDb(dbName); //Remove the dump ack file, so that dump is treated as an invalid dump. - String ackFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + ReplUtils.DUMP_ACKNOWLEDGEMENT; + String ackFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + + DUMP_ACKNOWLEDGEMENT.toString(); Path dumpFinishedAckFilePath = new Path(incrementalDump.dumpLocation, ackFileRelativePath); Path tmpDumpFinishedAckFilePath = new Path(dumpFinishedAckFilePath.getParent(), "old_" + dumpFinishedAckFilePath.getName()); @@ -1809,7 +1813,7 @@ public class TestReplicationScenarios { FileSystem fs = FileSystem.get(fileToDelete.toUri(), hconf); fs.delete(fileToDelete, true); assertTrue(fs.exists(bootstrapDumpDir)); - assertTrue(fs.exists(new Path(bootstrapDumpDir, ReplUtils.DUMP_ACKNOWLEDGEMENT))); + assertTrue(fs.exists(new Path(bootstrapDumpDir, DUMP_ACKNOWLEDGEMENT.toString()))); loadAndVerify(replDbName, dbName, incrDump.lastReplId); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 1e25598..39722a0 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -32,10 +33,12 @@ import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.Behaviour import org.apache.hadoop.hive.ql.DriverFactory; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.security.UserGroupInformation; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -51,7 +54,11 @@ import java.util.Collections; import java.util.Map; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; /** * TestReplicationScenariosAcidTables - test replication for ACID tables. @@ -655,4 +662,307 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios assertEquals("REPL LOAD * is not supported", e.getMessage()); } } + + @Test + public void testCheckPointingDataDumpFailure() throws Throwable { + //To force distcp copy + List<String> dumpClause = Arrays.asList( + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", + "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + + UserGroupInformation.getCurrentUser().getUserName() + "'"); + + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .run("CREATE TABLE t2(a string) STORED AS TEXTFILE") + .run("insert into t1 values (1)") + .run("insert into t1 values (2)") + .run("insert into t1 values (3)") + .run("insert into t2 values (11)") + .run("insert into t2 values (21)") + .dump(primaryDbName); + + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + Path metadataPath = new Path(dumpPath, EximUtil.METADATA_PATH_NAME); + long modifiedTimeMetadata = fs.getFileStatus(metadataPath).getModificationTime(); + Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME); + Path dbDataPath = new Path(dataPath, primaryDbName.toLowerCase()); + Path tablet1Path = new Path(dbDataPath, "t1"); + Path tablet2Path = new Path(dbDataPath, "t2"); + //Delete dump ack and t2 data, metadata should be rewritten, data should be same for t1 but rewritten for t2 + fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); + assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + FileStatus[] statuses = fs.listStatus(tablet2Path); + //Delete t2 data + fs.delete(statuses[0].getPath(), true); + long modifiedTimeTable1 = fs.getFileStatus(tablet1Path).getModificationTime(); + long modifiedTimeTable1CopyFile = fs.listStatus(tablet1Path)[0].getModificationTime(); + long modifiedTimeTable2 = fs.getFileStatus(tablet2Path).getModificationTime(); + //Do another dump. It should only dump table t2. Modification time of table t1 should be same while t2 is greater + WarehouseInstance.Tuple nextDump = primary.dump(primaryDbName, dumpClause); + assertEquals(nextDump.dumpLocation, bootstrapDump.dumpLocation); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertEquals(modifiedTimeTable1, fs.getFileStatus(tablet1Path).getModificationTime()); + assertEquals(modifiedTimeTable1CopyFile, fs.listStatus(tablet1Path)[0].getModificationTime()); + assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime()); + assertTrue(modifiedTimeMetadata < fs.getFileStatus(metadataPath).getModificationTime()); + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "2", "3"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"11", "21"}); + } + + @Test + public void testCheckPointingDataDumpFailureRegularCopy() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .run("CREATE TABLE t2(a string) STORED AS TEXTFILE") + .run("insert into t1 values (1)") + .run("insert into t1 values (2)") + .run("insert into t1 values (3)") + .run("insert into t2 values (11)") + .run("insert into t2 values (21)") + .dump(primaryDbName); + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + Path metadataPath = new Path(dumpPath, EximUtil.METADATA_PATH_NAME); + long modifiedTimeMetadata = fs.getFileStatus(metadataPath).getModificationTime(); + Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME); + Path dbPath = new Path(dataPath, primaryDbName.toLowerCase()); + Path tablet1Path = new Path(dbPath, "t1"); + Path tablet2Path = new Path(dbPath, "t2"); + //Delete dump ack and t2 data, metadata should be rewritten, data should be same for t1 but rewritten for t2 + fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); + assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + FileStatus[] statuses = fs.listStatus(tablet2Path); + //Delete t2 data + fs.delete(statuses[0].getPath(), true); + long modifiedTimeTable1 = fs.getFileStatus(tablet1Path).getModificationTime(); + long modifiedTimeTable1CopyFile = fs.listStatus(tablet1Path)[0].getModificationTime(); + long modifiedTimeTable2 = fs.getFileStatus(tablet2Path).getModificationTime(); + //Do another dump. It should only dump table t2. Modification time of table t1 should be same while t2 is greater + WarehouseInstance.Tuple nextDump = primary.dump(primaryDbName); + assertEquals(nextDump.dumpLocation, bootstrapDump.dumpLocation); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + //File is copied again as we are using regular copy + assertTrue(modifiedTimeTable1 < fs.getFileStatus(tablet1Path).getModificationTime()); + assertTrue(modifiedTimeTable1CopyFile < fs.listStatus(tablet1Path)[0].getModificationTime()); + assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime()); + assertTrue(modifiedTimeMetadata < fs.getFileStatus(metadataPath).getModificationTime()); + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "2", "3"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"11", "21"}); + } + + @Test + public void testCheckPointingWithSourceTableDataInserted() throws Throwable { + //To force distcp copy + List<String> dumpClause = Arrays.asList( + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", + "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + + UserGroupInformation.getCurrentUser().getUserName() + "'"); + + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .run("CREATE TABLE t2(a string) STORED AS TEXTFILE") + .run("insert into t1 values (1)") + .run("insert into t1 values (2)") + .run("insert into t1 values (3)") + .run("insert into t2 values (11)") + .run("insert into t2 values (21)") + .dump(primaryDbName); + + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME); + Path dbPath = new Path(dataPath, primaryDbName.toLowerCase()); + Path tablet1Path = new Path(dbPath, "t1"); + Path tablet2Path = new Path(dbPath, "t2"); + long modifiedTimeTable2 = fs.getFileStatus(tablet2Path).getModificationTime(); + //Delete table 2 data + FileStatus[] statuses = fs.listStatus(tablet2Path); + //Delete t2 data + fs.delete(statuses[0].getPath(), true); + fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); + long modifiedTimeTable1CopyFile = fs.listStatus(tablet1Path)[0].getModificationTime(); + + //Do another dump. It should only dump table t2. Also insert new data in existing tables. + // New data should be there in target + primary.run("use " + primaryDbName) + .run("insert into t2 values (13)") + .run("insert into t2 values (24)") + .run("insert into t1 values (4)") + .dump(primaryDbName, dumpClause); + + replica.load(replicatedDbName, primaryDbName) + .run("use " + replicatedDbName) + .run("select * from t1") + .verifyResults(new String[]{"1", "2", "3", "4"}) + .run("select * from t2") + .verifyResults(new String[]{"11", "21", "13", "24"}); + assertEquals(modifiedTimeTable1CopyFile, fs.listStatus(tablet1Path)[0].getModificationTime()); + assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime()); + } + + @Test + public void testCheckPointingWithNewTablesAdded() throws Throwable { + //To force distcp copy + List<String> dumpClause = Arrays.asList( + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", + "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + + UserGroupInformation.getCurrentUser().getUserName() + "'"); + + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .run("CREATE TABLE t2(a string) STORED AS TEXTFILE") + .run("insert into t1 values (1)") + .run("insert into t1 values (2)") + .run("insert into t1 values (3)") + .run("insert into t2 values (11)") + .run("insert into t2 values (21)") + .dump(primaryDbName); + + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME); + Path dbPath = new Path(dataPath, primaryDbName.toLowerCase()); + Path tablet1Path = new Path(dbPath, "t1"); + Path tablet2Path = new Path(dbPath, "t2"); + long modifiedTimeTable1 = fs.getFileStatus(tablet1Path).getModificationTime(); + long modifiedTimeTable2 = fs.getFileStatus(tablet2Path).getModificationTime(); + //Delete table 2 data + FileStatus[] statuses = fs.listStatus(tablet2Path); + fs.delete(statuses[0].getPath(), true); + fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); + long modifiedTimeTable1CopyFile = fs.listStatus(tablet1Path)[0].getModificationTime(); + + // Do another dump. It should only dump table t2 and next table. + // Also insert new tables. New tables will be there in target + primary.run("use " + primaryDbName) + .run("insert into t2 values (13)") + .run("insert into t2 values (24)") + .run("create table t3(a string) STORED AS TEXTFILE") + .run("insert into t3 values (1)") + .run("insert into t3 values (2)") + .run("insert into t3 values (3)") + .dump(primaryDbName, dumpClause); + + replica.load(replicatedDbName, primaryDbName) + .run("use " + replicatedDbName) + .run("select * from t1") + .verifyResults(new String[]{"1", "2", "3"}) + .run("select * from t2") + .verifyResults(new String[]{"11", "21", "13", "24"}) + .run("show tables") + .verifyResults(new String[]{"t1", "t2", "t3"}) + .run("select * from t3") + .verifyResults(new String[]{"1", "2", "3"}); + assertEquals(modifiedTimeTable1, fs.getFileStatus(tablet1Path).getModificationTime()); + assertEquals(modifiedTimeTable1CopyFile, fs.listStatus(tablet1Path)[0].getModificationTime()); + assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime()); + } + + @Test + public void testCheckPointingWithSourceTableDeleted() throws Throwable { + //To force distcp copy + List<String> dumpClause = Arrays.asList( + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", + "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + + UserGroupInformation.getCurrentUser().getUserName() + "'"); + + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .run("CREATE TABLE t2(a string) STORED AS TEXTFILE") + .run("insert into t1 values (1)") + .run("insert into t1 values (2)") + .run("insert into t1 values (3)") + .run("insert into t2 values (11)") + .run("insert into t2 values (21)") + .dump(primaryDbName); + + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + + + //Delete dump ack and t2 data, Also drop table. New data will be there in target + fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); + assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME); + Path dbPath = new Path(dataPath, primaryDbName.toLowerCase()); + Path tablet2Path = new Path(dbPath, "t2"); + FileStatus[] statuses = fs.listStatus(tablet2Path); + //Delete t2 data. + fs.delete(statuses[0].getPath(), true); + //Drop table t1. Target shouldn't have t1 table as metadata dump is rewritten + primary.run("use " + primaryDbName) + .run("drop table t1") + .dump(primaryDbName, dumpClause); + + replica.load(replicatedDbName, primaryDbName) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[]{"t2"}) + .run("select * from t2") + .verifyResults(new String[]{"11", "21"}); + } + + @Test + public void testCheckPointingMetadataDumpFailure() throws Throwable { + //To force distcp copy + List<String> dumpClause = Arrays.asList( + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", + "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + + UserGroupInformation.getCurrentUser().getUserName() + "'"); + + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a string) STORED AS TEXTFILE") + .run("CREATE TABLE t2(a string) STORED AS TEXTFILE") + .run("insert into t1 values (1)") + .run("insert into t1 values (2)") + .run("insert into t1 values (3)") + .run("insert into t2 values (11)") + .run("insert into t2 values (21)") + .dump(primaryDbName); + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + + //Delete dump ack and metadata ack, everything should be rewritten in a new dump dir + fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); + fs.delete(new Path(dumpPath, "_dumpmetadata"), true); + assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + //Insert new data + primary.run("insert into "+ primaryDbName +".t1 values (12)"); + primary.run("insert into "+ primaryDbName +".t1 values (13)"); + //Do another dump. It should dump everything in a new dump dir + // checkpointing will not be used + WarehouseInstance.Tuple nextDump = primary.dump(primaryDbName, dumpClause); + replica.load(replicatedDbName, primaryDbName) + .run("use " + replicatedDbName) + .run("select * from t2") + .verifyResults(new String[]{"11", "21"}) + .run("select * from t1") + .verifyResults(new String[]{"1", "2", "3", "12", "13"}); + assertNotEquals(nextDump.dumpLocation, bootstrapDump.dumpLocation); + dumpPath = new Path(nextDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 56b27a5..33124c8 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -55,6 +55,7 @@ import java.util.Map; import java.util.stream.Collectors; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -1093,13 +1094,13 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro // To retry with same dump delete the load ack new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path( - hiveDumpLocation, ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + hiveDumpLocation, LOAD_ACKNOWLEDGEMENT.toString()), true); // Retry with same dump with which it was already loaded also fails. replica.loadFailure(replicatedDbName, primaryDbName); // To retry with same dump delete the load ack new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path( - hiveDumpLocation, ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + hiveDumpLocation, LOAD_ACKNOWLEDGEMENT.toString()), true); // Retry from same dump when the database is empty is also not allowed. replica.run("drop table t1") .run("drop table t2") @@ -1344,7 +1345,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro //delete load ack to reuse the dump new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(tuple.dumpLocation + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR - + ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + + LOAD_ACKNOWLEDGEMENT.toString()), true); replica.load(replicatedDbName_CM, primaryDbName, withConfigs); replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')") @@ -1370,7 +1371,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro replica.load(replicatedDbName, primaryDbName, withConfigs); //delete load ack to reuse the dump new Path(bootstrapDump.dumpLocation).getFileSystem(conf).delete(new Path(bootstrapDump.dumpLocation - + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR + ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR + LOAD_ACKNOWLEDGEMENT.toString()), true); replica.load(replicatedDbName_CM, primaryDbName, withConfigs); replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')") .run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')"); @@ -1423,7 +1424,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro //delete load ack to reuse the dump new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(tuple.dumpLocation + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR - + ReplUtils.LOAD_ACKNOWLEDGEMENT), true); + + LOAD_ACKNOWLEDGEMENT.toString()), true); InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index 1325789..b7a9888 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -106,8 +106,10 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .dump(primaryDbName, dumpWithClause); // the _external_tables_file info only should be created if external tables are to be replicated not otherwise + Path metadataPath = new Path(new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR), + EximUtil.METADATA_PATH_NAME); assertFalse(primary.miniDFSCluster.getFileSystem() - .exists(new Path(tuple.dumpLocation, relativeExtInfoPath(primaryDbName)))); + .exists(new Path(metadataPath + relativeExtInfoPath(primaryDbName)))); replica.load(replicatedDbName, primaryDbName, loadWithClause) .run("repl status " + replicatedDbName) @@ -126,8 +128,10 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .dump(primaryDbName, dumpWithClause); // the _external_tables_file info only should be created if external tables are to be replicated not otherwise + metadataPath = new Path(new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR), + EximUtil.METADATA_PATH_NAME); assertFalse(primary.miniDFSCluster.getFileSystem() - .exists(new Path(tuple.dumpLocation, relativeExtInfoPath(null)))); + .exists(new Path(metadataPath + relativeExtInfoPath(null)))); replica.load(replicatedDbName, primaryDbName, loadWithClause) .run("use " + replicatedDbName) @@ -152,7 +156,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .dump(primaryDbName, withClauseOptions); // verify that the external table info is written correctly for bootstrap - assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, primaryDbName); + assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, primaryDbName, false); @@ -184,7 +188,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .dump(primaryDbName, withClauseOptions); // verify that the external table info is written correctly for incremental - assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"), tuple.dumpLocation); + assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"), tuple.dumpLocation, true); replica.load(replicatedDbName, primaryDbName, withClauseOptions) .run("use " + replicatedDbName) @@ -202,7 +206,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .dumpWithCommand("repl dump " + primaryDbName); // verify that the external table info is written correctly for incremental - assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"), tuple.dumpLocation); + assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"), tuple.dumpLocation, true); } /** @@ -310,7 +314,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("insert into t2 partition(country='india') values ('bangalore')") .dump(primaryDbName, withClause); - assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation, primaryDbName); + assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation, primaryDbName, false); replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) @@ -333,7 +337,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("insert into t2 partition(country='australia') values ('sydney')") .dump(primaryDbName, withClause); - assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation); + assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation, true); replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) @@ -420,7 +424,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("alter table t1 add partition(country='us')") .dump(primaryDbName, withClause); - assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation); + assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation, true); // Add new data externally, to a partition, but under the partition level top directory // Also, it is added after dumping the events so data should not be seen at target after REPL LOAD. @@ -467,7 +471,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros // Repl load with zero events but external tables location info should present. tuple = primary.dump(primaryDbName, withClause); - assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation); + assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation, true); replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) @@ -519,8 +523,10 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .dump(primaryDbName, dumpWithClause); // the _external_tables_file info only should be created if external tables are to be replicated not otherwise + Path metadataPath = new Path(new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR), + EximUtil.METADATA_PATH_NAME); assertFalse(primary.miniDFSCluster.getFileSystem() - .exists(new Path(tuple.dumpLocation, relativeExtInfoPath(primaryDbName)))); + .exists(new Path(metadataPath + relativeExtInfoPath(null)))); replica.load(replicatedDbName, primaryDbName, loadWithClause) .status(replicatedDbName) @@ -543,11 +549,12 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .dump(primaryDbName, dumpWithClause); // the _external_tables_file info should be created as external tables are to be replicated. + Path hivePath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); assertTrue(primary.miniDFSCluster.getFileSystem() - .exists(new Path(tuple.dumpLocation, relativeExtInfoPath(null)))); + .exists(new Path(hivePath + relativeExtInfoPath(null)))); // verify that the external table info is written correctly for incremental - assertExternalFileInfo(Arrays.asList("t2", "t3"), tuple.dumpLocation); + assertExternalFileInfo(Arrays.asList("t2", "t3"), tuple.dumpLocation, true); // _bootstrap directory should be created as bootstrap enabled on external tables. String hiveDumpLocation = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; @@ -762,7 +769,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros } // Only table t2 should exist in the data location list file. - assertExternalFileInfo(Collections.singletonList("t2"), tupleInc.dumpLocation); + assertExternalFileInfo(Collections.singletonList("t2"), tupleInc.dumpLocation, true); // The newly inserted data "2" should be missing in table "t1". But, table t2 should exist and have // inserted data. @@ -917,20 +924,29 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros return ReplicationTestUtils.externalTableWithClause(extTblBaseDir, bootstrapExtTbl, includeExtTbl); } - private void assertExternalFileInfo(List<String> expected, String dumplocation) throws IOException { - assertExternalFileInfo(expected, dumplocation, null); + private void assertExternalFileInfo(List<String> expected, String dumplocation, + boolean isIncremental) throws IOException { + assertExternalFileInfo(expected, dumplocation, null, isIncremental); } - private void assertExternalFileInfo(List<String> expected, String dumplocation, String dbName) + private void assertExternalFileInfo(List<String> expected, String dumplocation, String dbName, + boolean isIncremental) throws IOException { - Path externalTableInfoFile = new Path(dumplocation, relativeExtInfoPath(dbName)); + Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR); + Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME); + Path externalTableInfoFile; + if (isIncremental) { + externalTableInfoFile = new Path(hivePath + relativeExtInfoPath(dbName)); + } else { + externalTableInfoFile = new Path(metadataPath + relativeExtInfoPath(dbName)); + } ReplicationTestUtils.assertExternalFileInfo(primary, expected, externalTableInfoFile); } - private String relativeExtInfoPath(String dbName) { + private String relativeExtInfoPath(String dbName) { if (dbName == null) { - return ReplUtils.REPL_HIVE_BASE_DIR + File.separator + FILE_NAME; + return File.separator + FILE_NAME; } else { - return ReplUtils.REPL_HIVE_BASE_DIR + File.separator + dbName.toLowerCase() + File.separator + FILE_NAME; + return File.separator + dbName.toLowerCase() + File.separator + FILE_NAME; } } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java index 78251f2..93e24ef 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; @@ -242,7 +243,12 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios // Check if the DB dump path have any tables other than the ones listed in bootstrappedTables. Path dbPath = new Path(dumpPath, primaryDbName); - FileStatus[] fileStatuses = primary.miniDFSCluster.getFileSystem().listStatus(dbPath); + FileStatus[] fileStatuses = primary.miniDFSCluster.getFileSystem().listStatus(dbPath, new PathFilter() { + @Override + public boolean accept(Path path) { + return !path.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME); + } + }); Assert.assertEquals(fileStatuses.length, bootstrappedTables.length); // Eg: _bootstrap/<db_name>/t2, _bootstrap/<db_name>/t3 etc @@ -500,13 +506,14 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios .dump(replPolicy, dumpWithClause); String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; + Path metaDataPath = new Path(hiveDumpDir, EximUtil.METADATA_PATH_NAME); // the _external_tables_file info should be created as external tables are to be replicated. Assert.assertTrue(primary.miniDFSCluster.getFileSystem() - .exists(new Path(new Path(hiveDumpDir, primaryDbName.toLowerCase()), FILE_NAME))); + .exists(new Path(new Path(metaDataPath, primaryDbName.toLowerCase()), FILE_NAME))); // Verify that the external table info contains only table "a2". ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2"), - new Path(new Path(hiveDumpDir, primaryDbName.toLowerCase()), FILE_NAME)); + new Path(new Path(metaDataPath, primaryDbName.toLowerCase()), FILE_NAME)); replica.load(replicatedDbName, replPolicy, loadWithClause) .run("use " + replicatedDbName) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java index 56f0c93..d3e9413 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java @@ -46,7 +46,7 @@ public class ExportTask extends Task<ExportWork> implements Serializable { TableExport.Paths exportPaths = new TableExport.Paths( work.getAstRepresentationForErrorMsg(), work.getExportRootDir(), conf, false); Hive db = getHive(); - LOG.debug("Exporting data to: {}", exportPaths.exportRootDir()); + LOG.debug("Exporting data to: {}", exportPaths.metadataExportRootDir()); work.acidPostProcess(db); TableExport tableExport = new TableExport(exportPaths, work.getTableSpec(), work.getReplicationSpec(), db, null, conf, work.getMmContext()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java index efef052..46f9bb3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java @@ -28,6 +28,7 @@ import java.io.Serializable; Explain.Level.DEFAULT, Explain.Level.EXTENDED }) public class DirCopyWork implements Serializable { + private static final long serialVersionUID = 1L; private final Path fullyQualifiedSourcePath; private final Path fullyQualifiedTargetPath; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java new file mode 100644 index 0000000..db8db5f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl; + +/** + * ReplAck, used for repl acknowledgement constants. + */ +public enum ReplAck { + DUMP_ACKNOWLEDGEMENT("_finished_dump"), + LOAD_ACKNOWLEDGEMENT("_finished_load"); + private String ack; + ReplAck(String ack) { + this.ack = ack; + } + + @Override + public String toString() { + return ack; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 69f6ffe..2e0af02 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -93,6 +93,7 @@ import java.util.UUID; import java.util.ArrayList; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { private static final long serialVersionUID = 1L; @@ -135,20 +136,20 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase() .getBytes(StandardCharsets.UTF_8.name()))); - Path previousHiveDumpPath = getPreviousDumpMetadataPath(dumpRoot); + Path previousValidHiveDumpPath = getPreviousValidDumpMetadataPath(dumpRoot); //If no previous dump is present or previous dump is already loaded, proceed with the dump operation. - if (shouldDump(previousHiveDumpPath)) { - Path currentDumpPath = new Path(dumpRoot, getNextDumpDir()); + if (shouldDump(previousValidHiveDumpPath)) { + Path currentDumpPath = getCurrentDumpPath(dumpRoot); Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); // Initialize ReplChangeManager instance since we will require it to encode file URI. ReplChangeManager.getInstance(conf); Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); Long lastReplId; - if (previousHiveDumpPath == null) { + if (previousValidHiveDumpPath == null) { lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb); } else { - work.setEventFrom(getEventFromPreviousDumpMetadata(previousHiveDumpPath)); + work.setEventFrom(getEventFromPreviousDumpMetadata(previousValidHiveDumpPath)); lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb); } work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); @@ -166,6 +167,16 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { return 0; } + private Path getCurrentDumpPath(Path dumpRoot) throws IOException { + Path previousDumpPath = getPreviousDumpPath(dumpRoot); + if (previousDumpPath != null && !validDump(previousDumpPath) && shouldResumePreviousDump(previousDumpPath)) { + //Resume previous dump + return previousDumpPath; + } else { + return new Path(dumpRoot, getNextDumpDir()); + } + } + private void initiateDataCopyTasks() throws SemanticException, IOException { TaskTracker taskTracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS)); List<Task<?>> childTasks = new ArrayList<>(); @@ -183,7 +194,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { private void finishRemainingTasks() throws SemanticException, IOException { prepareReturnValues(work.getResultValues()); Path dumpAckFile = new Path(work.getCurrentDumpPath(), - ReplUtils.REPL_HIVE_BASE_DIR + File.separator + ReplUtils.DUMP_ACKNOWLEDGEMENT); + ReplUtils.REPL_HIVE_BASE_DIR + File.separator + + ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); Utils.create(dumpAckFile, conf); deleteAllPreviousDumpMeta(work.getCurrentDumpPath()); } @@ -233,7 +245,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { return 0L; } - private Path getPreviousDumpMetadataPath(Path dumpRoot) throws IOException { + private Path getPreviousValidDumpMetadataPath(Path dumpRoot) throws IOException { FileStatus latestValidStatus = null; FileSystem fs = dumpRoot.getFileSystem(conf); if (fs.exists(dumpRoot)) { @@ -241,8 +253,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { for (FileStatus status : statuses) { LOG.info("Evaluating previous dump dir path:{}", status.getPath()); if (latestValidStatus == null) { - latestValidStatus = validDump(fs, status.getPath()) ? status : null; - } else if (validDump(fs, status.getPath()) + latestValidStatus = validDump(status.getPath()) ? status : null; + } else if (validDump(status.getPath()) && status.getModificationTime() > latestValidStatus.getModificationTime()) { latestValidStatus = status; } @@ -254,10 +266,14 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { return latestDumpDir; } - private boolean validDump(FileSystem fs, Path dumpDir) throws IOException { + private boolean validDump(Path dumpDir) throws IOException { //Check if it was a successful dump - Path hiveDumpDir = new Path(dumpDir, ReplUtils.REPL_HIVE_BASE_DIR); - return fs.exists(new Path(hiveDumpDir, ReplUtils.DUMP_ACKNOWLEDGEMENT)); + if (dumpDir != null) { + FileSystem fs = dumpDir.getFileSystem(conf); + Path hiveDumpDir = new Path(dumpDir, ReplUtils.REPL_HIVE_BASE_DIR); + return fs.exists(new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString())); + } + return false; } private boolean shouldDump(Path previousDumpPath) throws IOException { @@ -267,7 +283,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { return true; } else { FileSystem fs = previousDumpPath.getFileSystem(conf); - return fs.exists(new Path(previousDumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT)); + return fs.exists(new Path(previousDumpPath, LOAD_ACKNOWLEDGEMENT.toString())); } } @@ -471,8 +487,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { // Dump the table to be bootstrapped if required. if (shouldBootstrapDumpTable(table)) { HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(table); + Path dbDataRoot = new Path(dbRoot, EximUtil.DATA_PATH_NAME); managedTableCopyPaths.addAll( - dumpTable(dbName, tableName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, + dumpTable(dbName, tableName, validTxnList, + dbRoot, dbDataRoot, bootDumpBeginReplId, hiveDb, tableTuple)); } if (tableList != null && isTableSatifiesConfig(table)) { @@ -611,16 +629,20 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { List<String> tableList; LOG.info("Bootstrap Dump for db {}", work.dbNameOrPattern); + List<DirCopyWork> extTableCopyWorks = new ArrayList<>(); + List<ManagedTableCopyPath> managedTableCopyPaths = new ArrayList<>(); long timeoutInMs = HiveConf.getTimeVar(conf, HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS); long waitUntilTime = System.currentTimeMillis() + timeoutInMs; - String validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime); - List<DirCopyWork> extTableCopyWorks = new ArrayList<>(); - List<ManagedTableCopyPath> managedTableCopyPaths = new ArrayList<>(); + Path metadataPath = new Path(dumpRoot, EximUtil.METADATA_PATH_NAME); + if (shouldResumePreviousDump(dmd)) { + //clear the metadata. We need to rewrite the metadata as the write id list will be changed + //We can't reuse the previous write id as it might be invalid due to compaction + metadataPath.getFileSystem(conf).delete(metadataPath, true); + } for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) { LOG.debug("Dumping db: " + dbName); - // TODO : Currently we don't support separate table list for each database. tableList = work.replScope.includeAllTables() ? null : new ArrayList<>(); Database db = hiveDb.getDatabase(dbName); @@ -634,8 +656,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { Utils.getAllTables(hiveDb, dbName, work.replScope).size(), hiveDb.getAllFunctions().size()); replLogger.startLog(); - Path dbRoot = dumpDbMetadata(dbName, dumpRoot, bootDumpBeginReplId, hiveDb); - dumpFunctionMetadata(dbName, dumpRoot, hiveDb); + Path dbRoot = dumpDbMetadata(dbName, metadataPath, bootDumpBeginReplId, hiveDb); + Path dbDataRoot = new Path(new Path(dumpRoot, EximUtil.DATA_PATH_NAME), dbName); + dumpFunctionMetadata(dbName, dbRoot, hiveDb); String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName); Exception caught = null; @@ -653,7 +676,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { LOG.debug("Adding table {} to external tables list", tblName); extTableLocations.addAll(writer.dataLocationDump(tableTuple.object)); } - managedTableCopyPaths.addAll(dumpTable(dbName, tblName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, + managedTableCopyPaths.addAll(dumpTable(dbName, tblName, validTxnList, dbRoot, dbDataRoot, + bootDumpBeginReplId, hiveDb, tableTuple)); } catch (InvalidTableException te) { // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it. @@ -677,11 +701,11 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { throw e; } else { LOG.error("failed to reset the db state for " + uniqueKey - + " on failure of repl dump", e); + + " on failure of repl dump", e); throw caught; } } - if(caught != null) { + if (caught != null) { throw caught; } } @@ -689,21 +713,36 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { } Long bootDumpEndReplId = currentNotificationId(hiveDb); LOG.info("Preparing to return {},{}->{}", - dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); + dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot); dmd.write(); + work.setDirCopyIterator(extTableCopyWorks.iterator()); work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator()); return bootDumpBeginReplId; } + private boolean shouldResumePreviousDump(DumpMetaData dumpMetaData) { + try { + return dumpMetaData.getEventFrom() != null; + } catch (Exception e) { + LOG.info("No previous dump present"); + return false; + } + } + + private boolean shouldResumePreviousDump(Path dumpPath) { + Path hiveDumpPath = new Path(dumpPath, ReplUtils.REPL_HIVE_BASE_DIR); + return shouldResumePreviousDump(new DumpMetaData(hiveDumpPath, conf)); + } + long currentNotificationId(Hive hiveDb) throws TException { return hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); } - Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId, Hive hiveDb) throws Exception { - Path dbRoot = getBootstrapDbRoot(dumpRoot, dbName, false); + Path dumpDbMetadata(String dbName, Path metadataRoot, long lastReplId, Hive hiveDb) throws Exception { // TODO : instantiating FS objects are generally costly. Refactor + Path dbRoot = getBootstrapDbRoot(metadataRoot, dbName, false); FileSystem fs = dbRoot.getFileSystem(conf); Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME); HiveWrapper.Tuple<Database> database = new HiveWrapper(hiveDb, dbName, lastReplId).database(); @@ -711,12 +750,13 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { return dbRoot; } - List<ManagedTableCopyPath> dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, Path dumproot, - long lastReplId, Hive hiveDb, HiveWrapper.Tuple<Table> tuple) throws Exception { + List<ManagedTableCopyPath> dumpTable(String dbName, String tblName, String validTxnList, Path dbRootMetadata, + Path dbRootData, long lastReplId, Hive hiveDb, + HiveWrapper.Tuple<Table> tuple) throws Exception { LOG.info("Bootstrap Dump for table " + tblName); TableSpec tableSpec = new TableSpec(tuple.object); TableExport.Paths exportPaths = - new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf, true); + new TableExport.Paths(work.astRepresentationForErrorMsg, dbRootMetadata, dbRootData, tblName, conf, true); String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); tuple.replicationSpec.setIsReplace(true); // by default for all other objects this is false if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) { @@ -827,8 +867,26 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { } } - void dumpFunctionMetadata(String dbName, Path dumpRoot, Hive hiveDb) throws Exception { - Path functionsRoot = new Path(new Path(dumpRoot, dbName), ReplUtils.FUNCTIONS_ROOT_DIR_NAME); + private Path getPreviousDumpPath(Path dumpRoot) throws IOException { + FileSystem fs = dumpRoot.getFileSystem(conf); + if (fs.exists(dumpRoot)) { + FileStatus[] statuses = fs.listStatus(dumpRoot); + if (statuses.length > 0) { + FileStatus latestValidStatus = statuses[0]; + for (FileStatus status : statuses) { + LOG.info("Evaluating previous dump dir path:{}", status.getPath()); + if (status.getModificationTime() > latestValidStatus.getModificationTime()) { + latestValidStatus = status; + } + } + return latestValidStatus.getPath(); + } + } + return null; + } + + void dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Hive hiveDb) throws Exception { + Path functionsRoot = new Path(dbMetadataRoot, ReplUtils.FUNCTIONS_ROOT_DIR_NAME); List<String> functionNames = hiveDb.getFunctions(dbName, "*"); for (String functionName : functionNames) { HiveWrapper.Tuple<Function> tuple = functionTuple(functionName, dbName, hiveDb); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 3427b59..a593555 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; -import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -65,6 +64,7 @@ import java.util.List; import java.util.Map; import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { private final static int ZERO_TASKS = 0; @@ -316,7 +316,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) { //All repl load tasks are executed and status is 0, create the task to add the acknowledgement AckWork replLoadAckWork = new AckWork( - new Path(work.dumpDirectory, ReplUtils.LOAD_ACKNOWLEDGEMENT)); + new Path(work.dumpDirectory, LOAD_ACKNOWLEDGEMENT.toString())); Task<AckWork> loadAckWorkTask = TaskFactory.get(replLoadAckWork, conf); if (this.childTasks.isEmpty()) { this.childTasks.add(loadAckWorkTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index 474d8c2..56efa32 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.Constrain import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadEventsIterator; import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.exec.Task; @@ -91,8 +92,10 @@ public class ReplLoadWork implements Serializable { this.constraintsIterator = null; } } else { - this.bootstrapIterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, true, hiveConf); - this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); + this.bootstrapIterator = new BootstrapEventsIterator(new Path(dumpDirectory, EximUtil.METADATA_PATH_NAME) + .toString(), dbNameToLoadIn, true, hiveConf); + this.constraintsIterator = new ConstraintEventsIterator( + new Path(dumpDirectory, EximUtil.METADATA_PATH_NAME).toString(), hiveConf); incrementalLoadTasksBuilder = null; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java index 10732b0..05ef274 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java @@ -42,4 +42,6 @@ public interface TableEvent extends BootstrapEvent { * Exposing the FileSystem implementation outside which is what it should NOT do. */ Path metadataPath(); + + Path dataPath(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java index 1af6a4c..5bbe20c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java @@ -23,7 +23,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; -import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.load.log.BootstrapLoadLogger; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; @@ -82,8 +81,11 @@ public class BootstrapEventsIterator implements Iterator<BootstrapEvent> { throws IOException { Path path = new Path(dumpDirectory); FileSystem fileSystem = path.getFileSystem(hiveConf); + if (!fileSystem.exists(path)) { + throw new IllegalArgumentException("No data to load in path " + dumpDirectory); + } FileStatus[] fileStatuses = - fileSystem.listStatus(new Path(dumpDirectory), ReplUtils.getBootstrapDirectoryFilter(fileSystem)); + fileSystem.listStatus(path, ReplUtils.getBootstrapDirectoryFilter(fileSystem)); if ((fileStatuses == null) || (fileStatuses.length == 0)) { throw new IllegalArgumentException("No data to load in path " + dumpDirectory); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java index 72baee6..a311f7a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java @@ -57,7 +57,7 @@ class DatabaseEventsIterator implements Iterator<BootstrapEvent> { this.hiveConf = hiveConf; FileSystem fileSystem = dbLevelPath.getFileSystem(hiveConf); // this is only there for the use case where we are doing table only replication and not database level - if (!fileSystem.exists(new Path(dbLevelPath + Path.SEPARATOR + EximUtil.METADATA_NAME))) { + if (!fileSystem.exists(new Path(dbLevelPath, EximUtil.METADATA_NAME))) { databaseEventProcessed = true; } @@ -129,7 +129,8 @@ class DatabaseEventsIterator implements Iterator<BootstrapEvent> { continue; } if (next.getPath().toString().endsWith(EximUtil.METADATA_NAME)) { - String replacedString = next.getPath().toString().replace(dbLevelPath.toString(), ""); + String replacedString = next.getPath().toString() + .replace(dbLevelPath.toString(), ""); List<String> filteredNames = Arrays.stream(replacedString.split(Path.SEPARATOR)) .filter(StringUtils::isNotBlank) .collect(Collectors.toList()); @@ -174,7 +175,15 @@ class DatabaseEventsIterator implements Iterator<BootstrapEvent> { LOG.debug("functions directory: {}", next.toString()); return postProcessing(new FSFunctionEvent(next)); } - return postProcessing(new FSTableEvent(hiveConf, next.toString())); + return postProcessing(new FSTableEvent(hiveConf, next.toString(), + new Path(getDbLevelDataPath(), next.getName()).toString())); + } + + private Path getDbLevelDataPath() { + if (dbLevelPath.toString().contains(Path.SEPARATOR + ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME + Path.SEPARATOR)) { + return new Path(dbLevelPath, EximUtil.DATA_PATH_NAME); + } + return new Path(new Path(dbLevelPath.getParent().getParent(), EximUtil.DATA_PATH_NAME), dbLevelPath.getName()); } private BootstrapEvent postProcessing(BootstrapEvent bootstrapEvent) { @@ -187,11 +196,14 @@ class DatabaseEventsIterator implements Iterator<BootstrapEvent> { private BootstrapEvent eventForReplicationState() { if (replicationState.partitionState != null) { BootstrapEvent - bootstrapEvent = new FSPartitionEvent(hiveConf, previous.toString(), replicationState); + bootstrapEvent = new FSPartitionEvent(hiveConf, previous.toString(), + new Path(getDbLevelDataPath(), previous.getName()).toString(), + replicationState); replicationState = null; return bootstrapEvent; } else if (replicationState.lastTableReplicated != null) { - FSTableEvent event = new FSTableEvent(hiveConf, previous.toString()); + FSTableEvent event = new FSTableEvent(hiveConf, previous.toString(), + new Path(new Path(dbLevelPath, EximUtil.DATA_PATH_NAME), previous.getName()).toString()); replicationState = null; return event; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java index a79f5b7..2d82408 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java @@ -34,9 +34,9 @@ public class FSPartitionEvent implements PartitionEvent { private final ReplicationState replicationState; private final TableEvent tableEvent; - FSPartitionEvent(HiveConf hiveConf, String metadataDir, + FSPartitionEvent(HiveConf hiveConf, String metadataDir, String dataDir, ReplicationState replicationState) { - tableEvent = new FSTableEvent(hiveConf, metadataDir); + tableEvent = new FSTableEvent(hiveConf, metadataDir, dataDir); this.replicationState = replicationState; } @@ -87,4 +87,9 @@ public class FSPartitionEvent implements PartitionEvent { public Path metadataPath() { return tableEvent.metadataPath(); } + + @Override + public Path dataPath() { + return tableEvent.dataPath(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java index 6d38c03..cd3d619 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java @@ -52,16 +52,19 @@ import java.util.Map; import static org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration.getHiveUpdater; public class FSTableEvent implements TableEvent { - private final Path fromPath; + private final Path fromPathMetadata; + private final Path fromPathData; private final MetaData metadata; private final HiveConf hiveConf; - FSTableEvent(HiveConf hiveConf, String metadataDir) { + FSTableEvent(HiveConf hiveConf, String metadataDir, String dataDir) { try { URI fromURI = EximUtil.getValidatedURI(hiveConf, PlanUtils.stripQuotes(metadataDir)); - fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath()); + fromPathMetadata = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath()); + URI fromURIData = EximUtil.getValidatedURI(hiveConf, PlanUtils.stripQuotes(dataDir)); + fromPathData = new Path(fromURIData.getScheme(), fromURIData.getAuthority(), fromURIData.getPath()); FileSystem fs = FileSystem.get(fromURI, hiveConf); - metadata = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME)); + metadata = EximUtil.readMetaData(fs, new Path(fromPathMetadata, EximUtil.METADATA_NAME)); this.hiveConf = hiveConf; } catch (Exception e) { throw new RuntimeException(e); @@ -82,7 +85,12 @@ public class FSTableEvent implements TableEvent { @Override public Path metadataPath() { - return fromPath; + return fromPathMetadata; + } + + @Override + public Path dataPath() { + return fromPathData; } /** @@ -150,7 +158,7 @@ public class FSTableEvent implements TableEvent { //TODO: if partitions are loaded lazily via the iterator then we will have to avoid conversion of everything here as it defeats the purpose. for (Partition partition : metadata.getPartitions()) { // TODO: this should ideally not create AddPartitionDesc per partition - AlterTableAddPartitionDesc partsDesc = addPartitionDesc(fromPath, tblDesc, partition); + AlterTableAddPartitionDesc partsDesc = addPartitionDesc(fromPathMetadata, tblDesc, partition); descs.add(partsDesc); } return descs; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 05a590a..b98f1f3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -42,7 +42,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.HiveTableName; import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -57,7 +56,6 @@ import org.datanucleus.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -243,7 +241,7 @@ public class LoadPartitions { Task<?> copyTask = ReplCopyTask.getLoadCopyTask( event.replicationSpec(), - new Path(sourceWarehousePartitionLocation, EximUtil.DATA_PATH_NAME), + new Path(event.dataPath() + Path.SEPARATOR + getPartitionName(sourceWarehousePartitionLocation)), stagingDir, context.hiveConf, false ); @@ -272,6 +270,12 @@ public class LoadPartitions { return ptnRootTask; } + private String getPartitionName(Path partitionMetadataFullPath) { + //Get partition name by removing the metadata base path. + //Needed for getting the data path + return partitionMetadataFullPath.toString().substring(event.metadataPath().toString().length()); + } + /** * This will create the move of partition data from temp path to actual path */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 82a3031..bb20687 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -42,7 +42,6 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -223,7 +222,7 @@ public class LoadTable { if (shouldCreateLoadTableTask) { LOG.debug("adding dependent ReplTxnTask/CopyWork/MoveWork for table"); Task<?> loadTableTask = loadTableTask(table, replicationSpec, new Path(tblDesc.getLocation()), - event.metadataPath()); + event.dataPath()); parentTask.addDependentTask(loadTableTask); } tracker.addTask(tblRootTask); @@ -272,7 +271,7 @@ public class LoadTable { private Task<?> loadTableTask(Table table, ReplicationSpec replicationSpec, Path tgtPath, Path fromURI) { - Path dataPath = new Path(fromURI, EximUtil.DATA_PATH_NAME); + Path dataPath = fromURI; Path tmpPath = tgtPath; // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 211c3f0..939cbc3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -94,10 +94,6 @@ public class ReplUtils { // Configuration to enable/disable dumping ACID tables. Used only for testing and shouldn't be // seen in production or in case of tests other than the ones where it's required. public static final String REPL_DUMP_INCLUDE_ACID_TABLES = "hive.repl.dump.include.acid.tables"; - //Acknowledgement for repl dump complete - public static final String DUMP_ACKNOWLEDGEMENT = "_finished_dump"; - //Acknowledgement for repl load complete - public static final String LOAD_ACKNOWLEDGEMENT = "_finished_load"; /** * Bootstrap REPL LOAD operation type on the examined object based on ckpt state. */ @@ -240,7 +236,8 @@ public class ReplUtils { try { return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME) && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME) - && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME); + && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME) + && !p.getName().equalsIgnoreCase(EximUtil.METADATA_PATH_NAME); } catch (IOException e) { throw new RuntimeException(e); } @@ -251,7 +248,8 @@ public class ReplUtils { return p -> { try { return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME) - && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME); + && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME) + && !p.getName().equalsIgnoreCase(EximUtil.METADATA_PATH_NAME); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index bc90ea1..5ada55f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -74,6 +74,7 @@ public class EximUtil { public static final String METADATA_NAME = "_metadata"; public static final String FILES_NAME = "_files"; public static final String DATA_PATH_NAME = "data"; + public static final String METADATA_PATH_NAME = "metadata"; private static final Logger LOG = LoggerFactory.getLogger(EximUtil.class); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 8802139..c4ff070 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.ReplAck; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; @@ -54,6 +55,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_MOVE_OPTIMIZED_FILE_SCHEMES; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPLACE; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG; @@ -424,8 +426,9 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } } Path hiveDumpPath = new Path(latestUpdatedStatus.getPath(), ReplUtils.REPL_HIVE_BASE_DIR); - if (loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT)) - && !loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT))) { + if (loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, + ReplAck.DUMP_ACKNOWLEDGEMENT.toString())) + && !loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, LOAD_ACKNOWLEDGEMENT.toString()))) { return hiveDumpPath; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index c3b1081..73dc606 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -23,7 +23,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PartitionIterable; -import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.EximUtil.ManagedTableCopyPath; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; @@ -118,12 +117,13 @@ class PartitionExport { // this the data copy List<Path> dataPathList = Utils.getDataPathList(partition.getDataLocation(), forReplicationSpec, hiveConf); - Path rootDataDumpDir = paths.partitionExportDir(partitionName); + Path rootDataDumpDir = paths.partitionMetadataExportDir(partitionName); new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf, mmCtx) .export(isExportTask); + Path dataDumpDir = new Path(paths.dataExportRootDir(), partitionName); LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName); return new ManagedTableCopyPath(forReplicationSpec, partition.getDataLocation(), - new Path(rootDataDumpDir, EximUtil.DATA_PATH_NAME)); + dataDumpDir); } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 683f3c0..b11afe8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -198,22 +198,29 @@ public class TableExport { public static class Paths { private final String astRepresentationForErrorMsg; private final HiveConf conf; - //variable access should not be done and use exportRootDir() instead. - private final Path _exportRootDir; + //metadataExportRootDir and dataExportRootDir variable access should not be done and use + // metadataExportRootDir() and dataExportRootDir() instead. + private final Path metadataExportRootDir; + private final Path dataExportRootDir; private final FileSystem exportFileSystem; - private boolean writeData, exportRootDirCreated = false; + private boolean writeData, metadataExportRootDirCreated = false, dataExportRootDirCreated = false; - public Paths(String astRepresentationForErrorMsg, Path dbRoot, String tblName, HiveConf conf, + public Paths(String astRepresentationForErrorMsg, Path dbMetadataRoot, Path dbDataRoot, + String tblName, HiveConf conf, boolean shouldWriteData) throws SemanticException { this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; this.conf = conf; this.writeData = shouldWriteData; - Path tableRoot = new Path(dbRoot, tblName); - URI exportRootDir = EximUtil.getValidatedURI(conf, tableRoot.toUri().toString()); - validateTargetDir(exportRootDir); - this._exportRootDir = new Path(exportRootDir); + Path tableRootForMetadataDump = new Path(dbMetadataRoot, tblName); + Path tableRootForDataDump = new Path(dbDataRoot, tblName); + URI metadataExportRootDirUri = EximUtil.getValidatedURI(conf, tableRootForMetadataDump.toUri().toString()); + validateTargetDir(metadataExportRootDirUri); + URI dataExportRootDirUri = EximUtil.getValidatedURI(conf, tableRootForDataDump.toUri().toString()); + validateTargetDataDir(dataExportRootDirUri); + this.metadataExportRootDir = new Path(metadataExportRootDirUri); + this.dataExportRootDir = new Path(dataExportRootDirUri); try { - this.exportFileSystem = this._exportRootDir.getFileSystem(conf); + this.exportFileSystem = this.metadataExportRootDir.getFileSystem(conf); } catch (IOException e) { throw new SemanticException(e); } @@ -223,37 +230,58 @@ public class TableExport { boolean shouldWriteData) throws SemanticException { this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; this.conf = conf; - this._exportRootDir = new Path(EximUtil.getValidatedURI(conf, path)); + this.metadataExportRootDir = new Path(EximUtil.getValidatedURI(conf, path)); + this.dataExportRootDir = new Path(new Path(EximUtil.getValidatedURI(conf, path)), EximUtil.DATA_PATH_NAME); this.writeData = shouldWriteData; try { - this.exportFileSystem = _exportRootDir.getFileSystem(conf); + this.exportFileSystem = metadataExportRootDir.getFileSystem(conf); } catch (IOException e) { throw new SemanticException(e); } } - Path partitionExportDir(String partitionName) throws SemanticException { - return exportDir(new Path(exportRootDir(), partitionName)); + Path partitionMetadataExportDir(String partitionName) throws SemanticException { + return exportDir(new Path(metadataExportRootDir(), partitionName)); } /** - * Access to the {@link #_exportRootDir} should only be done via this method + * Access to the {@link #metadataExportRootDir} should only be done via this method * since the creation of the directory is delayed until we figure out if we want * to write something or not. This is specifically important to prevent empty non-native * directories being created in repl dump. */ - public Path exportRootDir() throws SemanticException { - if (!exportRootDirCreated) { + public Path metadataExportRootDir() throws SemanticException { + if (!metadataExportRootDirCreated) { try { - if (!exportFileSystem.exists(this._exportRootDir) && writeData) { - exportFileSystem.mkdirs(this._exportRootDir); + if (!exportFileSystem.exists(this.metadataExportRootDir) && writeData) { + exportFileSystem.mkdirs(this.metadataExportRootDir); } - exportRootDirCreated = true; + metadataExportRootDirCreated = true; } catch (IOException e) { throw new SemanticException(e); } } - return _exportRootDir; + return metadataExportRootDir; + } + + /** + * Access to the {@link #dataExportRootDir} should only be done via this method + * since the creation of the directory is delayed until we figure out if we want + * to write something or not. This is specifically important to prevent empty non-native + * directories being created in repl dump. + */ + public Path dataExportRootDir() throws SemanticException { + if (!dataExportRootDirCreated) { + try { + if (!exportFileSystem.exists(this.dataExportRootDir) && writeData) { + exportFileSystem.mkdirs(this.dataExportRootDir); + } + dataExportRootDirCreated = true; + } catch (IOException e) { + throw new SemanticException(e); + } + } + return dataExportRootDir; } private Path exportDir(Path exportDir) throws SemanticException { @@ -269,7 +297,7 @@ public class TableExport { } private Path metaDataExportFile() throws SemanticException { - return new Path(exportRootDir(), EximUtil.METADATA_NAME); + return new Path(metadataExportRootDir(), EximUtil.METADATA_NAME); } /** @@ -277,7 +305,7 @@ public class TableExport { * Partition's data export directory is created within the export semantics of partition. */ private Path dataExportDir() throws SemanticException { - return exportDir(new Path(exportRootDir(), EximUtil.DATA_PATH_NAME)); + return exportDir(dataExportRootDir()); } /** @@ -310,6 +338,30 @@ public class TableExport { throw new SemanticException(astRepresentationForErrorMsg, e); } } + + /** + * this level of validation might not be required as the root directory in which we dump will + * be different for each run hence possibility of it having data is not there. + */ + private void validateTargetDataDir(URI rootDirExportFile) throws SemanticException { + try { + FileSystem fs = FileSystem.get(rootDirExportFile, conf); + Path toPath = new Path(rootDirExportFile.getScheme(), rootDirExportFile.getAuthority(), + rootDirExportFile.getPath()); + try { + FileStatus tgt = fs.getFileStatus(toPath); + // target exists + if (!tgt.isDirectory()) { + throw new SemanticException( + astRepresentationForErrorMsg + ": " + "Target is not a directory : " + + rootDirExportFile); + } + } catch (FileNotFoundException ignored) { + } + } catch (IOException e) { + throw new SemanticException(astRepresentationForErrorMsg, e); + } + } } public static class AuthEntities { @@ -343,7 +395,7 @@ public class TableExport { authEntities.inputs.add(new ReadEntity(tableSpec.tableHandle)); } } - authEntities.outputs.add(toWriteEntity(paths.exportRootDir(), conf)); + authEntities.outputs.add(toWriteEntity(paths.metadataExportRootDir(), conf)); } catch (Exception e) { throw new SemanticException(e); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index 81fac25..9973e9a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -84,11 +84,11 @@ public class TestReplDumpTask { } @Override - void dumpFunctionMetadata(String dbName, Path dumpRoot, Hive hiveDb) { + void dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Hive hiveDb) { } @Override - Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId, Hive hiveDb) { + Path dumpDbMetadata(String dbName, Path metadataRoot, long lastReplId, Hive hiveDb) { return Mockito.mock(Path.class); } @@ -128,8 +128,9 @@ public class TestReplDumpTask { private int tableDumpCount = 0; @Override - List<EximUtil.ManagedTableCopyPath> dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, - Path replDataDir, long lastReplId, Hive hiveDb, + List<EximUtil.ManagedTableCopyPath> dumpTable(String dbName, String tblName, String validTxnList, + Path dbRootMetadata, Path dbRootData, + long lastReplId, Hive hiveDb, HiveWrapper.Tuple<Table> tuple) throws Exception { tableDumpCount++; @@ -146,7 +147,7 @@ public class TestReplDumpTask { ); try { - task.bootStrapDump(mock(Path.class), null, mock(Path.class), hive); + task.bootStrapDump(new Path("mock"), null, mock(Path.class), hive); } finally { verifyStatic(); Utils.resetDbBootstrapDumpState(same(hive), eq("default"), eq(dbRandomKey));