This is an automated email from the ASF dual-hosted git repository.

mahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new de69a0f  HIVE-21722 : REPL:: logs are missing in 
hiveStatement.getQueryLog output during parallel execution mode.  (Mahesh Kumar 
Behera reviewed by  Sankar Hariappan)
de69a0f is described below

commit de69a0f9d973822895f3ff0744ee3af768118268
Author: Mahesh Kumar Behera <mah...@apache.org>
AuthorDate: Tue May 14 14:01:32 2019 +0530

    HIVE-21722 : REPL:: logs are missing in hiveStatement.getQueryLog output 
during parallel execution mode.  (Mahesh Kumar Behera reviewed by  Sankar 
Hariappan)
---
 .../java/org/apache/hive/jdbc/TestJdbcDriver2.java | 129 ++++++++++++++++++++-
 .../org/apache/hadoop/hive/ql/exec/TaskRunner.java |   2 +
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |   2 +-
 .../hadoop/hive/ql/exec/repl/ReplStateLogTask.java |   7 ++
 .../ql/exec/repl/bootstrap/load/LoadFunction.java  |   2 +-
 .../incremental/IncrementalLoadTasksBuilder.java   |   2 +-
 6 files changed, 139 insertions(+), 5 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
index 8f19b7d..654bdf8 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
@@ -20,6 +20,7 @@ package org.apache.hive.jdbc;
 
 import com.google.common.collect.ImmutableSet;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
 import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
@@ -27,6 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
 import org.apache.hadoop.hive.ql.processors.DfsProcessor;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
 import org.apache.hive.common.util.HiveVersionInfo;
@@ -77,6 +79,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Pattern;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.junit.rules.TestName;
 
 import static org.apache.hadoop.hive.conf.SystemVariables.SET_COLUMN_NAME;
 import static org.apache.hadoop.hive.ql.exec.ExplainTask.EXPL_COLUMN_NAME;
