This is an automated email from the ASF dual-hosted git repository. mahesh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new c7340c6 HIVE-21956 : Add the list of table selected by dump in the dump folder. (Mahesh Kumar Behera reviewed by Sankar Hariappan) c7340c6 is described below commit c7340c6f6e765ef6e499f7a3c399beab843cb6b0 Author: Mahesh Kumar Behera <mah...@apache.org> AuthorDate: Tue Jul 16 12:04:08 2019 +0530 HIVE-21956 : Add the list of table selected by dump in the dump folder. (Mahesh Kumar Behera reviewed by Sankar Hariappan) --- .../parse/TestTableLevelReplicationScenarios.java | 107 ++++++++++++++++++++- .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 102 +++++++++++++++++--- .../events/filesystem/BootstrapEventsIterator.java | 2 +- .../events/filesystem/DatabaseEventsIterator.java | 6 +- .../hadoop/hive/ql/exec/repl/util/ReplUtils.java | 18 +++- 5 files changed, 218 insertions(+), 17 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java index 09db38d..270e61a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.parse; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; @@ -36,6 +37,11 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.HashSet; +import java.util.Set; +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME; @@ -153,6 +159,9 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios verifyBootstrapDirInIncrementalDump(tuple.dumpLocation, bootstrappedTables); } + // If the policy contains '.'' means its table level replication. + verifyTableListForPolicy(tuple.dumpLocation, replPolicy.contains(".'") ? expectedTables : null); + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) .run("use " + replicatedDbName) .run("show tables") @@ -194,6 +203,36 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios } } + private void verifyTableListForPolicy(String dumpLocation, String[] tableList) throws Throwable { + FileSystem fileSystem = primary.miniDFSCluster.getFileSystem(); + Path tableListFile = new Path(dumpLocation, ReplUtils.REPL_TABLE_LIST_DIR_NAME); + tableListFile = new Path(tableListFile, primaryDbName.toLowerCase()); + + if (tableList == null) { + Assert.assertFalse(fileSystem.exists(tableListFile)); + return; + } else { + Assert.assertTrue(fileSystem.exists(tableListFile)); + } + + BufferedReader reader = null; + try { + InputStream inputStream = fileSystem.open(tableListFile); + reader = new BufferedReader(new InputStreamReader(inputStream)); + Set tableNames = new HashSet<>(Arrays.asList(tableList)); + int numTable = 0; + for (String line = reader.readLine(); line != null; line = reader.readLine()) { + numTable++; + Assert.assertTrue(tableNames.contains(line)); + } + Assert.assertEquals(numTable, tableList.length); + } finally { + if (reader != null) { + reader.close(); + } + } + } + @Test public void testBasicBootstrapWithIncludeList() throws Throwable { String[] originalNonAcidTables = new String[] {"t1", "t2"}; @@ -660,7 +699,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios .run("alter table out100 rename to in100") // this will add the bootstrap .run("drop table in100"); // table in100 is dropped, so no bootstrap should happen. - replicatedTables = new String[] {"in200", "in12", "in12", "in14"}; + replicatedTables = new String[] {"in200", "in12", "in11", "in14"}; bootstrapTables = new String[] {"in14", "in200"}; replicateAndVerify(replPolicy, null, lastReplId, null, null, bootstrapTables, replicatedTables); @@ -907,4 +946,70 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios replicateAndVerify(newPolicy, replPolicy, lastReplId, null, null, bootstrapTables, replicatedTables, new String[] {"1", "2"}); } + + @Test + public void testRenameTableScenariosUpgrade() throws Throwable { + // Policy with no table level filter, no ACID and external table. + String replPolicy = primaryDbName; + List<String> loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); + List<String> dumpWithClause = Arrays.asList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'", + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='false'" + ); + + String[] originalNonAcidTables = new String[] {"in1", "out4"}; + String[] originalExternalTables = new String[] {"in2", "out5"}; + String[] originalAcidTables = new String[] {"in3", "out6"}; + createTables(originalNonAcidTables, CreateTableType.NON_ACID); + createTables(originalExternalTables, CreateTableType.EXTERNAL); + createTables(originalAcidTables, CreateTableType.FULL_ACID); + + // Only NON_ACID table replication is done. + String[] replicatedTables = new String[] {"in1", "out4"}; + String lastReplId = replicateAndVerify(replPolicy, null, null, dumpWithClause, + null, new String[] {}, replicatedTables); + + originalNonAcidTables = new String[] {"in7", "out10"}; + originalExternalTables = new String[] {"in8", "out11"}; + originalAcidTables = new String[] {"in9", "out12"}; + createTables(originalNonAcidTables, CreateTableType.NON_ACID); + createTables(originalExternalTables, CreateTableType.EXTERNAL); + createTables(originalAcidTables, CreateTableType.MM_ACID); + + primary.run("use " + primaryDbName) + .run("alter table out4 rename to in4") + .run("alter table out5 rename to in5") + .run("alter table out6 rename to in6"); + + // Table level replication with ACID and EXTERNAL table. + String newReplPolicy = primaryDbName + ".'in[0-9]+'.'in8'"; + dumpWithClause = Arrays.asList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES.varname + "'='true'", + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'" + ); + + replicatedTables = new String[] {"in1", "in2", "in3", "in4", "in5", "in6", "in7", "in9"}; + String[] bootstrapTables = new String[] {"in2", "in3", "in4", "in5", "in6", "in9"}; + lastReplId = replicateAndVerify(newReplPolicy, replPolicy, lastReplId, dumpWithClause, + loadWithClause, bootstrapTables, replicatedTables); + + primary.run("use " + primaryDbName) + .run("alter table in4 rename to out4") + .run("alter table in5 rename to out5") + .run("alter table in6 rename to out6"); + + dumpWithClause = Arrays.asList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'" + ); + + // Database replication with ACID and EXTERNAL table. + replicatedTables = new String[] {"in1", "in2", "in3", "out4", "out5", "out6", "in7", "in8", + "in9", "out10", "out11", "out12"}; + bootstrapTables = new String[] {"out4", "out5", "out6", "in8", "out10", "out11", "out12"}; + replicateAndVerify(replPolicy, newReplPolicy, lastReplId, dumpWithClause, + loadWithClause, bootstrapTables, replicatedTables); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 9b80408..4cd60cc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.hive.ql.exec.repl; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; @@ -69,7 +71,10 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.security.auth.login.LoginException; +import java.io.IOException; import java.io.Serializable; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -219,6 +224,24 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { return !ReplUtils.tableIncludedInReplScope(work.oldReplScope, table.getTableName()); } + private boolean isTableSatifiesConfig(Table table) { + if (table == null) { + return false; + } + + if (TableType.EXTERNAL_TABLE.equals(table.getTableType()) + && !conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)) { + return false; + } + + if (AcidUtils.isTransactionalTable(table) + && !ReplUtils.includeAcidTableInDump(conf)) { + return false; + } + + return true; + } + private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throws Exception { Long lastReplId;// get list of events matching dbPattern & tblPattern // go through each event, and dump out each event to a event-level dump dir inside dumproot @@ -226,6 +249,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { long waitUntilTime = 0; long bootDumpBeginReplId = -1; + List<String> tableList = work.replScope.includeAllTables() ? null : new ArrayList<>(); + // If we are bootstrapping ACID tables, we need to perform steps similar to a regular // bootstrap (See bootstrapDump() for more details. Only difference here is instead of // waiting for the concurrent transactions to finish, we start dumping the incremental events @@ -292,7 +317,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { dmd.write(); // Examine all the tables if required. - if (shouldExamineTablesToDump()) { + if (shouldExamineTablesToDump() || (tableList != null)) { // If required wait more for any transactions open at the time of starting the ACID bootstrap. if (needBootstrapAcidTablesDuringIncrementalDump()) { assert (waitUntilTime > 0); @@ -318,6 +343,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { dumpTable(dbName, tableName, validTxnList, dbRoot, bootDumpBeginReplId, hiveDb, tableTuple); } + if (tableList != null && isTableSatifiesConfig(table)) { + tableList.add(tableName); + } } catch (InvalidTableException te) { // Repl dump shouldn't fail if the table is dropped/renamed while dumping it. // Just log a debug message and skip it. @@ -325,6 +353,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { } } } + dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf); } return lastReplId; @@ -376,13 +405,61 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { return rspec; } - Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throws Exception { + private void dumpTableListToDumpLocation(List<String> tableList, Path dbRoot, String dbName, + HiveConf hiveConf) throws IOException, LoginException { + // Empty list will create an empty file to distinguish it from db level replication. If no file is there, that means + // db level replication. If empty file is there, means no table satisfies the policy. + if (tableList == null) { + LOG.debug("Table list file is not created for db level replication."); + return; + } + + // The table list is dumped in _tables/dbname file + Path tableListFile = new Path(dbRoot, ReplUtils.REPL_TABLE_LIST_DIR_NAME); + tableListFile = new Path(tableListFile, dbName.toLowerCase()); + + int count = 0; + while (count < FileUtils.MAX_IO_ERROR_RETRY) { + try (FSDataOutputStream writer = FileSystem.get(hiveConf).create(tableListFile)) { + for (String tableName : tableList) { + String line = tableName.toLowerCase().concat("\n"); + writer.write(line.getBytes(StandardCharsets.UTF_8)); + } + // Close is called explicitly as close also calls the actual file system write, + // so there is chance of i/o exception thrown by close. + writer.close(); + break; + } catch (IOException e) { + LOG.info("File operation failed", e); + if (count >= (FileUtils.MAX_IO_ERROR_RETRY - 1)) { + //no need to wait in the last iteration + LOG.error("File " + tableListFile.toUri() + " creation failed even after " + + FileUtils.MAX_IO_ERROR_RETRY + " attempts."); + throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg()); + } + int sleepTime = FileUtils.getSleepTime(count); + LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (count+1)); + try { + Thread.sleep(sleepTime); + } catch (InterruptedException timerEx) { + LOG.info("Sleep interrupted", timerEx.getMessage()); + } + FileSystem.closeAllForUGI(org.apache.hadoop.hive.shims.Utils.getUGI()); + } + count++; + } + LOG.info("Table list file " + tableListFile.toUri() + " is created for table list - " + tableList); + } + + Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) + throws Exception { // bootstrap case // Last repl id would've been captured during compile phase in queryState configs before opening txn. // This is needed as we dump data on ACID/MM tables based on read snapshot or else we may lose data from // concurrent txns when bootstrap dump in progress. If it is not available, then get it from metastore. Long bootDumpBeginReplId = queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L); assert (bootDumpBeginReplId >= 0L); + List<String> tableList; LOG.info("Bootstrap Dump for db {}", work.dbNameOrPattern); long timeoutInMs = HiveConf.getTimeVar(conf, @@ -391,7 +468,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { String validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime); for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) { - LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName); + LOG.debug("Dumping db: " + dbName); + + // TODO : Currently we don't support separate table list for each database. + tableList = work.replScope.includeAllTables() ? null : new ArrayList<>(); Database db = hiveDb.getDatabase(dbName); if ((db != null) && (ReplUtils.isFirstIncPending(db.getParameters()))) { // For replicated (target) database, until after first successful incremental load, the database will not be @@ -413,18 +493,12 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { && !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY); try (Writer writer = new Writer(dbRoot, conf)) { for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { - LOG.debug( - "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); + LOG.debug("Dumping table: " + tblName + " to db root " + dbRoot.toUri()); + Table table = null; try { HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(tblName, conf); - Table table = tableTuple != null ? tableTuple.object : null; - if (table != null && ReplUtils.isFirstIncPending(table.getParameters())) { - // For replicated (target) table, until after first successful incremental load, the table will not be - // in a consistent state. Avoid allowing replicating this table to a new target. - throw new HiveException("Replication dump not allowed for replicated table" + - " with first incremental dump pending : " + tblName); - } + table = tableTuple != null ? tableTuple.object : null; if (shouldWriteExternalTableLocationInfo && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())) { LOG.debug("Adding table {} to external tables list", tblName); @@ -438,7 +512,11 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { LOG.debug(te.getMessage()); } dumpConstraintMetadata(dbName, tblName, dbRoot, hiveDb); + if (tableList != null && isTableSatifiesConfig(table)) { + tableList.add(tblName); + } } + dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf); } catch (Exception e) { caught = e; } finally { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java index 5735854..ab6e09f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java @@ -83,7 +83,7 @@ public class BootstrapEventsIterator implements Iterator<BootstrapEvent> { Path path = new Path(dumpDirectory); FileSystem fileSystem = path.getFileSystem(hiveConf); FileStatus[] fileStatuses = - fileSystem.listStatus(new Path(dumpDirectory), EximUtil.getDirectoryFilter(fileSystem)); + fileSystem.listStatus(new Path(dumpDirectory), ReplUtils.getBootstrapDirectoryFilter(fileSystem)); if ((fileStatuses == null) || (fileStatuses.length == 0)) { throw new IllegalArgumentException("No data to load in path " + dumpDirectory); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java index ae2e1db..5665bda 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; import org.slf4j.Logger; @@ -122,8 +123,9 @@ class DatabaseEventsIterator implements Iterator<BootstrapEvent> { while (remoteIterator.hasNext()) { LocatedFileStatus next = remoteIterator.next(); // we want to skip this file, this also means there cant be a table with name represented - // by constant ReplExternalTables.FILE_NAME - if(next.getPath().toString().endsWith(ReplExternalTables.FILE_NAME)) { + // by constant ReplExternalTables.FILE_NAME or ReplUtils.REPL_TABLE_LIST_DIR_NAME (_tables) + if(next.getPath().toString().endsWith(ReplExternalTables.FILE_NAME) || + next.getPath().toString().endsWith(ReplUtils.REPL_TABLE_LIST_DIR_NAME)) { continue; } if (next.getPath().toString().endsWith(EximUtil.METADATA_NAME)) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 1142595..1df5077 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.repl.util; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.repl.ReplConst; import org.apache.hadoop.hive.common.repl.ReplScope; @@ -80,6 +81,10 @@ public class ReplUtils { // Root directory for dumping bootstrapped tables along with incremental events dump. public static final String INC_BOOTSTRAP_ROOT_DIR_NAME = "_bootstrap"; + // Name of the directory which stores the list of tables included in the policy in case of table level replication. + // One file per database, named after the db name. The directory is not created for db level replication. + public static final String REPL_TABLE_LIST_DIR_NAME = "_tables"; + // Migrating to transactional tables in bootstrap load phase. // It is enough to copy all the original files under base_1 dir and so write-id is hardcoded to 1. public static final Long REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID = 1L; @@ -232,7 +237,18 @@ public class ReplUtils { public static PathFilter getEventsDirectoryFilter(final FileSystem fs) { return p -> { try { - return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME); + return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME) + && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + } + + public static PathFilter getBootstrapDirectoryFilter(final FileSystem fs) { + return p -> { + try { + return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME); } catch (IOException e) { throw new RuntimeException(e); }