Repository: hive Updated Branches: refs/heads/master 3972bf051 -> 2422e1808
HIVE-18467: support whole warehouse dump / load + create/drop database events (Anishek Agarwal, reviewed by Sankar Hariappan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2422e180 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2422e180 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2422e180 Branch: refs/heads/master Commit: 2422e1808fc58c4e81b9ab99692b99c92e0f02c9 Parents: 3972bf0 Author: Anishek Agarwal <anis...@gmail.com> Authored: Wed Feb 7 09:57:49 2018 +0530 Committer: Anishek Agarwal <anis...@gmail.com> Committed: Wed Feb 7 09:57:49 2018 +0530 ---------------------------------------------------------------------- .../hadoop/hive/ql/parse/TestExportImport.java | 29 ++- ...TestReplicationScenariosAcrossInstances.java | 180 +++++++++++++++++-- .../hadoop/hive/ql/parse/WarehouseInstance.java | 19 +- .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 13 +- .../ql/exec/repl/bootstrap/ReplLoadWork.java | 3 +- .../events/filesystem/FSTableEvent.java | 4 +- .../hive/ql/parse/DDLSemanticAnalyzer.java | 4 +- .../apache/hadoop/hive/ql/parse/EximUtil.java | 3 +- .../hive/ql/parse/ImportSemanticAnalyzer.java | 23 ++- .../hadoop/hive/ql/parse/repl/DumpType.java | 14 ++ .../repl/dump/events/CreateDatabaseHandler.java | 48 +++++ .../repl/dump/events/DropConstraintHandler.java | 2 +- .../repl/dump/events/DropDatabaseHandler.java | 41 +++++ .../repl/dump/events/EventHandlerFactory.java | 2 + .../load/message/CreateDatabaseHandler.java | 84 +++++++++ .../load/message/DropConstraintHandler.java | 11 +- .../repl/load/message/DropDatabaseHandler.java | 49 +++++ .../parse/repl/load/message/TableHandler.java | 6 - .../hadoop/hive/ql/plan/CreateDatabaseDesc.java | 6 - .../hadoop/hive/ql/plan/DropDatabaseDesc.java | 16 +- .../parse/TestReplicationSemanticAnalyzer.java | 10 ++ .../messaging/CreateDatabaseMessage.java | 5 +- .../event/filters/DatabaseAndTableFilter.java | 16 +- .../json/JSONCreateDatabaseMessage.java | 18 +- .../messaging/json/JSONMessageFactory.java | 2 +- 25 files changed, 520 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java index e9d5458..67b74c2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExportImport.java @@ -96,13 +96,30 @@ public class TestExportImport { String exportMDPath = "'" + path + "1/'"; String exportDataPath = "'" + path + "2/'"; srcHiveWarehouse.run("create table " + dbName + ".t1 (i int)") - .run("insert into table " + dbName + ".t1 values (1),(2)") - .run("export table " + dbName + ".t1 to " + exportMDPath + " for metadata replication('1')") - .run("export table " + dbName + ".t1 to " + exportDataPath + " for replication('2')"); + .run("insert into table " + dbName + ".t1 values (1),(2)") + .run("export table " + dbName + ".t1 to " + exportMDPath + " for metadata replication('1')") + .run("export table " + dbName + ".t1 to " + exportDataPath + " for replication('2')"); destHiveWarehouse.run("import table " + replDbName + ".t1 from " + exportMDPath) - .run("import table " + replDbName + ".t1 from " + exportDataPath) - .run("select * from " + replDbName + ".t1") - .verifyResults(new String[] { "1", "2" }); + .run("import table " + replDbName + ".t1 from " + exportDataPath) + .run("select * from " + replDbName + ".t1") + .verifyResults(new String[] { "1", "2" }); + } + + @Test + public void databaseTheTableIsImportedIntoShouldBeParsedFromCommandLine() throws Throwable { + String path = "hdfs:///tmp/" + dbName + "/"; + String exportPath = "'" + path + "1/'"; + + srcHiveWarehouse.run("create table " + dbName + ".t1 (i int)") + .run("insert into table " + dbName + ".t1 values (1),(2)") + .run("export table " + dbName + ".t1 to " + exportPath); + + destHiveWarehouse.run("create database test1") + .run("use default") + .run("import table test1.t1 from " + exportPath) + .run("select * from test1.t1") + .verifyResults(new String[] { "1", "2" }); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 2a48527..6e8d6b6 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.parse; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -26,6 +27,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.util.DependencyResolver; import org.apache.hadoop.hive.shims.Utils; +import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -42,6 +44,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.stream.Collectors; @@ -58,19 +61,21 @@ public class TestReplicationScenariosAcrossInstances { public TestRule replV1BackwardCompat; protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); - private static WarehouseInstance primary, replica; - private static MiniDFSCluster miniDFSCluster; + private String primaryDbName, replicatedDbName; @BeforeClass public static void classLevelSetup() throws Exception { Configuration conf = new Configuration(); conf.set("dfs.client.use.datanode.hostname", "true"); conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); - miniDFSCluster = + MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); - primary = new WarehouseInstance(LOG, miniDFSCluster); - replica = new WarehouseInstance(LOG, miniDFSCluster); + HashMap<String, String> overridesForHiveConf = new HashMap<String, String>() {{ + put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); + }}; + primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); + replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); } @AfterClass @@ -79,8 +84,6 @@ public class TestReplicationScenariosAcrossInstances { replica.close(); } - private String primaryDbName, replicatedDbName; - @Before public void setup() throws Throwable { replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>()); @@ -89,6 +92,12 @@ public class TestReplicationScenariosAcrossInstances { primary.run("create database " + primaryDbName); } + @After + public void tearDown() throws Throwable { + primary.run("drop database if exists " + primaryDbName + " cascade"); + replica.run("drop database if exists " + replicatedDbName + " cascade"); + } + @Test public void testCreateFunctionIncrementalReplication() throws Throwable { WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); @@ -97,7 +106,7 @@ public class TestReplicationScenariosAcrossInstances { .verifyResult(bootStrapDump.lastReplicationId); primary.run("CREATE FUNCTION " + primaryDbName - + ".testFunction as 'hivemall.tools.string.StopwordUDF' " + + ".testFunctionOne as 'hivemall.tools.string.StopwordUDF' " + "using jar 'ivy://io.github.myui:hivemall:0.4.0-2'"); WarehouseInstance.Tuple incrementalDump = @@ -106,41 +115,41 @@ public class TestReplicationScenariosAcrossInstances { .run("REPL STATUS " + replicatedDbName) .verifyResult(incrementalDump.lastReplicationId) .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'") - .verifyResult(replicatedDbName + ".testFunction"); + .verifyResult(replicatedDbName + ".testFunctionOne"); // Test the idempotent behavior of CREATE FUNCTION replica.load(replicatedDbName, incrementalDump.dumpLocation) - .run("REPL STATUS " + replicatedDbName) + .run("REPL STATUS " + replicatedDbName) .verifyResult(incrementalDump.lastReplicationId) - .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'") - .verifyResult(replicatedDbName + ".testFunction"); + .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'") + .verifyResult(replicatedDbName + ".testFunctionOne"); } @Test public void testDropFunctionIncrementalReplication() throws Throwable { primary.run("CREATE FUNCTION " + primaryDbName - + ".testFunction as 'hivemall.tools.string.StopwordUDF' " + + ".testFunctionAnother as 'hivemall.tools.string.StopwordUDF' " + "using jar 'ivy://io.github.myui:hivemall:0.4.0-2'"); WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); replica.load(replicatedDbName, bootStrapDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) .verifyResult(bootStrapDump.lastReplicationId); - primary.run("Drop FUNCTION " + primaryDbName + ".testFunction "); + primary.run("Drop FUNCTION " + primaryDbName + ".testFunctionAnother "); WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId); replica.load(replicatedDbName, incrementalDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) .verifyResult(incrementalDump.lastReplicationId) - .run("SHOW FUNCTIONS LIKE '*testfunction*'") + .run("SHOW FUNCTIONS LIKE '*testfunctionanother*'") .verifyResult(null); // Test the idempotent behavior of DROP FUNCTION replica.load(replicatedDbName, incrementalDump.dumpLocation) - .run("REPL STATUS " + replicatedDbName) + .run("REPL STATUS " + replicatedDbName) .verifyResult(incrementalDump.lastReplicationId) - .run("SHOW FUNCTIONS LIKE '*testfunction*'") + .run("SHOW FUNCTIONS LIKE '*testfunctionanother*'") .verifyResult(null); } @@ -254,7 +263,7 @@ public class TestReplicationScenariosAcrossInstances { } @Test - public void parallelExecutionOfReplicationBootStrapLoad() throws Throwable { + public void testParallelExecutionOfReplicationBootStrapLoad() throws Throwable { WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) .run("create table t1 (id int)") @@ -280,6 +289,7 @@ public class TestReplicationScenariosAcrossInstances { .run("select country from t2") .verifyResults(Arrays.asList("india", "australia", "russia", "uk", "us", "france", "japan", "china")); + replica.hiveConf.setBoolVar(HiveConf.ConfVars.EXECPARALLEL, false); } @Test @@ -376,4 +386,138 @@ public class TestReplicationScenariosAcrossInstances { "custom.value\t " }); } + + @Test + public void testBootStrapDumpOfWarehouse() throws Throwable { + String randomOne = RandomStringUtils.random(10, true, false); + String randomTwo = RandomStringUtils.random(10, true, false); + String dbOne = primaryDbName + randomOne; + String dbTwo = primaryDbName + randomTwo; + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (i int, j int)") + .run("create database " + dbOne) + .run("use " + dbOne) + .run("create table t1 (i int, j int) partitioned by (load_date date) " + + "clustered by(i) into 2 buckets stored as orc tblproperties ('transactional'='true') ") + .run("create database " + dbTwo) + .run("use " + dbTwo) + .run("create table t1 (i int, j int)") + .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'", + "'hive.repl.dump.include.acid.tables'='true'")); + + /* + Due to the limitation that we can only have one instance of Persistence Manager Factory in a JVM + we are not able to create multiple embedded derby instances for two different MetaStore instances. + */ + + primary.run("drop database " + primaryDbName + " cascade"); + primary.run("drop database " + dbOne + " cascade"); + primary.run("drop database " + dbTwo + " cascade"); + + /* + End of additional steps + */ + + replica.run("show databases") + .verifyFailure(new String[] { primaryDbName, dbOne, dbTwo }) + .load("", tuple.dumpLocation) + .run("show databases") + .verifyResults(new String[] { "default", primaryDbName, dbOne, dbTwo }) + .run("use " + primaryDbName) + .run("show tables") + .verifyResults(new String[] { "t1" }) + .run("use " + dbOne) + .run("show tables") + .verifyResults(new String[] { "t1" }) + .run("use " + dbTwo) + .run("show tables") + .verifyResults(new String[] { "t1" }); + /* + Start of cleanup + */ + + replica.run("drop database " + primaryDbName + " cascade"); + replica.run("drop database " + dbOne + " cascade"); + replica.run("drop database " + dbTwo + " cascade"); + + /* + End of cleanup + */ + } + + @Test + public void testIncrementalDumpOfWarehouse() throws Throwable { + String randomOne = RandomStringUtils.random(10, true, false); + String randomTwo = RandomStringUtils.random(10, true, false); + String dbOne = primaryDbName + randomOne; + WarehouseInstance.Tuple bootstrapTuple = primary + .run("use " + primaryDbName) + .run("create table t1 (i int, j int)") + .run("create database " + dbOne) + .run("use " + dbOne) + .run("create table t1 (i int, j int) partitioned by (load_date date) " + + "clustered by(i) into 2 buckets stored as orc tblproperties ('transactional'='true') ") + .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'", + "'hive.repl.dump.include.acid.tables'='true'")); + + String dbTwo = primaryDbName + randomTwo; + WarehouseInstance.Tuple incrementalTuple = primary + .run("create database " + dbTwo) + .run("use " + dbTwo) + .run("create table t1 (i int, j int)") + .run("use " + dbOne) + .run("create table t2 (a int, b int)") + .dump("`*`", bootstrapTuple.lastReplicationId, + Arrays.asList("'hive.repl.dump.metadata.only'='true'", + "'hive.repl.dump.include.acid.tables'='true'")); + + /* + Due to the limitation that we can only have one instance of Persistence Manager Factory in a JVM + we are not able to create multiple embedded derby instances for two different MetaStore instances. + */ + + primary.run("drop database " + primaryDbName + " cascade"); + primary.run("drop database " + dbOne + " cascade"); + primary.run("drop database " + dbTwo + " cascade"); + + /* + End of additional steps + */ + + replica.run("show databases") + .verifyFailure(new String[] { primaryDbName, dbOne, dbTwo }) + .load("", bootstrapTuple.dumpLocation) + .run("show databases") + .verifyResults(new String[] { "default", primaryDbName, dbOne }) + .run("use " + primaryDbName) + .run("show tables") + .verifyResults(new String[] { "t1" }) + .run("use " + dbOne) + .run("show tables") + .verifyResults(new String[] { "t1" }); + + replica.load("", incrementalTuple.dumpLocation) + .run("show databases") + .verifyResults(new String[] { "default", primaryDbName, dbOne, dbTwo }) + .run("use " + dbTwo) + .run("show tables") + .verifyResults(new String[] { "t1" }) + .run("use " + dbOne) + .run("show tables") + .verifyResults(new String[] { "t1", "t2" }); + + /* + Start of cleanup + */ + + replica.run("drop database " + primaryDbName + " cascade"); + replica.run("drop database " + dbOne + " cascade"); + replica.run("drop database " + dbTwo + " cascade"); + + /* + End of cleanup + */ + + } } http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 0918d33..dd6fa42 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -48,7 +48,6 @@ import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -87,25 +86,13 @@ class WarehouseInstance implements Closeable { initialize(cmRootPath.toString(), warehouseRoot.toString(), overridesForHiveConf); } - WarehouseInstance(Logger logger, MiniDFSCluster cluster, String keyNameForEncryptedZone) - throws Exception { - this(logger, cluster, new HashMap<String, String>() {{ - put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true"); - }}, keyNameForEncryptedZone); - } - WarehouseInstance(Logger logger, MiniDFSCluster cluster, Map<String, String> overridesForHiveConf) throws Exception { this(logger, cluster, overridesForHiveConf, null); } - WarehouseInstance(Logger logger, MiniDFSCluster cluster) throws Exception { - this(logger, cluster, (String) null); - } - private void initialize(String cmRoot, String warehouseRoot, - Map<String, String> overridesForHiveConf) - throws Exception { + Map<String, String> overridesForHiveConf) throws Exception { hiveConf = new HiveConf(miniDFSCluster.getConfiguration(0), TestReplicationScenarios.class); for (Map.Entry<String, String> entry : overridesForHiveConf.entrySet()) { hiveConf.set(entry.getKey(), entry.getValue()); @@ -129,7 +116,6 @@ class WarehouseInstance implements Closeable { hiveConf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); hiveConf.setVar(HiveConf.ConfVars.REPLCMDIR, cmRoot); hiveConf.setVar(HiveConf.ConfVars.REPL_FUNCTIONS_ROOT_DIR, functionsRoot); - System.setProperty("datanucleus.mapping.Schema", "APP"); hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:derby:memory:${test.tmp.dir}/APP;create=true"); hiveConf.setVar(HiveConf.ConfVars.REPLDIR, @@ -246,7 +232,8 @@ class WarehouseInstance implements Closeable { List<String> lowerCaseData = Arrays.stream(data).map(String::toLowerCase).collect(Collectors.toList()); assertEquals(data.length, filteredResults.size()); - assertTrue(filteredResults.containsAll(lowerCaseData)); + assertTrue(StringUtils.join(filteredResults, ",") + " does not contain all expected" + StringUtils + .join(lowerCaseData, ","), filteredResults.containsAll(lowerCaseData)); return this; } http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index d3aa571..20c2c32 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4829,13 +4829,20 @@ public class DDLTask extends Task<DDLWork> implements Serializable { throws HiveException { try { String dbName = dropDb.getDatabaseName(); + ReplicationSpec replicationSpec = dropDb.getReplicationSpec(); + if (replicationSpec.isInReplicationScope()) { + Database database = db.getDatabase(dbName); + if (database == null + || !replicationSpec.allowEventReplacementInto(database.getParameters())) { + return 0; + } + } db.dropDatabase(dbName, true, dropDb.getIfExists(), dropDb.isCasdade()); // Unregister the functions as well if (dropDb.isCasdade()) { FunctionRegistry.unregisterPermanentFunctions(dbName); } - } - catch (NoSuchObjectException ex) { + } catch (NoSuchObjectException ex) { throw new HiveException(ex, ErrorMsg.DATABASE_NOT_EXISTS, dropDb.getDatabaseName()); } return 0; @@ -5175,7 +5182,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { return 0; } - private int truncateTable(Hive db, TruncateTableDesc truncateTableDesc) throws HiveException { + private int truncateTable(Hive db, TruncateTableDesc truncateTableDesc) throws HiveException { if (truncateTableDesc.getColumnIndexes() != null) { ColumnTruncateWork truncateWork = new ColumnTruncateWork( http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java index 432e394..91ec93e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java @@ -79,8 +79,7 @@ public class ReplLoadWork implements Serializable { } DatabaseEvent databaseEvent(HiveConf hiveConf) { - DatabaseEvent databaseEvent = state.toEvent(hiveConf); - return databaseEvent; + return state.toEvent(hiveConf); } boolean hasDbState() { http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java index b9f2d0a..cfd1640 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -65,7 +66,8 @@ public class FSTableEvent implements TableEvent { public ImportTableDesc tableDesc(String dbName) throws SemanticException { try { Table table = new Table(metadata.getTable()); - ImportTableDesc tableDesc = new ImportTableDesc(dbName, table); + ImportTableDesc tableDesc = + new ImportTableDesc(StringUtils.isBlank(dbName) ? table.getDbName() : dbName, table); tableDesc.setReplicationSpec(metadata.getReplicationSpec()); return tableDesc; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index 3eb869d..b766791 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -65,7 +65,6 @@ import org.apache.hadoop.hive.metastore.api.WMMapping; import org.apache.hadoop.hive.metastore.api.WMNullablePool; import org.apache.hadoop.hive.metastore.api.WMNullableResourcePlan; import org.apache.hadoop.hive.metastore.api.WMPool; -import org.apache.hadoop.hive.metastore.api.WMResourcePlan; import org.apache.hadoop.hive.metastore.api.WMResourcePlanStatus; import org.apache.hadoop.hive.metastore.api.WMTrigger; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; @@ -1398,7 +1397,8 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { inputs.add(new ReadEntity(database)); outputs.add(new WriteEntity(database, WriteEntity.WriteType.DDL_EXCLUSIVE)); - DropDatabaseDesc dropDatabaseDesc = new DropDatabaseDesc(dbName, ifExists, ifCascade); + DropDatabaseDesc dropDatabaseDesc = new DropDatabaseDesc(dbName, ifExists, ifCascade, + new ReplicationSpec()); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), dropDatabaseDesc), conf)); } http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 87821fd..89837be 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -247,9 +247,8 @@ public class EximUtil { // Remove all the entries from the parameters which are added for bootstrap dump progress Map<String, String> parameters = dbObj.getParameters(); - Map<String, String> tmpParameters = new HashMap<>(); if (parameters != null) { - tmpParameters.putAll(parameters); + Map<String, String> tmpParameters = new HashMap<>(parameters); tmpParameters.entrySet() .removeIf(e -> e.getKey().startsWith(Utils.BOOTSTRAP_DUMP_STATE_KEY_PREFIX)); dbObj.setParameters(tmpParameters); http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 6c73dc5..70f5e21 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.mapred.OutputFormat; +import org.datanucleus.util.StringUtils; import org.slf4j.Logger; import java.io.IOException; @@ -143,6 +144,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { } } + if (StringUtils.isEmpty(parsedDbName)) { + parsedDbName = SessionState.get().getCurrentDatabase(); + } // parsing statement is now done, on to logic. tableExists = prepareImport(true, isLocationSet, isExternalSet, isPartSpecSet, waitOnPrecursor, @@ -180,9 +184,18 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { } } + /** + * The same code is used from both the "repl load" as well as "import". + * Given that "repl load" now supports two modes "repl load dbName [location]" and + * "repl load [location]" in which case the database name has to be taken from the table metadata + * by default and then over-ridden if something specified on the command line. + * + * hence for import to work correctly we have to pass in the sessionState default Db via the + * parsedDbName parameter + */ public static boolean prepareImport(boolean isImportCmd, boolean isLocationSet, boolean isExternalSet, boolean isPartSpecSet, boolean waitOnPrecursor, - String parsedLocation, String parsedTableName, String parsedDbName, + String parsedLocation, String parsedTableName, String overrideDBName, LinkedHashMap<String, String> parsedPartSpec, String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x, UpdatedMetaDataTracker updatedMetadata @@ -195,7 +208,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { FileSystem fs = FileSystem.get(fromURI, x.getConf()); x.getInputs().add(toReadEntity(fromPath, x.getConf())); - MetaData rv = new MetaData(); + MetaData rv; try { rv = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME)); } catch (IOException e) { @@ -219,10 +232,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { replicationSpec.setReplSpecType(ReplicationSpec.Type.IMPORT); } - String dbname = SessionState.get().getCurrentDatabase(); - if ((parsedDbName !=null) && (!parsedDbName.isEmpty())){ + String dbname = rv.getTable().getDbName(); + if ((overrideDBName !=null) && (!overrideDBName.isEmpty())){ // If the parsed statement contained a db.tablename specification, prefer that. - dbname = parsedDbName; + dbname = overrideDBName; } // Create table associated with the import http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java index c1c1fd3..c69ecc9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java @@ -22,9 +22,11 @@ import org.apache.hadoop.hive.ql.parse.repl.load.message.AddForeignKeyHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.AddPrimaryKeyHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.AddUniqueConstraintHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.AlterDatabaseHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateDatabaseHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.DefaultHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.DropConstraintHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.DropDatabaseHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.DropFunctionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.DropPartitionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.DropTableHandler; @@ -169,6 +171,18 @@ public enum DumpType { public MessageHandler handler() { return new DefaultHandler(); } + }, + EVENT_CREATE_DATABASE("EVENT_CREATE_DATABASE") { + @Override + public MessageHandler handler() { + return new CreateDatabaseHandler(); + } + }, + EVENT_DROP_DATABASE("EVENT_DROP_DATABASE") { + @Override + public MessageHandler handler() { + return new DropDatabaseHandler(); + } }; String type = null; http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateDatabaseHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateDatabaseHandler.java new file mode 100644 index 0000000..21eb74b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateDatabaseHandler.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; + +class CreateDatabaseHandler extends AbstractEventHandler { + CreateDatabaseHandler(NotificationEvent event) { + super(event); + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Processing#{} CREATE_DATABASE message : {}", fromEventId(), event.getMessage()); + CreateDatabaseMessage createDatabaseMsg = + deserializer.getCreateDatabaseMessage(event.getMessage()); + Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); + FileSystem fileSystem = metaDataPath.getFileSystem(withinContext.hiveConf); + EximUtil.createDbExportDump(fileSystem, metaDataPath, createDatabaseMsg.getDatabaseObject(), + withinContext.replicationSpec); + withinContext.createDmd(this).write(); + } + + @Override + public DumpType dumpType() { + return DumpType.EVENT_CREATE_DATABASE; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java index 6b709a6..979e9a1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java @@ -21,7 +21,7 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -public class DropConstraintHandler extends AbstractEventHandler { +class DropConstraintHandler extends AbstractEventHandler { DropConstraintHandler(NotificationEvent event) { super(event); } http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropDatabaseHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropDatabaseHandler.java new file mode 100644 index 0000000..4eae778 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropDatabaseHandler.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; + +class DropDatabaseHandler extends AbstractEventHandler { + DropDatabaseHandler(NotificationEvent event) { + super(event); + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Processing#{} DROP_DATABASE message : {}", fromEventId(), event.getMessage()); + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); + } + + @Override + public DumpType dumpType() { + return DumpType.EVENT_DROP_DATABASE; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java index dc19741..9955246 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java @@ -48,6 +48,8 @@ public class EventHandlerFactory { register(MessageFactory.ADD_UNIQUECONSTRAINT_EVENT, AddUniqueConstraintHandler.class); register(MessageFactory.ADD_NOTNULLCONSTRAINT_EVENT, AddNotNullConstraintHandler.class); register(MessageFactory.DROP_CONSTRAINT_EVENT, DropConstraintHandler.class); + register(MessageFactory.CREATE_DATABASE_EVENT, CreateDatabaseHandler.class); + register(MessageFactory.DROP_DATABASE_EVENT, DropDatabaseHandler.class); } static void register(String event, Class<? extends EventHandler> handlerClazz) { http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateDatabaseHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateDatabaseHandler.java new file mode 100644 index 0000000..f8d8d1a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateDatabaseHandler.java @@ -0,0 +1,84 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; +import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; +import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.PrincipalDesc; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; + +public class CreateDatabaseHandler extends AbstractMessageHandler { + + @Override + public List<Task<? extends Serializable>> handle(Context context) + throws SemanticException { + MetaData metaData; + try { + FileSystem fs = FileSystem.get(new Path(context.location).toUri(), context.hiveConf); + + metaData = EximUtil.readMetaData(fs, new Path(context.location, EximUtil.METADATA_NAME)); + } catch (IOException e) { + throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); + } + Database db = metaData.getDatabase(); + String destinationDBName = + context.dbName == null ? db.getName() : context.dbName; + + CreateDatabaseDesc createDatabaseDesc = + new CreateDatabaseDesc(destinationDBName, db.getDescription(), null, true); + createDatabaseDesc.setDatabaseProperties(db.getParameters()); + Task<DDLWork> createDBTask = TaskFactory.get( + new DDLWork(new HashSet<>(), new HashSet<>(), createDatabaseDesc), context.hiveConf); + if (!db.getParameters().isEmpty()) { + AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(destinationDBName, db.getParameters(), + context.eventOnlyReplicationSpec()); + Task<DDLWork> alterDbProperties = TaskFactory + .get(new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc), context.hiveConf); + createDBTask.addDependentTask(alterDbProperties); + } + if (StringUtils.isNotEmpty(db.getOwnerName())) { + AlterDatabaseDesc alterDbOwner = new AlterDatabaseDesc(destinationDBName, + new PrincipalDesc(db.getOwnerName(), db.getOwnerType()), + context.eventOnlyReplicationSpec()); + Task<DDLWork> alterDbTask = TaskFactory + .get(new DDLWork(new HashSet<>(), new HashSet<>(), alterDbOwner), context.hiveConf); + createDBTask.addDependentTask(alterDbTask); + } + updatedMetadata + .set(context.dmd.getEventTo().toString(), destinationDBName, null, null); + return Collections.singletonList(createDBTask); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java index 459fac5..d9d185b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java @@ -17,11 +17,6 @@ */ package org.apache.hadoop.hive.ql.parse.repl.load.message; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - import org.apache.hadoop.hive.metastore.messaging.DropConstraintMessage; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -29,6 +24,10 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + public class DropConstraintHandler extends AbstractMessageHandler { @Override public List<Task<? extends Serializable>> handle(Context context) @@ -41,8 +40,6 @@ public class DropConstraintHandler extends AbstractMessageHandler { AlterTableDesc dropConstraintsDesc = new AlterTableDesc(actualDbName + "." + actualTblName, constraintName, context.eventOnlyReplicationSpec()); Task<DDLWork> dropConstraintsTask = TaskFactory.get(new DDLWork(readEntitySet, writeEntitySet, dropConstraintsDesc), context.hiveConf); - List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>(); - tasks.add(dropConstraintsTask); context.log.debug("Added drop constrain task : {}:{}", dropConstraintsTask.getId(), actualTblName); updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); return Collections.singletonList(dropConstraintsTask); http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropDatabaseHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropDatabaseHandler.java new file mode 100644 index 0000000..8b11a9e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropDatabaseHandler.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DropDatabaseDesc; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +public class DropDatabaseHandler extends AbstractMessageHandler { + @Override + public List<Task<? extends Serializable>> handle(Context context) + throws SemanticException { + DropDatabaseMessage msg = + deserializer.getDropDatabaseMessage(context.dmd.getPayload()); + String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; + DropDatabaseDesc desc = + new DropDatabaseDesc(actualDbName, true, context.eventOnlyReplicationSpec()); + Task<? extends Serializable> dropDBTask = + TaskFactory + .get(new DDLWork(new HashSet<>(), new HashSet<>(), desc), context.hiveConf); + context.log.info( + "Added drop database task : {}:{}", dropDBTask.getId(), desc.getDatabaseName()); + updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, null, null); + return Collections.singletonList(dropDBTask); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java index 4ba6256..4cd75d8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java @@ -29,12 +29,6 @@ import java.util.List; public class TableHandler extends AbstractMessageHandler { @Override public List<Task<? extends Serializable>> handle(Context context) throws SemanticException { - // Path being passed to us is a table dump location. We go ahead and load it in as needed. - // If tblName is null, then we default to the table name specified in _metadata, which is good. - // or are both specified, in which case, that's what we are intended to create the new table as. - if (context.isDbNameEmpty()) { - throw new SemanticException("Database name cannot be null for a table load"); - } try { List<Task<? extends Serializable>> importTasks = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateDatabaseDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateDatabaseDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateDatabaseDesc.java index 601015c..f2e6a77 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateDatabaseDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateDatabaseDesc.java @@ -54,12 +54,6 @@ public class CreateDatabaseDesc extends DDLDesc implements Serializable { this.dbProperties = null; } - public CreateDatabaseDesc(String databaseName, boolean ifNotExists) { - this(databaseName, null, null, ifNotExists); - } - - - @Explain(displayName="if not exists", displayOnlyOnTrue = true) public boolean getIfNotExists() { return ifNotExists; http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/ql/src/java/org/apache/hadoop/hive/ql/plan/DropDatabaseDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DropDatabaseDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DropDatabaseDesc.java index 094cf3e..deaa7cd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DropDatabaseDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DropDatabaseDesc.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; + +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -33,16 +35,20 @@ public class DropDatabaseDesc extends DDLDesc implements Serializable { String databaseName; boolean ifExists; boolean cascade; + ReplicationSpec replicationSpec; - public DropDatabaseDesc(String databaseName, boolean ifExists) { - this(databaseName, ifExists, false); + public DropDatabaseDesc(String databaseName, boolean ifExists, + ReplicationSpec replicationSpec) { + this(databaseName, ifExists, false, replicationSpec); } - public DropDatabaseDesc(String databaseName, boolean ifExists, boolean cascade) { + public DropDatabaseDesc(String databaseName, boolean ifExists, boolean cascade, + ReplicationSpec replicationSpec) { super(); this.databaseName = databaseName; this.ifExists = ifExists; this.cascade = cascade; + this.replicationSpec = replicationSpec; } @Explain(displayName = "database", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -70,4 +76,8 @@ public class DropDatabaseDesc extends DDLDesc implements Serializable { public void setIsCascade(boolean cascade) { this.cascade = cascade; } + + public ReplicationSpec getReplicationSpec() { + return replicationSpec; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java index 8de4844..96e3fca 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java @@ -91,6 +91,16 @@ public class TestReplicationSemanticAnalyzer { public static class ReplDump { @Test + public void parseDbPattern() throws ParseException { + ASTNode root = parse("repl dump `*`"); + assertEquals("TOK_REPL_DUMP", root.getText()); + assertEquals(1, root.getChildCount()); + ASTNode child = (ASTNode) root.getChild(0); + assertEquals("`*`", child.getText()); + assertEquals(0, child.getChildCount()); + } + + @Test public void parseDb() throws ParseException { ASTNode root = parse("repl dump testDb"); assertDatabase(1, root); http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java index 328c118..3d64c73 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,10 +19,13 @@ package org.apache.hadoop.hive.metastore.messaging; +import org.apache.hadoop.hive.metastore.api.Database; + public abstract class CreateDatabaseMessage extends EventMessage { protected CreateDatabaseMessage() { super(EventType.CREATE_DATABASE); } + public abstract Database getDatabaseObject() throws Exception; } http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java index 0852abd..50420c8 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.metastore.messaging.event.filters; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import java.util.regex.Pattern; + /** * Utility function that constructs a notification filter to match a given db name and/or table name. * If dbName == null, fetches all warehouse events. @@ -26,19 +28,23 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent; * If dbName != null && tableName != null, fetches all events for the specified table */ public class DatabaseAndTableFilter extends BasicFilter { - private final String databaseName, tableName; + private final String tableName; + private final Pattern dbPattern; - public DatabaseAndTableFilter(final String databaseName, final String tableName) { - this.databaseName = databaseName; + public DatabaseAndTableFilter(final String databaseNameOrPattern, final String tableName) { + // we convert the databaseNameOrPattern to lower case because events will have these names in lower case. + this.dbPattern = (databaseNameOrPattern == null || databaseNameOrPattern.equals("*")) + ? null + : Pattern.compile(databaseNameOrPattern, Pattern.CASE_INSENSITIVE); this.tableName = tableName; } @Override boolean shouldAccept(final NotificationEvent event) { - if (databaseName == null) { + if (dbPattern == null) { return true; // if our dbName is null, we're interested in all wh events } - if (databaseName.equalsIgnoreCase(event.getDbName())) { + if (dbPattern.matcher(event.getDbName()).matches()) { if ((tableName == null) // if our dbName is equal, but tableName is blank, we're interested in this db-level event || (tableName.equalsIgnoreCase(event.getTableName())) http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java index f442e99..371bc8c 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hive.metastore.messaging.json; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage; +import org.apache.thrift.TException; import org.codehaus.jackson.annotate.JsonProperty; /** @@ -28,7 +30,7 @@ import org.codehaus.jackson.annotate.JsonProperty; public class JSONCreateDatabaseMessage extends CreateDatabaseMessage { @JsonProperty - String server, servicePrincipal, db; + String server, servicePrincipal, db, dbJson; @JsonProperty Long timestamp; @@ -38,14 +40,24 @@ public class JSONCreateDatabaseMessage extends CreateDatabaseMessage { */ public JSONCreateDatabaseMessage() {} - public JSONCreateDatabaseMessage(String server, String servicePrincipal, String db, Long timestamp) { + public JSONCreateDatabaseMessage(String server, String servicePrincipal, Database db, + Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; - this.db = db; + this.db = db.getName(); this.timestamp = timestamp; + try { + this.dbJson = JSONMessageFactory.createDatabaseObjJson(db); + } catch (TException ex) { + throw new IllegalArgumentException("Could not serialize Function object", ex); + } checkValid(); } + public Database getDatabaseObject() throws Exception { + return (Database) JSONMessageFactory.getTObj(dbJson, Database.class); + } + @Override public String getDB() { return db; } http://git-wip-us.apache.org/repos/asf/hive/blob/2422e180/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java index a9fe196..7f46d07 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java @@ -99,7 +99,7 @@ public class JSONMessageFactory extends MessageFactory { @Override public CreateDatabaseMessage buildCreateDatabaseMessage(Database db) { - return new JSONCreateDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db.getName(), now()); + return new JSONCreateDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, now()); } @Override