HIVE-16896: move replication load related work in semantic analysis phase to execution phase using a task (Anishek Agarwal, reviewed by Sankar Hariappan, Daniel Dai)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/92f764e0 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/92f764e0 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/92f764e0 Branch: refs/heads/master Commit: 92f764e056cc87b8c5ea82b5372a9cbbf804ec33 Parents: 844ec34 Author: Daniel Dai <da...@hortonworks.com> Authored: Tue Aug 8 11:43:00 2017 -0700 Committer: Daniel Dai <da...@hortonworks.com> Committed: Tue Aug 8 11:43:00 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 7 +- ...TestReplicationScenariosAcrossInstances.java | 53 +++- .../hadoop/hive/ql/parse/WarehouseInstance.java | 66 +++-- ql/if/queryplan.thrift | 3 +- ql/src/gen/thrift/gen-cpp/queryplan_types.cpp | 8 +- ql/src/gen/thrift/gen-cpp/queryplan_types.h | 3 +- .../hadoop/hive/ql/plan/api/StageType.java | 7 +- ql/src/gen/thrift/gen-php/Types.php | 6 +- ql/src/gen/thrift/gen-py/queryplan/ttypes.py | 9 +- ql/src/gen/thrift/gen-rb/queryplan_types.rb | 7 +- .../java/org/apache/hadoop/hive/ql/Context.java | 2 +- .../apache/hadoop/hive/ql/exec/TaskFactory.java | 4 + .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 13 +- .../ql/exec/repl/bootstrap/ReplLoadTask.java | 264 +++++++++++++++++ .../ql/exec/repl/bootstrap/ReplLoadWork.java | 71 +++++ .../repl/bootstrap/events/BootstrapEvent.java | 28 ++ .../repl/bootstrap/events/DatabaseEvent.java | 34 +++ .../repl/bootstrap/events/FunctionEvent.java | 33 +++ .../repl/bootstrap/events/PartitionEvent.java | 26 ++ .../exec/repl/bootstrap/events/TableEvent.java | 42 +++ .../filesystem/BootstrapEventsIterator.java | 133 +++++++++ .../filesystem/DatabaseEventsIterator.java | 141 +++++++++ .../events/filesystem/FSDatabaseEvent.java | 88 ++++++ .../events/filesystem/FSFunctionEvent.java | 39 +++ .../events/filesystem/FSPartitionEvent.java | 84 ++++++ .../events/filesystem/FSTableEvent.java | 123 ++++++++ .../exec/repl/bootstrap/load/LoadDatabase.java | 129 +++++++++ .../exec/repl/bootstrap/load/LoadFunction.java | 73 +++++ .../repl/bootstrap/load/ReplicationState.java | 58 ++++ .../exec/repl/bootstrap/load/TaskTracker.java | 113 ++++++++ .../bootstrap/load/table/LoadPartitions.java | 283 +++++++++++++++++++ .../repl/bootstrap/load/table/LoadTable.java | 216 ++++++++++++++ .../repl/bootstrap/load/table/TableContext.java | 49 ++++ .../exec/repl/bootstrap/load/util/Context.java | 37 +++ .../repl/bootstrap/load/util/PathUtils.java | 105 +++++++ .../hive/ql/parse/ImportSemanticAnalyzer.java | 28 +- .../ql/parse/ReplicationSemanticAnalyzer.java | 216 +------------- .../hadoop/hive/ql/plan/ImportTableDesc.java | 12 +- .../repl/bootstrap/load/TaskTrackerTest.java | 29 ++ .../repl_load_requires_admin.q.out | 4 - 40 files changed, 2356 insertions(+), 290 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index c16880e..7cee344 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.conf; import com.google.common.base.Joiner; - import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.FileUtils; @@ -37,7 +36,6 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Shell; import org.apache.hive.common.HiveCompat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -442,6 +440,11 @@ public class HiveConf extends Configuration { "Inteval for cmroot cleanup thread."), REPL_FUNCTIONS_ROOT_DIR("hive.repl.replica.functions.root.dir","/user/hive/repl/functions/", "Root directory on the replica warehouse where the repl sub-system will store jars from the primary warehouse"), + REPL_APPROX_MAX_LOAD_TASKS("hive.repl.approx.max.load.tasks", 1000, + "Provide and approximate of the max number of tasks that should be executed in before \n" + + "dynamically generating the next set of tasks. The number is an approximate as we \n" + + "will stop at slightly higher number than above, the reason being some events might \n" + + "lead to an task increment that would cross the above limit"), REPL_PARTITIONS_DUMP_PARALLELISM("hive.repl.partitions.dump.parallelism",5, "Number of threads that will be used to dump partition data information during repl dump."), LOCALSCRATCHDIR("hive.exec.local.scratchdir", http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index c431537..2af728f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.util.DependencyResolver; import org.junit.AfterClass; @@ -89,7 +90,7 @@ public class TestReplicationScenariosAcrossInstances { WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); replica.load(replicatedDbName, bootStrapDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) - .verify(bootStrapDump.lastReplicationId); + .verifyResult(bootStrapDump.lastReplicationId); primary.run("CREATE FUNCTION " + primaryDbName + ".testFunction as 'hivemall.tools.string.StopwordUDF' " @@ -99,16 +100,16 @@ public class TestReplicationScenariosAcrossInstances { primary.dump(primaryDbName, bootStrapDump.lastReplicationId); replica.load(replicatedDbName, incrementalDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) - .verify(incrementalDump.lastReplicationId) + .verifyResult(incrementalDump.lastReplicationId) .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'") - .verify(replicatedDbName + ".testFunction"); + .verifyResult(replicatedDbName + ".testFunction"); // Test the idempotent behavior of CREATE FUNCTION replica.load(replicatedDbName, incrementalDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) - .verify(incrementalDump.lastReplicationId) + .verifyResult(incrementalDump.lastReplicationId) .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'") - .verify(replicatedDbName + ".testFunction"); + .verifyResult(replicatedDbName + ".testFunction"); } @Test @@ -119,7 +120,7 @@ public class TestReplicationScenariosAcrossInstances { WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); replica.load(replicatedDbName, bootStrapDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) - .verify(bootStrapDump.lastReplicationId); + .verifyResult(bootStrapDump.lastReplicationId); primary.run("Drop FUNCTION " + primaryDbName + ".testFunction "); @@ -127,16 +128,16 @@ public class TestReplicationScenariosAcrossInstances { primary.dump(primaryDbName, bootStrapDump.lastReplicationId); replica.load(replicatedDbName, incrementalDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) - .verify(incrementalDump.lastReplicationId) + .verifyResult(incrementalDump.lastReplicationId) .run("SHOW FUNCTIONS LIKE '*testfunction*'") - .verify(null); + .verifyResult(null); // Test the idempotent behavior of DROP FUNCTION replica.load(replicatedDbName, incrementalDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) - .verify(incrementalDump.lastReplicationId) + .verifyResult(incrementalDump.lastReplicationId) .run("SHOW FUNCTIONS LIKE '*testfunction*'") - .verify(null); + .verifyResult(null); } @Test @@ -148,7 +149,7 @@ public class TestReplicationScenariosAcrossInstances { replica.load(replicatedDbName, bootStrapDump.dumpLocation) .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'") - .verify(replicatedDbName + ".testFunction"); + .verifyResult(replicatedDbName + ".testFunction"); } @Test @@ -164,7 +165,7 @@ public class TestReplicationScenariosAcrossInstances { replica.load(replicatedDbName, tuple.dumpLocation) .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'") - .verify(replicatedDbName + ".anotherFunction"); + .verifyResult(replicatedDbName + ".anotherFunction"); FileStatus[] fileStatuses = replica.miniDFSCluster.getFileSystem().globStatus( new Path( @@ -218,4 +219,32 @@ public class TestReplicationScenariosAcrossInstances { }).collect(Collectors.toList()); return new Dependencies(collect); } + + /* + From the hive logs(hive.log) we can also check for the info statement + fgrep "Total Tasks" [location of hive.log] + each line indicates one run of loadTask. + */ + @Test + public void testMultipleStagesOfReplicationLoadTask() throws Throwable { + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (id int)") + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='india') values ('mumbai')") + .run("insert into table t2 partition(country='india') values ('delhi')") + .run("create table t3 (rank int)") + .dump(primaryDbName, null); + + // each table creation itself takes more than one task, give we are giving a max of 1, we should hit multiple runs. + replica.hiveConf.setIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS, 1); + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] { "t1", "t2", "t3" }) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId); + + } } http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 8dfab08..c084d4d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -56,7 +56,7 @@ class WarehouseInstance implements Closeable { final String functionsRoot; private Logger logger; private Driver driver; - private HiveConf hiveConf; + HiveConf hiveConf; MiniDFSCluster miniDFSCluster; private HiveMetaStoreClient client; @@ -71,16 +71,18 @@ class WarehouseInstance implements Closeable { assert miniDFSCluster.isDataNodeUp(); DistributedFileSystem fs = miniDFSCluster.getFileSystem(); + Path warehouseRoot = mkDir(fs, "/warehouse" + uniqueIdentifier); Path cmRootPath = mkDir(fs, "/cmroot" + uniqueIdentifier); this.functionsRoot = mkDir(fs, "/functions" + uniqueIdentifier).toString(); - initialize(cmRootPath.toString(), hiveInTests); + initialize(cmRootPath.toString(), warehouseRoot.toString(), hiveInTests); } WarehouseInstance(Logger logger, MiniDFSCluster cluster) throws Exception { this(logger, cluster, true); } - private void initialize(String cmRoot, boolean hiveInTest) throws Exception { + private void initialize(String cmRoot, String warehouseRoot, boolean hiveInTest) + throws Exception { hiveConf = new HiveConf(miniDFSCluster.getConfiguration(0), TestReplicationScenarios.class); String metaStoreUri = System.getProperty("test." + HiveConf.ConfVars.METASTOREURIS.varname); String hiveWarehouseLocation = System.getProperty("test.warehouse.dir", "/tmp") @@ -95,6 +97,7 @@ class WarehouseInstance implements Closeable { hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST, hiveInTest); // turn on db notification listener on meta store + hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, warehouseRoot); hiveConf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, LISTENER_CLASS); hiveConf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true); hiveConf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); @@ -181,38 +184,39 @@ class WarehouseInstance implements Closeable { return this; } - WarehouseInstance verify(String data) throws IOException { - return verifyResults(data == null ? new String[] {} : new String[] { data }); - } + WarehouseInstance verifyResult (String data) throws IOException { + verifyResults(data == null ? new String[] {} : new String[] { data }); + return this; + } - /** - * All the results that are read from the hive output will not preserve - * case sensitivity and will all be in lower case, hence we will check against - * only lower case data values. - * Unless for Null Values it actually returns in UpperCase and hence explicitly lowering case - * before assert. - */ - WarehouseInstance verifyResults(String[] data) throws IOException { - List<String> results = getOutput(); - logger.info("Expecting {}", StringUtils.join(data, ",")); - logger.info("Got {}", results); - assertEquals(data.length, results.size()); - for (int i = 0; i < data.length; i++) { - assertEquals(data[i].toLowerCase(), results.get(i).toLowerCase()); + /** + * All the results that are read from the hive output will not preserve + * case sensitivity and will all be in lower case, hence we will check against + * only lower case data values. + * Unless for Null Values it actually returns in UpperCase and hence explicitly lowering case + * before assert. + */ + WarehouseInstance verifyResults(String[] data) throws IOException { + List<String> results = getOutput(); + logger.info("Expecting {}", StringUtils.join(data, ",")); + logger.info("Got {}", results); + assertEquals(data.length, results.size()); + for (int i = 0; i < data.length; i++) { + assertEquals(data[i].toLowerCase(), results.get(i).toLowerCase()); + } + return this; } - return this; - } - List<String> getOutput() throws IOException { - List<String> results = new ArrayList<>(); - try { - driver.getResults(results); - } catch (CommandNeedRetryException e) { - logger.warn(e.getMessage(), e); - throw new RuntimeException(e); + List<String> getOutput() throws IOException { + List<String> results = new ArrayList<>(); + try { + driver.getResults(results); + } catch (CommandNeedRetryException e) { + logger.warn(e.getMessage(), e); + throw new RuntimeException(e); + } + return results; } - return results; - } private void printOutput() throws IOException { for (String s : getOutput()) { http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/if/queryplan.thrift ---------------------------------------------------------------------- diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift index dc55805..00b0200 100644 --- a/ql/if/queryplan.thrift +++ b/ql/if/queryplan.thrift @@ -100,7 +100,8 @@ enum StageType { STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, - REPLDUMP, + REPL_DUMP, + REPL_BOOTSTRAP_LOAD, } struct Stage { http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp index 7254d50..f467da2 100644 --- a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp +++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp @@ -116,7 +116,8 @@ int _kStageTypeValues[] = { StageType::STATS, StageType::DEPENDENCY_COLLECTION, StageType::COLUMNSTATS, - StageType::REPLDUMP + StageType::REPL_DUMP, + StageType::REPL_BOOTSTRAP_LOAD }; const char* _kStageTypeNames[] = { "CONDITIONAL", @@ -131,9 +132,10 @@ const char* _kStageTypeNames[] = { "STATS", "DEPENDENCY_COLLECTION", "COLUMNSTATS", - "REPLDUMP" + "REPL_DUMP", + "REPL_BOOTSTRAP_LOAD" }; -const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(13, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(14, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); Adjacency::~Adjacency() throw() { http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/gen/thrift/gen-cpp/queryplan_types.h ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.h b/ql/src/gen/thrift/gen-cpp/queryplan_types.h index 38d054b..ac87ef7 100644 --- a/ql/src/gen/thrift/gen-cpp/queryplan_types.h +++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.h @@ -94,7 +94,8 @@ struct StageType { STATS = 9, DEPENDENCY_COLLECTION = 10, COLUMNSTATS = 11, - REPLDUMP = 12 + REPL_DUMP = 12, + REPL_BOOTSTRAP_LOAD = 13 }; }; http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java index deca574..11a8f6d 100644 --- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java +++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java @@ -24,7 +24,8 @@ public enum StageType implements org.apache.thrift.TEnum { STATS(9), DEPENDENCY_COLLECTION(10), COLUMNSTATS(11), - REPLDUMP(12); + REPL_DUMP(12), + REPL_BOOTSTRAP_LOAD(13); private final int value; @@ -70,7 +71,9 @@ public enum StageType implements org.apache.thrift.TEnum { case 11: return COLUMNSTATS; case 12: - return REPLDUMP; + return REPL_DUMP; + case 13: + return REPL_BOOTSTRAP_LOAD; default: return null; } http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/gen/thrift/gen-php/Types.php ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-php/Types.php b/ql/src/gen/thrift/gen-php/Types.php index 4d902ee..68edfcd 100644 --- a/ql/src/gen/thrift/gen-php/Types.php +++ b/ql/src/gen/thrift/gen-php/Types.php @@ -114,7 +114,8 @@ final class StageType { const STATS = 9; const DEPENDENCY_COLLECTION = 10; const COLUMNSTATS = 11; - const REPLDUMP = 12; + const REPL_DUMP = 12; + const REPL_BOOTSTRAP_LOAD = 13; static public $__names = array( 0 => 'CONDITIONAL', 1 => 'COPY', @@ -128,7 +129,8 @@ final class StageType { 9 => 'STATS', 10 => 'DEPENDENCY_COLLECTION', 11 => 'COLUMNSTATS', - 12 => 'REPLDUMP', + 12 => 'REPL_DUMP', + 13 => 'REPL_BOOTSTRAP_LOAD', ); } http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/gen/thrift/gen-py/queryplan/ttypes.py ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py index 9e29129..6bf65af 100644 --- a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py +++ b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py @@ -160,7 +160,8 @@ class StageType: STATS = 9 DEPENDENCY_COLLECTION = 10 COLUMNSTATS = 11 - REPLDUMP = 12 + REPL_DUMP = 12 + REPL_BOOTSTRAP_LOAD = 13 _VALUES_TO_NAMES = { 0: "CONDITIONAL", @@ -175,7 +176,8 @@ class StageType: 9: "STATS", 10: "DEPENDENCY_COLLECTION", 11: "COLUMNSTATS", - 12: "REPLDUMP", + 12: "REPL_DUMP", + 13: "REPL_BOOTSTRAP_LOAD", } _NAMES_TO_VALUES = { @@ -191,7 +193,8 @@ class StageType: "STATS": 9, "DEPENDENCY_COLLECTION": 10, "COLUMNSTATS": 11, - "REPLDUMP": 12, + "REPL_DUMP": 12, + "REPL_BOOTSTRAP_LOAD": 13, } http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/gen/thrift/gen-rb/queryplan_types.rb ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb b/ql/src/gen/thrift/gen-rb/queryplan_types.rb index 1433d4a..2730dde 100644 --- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb +++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb @@ -72,9 +72,10 @@ module StageType STATS = 9 DEPENDENCY_COLLECTION = 10 COLUMNSTATS = 11 - REPLDUMP = 12 - VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPLDUMP"} - VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPLDUMP]).freeze + REPL_DUMP = 12 + REPL_BOOTSTRAP_LOAD = 13 + VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD"} + VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD]).freeze end class Adjacency http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/Context.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index fdcf052..9183edf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -561,7 +561,7 @@ public class Context { private static final String MR_PREFIX = "-mr-"; - private static final String EXT_PREFIX = "-ext-"; + public static final String EXT_PREFIX = "-ext-"; private static final String LOCAL_PREFIX = "-local-"; /** http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index 94d6c5a..91ac4bf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -27,6 +27,8 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask; @@ -111,6 +113,7 @@ public final class TaskFactory { taskvec.add(new TaskTuple<TezWork>(TezWork.class, TezTask.class)); taskvec.add(new TaskTuple<SparkWork>(SparkWork.class, SparkTask.class)); taskvec.add(new TaskTuple<>(ReplDumpWork.class, ReplDumpTask.class)); + taskvec.add(new TaskTuple<>(ReplLoadWork.class, ReplLoadTask.class)); } private static ThreadLocal<Integer> tid = new ThreadLocal<Integer>() { @@ -149,6 +152,7 @@ public final class TaskFactory { throw new RuntimeException("No task for work class " + workClass.getName()); } + @SafeVarargs public static <T extends Serializable> Task<T> get(T work, HiveConf conf, Task<? extends Serializable>... tasklist) { Task<T> ret = get((Class<T>) work.getClass(), conf); http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 7501ed7..34b6737 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -61,7 +61,7 @@ import java.util.List; public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; - private static final String FUNCTION_METADATA_DIR_NAME = "_metadata"; + private static final String FUNCTION_METADATA_FILE_NAME = "_metadata"; private Logger LOG = LoggerFactory.getLogger(ReplDumpTask.class); private Logger REPL_STATE_LOG = LoggerFactory.getLogger("ReplState"); @@ -86,6 +86,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema); } catch (Exception e) { LOG.error("failed", e); + setException(e); return 1; } return 0; @@ -262,10 +263,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { // make it easy to write .q unit tests, instead of unique id generation. // however, this does mean that in writing tests, we have to be aware that // repl dump will clash with prior dumps, and thus have to clean up properly. - if (work.testInjectDumpDir == null) { + if (ReplDumpWork.testInjectDumpDir == null) { return "next"; } else { - return work.testInjectDumpDir; + return ReplDumpWork.testInjectDumpDir; } } else { return String.valueOf(System.currentTimeMillis()); @@ -284,9 +285,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { continue; } Path functionRoot = new Path(functionsRoot, functionName); - Path functionMetadataRoot = new Path(functionRoot, FUNCTION_METADATA_DIR_NAME); + Path functionMetadataFile = new Path(functionRoot, FUNCTION_METADATA_FILE_NAME); try (JsonWriter jsonWriter = - new JsonWriter(functionMetadataRoot.getFileSystem(conf), functionMetadataRoot)) { + new JsonWriter(functionMetadataFile.getFileSystem(conf), functionMetadataFile)) { FunctionSerializer serializer = new FunctionSerializer(tuple.object, conf); serializer.writeTo(jsonWriter, tuple.replicationSpec); } @@ -315,6 +316,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { @Override public StageType getType() { - return StageType.REPLDUMP; + return StageType.REPL_DUMP; } } http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java new file mode 100644 index 0000000..6ea1754 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java @@ -0,0 +1,264 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.PartitionEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadPartitions; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.api.StageType; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase; + +public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { + private final static int ZERO_TASKS = 0; + + @Override + public String getName() { + return "REPL_BOOTSTRAP_LOAD"; + } + + /** + * Provides the root Tasks created as a result of this loadTask run which will be executed + * by the driver. It does not track details across multiple runs of LoadTask. + */ + private static class Scope { + boolean database = false, table = false, partition = false; + List<Task<? extends Serializable>> rootTasks = new ArrayList<>(); + } + + @Override + protected int execute(DriverContext driverContext) { + try { + int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); + Context context = new Context(conf, getHive()); + TaskTracker loadTaskTracker = new TaskTracker(maxTasks); + /* + for now for simplicity we are doing just one directory ( one database ), come back to use + of multiple databases once we have the basic flow to chain creating of tasks in place for + a database ( directory ) + */ + BootstrapEventsIterator iterator = work.iterator(); + /* + This is used to get hold of a reference during the current creation of tasks and is initialized + with "0" tasks such that it will be non consequential in any operations done with task tracker + compositions. + */ + TaskTracker dbTracker = new TaskTracker(ZERO_TASKS); + TaskTracker tableTracker = new TaskTracker(ZERO_TASKS); + Scope scope = new Scope(); + while (iterator.hasNext() && loadTaskTracker.canAddMoreTasks()) { + BootstrapEvent next = iterator.next(); + switch (next.eventType()) { + case Database: + DatabaseEvent dbEvent = (DatabaseEvent) next; + dbTracker = + new LoadDatabase(context, dbEvent, work.dbNameToLoadIn, loadTaskTracker) + .tasks(); + loadTaskTracker.update(dbTracker); + if (work.hasDbState()) { + loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope)); + } + work.updateDbEventState(dbEvent.toState()); + scope.database = true; + scope.rootTasks.addAll(dbTracker.tasks()); + dbTracker.debugLog("database"); + break; + case Table: { + /* + Implicit assumption here is that database level is processed first before table level, + which will depend on the iterator used since it should provide the higher level directory + listing before providing the lower level listing. This is also required such that + the dbTracker / tableTracker are setup correctly always. + */ + TableContext tableContext = + new TableContext(dbTracker, work.dbNameToLoadIn, work.tableNameToLoadIn); + TableEvent tableEvent = (TableEvent) next; + LoadTable loadTable = new LoadTable(tableEvent, context, tableContext, loadTaskTracker); + tableTracker = loadTable.tasks(); + if (!scope.database) { + scope.rootTasks.addAll(tableTracker.tasks()); + scope.table = true; + } + setUpDependencies(dbTracker, tableTracker); + /* + for table replication if we reach the max number of tasks then for the next run we will + try to reload the same table again, this is mainly for ease of understanding the code + as then we can avoid handling == > loading partitions for the table given that + the creation of table lead to reaching max tasks vs, loading next table since current + one does not have partitions. + */ + + // for a table we explicitly try to load partitions as there is no separate partitions events. + LoadPartitions loadPartitions = + new LoadPartitions(context, loadTaskTracker, tableEvent, work.dbNameToLoadIn, + tableContext); + TaskTracker partitionsTracker = loadPartitions.tasks(); + partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker, + partitionsTracker); + tableTracker.debugLog("table"); + partitionsTracker.debugLog("partitions for table"); + break; + } + case Partition: { + /* + This will happen only when loading tables and we reach the limit of number of tasks we can create; + hence we know here that the table should exist and there should be a lastPartitionName + */ + PartitionEvent event = (PartitionEvent) next; + TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn, + work.tableNameToLoadIn); + LoadPartitions loadPartitions = + new LoadPartitions(context, tableContext, loadTaskTracker, event.asTableEvent(), + work.dbNameToLoadIn, + event.lastPartitionReplicated()); + /* + the tableTracker here should be a new instance and not an existing one as this can + only happen when we break in between loading partitions. + */ + TaskTracker partitionsTracker = loadPartitions.tasks(); + partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker, + partitionsTracker); + partitionsTracker.debugLog("partitions"); + break; + } + case Function: { + LoadFunction loadFunction = + new LoadFunction(context, (FunctionEvent) next, work.dbNameToLoadIn, dbTracker); + TaskTracker functionsTracker = loadFunction.tasks(); + if (!scope.database) { + scope.rootTasks.addAll(functionsTracker.tasks()); + } else { + setUpDependencies(dbTracker, functionsTracker); + } + loadTaskTracker.update(functionsTracker); + functionsTracker.debugLog("functions"); + break; + } + } + } + boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState(); + createBuilderTask(scope.rootTasks, addAnotherLoadTask); + if (!iterator.hasNext()) { + loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope)); + } + this.childTasks = scope.rootTasks; + LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), loadTaskTracker.numberOfTasks()); + } catch (Exception e) { + LOG.error("failed replication", e); + setException(e); + return 1; + } + LOG.info("completed load task run : {}", work.executedLoadTask()); + return 0; + } + + /** + * There was a database update done before and we want to make sure we update the last repl + * id on this database as we are now going to switch to processing a new database. + */ + private TaskTracker updateDatabaseLastReplID(int maxTasks, Context context, Scope scope) + throws SemanticException { + /* + we don't want to put any limits on this task as this is essential before we start + processing new database events. + */ + TaskTracker taskTracker = + new AlterDatabase(context, work.databaseEvent(context.hiveConf), work.dbNameToLoadIn, + new TaskTracker(maxTasks)).tasks(); + scope.rootTasks.addAll(taskTracker.tasks()); + return taskTracker; + } + + private void partitionsPostProcessing(BootstrapEventsIterator iterator, + Scope scope, TaskTracker loadTaskTracker, TaskTracker tableTracker, + TaskTracker partitionsTracker) throws SemanticException { + setUpDependencies(tableTracker, partitionsTracker); + if (!scope.database && !scope.table) { + scope.rootTasks.addAll(partitionsTracker.tasks()); + scope.partition = true; + } + loadTaskTracker.update(tableTracker); + loadTaskTracker.update(partitionsTracker); + if (partitionsTracker.hasReplicationState()) { + iterator.setReplicationState(partitionsTracker.replicationState()); + } + } + + /* + This sets up dependencies such that a child task is dependant on the parent to be complete. + */ + private void setUpDependencies(TaskTracker parentTasks, TaskTracker childTasks) { + for (Task<? extends Serializable> parentTask : parentTasks.tasks()) { + for (Task<? extends Serializable> childTask : childTasks.tasks()) { + parentTask.addDependentTask(childTask); + } + } + } + + private void createBuilderTask(List<Task<? extends Serializable>> rootTasks, + boolean shouldCreateAnotherLoadTask) { + /* + use loadTask as dependencyCollection + */ + if (shouldCreateAnotherLoadTask) { + Task<ReplLoadWork> loadTask = TaskFactory.get(work, conf); + dependency(rootTasks, loadTask); + } + } + + /** + * add the dependency to the leaf node + */ + private boolean dependency(List<Task<? extends Serializable>> tasks, + Task<ReplLoadWork> loadTask) { + if (tasks == null || tasks.isEmpty()) { + return true; + } + for (Task<? extends Serializable> task : tasks) { + boolean dependency = dependency(task.getChildTasks(), loadTask); + if (dependency) { + task.addDependentTask(loadTask); + } + } + return true; + } + + @Override + public StageType getType() { + return StageType.REPL_BOOTSTRAP_LOAD; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java new file mode 100644 index 0000000..eb18e5f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java @@ -0,0 +1,71 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; +import org.apache.hadoop.hive.ql.plan.Explain; + +import java.io.IOException; +import java.io.Serializable; + +@Explain(displayName = "Replication Load Operator", explainLevels = { Explain.Level.USER, + Explain.Level.DEFAULT, + Explain.Level.EXTENDED }) +public class ReplLoadWork implements Serializable { + final String dbNameToLoadIn; + final String tableNameToLoadIn; + private final BootstrapEventsIterator iterator; + private int loadTaskRunCount = 0; + private DatabaseEvent.State state = null; + + public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn, + String tableNameToLoadIn) throws IOException { + this.tableNameToLoadIn = tableNameToLoadIn; + this.iterator = new BootstrapEventsIterator(dumpDirectory, hiveConf); + this.dbNameToLoadIn = dbNameToLoadIn; + } + + public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern) + throws IOException { + this(hiveConf, dumpDirectory, dbNameOrPattern, null); + } + + public BootstrapEventsIterator iterator() { + return iterator; + } + + int executedLoadTask() { + return ++loadTaskRunCount; + } + + void updateDbEventState(DatabaseEvent.State state) { + this.state = state; + } + + DatabaseEvent databaseEvent(HiveConf hiveConf) { + DatabaseEvent databaseEvent = state.toEvent(hiveConf); + state = null; + return databaseEvent; + } + + boolean hasDbState() { + return state != null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java new file mode 100644 index 0000000..db2b0ac --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java @@ -0,0 +1,28 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events; + +public interface BootstrapEvent { + + EventType eventType(); + + enum EventType { + Database, Table, Function, Partition + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/DatabaseEvent.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/DatabaseEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/DatabaseEvent.java new file mode 100644 index 0000000..6d6c336 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/DatabaseEvent.java @@ -0,0 +1,34 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.io.Serializable; + +public interface DatabaseEvent extends BootstrapEvent { + Database dbInMetadata(String dbNameToOverride) throws SemanticException; + + State toState(); + + interface State extends Serializable { + DatabaseEvent toEvent(HiveConf hiveConf); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/FunctionEvent.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/FunctionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/FunctionEvent.java new file mode 100644 index 0000000..30bb747 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/FunctionEvent.java @@ -0,0 +1,33 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events; + +import org.apache.hadoop.fs.Path; + +/** + * Exposing the FileSystem implementation outside which is what it should NOT do. + * <p> + * Since the bootstrap and incremental for functions is handled similarly. There + * is additional work to make sure we pass the event object from both places. + * + * @see org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler.FunctionDescBuilder + * would be merged here mostly. + */ +public interface FunctionEvent extends BootstrapEvent { + Path rootDir(); +} http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java new file mode 100644 index 0000000..3b260d6 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java @@ -0,0 +1,26 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events; + +import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; + +public interface PartitionEvent extends TableEvent { + AddPartitionDesc lastPartitionReplicated(); + + TableEvent asTableEvent(); +} http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java new file mode 100644 index 0000000..e817f5f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java @@ -0,0 +1,42 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; +import org.apache.hadoop.hive.ql.plan.ImportTableDesc; + +import java.util.List; + +public interface TableEvent extends BootstrapEvent { + ImportTableDesc tableDesc(String dbName) throws SemanticException; + + List<AddPartitionDesc> partitionDescriptions(ImportTableDesc tblDesc) + throws SemanticException; + + ReplicationSpec replicationSpec(); + + boolean shouldNotReplicate(); + + /** + * Exposing the FileSystem implementation outside which is what it should NOT do. + */ + Path metadataPath(); +} http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java new file mode 100644 index 0000000..4e635ad --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java @@ -0,0 +1,133 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; +import org.apache.hadoop.hive.ql.parse.EximUtil; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * Replication layout is from the root directory of replication Dump is + * db + * table1 + * _metadata + * data + * _files + * table2 + * _metadata + * data + * _files + * _functions + * functionName1 + * _metadata + * functionName2 + * _metadata + * this class understands this layout and hence will help in identifying for subsequent bootstrap tasks + * as to where the last set of tasks left execution and from where this task should pick up replication. + * Since for replication we have the need for hierarchy of tasks we need to make sure that db level are + * processed first before table, table level are processed first before partitions etc. + * + * Based on how the metadata is being exported on the file we have to currently take care of the following: + * 1. Make sure db level are processed first as this will be required before table / functions processing. + * 2. Table before partition is not explicitly required as table and partition metadata are in the same file. + * + * + * For future integrations other sources of events like kafka, would require to implement an Iterator<BootstrapEvent> + * + */ +public class BootstrapEventsIterator implements Iterator<BootstrapEvent> { + private DatabaseEventsIterator currentDatabaseIterator = null; + /* + This denotes listing of any directories where during replication we want to take care of + db level operations first, namely in our case its only during db creation on the replica + warehouse. + */ + private Iterator<DatabaseEventsIterator> dbEventsIterator; + + public BootstrapEventsIterator(String dumpDirectory, HiveConf hiveConf) throws IOException { + Path path = new Path(dumpDirectory); + FileSystem fileSystem = path.getFileSystem(hiveConf); + FileStatus[] fileStatuses = + fileSystem.listStatus(new Path(dumpDirectory), EximUtil.getDirectoryFilter(fileSystem)); + + List<FileStatus> dbsToCreate = Arrays.stream(fileStatuses).filter(f -> { + Path metadataPath = new Path(f.getPath() + Path.SEPARATOR + EximUtil.METADATA_NAME); + try { + return fileSystem.exists(metadataPath); + } catch (IOException e) { + throw new RuntimeException("could not determine if exists : " + metadataPath.toString(), e); + } + }).collect(Collectors.toList()); + dbEventsIterator = dbsToCreate.stream().map(f -> { + try { + return new DatabaseEventsIterator(f.getPath(), hiveConf); + } catch (IOException e) { + throw new RuntimeException( + "Error while creating event iterator for db at path" + f.getPath().toString(), e); + } + }).collect(Collectors.toList()).iterator(); + + } + + @Override + public boolean hasNext() { + while (true) { + if (currentDatabaseIterator == null) { + if (dbEventsIterator.hasNext()) { + currentDatabaseIterator = dbEventsIterator.next(); + } else { + return false; + } + } else if (currentDatabaseIterator.hasNext()) { + return true; + } else { + currentDatabaseIterator = null; + } + } + } + + @Override + public BootstrapEvent next() { + return currentDatabaseIterator.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("This operation is not supported"); + } + + @Override + public void forEachRemaining(Consumer<? super BootstrapEvent> action) { + throw new UnsupportedOperationException("This operation is not supported"); + } + + public void setReplicationState(ReplicationState replicationState) { + this.currentDatabaseIterator.replicationState = replicationState; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java new file mode 100644 index 0000000..3100875 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java @@ -0,0 +1,141 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.FUNCTIONS_ROOT_DIR_NAME; + +class DatabaseEventsIterator implements Iterator<BootstrapEvent> { + private static Logger LOG = LoggerFactory.getLogger(DatabaseEventsIterator.class); + private RemoteIterator<LocatedFileStatus> remoteIterator; + + private final Path dbLevelPath; + private HiveConf hiveConf; + ReplicationState replicationState; + private Path next = null, previous = null; + private boolean databaseEventProcessed = false; + + DatabaseEventsIterator(Path dbLevelPath, HiveConf hiveConf) throws IOException { + this.dbLevelPath = dbLevelPath; + this.hiveConf = hiveConf; + FileSystem fileSystem = dbLevelPath.getFileSystem(hiveConf); + // this is only there for the use case where we are doing table only replication and not database level + if (!fileSystem.exists(new Path(dbLevelPath + Path.SEPARATOR + EximUtil.METADATA_NAME))) { + databaseEventProcessed = true; + } + remoteIterator = fileSystem.listFiles(dbLevelPath, true); + } + + @Override + public boolean hasNext() { + try { + if (!databaseEventProcessed) { + next = dbLevelPath; + return true; + } + + if (replicationState == null && next == null) { + while (remoteIterator.hasNext()) { + LocatedFileStatus next = remoteIterator.next(); + if (next.getPath().toString().endsWith(EximUtil.METADATA_NAME)) { + String replacedString = next.getPath().toString().replace(dbLevelPath.toString(), ""); + List<String> filteredNames = Arrays.stream(replacedString.split(Path.SEPARATOR)) + .filter(StringUtils::isNotBlank) + .collect(Collectors.toList()); + if (filteredNames.size() == 1) { + // this relates to db level event tracked via databaseEventProcessed + } else { + this.next = next.getPath().getParent(); + return true; + } + } + } + return false; + } + return true; + } catch (Exception e) { + // may be do some retry logic here. + throw new RuntimeException("could not traverse the file via remote iterator " + dbLevelPath, + e); + } + } + + /* + we handle three types of scenarios with special case. + 1. handling of db Level _metadata + 2. handling of subsequent loadTask which will start running from the previous replicationState + 3. other events : these can only be either table / function _metadata. + */ + @Override + public BootstrapEvent next() { + if (!databaseEventProcessed) { + FSDatabaseEvent event = new FSDatabaseEvent(hiveConf, next.toString()); + databaseEventProcessed = true; + return postProcessing(event); + } + + if (replicationState != null) { + return eventForReplicationState(); + } + + String currentPath = next.toString(); + if (currentPath.contains(FUNCTIONS_ROOT_DIR_NAME)) { + LOG.debug("functions directory: {}", next.toString()); + return postProcessing(new FSFunctionEvent(next)); + } + return postProcessing(new FSTableEvent(hiveConf, next.toString())); + } + + private BootstrapEvent postProcessing(BootstrapEvent bootstrapEvent) { + previous = next; + next = null; + LOG.debug("processing " + previous); + return bootstrapEvent; + } + + private BootstrapEvent eventForReplicationState() { + if (replicationState.partitionState != null) { + BootstrapEvent + bootstrapEvent = new FSPartitionEvent(hiveConf, previous.toString(), replicationState); + replicationState = null; + return bootstrapEvent; + } else if (replicationState.lastTableReplicated != null) { + FSTableEvent event = new FSTableEvent(hiveConf, previous.toString()); + replicationState = null; + return event; + } + throw new IllegalStateException("for replicationState " + replicationState.toString()); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSDatabaseEvent.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSDatabaseEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSDatabaseEvent.java new file mode 100644 index 0000000..48c908a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSDatabaseEvent.java @@ -0,0 +1,88 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; + +public class FSDatabaseEvent implements DatabaseEvent { + + private final Path dbMetadataFile; + private final FileSystem fileSystem; + + FSDatabaseEvent(HiveConf hiveConf, String dbDumpDirectory) { + try { + this.dbMetadataFile = new Path(dbDumpDirectory, EximUtil.METADATA_NAME); + this.fileSystem = dbMetadataFile.getFileSystem(hiveConf); + } catch (Exception e) { + String message = "Error while identifying the filesystem for db " + + "metadata file in " + dbDumpDirectory; + throw new RuntimeException(message, e); + } + } + + @Override + public Database dbInMetadata(String dbNameToOverride) throws SemanticException { + try { + MetaData rv = EximUtil.readMetaData(fileSystem, dbMetadataFile); + Database dbObj = rv.getDatabase(); + if (dbObj == null) { + throw new IllegalArgumentException( + "_metadata file read did not contain a db object - invalid dump."); + } + + // override the db name if provided in repl load command + if (StringUtils.isNotBlank(dbNameToOverride)) { + dbObj.setName(dbNameToOverride); + } + return dbObj; + } catch (Exception e) { + throw new SemanticException(e); + } + } + + @Override + public State toState() { + return new FSDBState(dbMetadataFile.getParent().toString()); + } + + @Override + public EventType eventType() { + return EventType.Database; + } + + static class FSDBState implements DatabaseEvent.State { + final String dbDumpDirectory; + + FSDBState(String dbDumpDirectory) { + this.dbDumpDirectory = dbDumpDirectory; + } + + @Override + public DatabaseEvent toEvent(HiveConf hiveConf) { + return new FSDatabaseEvent(hiveConf, dbDumpDirectory); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSFunctionEvent.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSFunctionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSFunctionEvent.java new file mode 100644 index 0000000..5b13ffc --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSFunctionEvent.java @@ -0,0 +1,39 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent; + +public class FSFunctionEvent implements FunctionEvent { + private final Path rootDir; + + FSFunctionEvent(Path rootDir) { + this.rootDir = rootDir; + } + + @Override + public Path rootDir() { + return rootDir; + } + + @Override + public EventType eventType() { + return EventType.Function; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java new file mode 100644 index 0000000..9b71a2e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java @@ -0,0 +1,84 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.PartitionEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; +import org.apache.hadoop.hive.ql.plan.ImportTableDesc; + +import java.util.List; + +public class FSPartitionEvent implements PartitionEvent { + + private final ReplicationState replicationState; + private final TableEvent tableEvent; + + FSPartitionEvent(HiveConf hiveConf, String metadataDir, + ReplicationState replicationState) { + tableEvent = new FSTableEvent(hiveConf, metadataDir); + this.replicationState = replicationState; + } + + @Override + public EventType eventType() { + return EventType.Partition; + } + + @Override + public AddPartitionDesc lastPartitionReplicated() { + assert replicationState != null && replicationState.partitionState != null; + return replicationState.partitionState.lastReplicatedPartition; + } + + @Override + public TableEvent asTableEvent() { + return tableEvent; + } + + @Override + public ImportTableDesc tableDesc(String dbName) throws SemanticException { + return tableEvent.tableDesc(dbName); + } + + @Override + public List<AddPartitionDesc> partitionDescriptions(ImportTableDesc tblDesc) + throws SemanticException { + return tableEvent.partitionDescriptions(tblDesc); + } + + @Override + public ReplicationSpec replicationSpec() { + return tableEvent.replicationSpec(); + } + + @Override + public boolean shouldNotReplicate() { + return tableEvent.shouldNotReplicate(); + } + + @Override + public Path metadataPath() { + return tableEvent.metadataPath(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java new file mode 100644 index 0000000..f313404 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java @@ -0,0 +1,123 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; +import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; +import org.apache.hadoop.hive.ql.plan.ImportTableDesc; +import org.apache.hadoop.hive.ql.plan.PlanUtils; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +public class FSTableEvent implements TableEvent { + private final Path fromPath; + private final MetaData metadata; + + FSTableEvent(HiveConf hiveConf, String metadataDir) { + try { + URI fromURI = EximUtil.getValidatedURI(hiveConf, PlanUtils.stripQuotes(metadataDir)); + fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath()); + FileSystem fs = FileSystem.get(fromURI, hiveConf); + metadata = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public boolean shouldNotReplicate() { + ReplicationSpec spec = metadata.getReplicationSpec(); + return spec.isNoop() || !spec.isInReplicationScope(); + } + + @Override + public Path metadataPath() { + return fromPath; + } + + @Override + public ImportTableDesc tableDesc(String dbName) throws SemanticException { + try { + Table table = new Table(metadata.getTable()); + ImportTableDesc tableDesc = new ImportTableDesc(dbName, table); + tableDesc.setReplicationSpec(metadata.getReplicationSpec()); + return tableDesc; + } catch (Exception e) { + throw new SemanticException(e); + } + } + + @Override + public List<AddPartitionDesc> partitionDescriptions(ImportTableDesc tblDesc) + throws SemanticException { + List<AddPartitionDesc> descs = new ArrayList<>(); + //TODO: if partitions are loaded lazily via the iterator then we will have to avoid conversion of everything here as it defeats the purpose. + for (Partition partition : metadata.getPartitions()) { + // TODO: this should ideally not create AddPartitionDesc per partition + AddPartitionDesc partsDesc = partitionDesc(fromPath, tblDesc, partition); + descs.add(partsDesc); + } + return descs; + } + + private AddPartitionDesc partitionDesc(Path fromPath, + ImportTableDesc tblDesc, Partition partition) throws SemanticException { + try { + AddPartitionDesc partsDesc = + new AddPartitionDesc(tblDesc.getDatabaseName(), tblDesc.getTableName(), + EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()), + partition.getSd().getLocation(), partition.getParameters()); + AddPartitionDesc.OnePartitionDesc partDesc = partsDesc.getPartition(0); + partDesc.setInputFormat(partition.getSd().getInputFormat()); + partDesc.setOutputFormat(partition.getSd().getOutputFormat()); + partDesc.setNumBuckets(partition.getSd().getNumBuckets()); + partDesc.setCols(partition.getSd().getCols()); + partDesc.setSerializationLib(partition.getSd().getSerdeInfo().getSerializationLib()); + partDesc.setSerdeParams(partition.getSd().getSerdeInfo().getParameters()); + partDesc.setBucketCols(partition.getSd().getBucketCols()); + partDesc.setSortCols(partition.getSd().getSortCols()); + partDesc.setLocation(new Path(fromPath, + Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString()); + partsDesc.setReplicationSpec(metadata.getReplicationSpec()); + return partsDesc; + } catch (Exception e) { + throw new SemanticException(e); + } + } + + @Override + public ReplicationSpec replicationSpec() { + return metadata.getReplicationSpec(); + } + + @Override + public EventType eventType() { + return EventType.Table; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java new file mode 100644 index 0000000..bab64ad --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java @@ -0,0 +1,129 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; +import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; +import org.apache.hadoop.hive.ql.plan.DDLWork; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +public class LoadDatabase { + + final Context context; + final TaskTracker tracker; + + private final DatabaseEvent event; + private final String dbNameToLoadIn; + + public LoadDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn, + TaskTracker loadTaskTracker) { + this.context = context; + this.event = event; + this.dbNameToLoadIn = dbNameToLoadIn; + this.tracker = new TaskTracker(loadTaskTracker); + } + + public TaskTracker tasks() throws SemanticException { + try { + Database dbInMetadata = readDbMetadata(); + Task<? extends Serializable> dbRootTask = existEmptyDb(dbInMetadata.getName()) + ? alterDbTask(dbInMetadata, context.hiveConf) + : createDbTask(dbInMetadata); + tracker.addTask(dbRootTask); + return tracker; + } catch (Exception e) { + throw new SemanticException(e); + } + } + + Database readDbMetadata() throws SemanticException { + return event.dbInMetadata(dbNameToLoadIn); + } + + private Task<? extends Serializable> createDbTask(Database dbObj) { + CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc(); + createDbDesc.setName(dbObj.getName()); + createDbDesc.setComment(dbObj.getDescription()); + + /* + explicitly remove the setting of last.repl.id from the db object parameters as loadTask is going + to run multiple times and explicit logic is in place which prevents updates to tables when db level + last repl id is set and we create a AlterDatabaseTask at the end of processing a database. + */ + Map<String, String> parameters = new HashMap<>(dbObj.getParameters()); + parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID.toString()); + createDbDesc.setDatabaseProperties(parameters); + // note that we do not set location - for repl load, we want that auto-created. + createDbDesc.setIfNotExists(false); + // If it exists, we want this to be an error condition. Repl Load is not intended to replace a + // db. + // TODO: we might revisit this in create-drop-recreate cases, needs some thinking on. + DDLWork work = new DDLWork(new HashSet<>(), new HashSet<>(), createDbDesc); + return TaskFactory.get(work, context.hiveConf); + } + + private static Task<? extends Serializable> alterDbTask(Database dbObj, HiveConf hiveConf) { + AlterDatabaseDesc alterDbDesc = + new AlterDatabaseDesc(dbObj.getName(), dbObj.getParameters(), null); + DDLWork work = new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc); + return TaskFactory.get(work, hiveConf); + } + + private boolean existEmptyDb(String dbName) throws InvalidOperationException, HiveException { + Database db = context.hiveDb.getDatabase(dbName); + if (db == null) { + return false; + } + List<String> allTables = context.hiveDb.getAllTables(dbName); + List<String> allFunctions = context.hiveDb.getFunctions(dbName, "*"); + if (allTables.isEmpty() && allFunctions.isEmpty()) { + return true; + } + throw new InvalidOperationException( + "Database " + db.getName() + " is not empty. One or more tables/functions exist."); + } + + public static class AlterDatabase extends LoadDatabase { + + public AlterDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn, + TaskTracker loadTaskTracker) { + super(context, event, dbNameToLoadIn, loadTaskTracker); + } + + @Override + public TaskTracker tasks() throws SemanticException { + tracker.addTask(alterDbTask(readDbMetadata(), context.hiveConf)); + return tracker; + } + } +}