@@ -121,6 +124,7 @@ public class TestJdbcDriver2 {
   private static final float floatCompareDelta = 0.0001f;
 
   @Rule public ExpectedException thrown = ExpectedException.none();
+  @Rule public final TestName testName = new TestName();
 
   private static Connection getConnection(String postfix) throws SQLException {
     Connection con1;
@@ -2767,10 +2771,10 @@ public class TestJdbcDriver2 {
             incrementalLogs.addAll(statement.getQueryLog());
             Thread.sleep(500);
           } catch (SQLException e) {
-            LOG.error("Failed getQueryLog. Error message: " + e.getMessage());
+            LOG.info("Failed getQueryLog. Error message: " + e.getMessage());
             fail("error in getting log thread");
           } catch (InterruptedException e) {
-            LOG.error("Getting log thread is interrupted. Error message: " + 
e.getMessage());
+            LOG.info("Getting log thread is interrupted. Error message: " + 
e.getMessage());
             fail("error in getting log thread");
           }
         }
@@ -2793,6 +2797,126 @@ public class TestJdbcDriver2 {
     verifyFetchedLog(incrementalLogs, expectedLogs);
   }
 
+  private static  int next = 0;
+  private synchronized void advanceDumpDir() {
+    next++;
+    ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next));
+  }
+
+  /**
+   * Test getting query log method in Jdbc for REPL commands
+   * @throws Exception
+   */
+  @Test
+  public void testGetQueryLogForReplCommands() throws Exception {
+    // Prepare
+    String primaryDb = testName.getMethodName() + "_" + 
System.currentTimeMillis();
+    String replicaDb = primaryDb + "_replica";
+    String primaryTblName = primaryDb + ".t1";
+    Path replDir = new Path(conf.get("test.data.files"));
+    HiveStatement stmt = (HiveStatement) con.createStatement();
+    assertNotNull("Statement is null", stmt);
+    
+    replDir = new Path(replDir, primaryDb + "_repl");
+    FileSystem fs = FileSystem.get(replDir.toUri(), conf);
+    fs.mkdirs(replDir);
+
+    try {
+      // Prepare
+      stmt.execute("set hive.exec.parallel = true");
+      stmt.execute("set hive.server2.logging.operation.level = execution");
+      stmt.execute("set hive.metastore.transactional.event.listeners =" +
+              " org.apache.hive.hcatalog.listener.DbNotificationListener");
+      stmt.execute("set hive.metastore.dml.events = true");
+      stmt.execute("create database " + primaryDb + " with 
dbproperties('repl.source.for'='1,2,3')");
+      stmt.execute("create table " + primaryTblName + " (id int)");
+      stmt.execute("insert into " + primaryTblName + " values (1), (2)");
+      stmt.close();
+
+      // Test query logs for bootstrap dump and load
+      String[] expectedBootstrapDumpLogs = {
+        "REPL::START",
+        "REPL::TABLE_DUMP",
+        "REPL::END"
+      };
+
+      // Bootstrap dump
+      stmt = (HiveStatement) con.createStatement();
+      advanceDumpDir();
+      ResultSet replDumpRslt = stmt.executeQuery("repl dump " + primaryDb +
+              " with ('hive.repl.rootdir' = '" + replDir + "')");
+      assertTrue(replDumpRslt.next());
+      String dumpLocation = replDumpRslt.getString(1);
+      String lastReplId = replDumpRslt.getString(2);
+      List<String> logs = stmt.getQueryLog(false, 10000);
+      stmt.close();
+      LOG.info("Query_Log for Bootstrap Dump");
+      verifyFetchedLog(logs, expectedBootstrapDumpLogs);
+
+      String[] expectedBootstrapLoadLogs = {
+        "REPL::START",
+        "REPL::TABLE_LOAD",
+        "REPL::END"
+      };
+
+      // Bootstrap load
+      stmt = (HiveStatement) con.createStatement();
+      stmt.execute("repl load " + replicaDb + " from '" + dumpLocation + "'");
+      logs = stmt.getQueryLog(false, 10000);
+      stmt.close();
+      LOG.info("Query_Log for Bootstrap Load");
+      verifyFetchedLog(logs, expectedBootstrapLoadLogs);
+
+      // Perform operation for incremental replication
+      stmt = (HiveStatement) con.createStatement();
+      stmt.execute("insert into " + primaryTblName + " values (3), (4)");
+      stmt.close();
+
+      // Test query logs for incremental dump and load
+      String[] expectedIncrementalDumpLogs = {
+        "REPL::START",
+        "REPL::EVENT_DUMP",
+        "REPL::END"
+      };
+
+      // Incremental dump
+      stmt = (HiveStatement) con.createStatement();
+      advanceDumpDir();
+      replDumpRslt = stmt.executeQuery("repl dump " + primaryDb + " from " + 
lastReplId +
+              " with ('hive.repl.rootdir' = '" + replDir + "')");
+      assertTrue(replDumpRslt.next());
+      dumpLocation = replDumpRslt.getString(1);
+      lastReplId = replDumpRslt.getString(2);
+      logs = stmt.getQueryLog(false, 10000);
+      stmt.close();
+      LOG.info("Query_Log for Incremental Dump");
+      verifyFetchedLog(logs, expectedIncrementalDumpLogs);
+
+      String[] expectedIncrementalLoadLogs = {
+        "REPL::START",
+        "REPL::EVENT_LOAD",
+        "REPL::END"
+      };
+
+      // Incremental load
+      stmt = (HiveStatement) con.createStatement();
+      stmt.execute("repl load " + replicaDb + " from '" + dumpLocation + "'");
+      logs = stmt.getQueryLog(false, 10000);
+      LOG.info("Query_Log for Incremental Load");
+      verifyFetchedLog(logs, expectedIncrementalLoadLogs);
+    } finally {
+      fs.delete(replDir, true);
+      // DB cleanup
+      stmt.execute("drop database if exists " + primaryDb + " cascade");
+      stmt.execute("drop database if exists " + replicaDb + " cascade");
+      stmt.execute("set hive.exec.parallel = false");
+      stmt.execute("set hive.server2.logging.operation.level = verbose");
+      stmt.execute("set hive.metastore.dml.events = false");
+      stmt.execute("set hive.metastore.transactional.event.listeners = ");
+      stmt.close();
+    }
+  }
+
   /**
    * Test getting query log when HS2 disable logging.
    *
@@ -2820,6 +2944,7 @@ public class TestJdbcDriver2 {
     }
     String accumulatedLogs = stringBuilder.toString();
     for (String expectedLog : expectedLogs) {
+      LOG.info("Checking match for " + expectedLog);
       assertTrue(accumulatedLogs.contains(expectedLog));
     }
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
index 6f6d721..13010ae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec;
 import java.io.Serializable;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.hadoop.hive.common.LogUtils;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -74,6 +75,7 @@ public class TaskRunner extends Thread {
 
   @Override
   public void run() {
+    LogUtils.registerLoggingContext(tsk.getConf());
     runner = Thread.currentThread();
     try {
       SessionState.start(ss);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 2309fc9..84bdcd7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -380,7 +380,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
       dbProps = dbInMetadata.getParameters();
     }
     ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, dbProps);
-    Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork);
+    Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork, conf);
     if (scope.rootTasks.isEmpty()) {
       scope.rootTasks.add(replLogTask);
     } else {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java
index 7daa850..845aad1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java
@@ -48,4 +48,11 @@ public class ReplStateLogTask extends Task<ReplStateLogWork> 
implements Serializ
   public String getName() {
     return "REPL_STATE_LOG";
   }
+
+  @Override
+  public boolean canExecuteInParallel() {
+    // ReplStateLogTask is executed only when all its parents are done with 
execution. So running it in parallel has no
+    // benefits.
+    return false;
+  }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
index a7c8ca4..7f981fd 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
@@ -68,7 +68,7 @@ public class LoadFunction {
   private void createFunctionReplLogTask(List<Task<? extends Serializable>> 
functionTasks,
                                          String functionName) {
     ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, 
functionName);
-    Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork);
+    Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork, 
context.hiveConf);
     DAGTraversal.traverse(functionTasks, new 
AddDependencyToLeaves(replLogTask));
   }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
index 4ed215c..13de791 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
@@ -142,7 +142,7 @@ public class IncrementalLoadTasksBuilder {
         ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger,
                 dir.getPath().getName(),
                 eventDmd.getDumpType().toString());
-        Task<? extends Serializable> barrierTask = 
TaskFactory.get(replStateLogWork);
+        Task<? extends Serializable> barrierTask = 
TaskFactory.get(replStateLogWork, conf);
         AddDependencyToLeaves function = new 
AddDependencyToLeaves(barrierTask);
         DAGTraversal.traverse(evTasks, function);
         this.log.debug("Updated taskChainTail from {}:{} to {}:{}",

Reply via email to