Repository: hive Updated Branches: refs/heads/master 649d7c12b -> 05d4e4ebc
BUG-108021 / BUG-108287 / HIVE-20383 : Invalid queue name and synchronisation issues in hive proto events hook. (Harish JP, reviewd by Anishek Agarwal) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/05d4e4eb Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/05d4e4eb Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/05d4e4eb Branch: refs/heads/master Commit: 05d4e4ebcec1c97e3b4e86e7e1fc0717e5b13d05 Parents: 649d7c1 Author: Anishek Agarwal <anis...@gmail.com> Authored: Thu Aug 16 12:29:58 2018 +0530 Committer: Anishek Agarwal <anis...@gmail.com> Committed: Thu Aug 16 12:29:58 2018 +0530 ---------------------------------------------------------------------- .../hive/ql/hooks/HiveProtoLoggingHook.java | 51 ++++++------ .../hive/ql/hooks/TestHiveProtoLoggingHook.java | 88 ++++++++++++++++++-- 2 files changed, 109 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/05d4e4eb/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java index 155b2be..45e1ab3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java @@ -118,10 +118,12 @@ import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.MapFieldEntry; import org.apache.hadoop.hive.ql.parse.ExplainConfiguration; import org.apache.hadoop.hive.ql.plan.ExplainWork; import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hive.common.util.ShutdownHookManager; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger; import org.apache.tez.dag.history.logging.proto.ProtoMessageWriter; import org.json.JSONObject; @@ -180,7 +182,6 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { private final Clock clock; private final String logFileName; private final DatePartitionedLogger<HiveHookEventProto> logger; - private final ExecutorService eventHandler; private final ExecutorService logWriter; private int logFileCount = 0; private ProtoMessageWriter<HiveHookEventProto> writer; @@ -207,7 +208,6 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { } this.logger = tmpLogger; if (logger == null) { - eventHandler = null; logWriter = null; return; } @@ -216,25 +216,16 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT); ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Hive Hook Proto Event Handler %d").build(); - eventHandler = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>(queueCapacity), threadFactory); - - threadFactory = new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("Hive Hook Proto Log Writer %d").build(); logWriter = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueCapacity), threadFactory); } void shutdown() { - // Wait for all the events to be written off, the order of service is important - for (ExecutorService service : new ExecutorService[] {eventHandler, logWriter}) { - if (service == null) { - continue; - } - service.shutdown(); + if (logWriter != null) { + logWriter.shutdown(); try { - service.awaitTermination(WAIT_TIME, TimeUnit.SECONDS); + logWriter.awaitTermination(WAIT_TIME, TimeUnit.SECONDS); } catch (InterruptedException e) { LOG.warn("Got interrupted exception while waiting for events to be flushed", e); } @@ -246,14 +237,9 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { if (logger == null) { return; } - try { - eventHandler.execute(() -> generateEvent(hookContext)); - } catch (RejectedExecutionException e) { - LOG.warn("Handler queue full ignoring event: " + hookContext.getHookType()); - } - } - - private void generateEvent(HookContext hookContext) { + // Note: same hookContext object is used for all the events for a given query, if we try to + // do it async we have concurrency issues and when query cache is enabled, post event comes + // before we start the pre hook processing and causes inconsistent events publishing. QueryPlan plan = hookContext.getQueryPlan(); if (plan == null) { LOG.debug("Received null query plan."); @@ -343,7 +329,10 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { builder.setHiveQueryId(plan.getQueryId()); builder.setUser(getUser(hookContext)); builder.setRequestUser(getRequestUser(hookContext)); - builder.setQueue(conf.get("mapreduce.job.queuename")); + String queueName = getQueueName(executionMode, conf); + if (queueName != null) { + builder.setQueue(queueName); + } builder.setExecutionMode(executionMode.name()); builder.addAllTablesRead(getTablesFromEntitySet(hookContext.getInputs())); builder.addAllTablesWritten(getTablesFromEntitySet(hookContext.getOutputs())); @@ -385,7 +374,6 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { ApplicationId llapId = determineLlapId(conf, executionMode); if (llapId != null) { addMapEntry(builder, OtherInfoType.LLAP_APP_ID, llapId.toString()); - builder.setQueue(conf.get(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname)); } conf.stripHiddenConfigurations(conf); @@ -439,6 +427,21 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { return requestuser; } + private String getQueueName(ExecutionMode mode, HiveConf conf) { + switch (mode) { + case LLAP: + return conf.get(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname); + case MR: + return conf.get(MRJobConfig.QUEUE_NAME); + case TEZ: + return conf.get(TezConfiguration.TEZ_QUEUE_NAME); + case SPARK: + case NONE: + default: + return null; + } + } + private List<String> getTablesFromEntitySet(Set<? extends Entity> entities) { List<String> tableNames = new ArrayList<>(); for (Entity entity : entities) { http://git-wip-us.apache.org/repos/asf/hive/blob/05d4e4eb/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java index 8124528..a5939fa 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.hooks; -import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; @@ -30,15 +29,22 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.EventLogger; import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.EventType; +import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.ExecutionMode; import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.OtherInfoType; import org.apache.hadoop.hive.ql.hooks.HookContext.HookType; import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.HiveHookEventProto; import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.MapFieldEntry; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger; import org.apache.tez.dag.history.logging.proto.ProtoMessageReader; import org.junit.Assert; @@ -63,6 +69,9 @@ public class TestHiveProtoLoggingHook { @Before public void setup() throws Exception { conf = new HiveConf(); + conf.set(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, "llap_queue"); + conf.set(MRJobConfig.QUEUE_NAME, "mr_queue"); + conf.set(TezConfiguration.TEZ_QUEUE_NAME, "tez_queue"); tmpFolder = folder.newFolder().getAbsolutePath(); conf.setVar(HiveConf.ConfVars.HIVE_PROTO_EVENTS_BASE_PATH, tmpFolder); QueryState state = new QueryState.Builder().withHiveConf(conf).build(); @@ -94,7 +103,8 @@ public class TestHiveProtoLoggingHook { Assert.assertEquals("test_user", event.getRequestUser()); Assert.assertEquals("test_queryId", event.getHiveQueryId()); Assert.assertEquals("test_op_id", event.getOperationId()); - Assert.assertEquals("NONE", event.getExecutionMode()); + Assert.assertEquals(ExecutionMode.NONE.name(), event.getExecutionMode()); + Assert.assertFalse(event.hasQueue()); assertOtherInfo(event, OtherInfoType.TEZ, Boolean.FALSE.toString()); assertOtherInfo(event, OtherInfoType.MAPRED, Boolean.FALSE.toString()); @@ -108,6 +118,69 @@ public class TestHiveProtoLoggingHook { } @Test + public void testQueueLogs() throws Exception { + context.setHookType(HookType.PRE_EXEC_HOOK); + EventLogger evtLogger = new EventLogger(conf, SystemClock.getInstance()); + + // This makes it MR task + context.getQueryPlan().getRootTasks().add(new ExecDriver()); + evtLogger.handle(context); + + // This makes it Tez task + MapWork mapWork = new MapWork(); + TezWork tezWork = new TezWork("test_queryid"); + tezWork.add(mapWork); + TezTask task = new TezTask(); + task.setId("id1"); + task.setWork(tezWork); + context.getQueryPlan().getRootTasks().add(task); + context.getQueryPlan().getRootTasks().add(new TezTask()); + evtLogger.handle(context); + + // This makes it llap task + mapWork.setLlapMode(true); + evtLogger.handle(context); + + evtLogger.shutdown(); + + ProtoMessageReader<HiveHookEventProto> reader = getTestReader(conf, tmpFolder); + + HiveHookEventProto event = reader.readEvent(); + Assert.assertNotNull(event); + Assert.assertEquals(ExecutionMode.MR.name(), event.getExecutionMode()); + Assert.assertEquals(event.getQueue(), "mr_queue"); + + event = reader.readEvent(); + Assert.assertNotNull(event); + Assert.assertEquals(ExecutionMode.TEZ.name(), event.getExecutionMode()); + Assert.assertEquals(event.getQueue(), "tez_queue"); + + event = reader.readEvent(); + Assert.assertNotNull(event); + Assert.assertEquals(ExecutionMode.LLAP.name(), event.getExecutionMode()); + Assert.assertEquals(event.getQueue(), "llap_queue"); + } + + @Test + public void testPreAndPostEventBoth() throws Exception { + context.setHookType(HookType.PRE_EXEC_HOOK); + EventLogger evtLogger = new EventLogger(conf, SystemClock.getInstance()); + evtLogger.handle(context); + context.setHookType(HookType.POST_EXEC_HOOK); + evtLogger.handle(context); + evtLogger.shutdown(); + + ProtoMessageReader<HiveHookEventProto> reader = getTestReader(conf, tmpFolder); + HiveHookEventProto event = reader.readEvent(); + Assert.assertNotNull("Pre hook event not found", event); + Assert.assertEquals(EventType.QUERY_SUBMITTED.name(), event.getEventType()); + + event = reader.readEvent(); + Assert.assertNotNull("Post hook event not found", event); + Assert.assertEquals(EventType.QUERY_COMPLETED.name(), event.getEventType()); + } + + @Test public void testPostEventLog() throws Exception { context.setHookType(HookType.POST_EXEC_HOOK); context.getPerfLogger().PerfLogBegin("test", "LogTest"); @@ -151,18 +224,21 @@ public class TestHiveProtoLoggingHook { assertOtherInfo(event, OtherInfoType.PERF, null); } - private HiveHookEventProto loadEvent(HiveConf conf, String tmpFolder) - throws IOException, FileNotFoundException { + private ProtoMessageReader<HiveHookEventProto> getTestReader(HiveConf conf, String tmpFolder) + throws IOException { Path path = new Path(tmpFolder); FileSystem fs = path.getFileSystem(conf); FileStatus[] status = fs.listStatus(path); Assert.assertEquals(1, status.length); status = fs.listStatus(status[0].getPath()); Assert.assertEquals(1, status.length); - DatePartitionedLogger<HiveHookEventProto> logger = new DatePartitionedLogger<>( HiveHookEventProto.PARSER, path, conf, SystemClock.getInstance()); - ProtoMessageReader<HiveHookEventProto> reader = logger.getReader(status[0].getPath()); + return logger.getReader(status[0].getPath()); + } + + private HiveHookEventProto loadEvent(HiveConf conf, String tmpFolder) throws IOException { + ProtoMessageReader<HiveHookEventProto> reader = getTestReader(conf, tmpFolder); HiveHookEventProto event = reader.readEvent(); Assert.assertNotNull(event); return event;