This is an automated email from the ASF dual-hosted git repository. anishek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new e033ebe HIVE-24109: Load partitions in batches for managed tables in the bootstrap phase (Aasha Medhi, reviewed by Pravin Kumar Sinha) e033ebe is described below commit e033ebe3e37357244fe35d3ff9f080742b4e1b5f Author: Anishek Agarwal <anis...@gmail.com> AuthorDate: Fri Oct 23 09:08:53 2020 +0530 HIVE-24109: Load partitions in batches for managed tables in the bootstrap phase (Aasha Medhi, reviewed by Pravin Kumar Sinha) --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 16 +- .../hive/ql/parse/TestReplicationScenarios.java | 10 +- .../parse/TestReplicationScenariosAcidTables.java | 266 +++++++++++---------- .../TestReplicationScenariosAcrossInstances.java | 64 +++-- .../partition/add/AlterTableAddPartitionDesc.java | 7 + .../apache/hadoop/hive/ql/exec/ReplCopyTask.java | 81 ++----- .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 2 + .../hadoop/hive/ql/exec/repl/ReplDumpWork.java | 17 +- .../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 3 +- .../exec/repl/bootstrap/events/PartitionEvent.java | 5 + .../events/filesystem/FSPartitionEvent.java | 12 + .../exec/repl/bootstrap/load/ReplicationState.java | 18 ++ .../repl/bootstrap/load/table/LoadPartitions.java | 228 ++++++++---------- .../exec/repl/bootstrap/load/table/LoadTable.java | 6 +- .../hive/ql/parse/ImportSemanticAnalyzer.java | 48 ++-- .../hive/ql/parse/ReplicationSemanticAnalyzer.java | 39 --- .../TestPrimaryToReplicaResourceFunction.java | 2 +- .../metastore/InjectableBehaviourObjectStore.java | 23 ++ 18 files changed, 411 insertions(+), 436 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index aab4913..edaa75b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -596,18 +596,6 @@ public class HiveConf extends Configuration { + "'hive.repl.include.external.tables' when sets to true. If 'hive.repl.include.external.tables' is \n" + "set to false, then this config parameter has no effect. It should be set to true only once for \n" + "incremental repl dump on each existing replication policy after enabling external tables replication."), - REPL_ENABLE_MOVE_OPTIMIZATION("hive.repl.enable.move.optimization", false, - "If its set to true, REPL LOAD copies data files directly to the target table/partition location \n" - + "instead of copying to staging directory first and then move to target location. This optimizes \n" - + " the REPL LOAD on object data stores such as S3 or WASB where creating a directory and move \n" - + " files are costly operations. In file system like HDFS where move operation is atomic, this \n" - + " optimization should not be enabled as it may lead to inconsistent data read for non acid tables."), - REPL_MOVE_OPTIMIZED_FILE_SCHEMES("hive.repl.move.optimized.scheme", "s3a, wasb", - "Comma separated list of schemes for which move optimization will be enabled during repl load. \n" - + "This configuration overrides the value set using REPL_ENABLE_MOVE_OPTIMIZATION for the given schemes. \n" - + " Schemes of the file system which does not support atomic move (rename) can be specified here to \n " - + " speed up the repl load operation. In file system like HDFS where move operation is atomic, this \n" - + " optimization should not be enabled as it may lead to inconsistent data read for non acid tables."), REPL_EXTERNAL_TABLE_BASE_DIR("hive.repl.replica.external.table.base.dir", null, "This is the fully qualified base directory on the target/replica warehouse under which data for " + "external tables is stored. This is relative base path and hence prefixed to the source " @@ -655,6 +643,10 @@ public class HiveConf extends Configuration { "Provide the maximum number of partitions of a table that will be batched together during \n" + "repl load. All the partitions in a batch will make a single metastore call to update the metadata. \n" + "The data for these partitions will be copied before copying the metadata batch. "), + REPL_LOAD_PARTITIONS_WITH_DATA_COPY_BATCH_SIZE("hive.repl.load.partitions.with.data.copy.batch.size",1000, + "Provide the maximum number of partitions of a table that will be batched together during \n" + + "repl load. All the partitions in a batch will make a single metastore call to update the metadata. \n" + + "The data for these partitions will be copied before copying the metadata batch. "), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index bef0a95..27e97b9 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -475,7 +475,6 @@ public class TestReplicationScenarios { private Task getReplLoadRootTask(String sourceDb, String replicadb, boolean isIncrementalDump, Tuple tuple) throws Throwable { HiveConf confTemp = new HiveConf(); - confTemp.set("hive.repl.enable.move.optimization", "true"); Path loadPath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); ReplicationMetricCollector metricCollector; if (isIncrementalDump) { @@ -4022,8 +4021,7 @@ public class TestReplicationScenarios { String replDbName = dbName + "_replica"; Tuple dump = replDumpDb(dbName); - run("REPL LOAD " + dbName + " INTO " + replDbName + - " with ('hive.repl.enable.move.optimization'='true')", driverMirror); + run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror); verifyRun("REPL STATUS " + replDbName, dump.lastReplId, driverMirror); run(" use " + replDbName, driverMirror); @@ -4056,8 +4054,7 @@ public class TestReplicationScenarios { verifySetup("SELECT * from " + dbName + ".unptned_late ORDER BY a", unptn_data, driver); Tuple incrementalDump = replDumpDb(dbName); - run("REPL LOAD " + dbName + " INTO " + replDbName + - " with ('hive.repl.enable.move.optimization'='true')", driverMirror); + run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror); verifyRun("REPL STATUS " + replDbName, incrementalDump.lastReplId, driverMirror); verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror); @@ -4073,8 +4070,7 @@ public class TestReplicationScenarios { verifySetup("SELECT a from " + dbName + ".unptned", data_after_ovwrite, driver); incrementalDump = replDumpDb(dbName); - run("REPL LOAD " + dbName + " INTO " + replDbName + - " with ('hive.repl.enable.move.optimization'='true')", driverMirror); + run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror); verifyRun("REPL STATUS " + replDbName, incrementalDump.lastReplId, driverMirror); verifyRun("SELECT a from " + replDbName + ".unptned_late ORDER BY a", unptn_data_after_ins, driverMirror); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 575eeab..3d22770 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -61,6 +61,7 @@ import java.util.List; import java.util.Collections; import java.util.Map; + import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT; import static org.junit.Assert.assertEquals; @@ -161,25 +162,6 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios } @Test - public void testAcidTablesMoveOptimizationBootStrap() throws Throwable { - WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null); - replica.load(replicatedDbName, primaryDbName, - Collections.singletonList("'hive.repl.enable.move.optimization'='true'")); - verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, true); - } - - @Test - public void testAcidTablesMoveOptimizationIncremental() throws Throwable { - WarehouseInstance.Tuple bootstrapDump = primary.dump(primaryDbName); - replica.load(replicatedDbName, primaryDbName, - Collections.singletonList("'hive.repl.enable.move.optimization'='true'")); - WarehouseInstance.Tuple incrDump = prepareDataAndDump(primaryDbName, null); - replica.load(replicatedDbName, primaryDbName, - Collections.singletonList("'hive.repl.enable.move.optimization'='true'")); - verifyLoadExecution(replicatedDbName, incrDump.lastReplicationId, true); - } - - @Test /** * Testcase for getting immutable dataset dump, and its corresponding repl load. */ @@ -2301,131 +2283,155 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios public void testORCTableDistcpCopyWithCopyOnTarget() throws Throwable { //Distcp copy List<String> withClause = Arrays.asList( - "'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='true'", - "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", - "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'", - "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", - "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" - + UserGroupInformation.getCurrentUser().getUserName() + "'"); + "'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='true'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", + "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + + UserGroupInformation.getCurrentUser().getUserName() + "'"); WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) - .run("CREATE TABLE t1(a int) stored as orc TBLPROPERTIES ('transactional'='true')") - .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" + - " stored as orc TBLPROPERTIES ('transactional'='true')") - .run("CREATE TABLE t3(a string) clustered by (a) into 2 buckets" + - " stored as orc TBLPROPERTIES ('transactional'='true')") - .run("CREATE TABLE tpart1(a int) partitioned by (name string)" + - " stored as orc TBLPROPERTIES ('transactional'='true')") - .run("CREATE TABLE tpart2(a int) partitioned by (name string) clustered by (a) into 2 buckets" + - " stored as orc TBLPROPERTIES ('transactional'='true')") - .run("CREATE TABLE text1(a string) STORED AS TEXTFILE") - .run("insert into t1 values (1)") - .run("insert into t1 values (11)") - .run("insert into t2 values (2)") - .run("insert into t2 values (22)") - .run("insert into t3 values (33)") - .run("insert into tpart1 partition(name='Tom') values(100)") - .run("insert into tpart1 partition(name='Jerry') values(101)") - .run("insert into tpart2 partition(name='Bob') values(200)") - .run("insert into tpart2 partition(name='Carl') values(201)") - .run("insert into text1 values ('ricky')") - .dump(primaryDbName, withClause); + .run("CREATE TABLE t1(a int) stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE t3(a string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE tpart1(a int) partitioned by (name string)" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE tpart2(a int) partitioned by (name string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE text1(a string) STORED AS TEXTFILE") + .run("insert into t1 values (1)") + .run("insert into t1 values (11)") + .run("insert into t2 values (2)") + .run("insert into t2 values (22)") + .run("insert into t3 values (33)") + .run("insert into tpart1 partition(name='Tom') values(100)") + .run("insert into tpart1 partition(name='Jerry') values(101)") + .run("insert into tpart2 partition(name='Bob') values(200)") + .run("insert into tpart2 partition(name='Carl') values(201)") + .run("insert into text1 values ('ricky')") + .dump(primaryDbName, withClause); replica.run("DROP TABLE t3"); replica.load(replicatedDbName, primaryDbName, withClause) - .run("use " + replicatedDbName) - .run("show tables") - .verifyResults(new String[]{"t1", "t2", "t3", "tpart1", "tpart2", "text1"}) - .run("select * from " + replicatedDbName + ".t1") - .verifyResults(new String[] {"1", "11"}) - .run("select * from " + replicatedDbName + ".t2") - .verifyResults(new String[]{"2", "22"}) - .run("select a from " + replicatedDbName + ".tpart1") - .verifyResults(new String[]{"100", "101"}) - .run("show partitions " + replicatedDbName + ".tpart1") - .verifyResults(new String[]{"name=Tom", "name=Jerry"}) - .run("select a from " + replicatedDbName + ".tpart2") - .verifyResults(new String[]{"200", "201"}) - .run("show partitions " + replicatedDbName + ".tpart2") - .verifyResults(new String[]{"name=Bob", "name=Carl"}) - .run("select a from " + replicatedDbName + ".text1") - .verifyResults(new String[]{"ricky"}); + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[]{"t1", "t2", "t3", "tpart1", "tpart2", "text1"}) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[]{"1", "11"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"2", "22"}) + .run("select a from " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"100", "101"}) + .run("show partitions " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"name=Tom", "name=Jerry"}) + .run("select a from " + replicatedDbName + ".tpart2") + .verifyResults(new String[]{"200", "201"}) + .run("show partitions " + replicatedDbName + ".tpart2") + .verifyResults(new String[]{"name=Bob", "name=Carl"}) + .run("select a from " + replicatedDbName + ".text1") + .verifyResults(new String[]{"ricky"}); WarehouseInstance.Tuple incrementalDump = primary.run("use " + primaryDbName) - .run("CREATE TABLE t4(a int) clustered by (a) into 2 buckets" + - " stored as orc TBLPROPERTIES ('transactional'='true')") - .run("CREATE TABLE tpart3(a int) partitioned by (name string)" + - " stored as orc TBLPROPERTIES ('transactional'='true')") - .run("CREATE TABLE tpart4(a int) partitioned by (name string) clustered by (a) into 2 buckets" + - " stored as orc TBLPROPERTIES ('transactional'='true')") - .run("insert into t1 values (111)") - .run("insert into t2 values (222)") - .run("insert into t4 values (4)") - .run("insert into tpart1 partition(name='Tom') values(102)") - .run("insert into tpart1 partition(name='Jerry') values(103)") - .run("insert into tpart2 partition(name='Bob') values(202)") - .run("insert into tpart2 partition(name='Carl') values(203)") - .run("insert into tpart3 partition(name='Tom3') values(300)") - .run("insert into tpart3 partition(name='Jerry3') values(301)") - .run("insert into tpart4 partition(name='Bob4') values(400)") - .run("insert into tpart4 partition(name='Carl4') values(401)") - .run("insert into text1 values ('martin')") - .dump(primaryDbName, withClause); + .run("CREATE TABLE t4(a int) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE tpart3(a int) partitioned by (name string)" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE tpart4(a int) partitioned by (name string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("insert into t1 values (111)") + .run("insert into t2 values (222)") + .run("insert into t4 values (4)") + .run("insert into tpart1 partition(name='Tom') values(102)") + .run("insert into tpart1 partition(name='Jerry') values(103)") + .run("insert into tpart2 partition(name='Bob') values(202)") + .run("insert into tpart2 partition(name='Carl') values(203)") + .run("insert into tpart3 partition(name='Tom3') values(300)") + .run("insert into tpart3 partition(name='Jerry3') values(301)") + .run("insert into tpart4 partition(name='Bob4') values(400)") + .run("insert into tpart4 partition(name='Carl4') values(401)") + .run("insert into text1 values ('martin')") + .dump(primaryDbName, withClause); replica.load(replicatedDbName, primaryDbName, withClause) - .run("use " + replicatedDbName) - .run("show tables ") - .verifyResults(new String[]{"t1", "t2", "t4", "tpart1", "tpart2", "tpart3", "tpart4", "text1"}) - .run("select * from " + replicatedDbName + ".t1") - .verifyResults(new String[] {"1", "11", "111"}) - .run("select * from " + replicatedDbName + ".t2") - .verifyResults(new String[]{"2", "22", "222"}) - .run("select * from " + replicatedDbName + ".t4") - .verifyResults(new String[]{"4"}) - .run("select a from " + replicatedDbName + ".tpart1") - .verifyResults(new String[]{"100", "101", "102", "103"}) - .run("show partitions " + replicatedDbName + ".tpart1") - .verifyResults(new String[]{"name=Tom", "name=Jerry"}) - .run("select a from " + replicatedDbName + ".tpart2") - .verifyResults(new String[]{"200", "201", "202", "203"}) - .run("show partitions " + replicatedDbName + ".tpart2") - .verifyResults(new String[]{"name=Bob", "name=Carl"}) - .run("select a from " + replicatedDbName + ".tpart3") - .verifyResults(new String[]{"300", "301"}) - .run("show partitions " + replicatedDbName + ".tpart3") - .verifyResults(new String[]{"name=Tom3", "name=Jerry3"}) - .run("select a from " + replicatedDbName + ".tpart4") - .verifyResults(new String[]{"400", "401"}) - .run("show partitions " + replicatedDbName + ".tpart4") - .verifyResults(new String[]{"name=Bob4", "name=Carl4"}) - .run("select a from " + replicatedDbName + ".text1") - .verifyResults(new String[]{"ricky", "martin"}); + .run("use " + replicatedDbName) + .run("show tables ") + .verifyResults(new String[]{"t1", "t2", "t4", "tpart1", "tpart2", "tpart3", "tpart4", "text1"}) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[]{"1", "11", "111"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"2", "22", "222"}) + .run("select * from " + replicatedDbName + ".t4") + .verifyResults(new String[]{"4"}) + .run("select a from " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"100", "101", "102", "103"}) + .run("show partitions " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"name=Tom", "name=Jerry"}) + .run("select a from " + replicatedDbName + ".tpart2") + .verifyResults(new String[]{"200", "201", "202", "203"}) + .run("show partitions " + replicatedDbName + ".tpart2") + .verifyResults(new String[]{"name=Bob", "name=Carl"}) + .run("select a from " + replicatedDbName + ".tpart3") + .verifyResults(new String[]{"300", "301"}) + .run("show partitions " + replicatedDbName + ".tpart3") + .verifyResults(new String[]{"name=Tom3", "name=Jerry3"}) + .run("select a from " + replicatedDbName + ".tpart4") + .verifyResults(new String[]{"400", "401"}) + .run("show partitions " + replicatedDbName + ".tpart4") + .verifyResults(new String[]{"name=Bob4", "name=Carl4"}) + .run("select a from " + replicatedDbName + ".text1") + .verifyResults(new String[]{"ricky", "martin"}); incrementalDump = primary.run("use " + primaryDbName) - .run("insert into t4 values (44)") - .run("insert into t1 values (1111)") - .run("DROP TABLE t1") - .run("insert into t2 values (2222)") - .run("insert into tpart1 partition(name='Tom') values(104)") - .run("insert into tpart1 partition(name='Tom_del') values(1000)") - .run("insert into tpart1 partition(name='Harry') values(10001)") - .run("insert into tpart1 partition(name='Jerry') values(105)") - .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom')") - .run("DROP TABLE tpart2") - .dump(primaryDbName, withClause); + .run("insert into t4 values (44)") + .run("insert into t1 values (1111)") + .run("DROP TABLE t1") + .run("insert into t2 values (2222)") + .run("insert into tpart1 partition(name='Tom') values(104)") + .run("insert into tpart1 partition(name='Tom_del') values(1000)") + .run("insert into tpart1 partition(name='Harry') values(10001)") + .run("insert into tpart1 partition(name='Jerry') values(105)") + .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom')") + .run("DROP TABLE tpart2") + .dump(primaryDbName, withClause); replica.run("DROP TABLE t4") - .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom_del')"); + .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom_del')"); replica.load(replicatedDbName, primaryDbName, withClause) - .run("use " + replicatedDbName) - .run("show tables ") - .verifyResults(new String[]{"t2", "t4", "tpart1", "tpart3", "tpart4", "text1"}) - .run("select * from " + replicatedDbName + ".t2") - .verifyResults(new String[]{"2", "22", "222", "2222"}) - .run("select a from " + replicatedDbName + ".tpart1") - .verifyResults(new String[]{"101", "103", "105", "1000", "10001"}) - .run("show partitions " + replicatedDbName + ".tpart1") - .verifyResults(new String[]{"name=Harry", "name=Jerry", "name=Tom_del"}); + .run("use " + replicatedDbName) + .run("show tables ") + .verifyResults(new String[]{"t2", "t4", "tpart1", "tpart3", "tpart4", "text1"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"2", "22", "222", "2222"}) + .run("select a from " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"101", "103", "105", "1000", "10001"}) + .run("show partitions " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"name=Harry", "name=Jerry", "name=Tom_del"}); + } + + @Test + public void testTableWithPartitionsInBatch() throws Throwable { + + List<String> withClause = new ArrayList<>(); + withClause.add("'" + HiveConf.ConfVars.REPL_LOAD_PARTITIONS_WITH_DATA_COPY_BATCH_SIZE.varname + "'='" + 1 + "'"); + + primary.run("use " + primaryDbName) + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into t2 partition(country='india') values ('bangalore')") + .run("insert into t2 partition(country='france') values ('paris')") + .run("insert into t2 partition(country='australia') values ('sydney')") + .dump(primaryDbName, withClause); + + replica.load(replicatedDbName, primaryDbName, withClause) + .run("use " + replicatedDbName) + .run("show tables like 't2'") + .verifyResults(new String[] { "t2" }) + .run("select distinct(country) from t2") + .verifyResults(new String[] { "india", "france", "australia" }) + .run("select place from t2") + .verifyResults(new String[] { "bangalore", "paris", "sydney" }) + .verifyReplTargetProperty(replicatedDbName); } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index dbdee9d..e01c6c4 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -453,6 +453,34 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro } @Test + public void testMultipleStagesOfReplicationLoadTaskWithPartitionBatching() throws Throwable { + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (id int)") + .run("insert into t1 values (1), (2)") + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='us') values ('austin')") + .run("insert into table t2 partition(country='france') values ('paris')") + .run("create table t3 (rank int)") + .dump(primaryDbName); + + // each table creation itself takes more than one task, give we are giving a max of 1, we should hit multiple runs. + List<String> withClause = new ArrayList<>(); + withClause.add("'" + HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS.varname + "'='1'"); + withClause.add("'" + HiveConf.ConfVars.REPL_LOAD_PARTITIONS_WITH_DATA_COPY_BATCH_SIZE.varname + "'='1'"); + + replica.load(replicatedDbName, primaryDbName, withClause) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] { "t1", "t2", "t3" }) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("select country from t2 order by country") + .verifyResults(new String[] { "france", "india", "us" }); + } + + @Test public void testParallelExecutionOfReplicationBootStrapLoad() throws Throwable { WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) @@ -1337,12 +1365,12 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro // Inject a behavior where REPL LOAD failed when try to load table "t2" and partition "uk". // So, table "t2" will exist and partition "india" will exist, rest failed as operation failed. - BehaviourInjection<List<Partition>, Boolean> alterPartitionStub + BehaviourInjection<List<Partition>, Boolean> addPartitionStub = new BehaviourInjection<List<Partition>, Boolean>() { @Override public Boolean apply(List<Partition> ptns) { for (Partition ptn : ptns) { - if (ptn.getValues().get(0).equals("india")) { + if (ptn.getValues().get(0).equals("uk")) { injectionPathCalled = true; LOG.warn("####getPartition Stub called"); return false; @@ -1351,14 +1379,15 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro return true; } }; - InjectableBehaviourObjectStore.setAlterPartitionsBehaviour(alterPartitionStub); + InjectableBehaviourObjectStore.setAddPartitionsBehaviour(addPartitionStub); // Make sure that there's some order in which the objects are loaded. List<String> withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'", - "'hive.in.repl.test.files.sorted'='true'"); + "'hive.in.repl.test.files.sorted'='true'", + "'" + HiveConf.ConfVars.REPL_LOAD_PARTITIONS_WITH_DATA_COPY_BATCH_SIZE + "' = '1'"); replica.loadFailure(replicatedDbName, primaryDbName, withConfigs); - InjectableBehaviourObjectStore.setAlterPartitionsBehaviour(null); // reset the behaviour - alterPartitionStub.assertInjectionsPerformed(true, false); + InjectableBehaviourObjectStore.resetAddPartitionModifier(); // reset the behaviour + addPartitionStub.assertInjectionsPerformed(true, false); replica.run("use " + replicatedDbName) .run("repl status " + replicatedDbName) @@ -1420,21 +1449,19 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro @Test public void testMoveOptimizationIncrementalFailureAfterCopyReplace() throws Throwable { - List<String> withConfigs = - Collections.singletonList("'hive.repl.enable.move.optimization'='true'"); String replicatedDbName_CM = replicatedDbName + "_CM"; WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .run("create table t2 (place string) partitioned by (country string)") .run("insert into table t2 partition(country='india') values ('bangalore')") .run("create table t1 (place string) partitioned by (country string)") .dump(primaryDbName); - replica.load(replicatedDbName, primaryDbName, withConfigs); + replica.load(replicatedDbName, primaryDbName); //delete load ack to reuse the dump new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(tuple.dumpLocation + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR + LOAD_ACKNOWLEDGEMENT.toString()), true); - replica.load(replicatedDbName_CM, primaryDbName, withConfigs); + replica.load(replicatedDbName_CM, primaryDbName); replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')") .run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')"); @@ -1448,18 +1475,16 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro @Test public void testMoveOptimizationIncrementalFailureAfterCopy() throws Throwable { - List<String> withConfigs = - Collections.singletonList("'hive.repl.enable.move.optimization'='true'"); String replicatedDbName_CM = replicatedDbName + "_CM"; WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) .run("create table t2 (place string) partitioned by (country string)") .run("ALTER TABLE t2 ADD PARTITION (country='india')") .dump(primaryDbName); - replica.load(replicatedDbName, primaryDbName, withConfigs); + replica.load(replicatedDbName, primaryDbName); //delete load ack to reuse the dump new Path(bootstrapDump.dumpLocation).getFileSystem(conf).delete(new Path(bootstrapDump.dumpLocation + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR + LOAD_ACKNOWLEDGEMENT.toString()), true); - replica.load(replicatedDbName_CM, primaryDbName, withConfigs); + replica.load(replicatedDbName_CM, primaryDbName); replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')") .run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')"); @@ -1472,9 +1497,6 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro private void testMoveOptimization(String primaryDb, String replicaDb, String replicatedDbName_CM, String tbl, String eventType, WarehouseInstance.Tuple tuple) throws Throwable { - List<String> withConfigs = - Collections.singletonList("'hive.repl.enable.move.optimization'='true'"); - // fail add notification for given event type. BehaviourInjection<NotificationEvent, Boolean> callerVerifier = new BehaviourInjection<NotificationEvent, Boolean>() { @@ -1493,13 +1515,13 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier); try { - replica.loadFailure(replicaDb, primaryDbName, withConfigs); + replica.loadFailure(replicaDb, primaryDbName); } finally { InjectableBehaviourObjectStore.resetAddNotificationModifier(); } callerVerifier.assertInjectionsPerformed(true, false); - replica.load(replicaDb, primaryDbName, withConfigs); + replica.load(replicaDb, primaryDbName); replica.run("use " + replicaDb) .run("select country from " + tbl + " where country == 'india'") @@ -1516,13 +1538,13 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier); try { - replica.loadFailure(replicatedDbName_CM, primaryDbName, withConfigs); + replica.loadFailure(replicatedDbName_CM, primaryDbName); } finally { InjectableBehaviourObjectStore.resetAddNotificationModifier(); } callerVerifier.assertInjectionsPerformed(true, false); - replica.load(replicatedDbName_CM, primaryDbName, withConfigs); + replica.load(replicatedDbName_CM, primaryDbName); replica.run("use " + replicatedDbName_CM) .run("select country from " + tbl + " where country == 'india'") diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java index c4b2dab..d61c575 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.ddl.table.partition.add; import java.io.Serializable; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -105,6 +106,12 @@ public class AlterTableAddPartitionDesc implements DDLDescWithWriteId, Serializa return params; } + public void addPartParams(Map<String, String> partParams) { + if (params != null) { + params.putAll(partParams); + } + } + @Explain(displayName = "params", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public String getPartParamsForExplain() { return params.toString(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 3b07b73..57ce1aa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.exec; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; -import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; @@ -33,10 +32,8 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.Serializable; -import java.io.FileNotFoundException; import java.util.ArrayList; import java.util.List; -import java.util.ListIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +47,6 @@ import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.util.StringUtils; import static org.apache.hadoop.hive.common.FileUtils.HIDDEN_FILES_PATH_FILTER; -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { @@ -142,7 +138,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { LOG.debug("ReplCopyTask numFiles: {}", srcFiles.size()); - // in case of move optimization, file is directly copied to destination. So we need to clear the old content, if + // in case of acid tables, file is directly copied to destination. So we need to clear the old content, if // its a replace (insert overwrite ) operation. if (work.getDeleteDestIfExist() && dstFs.exists(toPath)) { LOG.debug(" path " + toPath + " is cleaned before renaming"); @@ -226,52 +222,17 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf, boolean isAutoPurge, boolean needRecycle, - boolean readSourceAsFileList) { - return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, isAutoPurge, needRecycle, - readSourceAsFileList, false); - } - - public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, - HiveConf conf, boolean isAutoPurge, boolean needRecycle, boolean readSourceAsFileList, String dumpDirectory, ReplicationMetricCollector metricCollector) { return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, isAutoPurge, needRecycle, - readSourceAsFileList, false, dumpDirectory, metricCollector); + readSourceAsFileList, false, true, dumpDirectory, metricCollector); } - private static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, - HiveConf conf, boolean isAutoPurge, boolean needRecycle, - boolean readSourceAsFileList, - boolean overWrite) { - Task<?> copyTask = null; - LOG.debug("ReplCopyTask:getLoadCopyTask: {}=>{}", srcPath, dstPath); - if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ - ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false, overWrite); - rcwork.setReadSrcAsFilesList(readSourceAsFileList); - if (replicationSpec.isReplace() && (conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION))) { - rcwork.setDeleteDestIfExist(true); - rcwork.setAutoPurge(isAutoPurge); - rcwork.setNeedRecycle(needRecycle); - } - // For replace case, duplicate check should not be done. The new base directory will automatically make the older - // data invisible. Doing duplicate check and ignoring copy will cause consistency issue if there are multiple - // replace events getting replayed in the first incremental load. - rcwork.setCheckDuplicateCopy(replicationSpec.needDupCopyCheck() && !replicationSpec.isReplace()); - LOG.debug("ReplCopyTask:\trcwork"); - String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); - rcwork.setDistCpDoAsUser(distCpDoAsUser); - copyTask = TaskFactory.get(rcwork, conf); - } else { - LOG.debug("ReplCopyTask:\tcwork"); - copyTask = TaskFactory.get(new CopyWork(srcPath, dstPath, false), conf); - } - return copyTask; - } private static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf, boolean isAutoPurge, boolean needRecycle, boolean readSourceAsFileList, - boolean overWrite, + boolean overWrite, boolean deleteDestination, String dumpDirectory, ReplicationMetricCollector metricCollector) { Task<?> copyTask = null; @@ -280,7 +241,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false, overWrite, dumpDirectory, metricCollector); rcwork.setReadSrcAsFilesList(readSourceAsFileList); - if (replicationSpec.isReplace() && (conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION))) { + if (replicationSpec.isReplace() && deleteDestination) { rcwork.setDeleteDestIfExist(true); rcwork.setAutoPurge(isAutoPurge); rcwork.setNeedRecycle(needRecycle); @@ -300,35 +261,41 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { return copyTask; } - - public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, - HiveConf conf) { - return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false, - true, false); - } - public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf, String dumpDirectory, ReplicationMetricCollector metricCollector) { return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false, - true, false, dumpDirectory, metricCollector); + true, false, true, dumpDirectory, metricCollector); } - /* * Invoked in the bootstrap path. * Overwrite set to true */ public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, - HiveConf conf, boolean readSourceAsFileList, boolean overWrite) { + HiveConf conf, boolean readSourceAsFileList, boolean overWrite, + String dumpDirectory, ReplicationMetricCollector metricCollector) { return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false, - readSourceAsFileList, overWrite); + readSourceAsFileList, overWrite, true, dumpDirectory, metricCollector); } - public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, + /* + * Invoked in the bootstrap dump path. bootstrap dump purge is false. + * No purge for dump dir in case of check pointing + */ + public static Task<?> getDumpCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf, boolean readSourceAsFileList, boolean overWrite, - String dumpDirectory, ReplicationMetricCollector metricCollector) { + boolean deleteDestination, String dumpDirectory, + ReplicationMetricCollector metricCollector) { + return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false, + readSourceAsFileList, overWrite, deleteDestination, dumpDirectory, metricCollector); + } + + + public static Task<?> getDumpCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, + HiveConf conf, String dumpDirectory, + ReplicationMetricCollector metricCollector) { return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false, - readSourceAsFileList, overWrite, dumpDirectory, metricCollector); + true, false, true, dumpDirectory, metricCollector); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index ea9bf9a..1fce791 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -163,6 +163,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { } Path previousValidHiveDumpPath = getPreviousValidDumpMetadataPath(dumpRoot); boolean isBootstrap = (previousValidHiveDumpPath == null); + work.setBootstrap(isBootstrap); //If no previous dump is present or previous dump is already loaded, proceed with the dump operation. if (shouldDump(previousValidHiveDumpPath)) { Path currentDumpPath = getCurrentDumpPath(dumpRoot, isBootstrap); @@ -524,6 +525,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { // waiting for the concurrent transactions to finish, we start dumping the incremental events // and wait only for the remaining time if any. if (needBootstrapAcidTablesDuringIncrementalDump()) { + work.setBootstrap(true); bootDumpBeginReplId = queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L); assert (bootDumpBeginReplId >= 0); LOG.info("Dump for bootstrapping ACID tables during an incremental dump for db {}", diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index 4da5bac..aecfe75 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -51,6 +51,7 @@ public class ReplDumpWork implements Serializable { final String dbNameOrPattern, astRepresentationForErrorMsg, resultTempPath; Long eventTo; Long eventFrom; + private boolean isBootstrap; private static String testInjectDumpDir = null; private static boolean testInjectDumpDirAutoIncrement = false; static boolean testDeletePreviousDumpMetaPath = false; @@ -134,6 +135,10 @@ public class ReplDumpWork implements Serializable { } } + void setBootstrap(boolean bootstrap) { + isBootstrap = bootstrap; + } + public void setExternalTblCopyPathIterator(Iterator<String> externalTblCopyPathIterator) { if (this.externalTblCopyPathIterator != null) { throw new IllegalStateException("External table copy path iterator has already been initialized"); @@ -204,10 +209,12 @@ public class ReplDumpWork implements Serializable { replSpec.setInReplicationScope(true); EximUtil.DataCopyPath managedTableCopyPath = new EximUtil.DataCopyPath(replSpec); managedTableCopyPath.loadFromString(managedTblCopyPathIterator.next()); - Task<?> copyTask = ReplCopyTask.getLoadCopyTask( + //If its incremental, in checkpointing case, dump dir may exist. We will delete the event dir. + //In case of bootstrap checkpointing we will not delete the entire dir and just do a sync + Task<?> copyTask = ReplCopyTask.getDumpCopyTask( managedTableCopyPath.getReplicationSpec(), managedTableCopyPath.getSrcPath(), - managedTableCopyPath.getTargetPath(), conf, false, shouldOverwrite, - getCurrentDumpPath().toString(), getMetricCollector()); + managedTableCopyPath.getTargetPath(), conf, false, shouldOverwrite, !isBootstrap, + getCurrentDumpPath().toString(), getMetricCollector()); tasks.add(copyTask); tracker.addTask(copyTask); LOG.debug("added task for {}", managedTableCopyPath); @@ -220,9 +227,9 @@ public class ReplDumpWork implements Serializable { if (functionCopyPathIterator != null) { while (functionCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) { EximUtil.DataCopyPath binaryCopyPath = functionCopyPathIterator.next(); - Task<?> copyTask = ReplCopyTask.getLoadCopyTask( + Task<?> copyTask = ReplCopyTask.getDumpCopyTask( binaryCopyPath.getReplicationSpec(), binaryCopyPath.getSrcPath(), binaryCopyPath.getTargetPath(), conf, - getCurrentDumpPath().toString(), getMetricCollector() + getCurrentDumpPath().toString(), getMetricCollector() ); tasks.add(copyTask); tracker.addTask(copyTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 16c906c..e7245bd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -371,7 +371,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn); LoadPartitions loadPartitions = new LoadPartitions(loadContext, iterator.replLogger(), tableContext, loadTaskTracker, - event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated(), work.getMetricCollector()); + event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated(), work.getMetricCollector(), + event.lastPartSpecReplicated(), event.lastStageReplicated()); /* the tableTracker here should be a new instance and not an existing one as this can only happen when we break in between loading partitions. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java index b9d6679..f5ab30c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java @@ -18,9 +18,14 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events; import org.apache.hadoop.hive.ql.ddl.table.partition.add.AlterTableAddPartitionDesc; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; public interface PartitionEvent extends TableEvent { AlterTableAddPartitionDesc lastPartitionReplicated(); + ReplicationState.PartitionState.Stage lastStageReplicated(); + + AlterTableAddPartitionDesc.PartitionDesc lastPartSpecReplicated(); + TableEvent asTableEvent(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java index 2d82408..c7705e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java @@ -52,6 +52,18 @@ public class FSPartitionEvent implements PartitionEvent { } @Override + public ReplicationState.PartitionState.Stage lastStageReplicated() { + assert replicationState != null && replicationState.partitionState != null; + return replicationState.partitionState.stage; + } + + @Override + public AlterTableAddPartitionDesc.PartitionDesc lastPartSpecReplicated() { + assert replicationState != null && replicationState.partitionState != null; + return replicationState.partitionState.partSpec; + } + + @Override public TableEvent asTableEvent() { return tableEvent; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/ReplicationState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/ReplicationState.java index a67184d..d8c995c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/ReplicationState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/ReplicationState.java @@ -20,16 +20,34 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load; import java.io.Serializable; import org.apache.hadoop.hive.ql.ddl.table.partition.add.AlterTableAddPartitionDesc; +import org.apache.hadoop.hive.ql.exec.CopyTask; +import org.apache.hadoop.hive.ql.exec.MoveTask; public class ReplicationState implements Serializable { public static class PartitionState { final String tableName; public final AlterTableAddPartitionDesc lastReplicatedPartition; + public AlterTableAddPartitionDesc.PartitionDesc partSpec; + public Stage stage; + + public enum Stage { + COPY, + PARTITION + } public PartitionState(String tableName, AlterTableAddPartitionDesc lastReplicatedPartition) { this.tableName = tableName; this.lastReplicatedPartition = lastReplicatedPartition; + this.stage = Stage.PARTITION; + } + + public PartitionState(String tableName, AlterTableAddPartitionDesc lastReplicatedPartition, + AlterTableAddPartitionDesc.PartitionDesc lastProcessedPartSpec, Stage stage) { + this.tableName = tableName; + this.lastReplicatedPartition = lastReplicatedPartition; + this.partSpec = lastProcessedPartSpec; + this.stage = stage; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index e0c9b96..bcbf20c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hive.ql.ddl.table.partition.drop.AlterTableDropPartitio import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.ReplLoadOpType; @@ -38,8 +37,6 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils; -import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; @@ -50,10 +47,6 @@ import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; -import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; -import org.apache.hadoop.hive.ql.plan.LoadTableDesc; -import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType; -import org.apache.hadoop.hive.ql.plan.MoveWork; import org.datanucleus.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +61,6 @@ import java.util.Map; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY; -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState.PartitionState; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.REPL_CHECKPOINT_KEY; import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned; @@ -83,6 +75,8 @@ public class LoadPartitions { private final TableEvent event; private final TaskTracker tracker; private final AlterTableAddPartitionDesc lastReplicatedPartition; + private final AlterTableAddPartitionDesc.PartitionDesc lastReplicatedPartitionDesc; + private final PartitionState.Stage lastReplicatedStage; private final ReplicationMetricCollector metricCollector; private final ImportTableDesc tableDesc; @@ -92,23 +86,26 @@ public class LoadPartitions { TableEvent event, String dbNameToLoadIn, TableContext tableContext, ReplicationMetricCollector metricCollector) throws HiveException { this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null, - metricCollector); + metricCollector, null, PartitionState.Stage.PARTITION); } public LoadPartitions(Context context, ReplLogger replLogger, TableContext tableContext, TaskTracker limiter, TableEvent event, String dbNameToLoadIn, AlterTableAddPartitionDesc lastReplicatedPartition, - ReplicationMetricCollector metricCollector) throws HiveException { + ReplicationMetricCollector metricCollector, + AlterTableAddPartitionDesc.PartitionDesc lastReplicatedPartitionDesc, + ReplicationState.PartitionState.Stage lastReplicatedStage) throws HiveException { this.tracker = new TaskTracker(limiter); this.event = event; this.context = context; this.replLogger = replLogger; this.lastReplicatedPartition = lastReplicatedPartition; this.tableContext = tableContext; - this.tableDesc = event.tableDesc(dbNameToLoadIn); this.table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb); this.metricCollector = metricCollector; + this.lastReplicatedPartitionDesc = lastReplicatedPartitionDesc; + this.lastReplicatedStage = lastReplicatedStage; } public TaskTracker tasks() throws Exception { @@ -176,11 +173,29 @@ public class LoadPartitions { * Also, copy relevant stats and other information from original request. * * @throws SemanticException + * @param lastAlterTableAddPartitionDesc */ - private void addConsolidatedPartitionDesc() throws Exception { + private void addConsolidatedPartitionDesc(AlterTableAddPartitionDesc lastAlterTableAddPartitionDesc) throws Exception { + int maxTasks = 0; //Load partitions equal to batch size at one go for metadata only and for external tables. - int maxTasks = context.hiveConf.getIntVar(HiveConf.ConfVars.REPL_LOAD_PARTITIONS_BATCH_SIZE); + if (isMetaDataOp() || TableType.EXTERNAL_TABLE.equals(table.getTableType())) { + maxTasks = context.hiveConf.getIntVar(HiveConf.ConfVars.REPL_LOAD_PARTITIONS_BATCH_SIZE); + } else { + maxTasks = context.hiveConf.getIntVar(HiveConf.ConfVars.REPL_LOAD_PARTITIONS_WITH_DATA_COPY_BATCH_SIZE); + } int currentPartitionCount = 0; + Iterator<AlterTableAddPartitionDesc> partitionIterator = event.partitionDescriptions(tableDesc).iterator(); + //If already a set of partitions are processed as part of previous run, we skip those + if (lastAlterTableAddPartitionDesc != null) { + while (partitionIterator.hasNext()) { + currentPartitionCount++; + AlterTableAddPartitionDesc addPartitionDesc = partitionIterator.next(); + if (lastAlterTableAddPartitionDesc.getPartitions().get(0).getPartSpec() + .equals(addPartitionDesc.getPartitions().get(0).getPartSpec())) { + break; + } + } + } List<AlterTableAddPartitionDesc> partitionDescs = event.partitionDescriptions(tableDesc); int totalPartitionCount = partitionDescs.size(); while (currentPartitionCount < totalPartitionCount) { @@ -207,37 +222,28 @@ public class LoadPartitions { tableDesc.getTableName(), true, partitions); //don't need to add ckpt task separately. Added as part of add partition task - addPartition((toPartitionCount < totalPartitionCount), consolidatedPartitionDesc, null); - if (partitions.size() > 0) { - LOG.info("Added {} partitions", partitions.size()); + addPartition((toPartitionCount < totalPartitionCount), consolidatedPartitionDesc); + if (!tracker.canAddMoreTasks()) { + //No need to do processing as no more tasks can be added. Will be processed in next run. State is already + //updated in add partition task + return; } currentPartitionCount = toPartitionCount; } } private TaskTracker forNewTable() throws Exception { - if (isMetaDataOp() || TableType.EXTERNAL_TABLE.equals(table.getTableType())) { - // Place all partitions in single task to reduce load on HMS. - addConsolidatedPartitionDesc(); - return tracker; - } - - Iterator<AlterTableAddPartitionDesc> iterator = event.partitionDescriptions(tableDesc).iterator(); - while (iterator.hasNext() && tracker.canAddMoreTasks()) { - AlterTableAddPartitionDesc currentPartitionDesc = iterator.next(); - /* - the currentPartitionDesc cannot be inlined as we need the hasNext() to be evaluated post the - current retrieved lastReplicatedPartition - */ - addPartition(iterator.hasNext(), currentPartitionDesc, null); - } + // Place all partitions in single task to reduce load on HMS. + addConsolidatedPartitionDesc(null); return tracker; } - private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc addPartitionDesc, Task<?> ptnRootTask) + private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc addPartitionDesc) throws Exception { - tracker.addTask(tasksForAddPartition(table, addPartitionDesc, ptnRootTask)); - if (hasMorePartitions && !tracker.canAddMoreTasks()) { + boolean processingComplete = addTasksForPartition(table, addPartitionDesc, null); + //If processing is not complete, means replication state is already updated with copy tasks which need + //to be processed + if (processingComplete && hasMorePartitions && !tracker.canAddMoreTasks()) { ReplicationState currentReplicationState = new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc)); updateReplicationState(currentReplicationState); @@ -245,113 +251,56 @@ public class LoadPartitions { } /** - * returns the root task for adding a partition + * returns the root task for adding all partitions in a batch */ - private Task<?> tasksForAddPartition(Table table, AlterTableAddPartitionDesc addPartitionDesc, Task<?> ptnRootTask) + private boolean addTasksForPartition(Table table, AlterTableAddPartitionDesc addPartitionDesc, + AlterTableAddPartitionDesc.PartitionDesc lastPartSpec) throws MetaException, HiveException { Task<?> addPartTask = TaskFactory.get( new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc, true, (new Path(context.dumpDirectory)).getParent().toString(), this.metricCollector), context.hiveConf ); - //checkpointing task already added as part of add batch of partition in case for metadata only and external tables + //checkpointing task already added as part of add batch of partition if (isMetaDataOp() || TableType.EXTERNAL_TABLE.equals(table.getTableType())) { - if (ptnRootTask == null) { - ptnRootTask = addPartTask; - } else { - ptnRootTask.addDependentTask(addPartTask); - } - return ptnRootTask; + tracker.addTask(addPartTask); + return true; } - - AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartitions().get(0); - Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation()); - Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec); - partSpec.setLocation(replicaWarehousePartitionLocation.toString()); - LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition " - + partSpecToString(partSpec.getPartSpec()) + " with source location: " - + partSpec.getLocation()); - Task<?> ckptTask = ReplUtils.getTableCheckpointTask( - tableDesc, - (HashMap<String, String>)partSpec.getPartSpec(), - context.dumpDirectory, - this.metricCollector, - context.hiveConf - ); - - Path stagingDir = replicaWarehousePartitionLocation; - // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir. - LoadFileType loadFileType; - if (event.replicationSpec().isInReplicationScope() && - context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { - loadFileType = LoadFileType.IGNORE; - } else { - loadFileType = event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; - stagingDir = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); - } - boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET); - Task<?> copyTask = ReplCopyTask.getLoadCopyTask( + //Add Copy task for all partitions + boolean lastProcessedStageFound = false; + for (AlterTableAddPartitionDesc.PartitionDesc partSpec : addPartitionDesc.getPartitions()) { + if (!tracker.canAddMoreTasks()) { + //update replication state with the copy task added with which it needs to proceed next + ReplicationState currentReplicationState = + new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc, + partSpec, PartitionState.Stage.COPY)); + updateReplicationState(currentReplicationState); + return false; + } + Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec); + partSpec.setLocation(replicaWarehousePartitionLocation.toString()); + LOG.debug("adding dependent CopyWork for partition " + + partSpecToString(partSpec.getPartSpec()) + " with source location: " + + partSpec.getLocation()); + if (!lastProcessedStageFound && lastPartSpec != null && + lastPartSpec.getLocation() != partSpec.getLocation()) { + //Don't process copy task if already processed as part of previous run + continue; + } + lastProcessedStageFound = true; + boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET); + Task<?> copyTask = ReplCopyTask.getLoadCopyTask( event.replicationSpec(), - new Path(event.dataPath() + Path.SEPARATOR + getPartitionName(sourceWarehousePartitionLocation)), - stagingDir, + new Path(event.dataPath() + Path.SEPARATOR + Warehouse.makePartPath(partSpec.getPartSpec())), + replicaWarehousePartitionLocation, context.hiveConf, copyAtLoad, false, (new Path(context.dumpDirectory)).getParent().toString(), this.metricCollector - ); - - Task<?> movePartitionTask = null; - if (loadFileType != LoadFileType.IGNORE) { - // no need to create move task, if file is moved directly to target location. - movePartitionTask = movePartitionTask(table, partSpec, stagingDir, loadFileType); - } - - if (ptnRootTask == null) { - ptnRootTask = copyTask; - } else { - ptnRootTask.addDependentTask(copyTask); - } - - // Set Checkpoint task as dependant to the tail of add partition tasks. So, if same dump is - // retried for bootstrap, we skip current partition update. - copyTask.addDependentTask(addPartTask); - if (movePartitionTask != null) { - addPartTask.addDependentTask(movePartitionTask); - movePartitionTask.addDependentTask(ckptTask); - } else { - addPartTask.addDependentTask(ckptTask); - } - return ptnRootTask; - } - - private String getPartitionName(Path partitionMetadataFullPath) { - //Get partition name by removing the metadata base path. - //Needed for getting the data path - return partitionMetadataFullPath.toString().substring(event.metadataPath().toString().length()); - } - - /** - * This will create the move of partition data from temp path to actual path - */ - private Task<?> movePartitionTask(Table table, AlterTableAddPartitionDesc.PartitionDesc partSpec, Path tmpPath, - LoadFileType loadFileType) { - MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false, - (new Path(context.dumpDirectory)).getParent().toString(), this.metricCollector, - true); - if (AcidUtils.isTransactionalTable(table)) { - LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( - Collections.singletonList(tmpPath), - Collections.singletonList(new Path(partSpec.getLocation())), - true, null, null); - moveWork.setMultiFilesDesc(loadFilesWork); - } else { - LoadTableDesc loadTableWork = new LoadTableDesc( - tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(), - loadFileType, 0L ); - loadTableWork.setInheritTableSpecs(false); - moveWork.setLoadTableWork(loadTableWork); + tracker.addTask(copyTask); } - moveWork.setIsInReplicationScope(event.replicationSpec().isInReplicationScope()); - return TaskFactory.get(moveWork, context.hiveConf); + //add partition metadata task once all the copy tasks are added + tracker.addDependentTask(addPartTask); + return true; } /** @@ -415,24 +364,39 @@ public class LoadPartitions { Map<String, String> currentSpec = addPartitionDesc.getPartitions().get(0).getPartSpec(); encounteredTheLastReplicatedPartition = lastReplicatedPartSpec.equals(currentSpec); } - + //Add Copy task pending for previous partition + if (PartitionState.Stage.COPY.equals(lastReplicatedStage)) { + addTasksForPartition(table, lastPartitionReplicated, + lastReplicatedPartitionDesc); + } + boolean pendingPartitions = false; while (partitionIterator.hasNext() && tracker.canAddMoreTasks()) { + pendingPartitions = true; AlterTableAddPartitionDesc addPartitionDesc = partitionIterator.next(); - Map<String, String> partSpec = addPartitionDesc.getPartitions().get(0).getPartSpec(); - Task<?> ptnRootTask = null; + AlterTableAddPartitionDesc.PartitionDesc src = addPartitionDesc.getPartitions().get(0); + //Add check point task as part of add partition + Map<String, String> partParams = new HashMap<>(); + partParams.put(REPL_CHECKPOINT_KEY, context.dumpDirectory); + Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, src); + src.setLocation(replicaWarehousePartitionLocation.toString()); + src.addPartParams(partParams); + Map<String, String> partSpec = src.getPartSpec(); + ReplLoadOpType loadPtnType = getLoadPartitionType(partSpec); switch (loadPtnType) { case LOAD_NEW: break; case LOAD_REPLACE: - ptnRootTask = dropPartitionTask(table, partSpec); + tracker.addDependentTask(dropPartitionTask(table, partSpec)); break; case LOAD_SKIP: continue; default: break; } - addPartition(partitionIterator.hasNext(), addPartitionDesc, ptnRootTask); + } + if (pendingPartitions) { + addConsolidatedPartitionDesc(lastPartitionReplicated); } return tracker; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 11a1036..fb31159 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -60,7 +60,6 @@ import java.util.HashSet; import java.util.List; import java.util.TreeMap; -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned; public class LoadTable { @@ -270,10 +269,9 @@ public class LoadTable { Path dataPath = fromURI; Path tmpPath = tgtPath; - // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir. + // if acid tables, copy the files directly to the target path. No need to create the staging dir. LoadFileType loadFileType; - if (replicationSpec.isInReplicationScope() && - context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { + if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(table)) { loadFileType = LoadFileType.IGNORE; } else { loadFileType = (replicationSpec.isReplace()) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 5e05c73..0e7209d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -89,7 +89,6 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; /** * ImportSemanticAnalyzer. @@ -437,10 +436,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { boolean isSkipTrash = false; boolean needRecycle = false; - if (replicationSpec.isInReplicationScope() && (x.getCtx().getConf().getBoolean( - REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) { - lft = LoadFileType.IGNORE; - destPath = loadPath = tgtPath; + if (replicationSpec.isInReplicationScope()) { isSkipTrash = MetaStoreUtils.isSkipTrash(table.getParameters()); if (table.isTemporary()) { needRecycle = false; @@ -448,27 +444,26 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { org.apache.hadoop.hive.metastore.api.Database db = x.getHive().getDatabase(table.getDbName()); needRecycle = db != null && ReplChangeManager.shouldEnableCm(db, table.getTTable()); } + } + if (AcidUtils.isTransactionalTable(table)) { + String mmSubdir = replace ? AcidUtils.baseDir(writeId) + : AcidUtils.deltaSubdir(writeId, writeId, stmtId); + destPath = new Path(tgtPath, mmSubdir); + /** + * CopyTask below will copy files from the 'archive' to a delta_x_x in the table/partition + * directory, i.e. the final destination for these files. This has to be a copy to preserve + * the archive. MoveTask is optimized to do a 'rename' if files are on the same FileSystem. + * So setting 'loadPath' this way will make + * {@link Hive#loadTable(Path, String, LoadFileType, boolean, boolean, boolean, + * boolean, Long, int)} + * skip the unnecessary file (rename) operation but it will perform other things. + */ + loadPath = tgtPath; + lft = LoadFileType.KEEP_EXISTING; } else { - if (AcidUtils.isTransactionalTable(table) && !replicationSpec.isInReplicationScope()) { - String mmSubdir = replace ? AcidUtils.baseDir(writeId) - : AcidUtils.deltaSubdir(writeId, writeId, stmtId); - destPath = new Path(tgtPath, mmSubdir); - /** - * CopyTask below will copy files from the 'archive' to a delta_x_x in the table/partition - * directory, i.e. the final destination for these files. This has to be a copy to preserve - * the archive. MoveTask is optimized to do a 'rename' if files are on the same FileSystem. - * So setting 'loadPath' this way will make - * {@link Hive#loadTable(Path, String, LoadFileType, boolean, boolean, boolean, - * boolean, Long, int)} - * skip the unnecessary file (rename) operation but it will perform other things. - */ - loadPath = tgtPath; - lft = LoadFileType.KEEP_EXISTING; - } else { - destPath = loadPath = x.getCtx().getExternalTmpPath(tgtPath); - lft = replace ? LoadFileType.REPLACE_ALL : - LoadFileType.OVERWRITE_EXISTING; - } + destPath = loadPath = x.getCtx().getExternalTmpPath(tgtPath); + lft = replace ? LoadFileType.REPLACE_ALL : + LoadFileType.OVERWRITE_EXISTING; } if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { @@ -649,8 +644,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { LoadFileType loadFileType; Path destPath; - if (replicationSpec.isInReplicationScope() && (x.getCtx().getConf().getBoolean( - REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) { + if (replicationSpec.isInReplicationScope()) { loadFileType = LoadFileType.IGNORE; destPath = tgtLocation; isSkipTrash = MetaStoreUtils.isSkipTrash(table.getParameters()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index b73921c..4c10499 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; -import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; @@ -48,16 +47,12 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import java.io.IOException; import java.net.URI; -import java.nio.charset.StandardCharsets; import java.util.Map; -import java.util.Base64; import java.util.List; import java.util.Collections; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_MOVE_OPTIMIZED_FILE_SCHEMES; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPLACE; @@ -264,29 +259,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } } - private boolean ifEnableMoveOptimization(Path filePath, org.apache.hadoop.conf.Configuration conf) throws Exception { - if (filePath == null) { - throw new HiveException("filePath cannot be null"); - } - - URI uri = filePath.toUri(); - String scheme = uri.getScheme(); - scheme = StringUtils.isBlank(scheme) ? FileSystem.get(uri, conf).getScheme() : scheme; - if (StringUtils.isBlank(scheme)) { - throw new HiveException("Cannot get valid scheme for " + filePath); - } - - LOG.info("scheme is " + scheme); - - String[] schmeList = conf.get(REPL_MOVE_OPTIMIZED_FILE_SCHEMES.varname).toLowerCase().split(","); - for (String schemeIter : schmeList) { - if (schemeIter.trim().equalsIgnoreCase(scheme.trim())) { - return true; - } - } - return false; - } - // REPL LOAD private void initReplLoad(ASTNode ast) throws HiveException { sourceDbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText()); @@ -363,17 +335,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { try { assert(sourceDbNameOrPattern != null); Path loadPath = getCurrentLoadPath(); - // Ths config is set to make sure that in case of s3 replication, move is skipped. - try { - Warehouse wh = new Warehouse(conf); - Path filePath = wh.getWhRoot(); - if (ifEnableMoveOptimization(filePath, conf)) { - conf.setBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION, true); - LOG.info(" Set move optimization to true for warehouse " + filePath.toString()); - } - } catch (Exception e) { - throw new SemanticException(e.getMessage(), e); - } // Now, the dumped path can be one of three things: // a) It can be a db dump, in which case we expect a set of dirs, each with a diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java index faba6e4..ddf687b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java @@ -92,7 +92,7 @@ public class TestPrimaryToReplicaResourceFunction { mockStatic(ReplCopyTask.class); Task mock = mock(Task.class); when(ReplCopyTask.getLoadCopyTask(any(ReplicationSpec.class), any(Path.class), any(Path.class), - any(HiveConf.class))).thenReturn(mock); + any(HiveConf.class), any(), any())).thenReturn(mock); ResourceUri resourceUri = function.destinationResourceUri(new ResourceUri(ResourceType.JAR, "hdfs://localhost:9000/user/someplace/ab.jar#e094828883")); diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java index 8673186..b828d32 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java @@ -93,6 +93,8 @@ public class InjectableBehaviourObjectStore extends ObjectStore { private static com.google.common.base.Function<List<Partition>, Boolean> alterPartitionsModifier = null; + private static com.google.common.base.Function<List<Partition>, Boolean> addPartitionsModifier = null; + // Methods to set/reset getTable modifier public static void setGetTableBehaviour(com.google.common.base.Function<Table, Table> modifier){ getTableModifier = (modifier == null) ? com.google.common.base.Functions.identity() : modifier; @@ -154,10 +156,18 @@ public class InjectableBehaviourObjectStore extends ObjectStore { setAlterTableModifier(null); } + public static void resetAddPartitionModifier() { + setAddPartitionsBehaviour(null); + } + public static void setAlterPartitionsBehaviour(com.google.common.base.Function<List<Partition>, Boolean> modifier){ alterPartitionsModifier = modifier; } + public static void setAddPartitionsBehaviour(com.google.common.base.Function<List<Partition>, Boolean> modifier){ + addPartitionsModifier = modifier; + } + // ObjectStore methods to be overridden with injected behavior @Override @@ -317,4 +327,17 @@ public class InjectableBehaviourObjectStore extends ObjectStore { } return super.alterPartitions(catName, dbname, name, part_vals, newParts, writeId, queryWriteIdList); } + + @Override + public boolean addPartitions(String catName, String dbName, String tblName, List<Partition> parts) + throws InvalidObjectException, MetaException { + if (addPartitionsModifier != null) { + Boolean success = addPartitionsModifier.apply(parts); + if ((success != null) && !success) { + throw new MetaException("InjectableBehaviourObjectStore: Invalid addPartitions operation on Catalog : " + + catName + " DB: " + dbName + " table: " + tblName); + } + } + return super.addPartitions(catName, dbName, tblName, parts); + } }