This is an automated email from the ASF dual-hosted git repository. mahesh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
commit 7facd2c6afd35311b7dbaa37da6fa00bd0a2e0be Author: Mahesh Kumar Behera <mbeh...@hortonworks.com> AuthorDate: Wed Feb 20 14:31:02 2019 +0530 HIVE-21260 : Hive replication to a target with hive.strict.managed.tables enabled is failing when used HMS on postgres. (Mahesh Kumar Behera, reviewed by Sankar Hariappan) --- .../hadoop/hive/ql/parse/WarehouseInstance.java | 26 +++++++++++++++++++ .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 1 + .../org/apache/hadoop/hive/ql/metadata/Hive.java | 2 ++ .../hadoop/hive/ql/parse/ReplicationSpec.java | 7 +++++ .../hadoop/hive/metastore/txn/TxnHandler.java | 30 +++++++++++----------- 5 files changed, 51 insertions(+), 15 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index bd3a557..c0d416c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -155,6 +155,32 @@ public class WarehouseInstance implements Closeable { MetaStoreTestUtils.startMetaStoreWithRetry(hiveConf, true); + // Add the below mentioned dependency in metastore/pom.xml file. For postgres need to copy postgresql-42.2.1.jar to + // .m2//repository/postgresql/postgresql/9.3-1102.jdbc41/postgresql-9.3-1102.jdbc41.jar. + /* + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <version>8.0.15</version> + </dependency> + + <dependency> + <groupId>postgresql</groupId> + <artifactId>postgresql</artifactId> + <version>9.3-1102.jdbc41</version> + </dependency> + */ + + /*hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:mysql://localhost:3306/APP"); + hiveConf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "com.mysql.jdbc.Driver"); + hiveConf.setVar(HiveConf.ConfVars.METASTOREPWD, "hivepassword"); + hiveConf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, "hiveuser");*/ + + /*hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY,"jdbc:postgresql://localhost/app"); + hiveConf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "org.postgresql.Driver"); + hiveConf.setVar(HiveConf.ConfVars.METASTOREPWD, "password"); + hiveConf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, "postgres");*/ + driver = DriverFactory.newDriver(hiveConf); SessionState.start(new CliSessionState(hiveConf)); client = new HiveMetaStoreClient(hiveConf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index cb7fdf7..b02cdf8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4712,6 +4712,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { if (existingTable != null){ if (crtTbl.getReplicationSpec().allowEventReplacementInto(existingTable.getParameters())){ crtTbl.setReplaceMode(true); // we replace existing table. + ReplicationSpec.copyLastReplId(existingTable.getParameters(), tbl.getParameters()); } else { LOG.debug("DDLTask: Create Table is skipped as table {} is newer than update", crtTbl.getTableName()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 5cfd0a8..7343eed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -132,6 +132,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.session.CreateTableAutomaticGrant; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.Deserializer; @@ -2972,6 +2973,7 @@ private void constructOneLBLocationMap(FileStatus fSta, org.apache.hadoop.hive.metastore.api.Partition ptn = getMSC().getPartition(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(), p.getValues()); if (addPartitionDesc.getReplicationSpec().allowReplacementInto(ptn.getParameters())){ + ReplicationSpec.copyLastReplId(ptn.getParameters(), p.getParameters()); partsToAlter.add(p); } // else ptn already exists, but we do nothing with it. } catch (NoSuchObjectException nsoe){ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index b087831..d55ee20 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -419,4 +419,11 @@ public class ReplicationSpec { public void setMigratingToExternalTable() { isMigratingToExternalTable = true; } + + public static void copyLastReplId(Map<String, String> srcParameter, Map<String, String> destParameter) { + String lastReplId = srcParameter.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()); + if (lastReplId != null) { + destParameter.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), lastReplId); + } + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 6df7680..fd85af9 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -897,13 +897,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { if (needUpdateDBReplId) { // not used select for update as it will be updated by single thread only from repl load - rs = stmt.executeQuery("select PARAM_VALUE from DATABASE_PARAMS where PARAM_KEY = " + - "'repl.last.id' and DB_ID = " + dbId); + rs = stmt.executeQuery("select \"PARAM_VALUE\" from \"DATABASE_PARAMS\" where \"PARAM_KEY\" = " + + "'repl.last.id' and \"DB_ID\" = " + dbId); if (!rs.next()) { - query = "insert into DATABASE_PARAMS values ( " + dbId + " , 'repl.last.id' , ? )"; + query = "insert into \"DATABASE_PARAMS\" values ( " + dbId + " , 'repl.last.id' , ? )"; } else { - query = "update DATABASE_PARAMS set PARAM_VALUE = ? where DB_ID = " + dbId + - " and PARAM_KEY = 'repl.last.id'"; + query = "update \"DATABASE_PARAMS\" set \"PARAM_VALUE\" = ? where \"DB_ID\" = " + dbId + + " and \"PARAM_KEY\" = 'repl.last.id'"; } close(rs); params = Arrays.asList(lastReplId); @@ -935,13 +935,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { pst.close(); // select for update is not required as only one task will update this during repl load. - rs = stmt.executeQuery("select PARAM_VALUE from TABLE_PARAMS where PARAM_KEY = " + - "'repl.last.id' and TBL_ID = " + tblId); + rs = stmt.executeQuery("select \"PARAM_VALUE\" from \"TABLE_PARAMS\" where \"PARAM_KEY\" = " + + "'repl.last.id' and \"TBL_ID\" = " + tblId); if (!rs.next()) { - query = "insert into TABLE_PARAMS values ( " + tblId + " , 'repl.last.id' , ? )"; + query = "insert into \"TABLE_PARAMS\" values ( " + tblId + " , 'repl.last.id' , ? )"; } else { - query = "update TABLE_PARAMS set PARAM_VALUE = ? where TBL_ID = " + tblId + - " and PARAM_KEY = 'repl.last.id'"; + query = "update \"TABLE_PARAMS\" set \"PARAM_VALUE\" = ? where \"TBL_ID\" = " + tblId + + " and \"PARAM_KEY\" = 'repl.last.id'"; } rs.close(); @@ -988,13 +988,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { prs = pst.executeQuery(); while (prs.next()) { long partId = prs.getLong(1); - rs = stmt.executeQuery("select PARAM_VALUE from PARTITION_PARAMS where PARAM_KEY " + - " = 'repl.last.id' and PART_ID = " + partId); + rs = stmt.executeQuery("select \"PARAM_VALUE\" from \"PARTITION_PARAMS\" where \"PARAM_KEY\" " + + " = 'repl.last.id' and \"PART_ID\" = " + partId); if (!rs.next()) { - query = "insert into PARTITION_PARAMS values ( " + partId + " , 'repl.last.id' , ? )"; + query = "insert into \"PARTITION_PARAMS\" values ( " + partId + " , 'repl.last.id' , ? )"; } else { - query = "update PARTITION_PARAMS set PARAM_VALUE = ? " + - " where PART_ID = " + partId + " and PARAM_KEY = 'repl.last.id'"; + query = "update \"PARTITION_PARAMS\" set \"PARAM_VALUE\" = ? " + + " where \"PART_ID\" = " + partId + " and \"PARAM_KEY\" = 'repl.last.id'"; } rs.close();