This is an automated email from the ASF dual-hosted git repository. mahesh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new de69a0f HIVE-21722 : REPL:: logs are missing in hiveStatement.getQueryLog output during parallel execution mode. (Mahesh Kumar Behera reviewed by Sankar Hariappan) de69a0f is described below commit de69a0f9d973822895f3ff0744ee3af768118268 Author: Mahesh Kumar Behera <mah...@apache.org> AuthorDate: Tue May 14 14:01:32 2019 +0530 HIVE-21722 : REPL:: logs are missing in hiveStatement.getQueryLog output during parallel execution mode. (Mahesh Kumar Behera reviewed by Sankar Hariappan) --- .../java/org/apache/hive/jdbc/TestJdbcDriver2.java | 129 ++++++++++++++++++++- .../org/apache/hadoop/hive/ql/exec/TaskRunner.java | 2 + .../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 2 +- .../hadoop/hive/ql/exec/repl/ReplStateLogTask.java | 7 ++ .../ql/exec/repl/bootstrap/load/LoadFunction.java | 2 +- .../incremental/IncrementalLoadTasksBuilder.java | 2 +- 6 files changed, 139 insertions(+), 5 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index 8f19b7d..654bdf8 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -20,6 +20,7 @@ package org.apache.hive.jdbc; import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; @@ -27,6 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.processors.DfsProcessor; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hive.common.util.HiveVersionInfo; @@ -77,6 +79,7 @@ import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.junit.rules.TestName; import static org.apache.hadoop.hive.conf.SystemVariables.SET_COLUMN_NAME; import static org.apache.hadoop.hive.ql.exec.ExplainTask.EXPL_COLUMN_NAME; @@ -121,6 +124,7 @@ public class TestJdbcDriver2 { private static final float floatCompareDelta = 0.0001f; @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule public final TestName testName = new TestName(); private static Connection getConnection(String postfix) throws SQLException { Connection con1; @@ -2767,10 +2771,10 @@ public class TestJdbcDriver2 { incrementalLogs.addAll(statement.getQueryLog()); Thread.sleep(500); } catch (SQLException e) { - LOG.error("Failed getQueryLog. Error message: " + e.getMessage()); + LOG.info("Failed getQueryLog. Error message: " + e.getMessage()); fail("error in getting log thread"); } catch (InterruptedException e) { - LOG.error("Getting log thread is interrupted. Error message: " + e.getMessage()); + LOG.info("Getting log thread is interrupted. Error message: " + e.getMessage()); fail("error in getting log thread"); } } @@ -2793,6 +2797,126 @@ public class TestJdbcDriver2 { verifyFetchedLog(incrementalLogs, expectedLogs); } + private static int next = 0; + private synchronized void advanceDumpDir() { + next++; + ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next)); + } + + /** + * Test getting query log method in Jdbc for REPL commands + * @throws Exception + */ + @Test + public void testGetQueryLogForReplCommands() throws Exception { + // Prepare + String primaryDb = testName.getMethodName() + "_" + System.currentTimeMillis(); + String replicaDb = primaryDb + "_replica"; + String primaryTblName = primaryDb + ".t1"; + Path replDir = new Path(conf.get("test.data.files")); + HiveStatement stmt = (HiveStatement) con.createStatement(); + assertNotNull("Statement is null", stmt); + + replDir = new Path(replDir, primaryDb + "_repl"); + FileSystem fs = FileSystem.get(replDir.toUri(), conf); + fs.mkdirs(replDir); + + try { + // Prepare + stmt.execute("set hive.exec.parallel = true"); + stmt.execute("set hive.server2.logging.operation.level = execution"); + stmt.execute("set hive.metastore.transactional.event.listeners =" + + " org.apache.hive.hcatalog.listener.DbNotificationListener"); + stmt.execute("set hive.metastore.dml.events = true"); + stmt.execute("create database " + primaryDb + " with dbproperties('repl.source.for'='1,2,3')"); + stmt.execute("create table " + primaryTblName + " (id int)"); + stmt.execute("insert into " + primaryTblName + " values (1), (2)"); + stmt.close(); + + // Test query logs for bootstrap dump and load + String[] expectedBootstrapDumpLogs = { + "REPL::START", + "REPL::TABLE_DUMP", + "REPL::END" + }; + + // Bootstrap dump + stmt = (HiveStatement) con.createStatement(); + advanceDumpDir(); + ResultSet replDumpRslt = stmt.executeQuery("repl dump " + primaryDb + + " with ('hive.repl.rootdir' = '" + replDir + "')"); + assertTrue(replDumpRslt.next()); + String dumpLocation = replDumpRslt.getString(1); + String lastReplId = replDumpRslt.getString(2); + List<String> logs = stmt.getQueryLog(false, 10000); + stmt.close(); + LOG.info("Query_Log for Bootstrap Dump"); + verifyFetchedLog(logs, expectedBootstrapDumpLogs); + + String[] expectedBootstrapLoadLogs = { + "REPL::START", + "REPL::TABLE_LOAD", + "REPL::END" + }; + + // Bootstrap load + stmt = (HiveStatement) con.createStatement(); + stmt.execute("repl load " + replicaDb + " from '" + dumpLocation + "'"); + logs = stmt.getQueryLog(false, 10000); + stmt.close(); + LOG.info("Query_Log for Bootstrap Load"); + verifyFetchedLog(logs, expectedBootstrapLoadLogs); + + // Perform operation for incremental replication + stmt = (HiveStatement) con.createStatement(); + stmt.execute("insert into " + primaryTblName + " values (3), (4)"); + stmt.close(); + + // Test query logs for incremental dump and load + String[] expectedIncrementalDumpLogs = { + "REPL::START", + "REPL::EVENT_DUMP", + "REPL::END" + }; + + // Incremental dump + stmt = (HiveStatement) con.createStatement(); + advanceDumpDir(); + replDumpRslt = stmt.executeQuery("repl dump " + primaryDb + " from " + lastReplId + + " with ('hive.repl.rootdir' = '" + replDir + "')"); + assertTrue(replDumpRslt.next()); + dumpLocation = replDumpRslt.getString(1); + lastReplId = replDumpRslt.getString(2); + logs = stmt.getQueryLog(false, 10000); + stmt.close(); + LOG.info("Query_Log for Incremental Dump"); + verifyFetchedLog(logs, expectedIncrementalDumpLogs); + + String[] expectedIncrementalLoadLogs = { + "REPL::START", + "REPL::EVENT_LOAD", + "REPL::END" + }; + + // Incremental load + stmt = (HiveStatement) con.createStatement(); + stmt.execute("repl load " + replicaDb + " from '" + dumpLocation + "'"); + logs = stmt.getQueryLog(false, 10000); + LOG.info("Query_Log for Incremental Load"); + verifyFetchedLog(logs, expectedIncrementalLoadLogs); + } finally { + fs.delete(replDir, true); + // DB cleanup + stmt.execute("drop database if exists " + primaryDb + " cascade"); + stmt.execute("drop database if exists " + replicaDb + " cascade"); + stmt.execute("set hive.exec.parallel = false"); + stmt.execute("set hive.server2.logging.operation.level = verbose"); + stmt.execute("set hive.metastore.dml.events = false"); + stmt.execute("set hive.metastore.transactional.event.listeners = "); + stmt.close(); + } + } + /** * Test getting query log when HS2 disable logging. * @@ -2820,6 +2944,7 @@ public class TestJdbcDriver2 { } String accumulatedLogs = stringBuilder.toString(); for (String expectedLog : expectedLogs) { + LOG.info("Checking match for " + expectedLog); assertTrue(accumulatedLogs.contains(expectedLog)); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java index 6f6d721..13010ae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec; import java.io.Serializable; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.session.SessionState; @@ -74,6 +75,7 @@ public class TaskRunner extends Thread { @Override public void run() { + LogUtils.registerLoggingContext(tsk.getConf()); runner = Thread.currentThread(); try { SessionState.start(ss); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 2309fc9..84bdcd7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -380,7 +380,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { dbProps = dbInMetadata.getParameters(); } ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, dbProps); - Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork); + Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork, conf); if (scope.rootTasks.isEmpty()) { scope.rootTasks.add(replLogTask); } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java index 7daa850..845aad1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java @@ -48,4 +48,11 @@ public class ReplStateLogTask extends Task<ReplStateLogWork> implements Serializ public String getName() { return "REPL_STATE_LOG"; } + + @Override + public boolean canExecuteInParallel() { + // ReplStateLogTask is executed only when all its parents are done with execution. So running it in parallel has no + // benefits. + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java index a7c8ca4..7f981fd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java @@ -68,7 +68,7 @@ public class LoadFunction { private void createFunctionReplLogTask(List<Task<? extends Serializable>> functionTasks, String functionName) { ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, functionName); - Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork); + Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork, context.hiveConf); DAGTraversal.traverse(functionTasks, new AddDependencyToLeaves(replLogTask)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java index 4ed215c..13de791 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java @@ -142,7 +142,7 @@ public class IncrementalLoadTasksBuilder { ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dir.getPath().getName(), eventDmd.getDumpType().toString()); - Task<? extends Serializable> barrierTask = TaskFactory.get(replStateLogWork); + Task<? extends Serializable> barrierTask = TaskFactory.get(replStateLogWork, conf); AddDependencyToLeaves function = new AddDependencyToLeaves(barrierTask); DAGTraversal.traverse(evTasks, function); this.log.debug("Updated taskChainTail from {}:{} to {}:{}",