http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java index 3c9ad24..f1fc285 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java @@ -27,6 +27,7 @@ public class HistoryLogger { private static final String HISTORY_START_TIME = "StartTime"; private static final String HISTORY_END_TIME = "EndTime"; private static final String HISTORY_DAG_NAME = "DagName"; + private static final String HISTORY_DAG_ID = "DagId"; private static final String HISTORY_VERTEX_NAME = "VertexName"; private static final String HISTORY_TASK_ID = "TaskId"; private static final String HISTORY_ATTEMPT_ID = "TaskAttemptId"; @@ -41,29 +42,30 @@ public class HistoryLogger { public static void logFragmentStart(String applicationIdStr, String containerIdStr, String hostname, - String dagName, String vertexName, int taskId, + String dagName, int dagIdentifier, String vertexName, int taskId, int attemptId) { HISTORY_LOGGER.info( - constructFragmentStartString(applicationIdStr, containerIdStr, hostname, dagName, + constructFragmentStartString(applicationIdStr, containerIdStr, hostname, dagName, dagIdentifier, vertexName, taskId, attemptId)); } public static void logFragmentEnd(String applicationIdStr, String containerIdStr, String hostname, - String dagName, String vertexName, int taskId, int attemptId, + String dagName, int dagIdentifier, String vertexName, int taskId, int attemptId, String threadName, long startTime, boolean failed) { HISTORY_LOGGER.info(constructFragmentEndString(applicationIdStr, containerIdStr, hostname, - dagName, vertexName, taskId, attemptId, threadName, startTime, failed)); + dagName, dagIdentifier, vertexName, taskId, attemptId, threadName, startTime, failed)); } private static String constructFragmentStartString(String applicationIdStr, String containerIdStr, - String hostname, String dagName, + String hostname, String dagName, int dagIdentifier, String vertexName, int taskId, int attemptId) { HistoryLineBuilder lb = new HistoryLineBuilder(EVENT_TYPE_FRAGMENT_START); lb.addHostName(hostname); lb.addAppid(applicationIdStr); lb.addContainerId(containerIdStr); lb.addDagName(dagName); + lb.addDagId(dagIdentifier); lb.addVertexName(vertexName); lb.addTaskId(taskId); lb.addTaskAttemptId(attemptId); @@ -72,7 +74,7 @@ public class HistoryLogger { } private static String constructFragmentEndString(String applicationIdStr, String containerIdStr, - String hostname, String dagName, + String hostname, String dagName, int dagIdentifier, String vertexName, int taskId, int attemptId, String threadName, long startTime, boolean succeeded) { HistoryLineBuilder lb = new HistoryLineBuilder(EVENT_TYPE_FRAGMENT_END); @@ -80,6 +82,7 @@ public class HistoryLogger { lb.addAppid(applicationIdStr); lb.addContainerId(containerIdStr); lb.addDagName(dagName); + lb.addDagId(dagIdentifier); lb.addVertexName(vertexName); lb.addTaskId(taskId); lb.addTaskAttemptId(attemptId); @@ -113,6 +116,10 @@ public class HistoryLogger { return setKeyValue(HISTORY_DAG_NAME, dagName); } + HistoryLineBuilder addDagId(int dagId) { + return setKeyValue(HISTORY_DAG_ID, String.valueOf(dagId)); + } + HistoryLineBuilder addVertexName(String vertexName) { return setKeyValue(HISTORY_VERTEX_NAME, vertexName); }
http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java index 7cb433b..e2caec2 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java @@ -14,6 +14,7 @@ package org.apache.hadoop.hive.llap.daemon; +import org.apache.hadoop.hive.llap.daemon.impl.QueryIdentifier; import org.apache.hadoop.security.token.Token; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -24,6 +25,6 @@ public interface KilledTaskHandler { // inferred from this. // Passing in parameters until there's some dag information stored and tracked in the daemon. void taskKilled(String amLocation, int port, String user, - Token<JobTokenIdentifier> jobToken, String queryId, String dagName, + Token<JobTokenIdentifier> jobToken, QueryIdentifier queryIdentifier, TezTaskAttemptID taskAttemptId); } http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java index 4e62a68..7f9553d 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java @@ -14,7 +14,9 @@ package org.apache.hadoop.hive.llap.daemon; +import org.apache.hadoop.hive.llap.daemon.impl.QueryIdentifier; + public interface QueryFailedHandler { - public void queryFailed(String queryId, String dagName); + public void queryFailed(QueryIdentifier queryIdentifier); } http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index f6711d8..d1ec715 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -172,9 +172,9 @@ public class AMReporter extends AbstractService { } public void registerTask(String amLocation, int port, String user, - Token<JobTokenIdentifier> jobToken, String queryId, String dagName) { + Token<JobTokenIdentifier> jobToken, QueryIdentifier queryIdentifier) { if (LOG.isTraceEnabled()) { - LOG.trace("Registering for heartbeat: " + amLocation + ":" + port + " for dagName=" + dagName); + LOG.trace("Registering for heartbeat: " + amLocation + ":" + port + " for queryIdentifier=" + queryIdentifier); } AMNodeInfo amNodeInfo; synchronized (knownAppMasters) { @@ -182,7 +182,7 @@ public class AMReporter extends AbstractService { amNodeInfo = knownAppMasters.get(amNodeId); if (amNodeInfo == null) { amNodeInfo = - new AMNodeInfo(amNodeId, user, jobToken, dagName, retryPolicy, retryTimeout, socketFactory, + new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, conf); knownAppMasters.put(amNodeId, amNodeInfo); // Add to the queue only the first time this is registered, and on @@ -190,7 +190,7 @@ public class AMReporter extends AbstractService { amNodeInfo.setNextHeartbeatTime(System.currentTimeMillis() + heartbeatInterval); pendingHeartbeatQueeu.add(amNodeInfo); } - amNodeInfo.setCurrentDagName(dagName); + amNodeInfo.setCurrentQueryIdentifier(queryIdentifier); amNodeInfo.incrementAndGetTaskCount(); } } @@ -214,12 +214,12 @@ public class AMReporter extends AbstractService { } public void taskKilled(String amLocation, int port, String user, Token<JobTokenIdentifier> jobToken, - final String queryId, final String dagName, final TezTaskAttemptID taskAttemptId) { + final QueryIdentifier queryIdentifier, final TezTaskAttemptID taskAttemptId) { // Not re-using the connection for the AM heartbeat - which may or may not be open by this point. // knownAppMasters is used for sending heartbeats for queued tasks. Killed messages use a new connection. LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); AMNodeInfo amNodeInfo = - new AMNodeInfo(amNodeId, user, jobToken, dagName, retryPolicy, retryTimeout, socketFactory, + new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, conf); // Even if the service hasn't started up. It's OK to make this invocation since this will @@ -251,8 +251,8 @@ public class AMReporter extends AbstractService { synchronized (knownAppMasters) { if (LOG.isDebugEnabled()) { LOG.debug( - "Removing am {} with last associated dag{} from heartbeat with taskCount={}, amFailed={}", - amNodeInfo.amNodeId, amNodeInfo.getCurrentDagName(), amNodeInfo.getTaskCount(), + "Removing am {} with last associated dag {} from heartbeat with taskCount={}, amFailed={}", + amNodeInfo.amNodeId, amNodeInfo.getCurrentQueryIdentifier(), amNodeInfo.getTaskCount(), amNodeInfo.hasAmFailed(), amNodeInfo); } knownAppMasters.remove(amNodeInfo.amNodeId); @@ -272,11 +272,11 @@ public class AMReporter extends AbstractService { @Override public void onFailure(Throwable t) { - String currentDagName = amNodeInfo.getCurrentDagName(); + QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier(); amNodeInfo.setAmFailed(true); LOG.warn("Heartbeat failed to AM {}. Killing all other tasks for the query={}", - amNodeInfo.amNodeId, currentDagName, t); - queryFailedHandler.queryFailed(null, currentDagName); + amNodeInfo.amNodeId, currentQueryIdentifier, t); + queryFailedHandler.queryFailed(currentQueryIdentifier); } }); } @@ -339,11 +339,11 @@ public class AMReporter extends AbstractService { amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()), nodeId.getPort()); } catch (IOException e) { - String currentDagName = amNodeInfo.getCurrentDagName(); + QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier(); amNodeInfo.setAmFailed(true); LOG.warn("Failed to communicated with AM at {}. Killing remaining fragments for query {}", - amNodeInfo.amNodeId, currentDagName, e); - queryFailedHandler.queryFailed(null, currentDagName); + amNodeInfo.amNodeId, currentQueryIdentifier, e); + queryFailedHandler.queryFailed(currentQueryIdentifier); } catch (InterruptedException e) { if (!isShutdown.get()) { LOG.warn("Interrupted while trying to send heartbeat to AM {}", amNodeInfo.amNodeId, e); @@ -370,21 +370,21 @@ public class AMReporter extends AbstractService { private final long timeout; private final SocketFactory socketFactory; private final AtomicBoolean amFailed = new AtomicBoolean(false); - private String currentDagName; + private QueryIdentifier currentQueryIdentifier; private LlapTaskUmbilicalProtocol umbilical; private long nextHeartbeatTime; public AMNodeInfo(LlapNodeId amNodeId, String user, Token<JobTokenIdentifier> jobToken, - String currentDagName, + QueryIdentifier currentQueryIdentifier, RetryPolicy retryPolicy, long timeout, SocketFactory socketFactory, Configuration conf) { this.user = user; this.jobToken = jobToken; - this.currentDagName = currentDagName; + this.currentQueryIdentifier = currentQueryIdentifier; this.retryPolicy = retryPolicy; this.timeout = timeout; this.socketFactory = socketFactory; @@ -439,12 +439,12 @@ public class AMReporter extends AbstractService { return taskCount.get(); } - public synchronized String getCurrentDagName() { - return currentDagName; + public synchronized QueryIdentifier getCurrentQueryIdentifier() { + return currentQueryIdentifier; } - public synchronized void setCurrentDagName(String currentDagName) { - this.currentDagName = currentDagName; + public synchronized void setCurrentQueryIdentifier(QueryIdentifier queryIdentifier) { + this.currentQueryIdentifier = queryIdentifier; } synchronized void setNextHeartbeatTime(long nextTime) { http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 0d85671..b0bf844 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -149,7 +149,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu @Override public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException { HistoryLogger.logFragmentStart(request.getApplicationIdString(), request.getContainerIdString(), - localAddress.get().getHostName(), request.getFragmentSpec().getDagName(), + localAddress.get().getHostName(), request.getFragmentSpec().getDagName(), request.getFragmentSpec().getDagId(), request.getFragmentSpec().getVertexName(), request.getFragmentSpec().getFragmentNumber(), request.getFragmentSpec().getAttemptNumber()); if (LOG.isInfoEnabled()) { @@ -172,8 +172,10 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu fragmentSpec.getFragmentIdentifierString()); int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId(); + QueryIdentifier queryIdentifier = new QueryIdentifier(request.getApplicationIdString(), dagIdentifier); + QueryFragmentInfo fragmentInfo = queryTracker - .registerFragment(null, request.getApplicationIdString(), fragmentSpec.getDagName(), + .registerFragment(queryIdentifier, request.getApplicationIdString(), fragmentSpec.getDagName(), dagIdentifier, fragmentSpec.getVertexName(), fragmentSpec.getFragmentNumber(), fragmentSpec.getAttemptNumber(), request.getUser(), request.getFragmentSpec()); @@ -239,28 +241,37 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu @Override public void initializeHook(TezProcessor source) { - queryTracker.registerDagQueryId(source.getContext().getDAGName(), + queryTracker.registerDagQueryId( + new QueryIdentifier(source.getContext().getApplicationId().toString(), + source.getContext().getDagIdentifier()), HiveConf.getVar(source.getConf(), HiveConf.ConfVars.HIVEQUERYID)); } } @Override - public SourceStateUpdatedResponseProto sourceStateUpdated(SourceStateUpdatedRequestProto request) { + public SourceStateUpdatedResponseProto sourceStateUpdated( + SourceStateUpdatedRequestProto request) { LOG.info("Processing state update: " + stringifySourceStateUpdateRequest(request)); - queryTracker.registerSourceStateChange(request.getDagName(), request.getSrcName(), + queryTracker.registerSourceStateChange( + new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(), + request.getQueryIdentifier().getDagIdentifier()), request.getSrcName(), request.getState()); return SourceStateUpdatedResponseProto.getDefaultInstance(); } @Override public QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request) { - LOG.info("Processing queryComplete notification for {}", request.getDagName()); + QueryIdentifier queryIdentifier = + new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(), + request.getQueryIdentifier().getDagIdentifier()); + LOG.info("Processing queryComplete notification for {}", queryIdentifier); List<QueryFragmentInfo> knownFragments = - queryTracker.queryComplete(null, request.getDagName(), request.getDeleteDelay()); - LOG.info("DBG: Pending fragment count for completed query {} = {}", request.getDagName(), + queryTracker + .queryComplete(queryIdentifier, request.getDeleteDelay()); + LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier, knownFragments.size()); for (QueryFragmentInfo fragmentInfo : knownFragments) { - LOG.info("DBG: Issuing killFragment for completed query {} {}", request.getDagName(), + LOG.info("DBG: Issuing killFragment for completed query {} {}", queryIdentifier, fragmentInfo.getFragmentIdentifierString()); executorService.killFragment(fragmentInfo.getFragmentIdentifierString()); } @@ -276,7 +287,9 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu private String stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto request) { StringBuilder sb = new StringBuilder(); - sb.append("dagName=").append(request.getDagName()) + QueryIdentifier queryIdentifier = new QueryIdentifier(request.getQueryIdentifier().getAppIdentifier(), + request.getQueryIdentifier().getDagIdentifier()); + sb.append("queryIdentifier=").append(queryIdentifier) .append(", ").append("sourceName=").append(request.getSrcName()) .append(", ").append("state=").append(request.getState()); return sb.toString(); @@ -342,14 +355,14 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu } @Override - public void queryFailed(String queryId, String dagName) { - LOG.info("Processing query failed notification for {}", dagName); + public void queryFailed(QueryIdentifier queryIdentifier) { + LOG.info("Processing query failed notification for {}", queryIdentifier); List<QueryFragmentInfo> knownFragments = - queryTracker.queryComplete(queryId, dagName, -1); - LOG.info("DBG: Pending fragment count for failed query {} = {}", dagName, + queryTracker.queryComplete(queryIdentifier, -1); + LOG.info("DBG: Pending fragment count for failed query {} = {}", queryIdentifier, knownFragments.size()); for (QueryFragmentInfo fragmentInfo : knownFragments) { - LOG.info("DBG: Issuing killFragment for failed query {} {}", dagName, + LOG.info("DBG: Issuing killFragment for failed query {} {}", queryIdentifier, fragmentInfo.getFragmentIdentifierString()); executorService.killFragment(fragmentInfo.getFragmentIdentifierString()); } @@ -359,9 +372,9 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu @Override public void taskKilled(String amLocation, int port, String user, - Token<JobTokenIdentifier> jobToken, String queryId, String dagName, + Token<JobTokenIdentifier> jobToken, QueryIdentifier queryIdentifier, TezTaskAttemptID taskAttemptId) { - amReporter.taskKilled(amLocation, port, user, jobToken, queryId, dagName, taskAttemptId); + amReporter.taskKilled(amLocation, port, user, jobToken, queryIdentifier, taskAttemptId); } } http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 4f1299d..98951e1 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -435,8 +435,8 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla private class QueryFailedHandlerProxy implements QueryFailedHandler { @Override - public void queryFailed(String queryId, String dagName) { - containerRunner.queryFailed(queryId, dagName); + public void queryFailed(QueryIdentifier queryIdentifier) { + containerRunner.queryFailed(queryIdentifier); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryIdentifier.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryIdentifier.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryIdentifier.java new file mode 100644 index 0000000..96e77e4 --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryIdentifier.java @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon.impl; + +/** + * An identifier for a query, which is unique. + */ +public final class QueryIdentifier { + + private final String appIdentifier; + private final int dagIdentifier; + + + public QueryIdentifier(String appIdentifier, int dagIdentifier) { + this.appIdentifier = appIdentifier; + this.dagIdentifier = dagIdentifier; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || !getClass().isAssignableFrom(o.getClass())) { + return false; + } + + QueryIdentifier that = (QueryIdentifier) o; + + if (dagIdentifier != that.dagIdentifier) { + return false; + } + return appIdentifier.equals(that.appIdentifier); + + } + + @Override + public int hashCode() { + int result = appIdentifier.hashCode(); + result = 31 * result + dagIdentifier; + return result; + } + + @Override + public String toString() { + return "QueryIdentifier{" + + "appIdentifier='" + appIdentifier + '\'' + + ", dagIdentifier=" + dagIdentifier + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index 27f2d4c..8bec95f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentS import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; public class QueryInfo { - private final String queryId; + private final QueryIdentifier queryIdentifier; private final String appIdString; private final String dagName; private final int dagIdentifier; @@ -54,10 +54,10 @@ public class QueryInfo { private final FinishableStateTracker finishableStateTracker = new FinishableStateTracker(); - public QueryInfo(String queryId, String appIdString, String dagName, int dagIdentifier, + public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagName, int dagIdentifier, String user, ConcurrentMap<String, SourceStateProto> sourceStateMap, String[] localDirsBase, FileSystem localFs) { - this.queryId = queryId; + this.queryIdentifier = queryIdentifier; this.appIdString = appIdString; this.dagName = dagName; this.dagIdentifier = dagIdentifier; @@ -67,18 +67,14 @@ public class QueryInfo { this.localFs = localFs; } - public String getQueryId() { - return queryId; + public QueryIdentifier getQueryIdentifier() { + return queryIdentifier; } public String getAppIdString() { return appIdString; } - public String getDagName() { - return dagName; - } - public int getDagIdentifier() { return dagIdentifier; } http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index 6deaefc..0676edd 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -56,8 +56,7 @@ public class QueryTracker extends AbstractService { private final ScheduledExecutorService executorService; - // TODO Make use if the query id for cachin when this is available. - private final ConcurrentHashMap<String, QueryInfo> queryInfoMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<QueryIdentifier, QueryInfo> queryInfoMap = new ConcurrentHashMap<>(); private final String[] localDirsBase; private final FileSystem localFs; @@ -70,22 +69,25 @@ public class QueryTracker extends AbstractService { // Alternately - send in an explicit dag start message before any other message is processed. // Multiple threads communicating from a single AM gets in the way of this. - // Keeps track of completed dags. Assumes dag names are unique across AMs. - private final Set<String> completedDagMap = Collections.newSetFromMap( - new ConcurrentHashMap<String, Boolean>()); + // Keeps track of completed DAGS. QueryIdentifiers need to be unique across applications. + private final Set<QueryIdentifier> completedDagMap = + Collections.newSetFromMap(new ConcurrentHashMap<QueryIdentifier, Boolean>()); private final Lock lock = new ReentrantLock(); - private final ConcurrentMap<String, ReadWriteLock> dagSpecificLocks = new ConcurrentHashMap<>(); + private final ConcurrentMap<QueryIdentifier, ReadWriteLock> dagSpecificLocks = new ConcurrentHashMap<>(); // Tracks various maps for dagCompletions. This is setup here since stateChange messages // may be processed by a thread which ends up executing before a task. - private final ConcurrentMap<String, ConcurrentMap<String, SourceStateProto>> - sourceCompletionMap = new ConcurrentHashMap<>(); + private final ConcurrentMap<QueryIdentifier, ConcurrentMap<String, SourceStateProto>> + sourceCompletionMap = new ConcurrentHashMap<>(); - // Tracks queryId by dagName. This can only be set when config is parsed in TezProcessor, + // Tracks HiveQueryId by QueryIdentifier. This can only be set when config is parsed in TezProcessor. // all the other existing code passes queryId equal to 0 everywhere. - private final ConcurrentHashMap<String, String> dagNameToQueryId = new ConcurrentHashMap<>(); + // If we switch the runtime and move to parsing the payload in the AM - the actual hive queryId could + // be sent over the wire from the AM, and will take the place of AppId+dagId in QueryIdentifier. + private final ConcurrentHashMap<QueryIdentifier, String> queryIdentifierToHiveQueryId = + new ConcurrentHashMap<>(); public QueryTracker(Configuration conf, String[] localDirsBase) { super("QueryTracker"); @@ -107,7 +109,7 @@ public class QueryTracker extends AbstractService { /** * Register a new fragment for a specific query - * @param queryId + * @param queryIdentifier * @param appIdString * @param dagName * @param dagIdentifier @@ -117,23 +119,23 @@ public class QueryTracker extends AbstractService { * @param user * @throws IOException */ - QueryFragmentInfo registerFragment(String queryId, String appIdString, String dagName, + QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, String dagName, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, String user, FragmentSpecProto fragmentSpec) throws IOException { - ReadWriteLock dagLock = getDagLock(dagName); + ReadWriteLock dagLock = getDagLock(queryIdentifier); dagLock.readLock().lock(); try { - if (!completedDagMap.contains(dagName)) { - QueryInfo queryInfo = queryInfoMap.get(dagName); + if (!completedDagMap.contains(queryIdentifier)) { + QueryInfo queryInfo = queryInfoMap.get(queryIdentifier); if (queryInfo == null) { - queryInfo = new QueryInfo(queryId, appIdString, dagName, dagIdentifier, user, - getSourceCompletionMap(dagName), localDirsBase, localFs); - queryInfoMap.putIfAbsent(dagName, queryInfo); + queryInfo = new QueryInfo(queryIdentifier, appIdString, dagName, dagIdentifier, user, + getSourceCompletionMap(queryIdentifier), localDirsBase, localFs); + queryInfoMap.putIfAbsent(queryIdentifier, queryInfo); } return queryInfo.registerFragment(vertexName, fragmentNumber, attemptNumber, fragmentSpec); } else { // Cleanup the dag lock here, since it may have been created after the query completed - dagSpecificLocks.remove(dagName); + dagSpecificLocks.remove(queryIdentifier); throw new RuntimeException( "Dag " + dagName + " already complete. Rejecting fragment [" + vertexName + ", " + fragmentNumber + ", " + attemptNumber + "]"); @@ -148,12 +150,12 @@ public class QueryTracker extends AbstractService { * @param fragmentInfo */ void fragmentComplete(QueryFragmentInfo fragmentInfo) { - String dagName = fragmentInfo.getQueryInfo().getDagName(); - QueryInfo queryInfo = queryInfoMap.get(dagName); + QueryIdentifier qId = fragmentInfo.getQueryInfo().getQueryIdentifier(); + QueryInfo queryInfo = queryInfoMap.get(qId); if (queryInfo == null) { // Possible because a queryComplete message from the AM can come in first - KILL / SUCCESSFUL, // before the fragmentComplete is reported - LOG.info("Ignoring fragmentComplete message for unknown query"); + LOG.info("Ignoring fragmentComplete message for unknown query: {}", qId); } else { queryInfo.unregisterFragment(fragmentInfo); } @@ -161,42 +163,40 @@ public class QueryTracker extends AbstractService { /** * Register completion for a query - * @param queryId - * @param dagName + * @param queryIdentifier * @param deleteDelay */ - List<QueryFragmentInfo> queryComplete(String queryId, String dagName, long deleteDelay) { + List<QueryFragmentInfo> queryComplete(QueryIdentifier queryIdentifier, long deleteDelay) { if (deleteDelay == -1) { deleteDelay = defaultDeleteDelaySeconds; } - ReadWriteLock dagLock = getDagLock(dagName); + ReadWriteLock dagLock = getDagLock(queryIdentifier); dagLock.writeLock().lock(); try { - rememberCompletedDag(dagName); - LOG.info("Processing queryComplete for dagName={} with deleteDelay={} seconds", - dagName, deleteDelay); - QueryInfo queryInfo = queryInfoMap.remove(dagName); + rememberCompletedDag(queryIdentifier); + LOG.info("Processing queryComplete for queryIdentifier={} with deleteDelay={} seconds", queryIdentifier, + deleteDelay); + QueryInfo queryInfo = queryInfoMap.remove(queryIdentifier); if (queryInfo == null) { - LOG.warn("Ignoring query complete for unknown dag: {}", dagName); + LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier); return Collections.emptyList(); } String[] localDirs = queryInfo.getLocalDirsNoCreate(); if (localDirs != null) { for (String localDir : localDirs) { cleanupDir(localDir, deleteDelay); - ShuffleHandler.get().unregisterDag(localDir, dagName, queryInfo.getDagIdentifier()); + ShuffleHandler.get().unregisterDag(localDir, queryInfo.getAppIdString(), queryInfo.getDagIdentifier()); } } // Clearing this before sending a kill is OK, since canFinish will change to false. // Ideally this should be a state machine where kills are issued to the executor, // and the structures are cleaned up once all tasks complete. New requests, however, // should not be allowed after a query complete is received. - sourceCompletionMap.remove(dagName); - String savedQueryId = dagNameToQueryId.remove(dagName); - queryId = queryId == null ? savedQueryId : queryId; - dagSpecificLocks.remove(dagName); - if (queryId != null) { - ObjectCacheFactory.removeLlapQueryCache(queryId); + sourceCompletionMap.remove(queryIdentifier); + String savedQueryId = queryIdentifierToHiveQueryId.remove(queryIdentifier); + dagSpecificLocks.remove(queryIdentifier); + if (savedQueryId != null) { + ObjectCacheFactory.removeLlapQueryCache(savedQueryId); } return queryInfo.getRegisteredFragments(); } finally { @@ -206,24 +206,24 @@ public class QueryTracker extends AbstractService { - public void rememberCompletedDag(String dagName) { - if (completedDagMap.add(dagName)) { + public void rememberCompletedDag(QueryIdentifier queryIdentifier) { + if (completedDagMap.add(queryIdentifier)) { // We will remember completed DAG for an hour to avoid execution out-of-order fragments. - executorService.schedule(new DagMapCleanerCallable(dagName), 1, TimeUnit.HOURS); + executorService.schedule(new DagMapCleanerCallable(queryIdentifier), 1, TimeUnit.HOURS); } else { - LOG.warn("Couldn't add {} to completed dag set", dagName); + LOG.warn("Couldn't add {} to completed dag set", queryIdentifier); } } /** * Register an update to a source within an executing dag - * @param dagName + * @param queryIdentifier * @param sourceName * @param sourceState */ - void registerSourceStateChange(String dagName, String sourceName, SourceStateProto sourceState) { - getSourceCompletionMap(dagName).put(sourceName, sourceState); - QueryInfo queryInfo = queryInfoMap.get(dagName); + void registerSourceStateChange(QueryIdentifier queryIdentifier, String sourceName, SourceStateProto sourceState) { + getSourceCompletionMap(queryIdentifier).put(sourceName, sourceState); + QueryInfo queryInfo = queryInfoMap.get(queryIdentifier); if (queryInfo != null) { queryInfo.sourceStateUpdated(sourceName); } else { @@ -233,13 +233,13 @@ public class QueryTracker extends AbstractService { } - private ReadWriteLock getDagLock(String dagName) { + private ReadWriteLock getDagLock(QueryIdentifier queryIdentifier) { lock.lock(); try { - ReadWriteLock dagLock = dagSpecificLocks.get(dagName); + ReadWriteLock dagLock = dagSpecificLocks.get(queryIdentifier); if (dagLock == null) { dagLock = new ReentrantReadWriteLock(); - dagSpecificLocks.put(dagName, dagLock); + dagSpecificLocks.put(queryIdentifier, dagLock); } return dagLock; } finally { @@ -247,20 +247,20 @@ public class QueryTracker extends AbstractService { } } - private ConcurrentMap<String, SourceStateProto> getSourceCompletionMap(String dagName) { - ConcurrentMap<String, SourceStateProto> dagMap = sourceCompletionMap.get(dagName); + private ConcurrentMap<String, SourceStateProto> getSourceCompletionMap(QueryIdentifier queryIdentifier) { + ConcurrentMap<String, SourceStateProto> dagMap = sourceCompletionMap.get(queryIdentifier); if (dagMap == null) { dagMap = new ConcurrentHashMap<>(); ConcurrentMap<String, SourceStateProto> old = - sourceCompletionMap.putIfAbsent(dagName, dagMap); + sourceCompletionMap.putIfAbsent(queryIdentifier, dagMap); dagMap = (old != null) ? old : dagMap; } return dagMap; } - public void registerDagQueryId(String dagName, String queryId) { - if (queryId == null) return; - dagNameToQueryId.putIfAbsent(dagName, queryId); + public void registerDagQueryId(QueryIdentifier queryIdentifier, String hiveQueryIdString) { + if (hiveQueryIdString == null) return; + queryIdentifierToHiveQueryId.putIfAbsent(queryIdentifier, hiveQueryIdString); } @Override @@ -302,15 +302,15 @@ public class QueryTracker extends AbstractService { } private class DagMapCleanerCallable extends CallableWithNdc<Void> { - private final String dagName; + private final QueryIdentifier queryIdentifier; - private DagMapCleanerCallable(String dagName) { - this.dagName = dagName; + private DagMapCleanerCallable(QueryIdentifier queryIdentifier) { + this.queryIdentifier = queryIdentifier; } @Override protected Void callInternal() { - completedDagMap.remove(dagName); + completedDagMap.remove(queryIdentifier); return null; } } http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index f03a2ff..b60f71f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -131,7 +131,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { // Register with the AMReporter when the callable is setup. Unregister once it starts running. if (jobToken != null) { this.amReporter.registerTask(request.getAmHost(), request.getAmPort(), - request.getUser(), jobToken, null, request.getFragmentSpec().getDagName()); + request.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier()); } this.metrics = metrics; this.requestId = request.getFragmentSpec().getFragmentIdentifierString(); @@ -297,9 +297,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { */ public void reportTaskKilled() { killedTaskHandler - .taskKilled(request.getAmHost(), request.getAmPort(), request.getUser(), jobToken, null, - taskSpec.getDAGName(), - taskSpec.getTaskAttemptID()); + .taskKilled(request.getAmHost(), request.getAmPort(), request.getUser(), jobToken, + fragmentInfo.getQueryInfo().getQueryIdentifier(), taskSpec.getTaskAttemptID()); } public boolean canFinish() { @@ -428,6 +427,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { HistoryLogger .logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(), executionContext.getHostName(), request.getFragmentSpec().getDagName(), + fragmentInfo.getQueryInfo().getDagIdentifier(), request.getFragmentSpec().getVertexName(), request.getFragmentSpec().getFragmentNumber(), request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName, @@ -445,6 +445,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { HistoryLogger .logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(), executionContext.getHostName(), request.getFragmentSpec().getDagName(), + fragmentInfo.getQueryInfo().getDagIdentifier(), request.getFragmentSpec().getVertexName(), request.getFragmentSpec().getFragmentNumber(), request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName, http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java index 7428a6a..f61d62f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java @@ -262,5 +262,4 @@ public class Converters { throw new RuntimeException("Unexpected state: " + state); } } - } http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 5c370ee..ae7d291 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; @@ -85,7 +86,8 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { private static final boolean isDebugEnabed = LOG.isDebugEnabled(); private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST; - private final ConcurrentMap<String, ByteBuffer> credentialMap; + + private final ConcurrentMap<QueryIdentifierProto, ByteBuffer> credentialMap; // Tracks containerIds and taskAttemptIds, so can be kept independent of the running DAG. // When DAG specific cleanup happens, it'll be better to link this to a DAG though. @@ -104,7 +106,8 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { private final ConcurrentMap<LlapNodeId, PingingNodeInfo> pingedNodeMap = new ConcurrentHashMap<>(); - private volatile String currentDagName; + private volatile int currentDagId; + private volatile QueryIdentifierProto currentQueryIdentifierProto; public LlapTaskCommunicator( TaskCommunicatorContext taskCommunicatorContext) { @@ -226,8 +229,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { int priority) { super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, credentialsChanged, priority); - if (taskSpec.getDAGName() != currentDagName) { - resetCurrentDag(taskSpec.getDAGName()); + int dagId = taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId(); + if (currentQueryIdentifierProto == null || (dagId != currentQueryIdentifierProto.getDagIdentifier())) { + resetCurrentDag(dagId); } @@ -251,7 +255,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { nodesForQuery.add(nodeId); sourceStateTracker.registerTaskForStateUpdates(host, port, taskSpec.getInputs()); - FragmentRuntimeInfo fragmentRuntimeInfo = sourceStateTracker.getFragmentRuntimeInfo(taskSpec.getDAGName(), + FragmentRuntimeInfo fragmentRuntimeInfo = sourceStateTracker.getFragmentRuntimeInfo( taskSpec.getVertexName(), taskSpec.getTaskAttemptID().getTaskID().getId(), priority); SubmitWorkRequestProto requestProto; @@ -349,7 +353,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { // NodeId can be null if the task gets unregistered due to failure / being killed by the daemon itself if (nodeId != null) { TerminateFragmentRequestProto request = - TerminateFragmentRequestProto.newBuilder().setDagName(currentDagName) + TerminateFragmentRequestProto.newBuilder().setQueryIdentifier(currentQueryIdentifierProto) .setFragmentIdentifierString(taskAttemptId.toString()).build(); communicator.sendTerminateFragment(request, nodeId.getHostname(), nodeId.getPort(), new LlapDaemonProtocolClientProxy.ExecuteRequestCallback<TerminateFragmentResponseProto>() { @@ -370,12 +374,16 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { } } + + + @Override - public void dagComplete(final String dagName) { - QueryCompleteRequestProto request = QueryCompleteRequestProto.newBuilder().setDagName( - dagName).setDeleteDelay(deleteDelayOnDagComplete).build(); + public void dagComplete(final int dagIdentifier) { + QueryCompleteRequestProto request = QueryCompleteRequestProto.newBuilder() + .setQueryIdentifier(constructQueryIdentifierProto(dagIdentifier)) + .setDeleteDelay(deleteDelayOnDagComplete).build(); for (final LlapNodeId llapNodeId : nodesForQuery) { - LOG.info("Sending dagComplete message for {}, to {}", dagName, llapNodeId); + LOG.info("Sending dagComplete message for {}, to {}", dagIdentifier, llapNodeId); communicator.sendQueryComplete(request, llapNodeId.getHostname(), llapNodeId.getPort(), new LlapDaemonProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.QueryCompleteResponseProto>() { @Override @@ -384,7 +392,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { @Override public void indicateError(Throwable t) { - LOG.warn("Failed to indicate dag complete dagId={} to node {}", dagName, llapNodeId); + LOG.warn("Failed to indicate dag complete dagId={} to node {}", dagIdentifier, llapNodeId); } }); } @@ -495,12 +503,12 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { } } - private void resetCurrentDag(String newDagName) { + private void resetCurrentDag(int newDagId) { // Working on the assumption that a single DAG runs at a time per AM. - currentDagName = newDagName; - sourceStateTracker.resetState(newDagName); + currentQueryIdentifierProto = constructQueryIdentifierProto(newDagId); + sourceStateTracker.resetState(newDagId); nodesForQuery.clear(); - LOG.info("CurrentDag set to: " + newDagName); + LOG.info("CurrentDagId set to: " + newDagId + ", name=" + getContext().getCurrentDagName()); // TODO Is it possible for heartbeats to come in from lost tasks - those should be told to die, which // is likely already happening. } @@ -518,10 +526,12 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { // Credentials can change across DAGs. Ideally construct only once per DAG. taskCredentials.addAll(getContext().getCredentials()); - ByteBuffer credentialsBinary = credentialMap.get(taskSpec.getDAGName()); + Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() == + taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId()); + ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto); if (credentialsBinary == null) { credentialsBinary = serializeCredentials(getContext().getCredentials()); - credentialMap.putIfAbsent(taskSpec.getDAGName(), credentialsBinary.duplicate()); + credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate()); } else { credentialsBinary = credentialsBinary.duplicate(); } @@ -736,4 +746,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { } } + + private QueryIdentifierProto constructQueryIdentifierProto(int dagIdentifier) { + return QueryIdentifierProto.newBuilder() + .setAppIdentifier(getContext().getCurrentAppIdentifier()).setDagIdentifier(dagIdentifier) + .build(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java index 066fae5..628fe9c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.Set; import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.hadoop.hive.llap.daemon.impl.QueryIdentifier; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.llap.LlapNodeId; @@ -45,28 +47,33 @@ public class SourceStateTracker { private final TaskCommunicatorContext taskCommunicatorContext; private final LlapTaskCommunicator taskCommunicator; + private final QueryIdentifierProto BASE_QUERY_IDENTIFIER; + // Tracks vertices for which notifications have been registered private final Set<String> notificationRegisteredVertices = new HashSet<>(); private final Map<String, SourceInfo> sourceInfoMap = new HashMap<>(); private final Map<LlapNodeId, NodeInfo> nodeInfoMap = new HashMap<>(); - private volatile String currentDagName; + private volatile QueryIdentifierProto currentQueryIdentifier; public SourceStateTracker(TaskCommunicatorContext taskCommunicatorContext, LlapTaskCommunicator taskCommunicator) { this.taskCommunicatorContext = taskCommunicatorContext; this.taskCommunicator = taskCommunicator; + BASE_QUERY_IDENTIFIER = QueryIdentifierProto.newBuilder() + .setAppIdentifier(taskCommunicatorContext.getCurrentAppIdentifier()).build(); } /** * To be invoked after each DAG completes. */ - public synchronized void resetState(String newDagName) { + public synchronized void resetState(int newDagId) { sourceInfoMap.clear(); nodeInfoMap.clear(); notificationRegisteredVertices.clear(); - this.currentDagName = newDagName; + this.currentQueryIdentifier = + QueryIdentifierProto.newBuilder(BASE_QUERY_IDENTIFIER).setDagIdentifier(newDagId).build(); } /** @@ -139,16 +146,16 @@ public class SourceStateTracker { } + // Assumes serialized DAGs within an AM, and a reset of structures after each DAG completes. /** * Constructs FragmentRuntimeInfo for scheduling within LLAP daemons. * Also caches state based on state updates. - * @param dagName * @param vertexName * @param fragmentNumber * @param priority * @return */ - public synchronized FragmentRuntimeInfo getFragmentRuntimeInfo(String dagName, String vertexName, int fragmentNumber, + public synchronized FragmentRuntimeInfo getFragmentRuntimeInfo(String vertexName, int fragmentNumber, int priority) { FragmentRuntimeInfo.Builder builder = FragmentRuntimeInfo.newBuilder(); maybeRegisterForVertexUpdates(vertexName); @@ -282,9 +289,8 @@ public class SourceStateTracker { void sendStateUpdateToNode(LlapNodeId nodeId, String sourceName, VertexState state) { taskCommunicator.sendStateUpdate(nodeId.getHostname(), nodeId.getPort(), - SourceStateUpdatedRequestProto.newBuilder().setDagName(currentDagName).setSrcName( - sourceName) - .setState(Converters.fromVertexState(state)).build()); + SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(currentQueryIdentifier) + .setSrcName(sourceName).setState(Converters.fromVertexState(state)).build()); } http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/protobuf/LlapDaemonProtocol.proto ---------------------------------------------------------------------- diff --git a/llap-server/src/protobuf/LlapDaemonProtocol.proto b/llap-server/src/protobuf/LlapDaemonProtocol.proto index a2d944f..944c96c 100644 --- a/llap-server/src/protobuf/LlapDaemonProtocol.proto +++ b/llap-server/src/protobuf/LlapDaemonProtocol.proto @@ -50,6 +50,7 @@ message GroupInputSpecProto { message FragmentSpecProto { optional string fragment_identifier_string = 1; optional string dag_name = 2; + optional int32 dag_id = 11; optional string vertex_name = 3; optional EntityDescriptorProto processor_descriptor = 4; repeated IOSpecProto input_specs = 5; @@ -74,6 +75,11 @@ enum SourceStateProto { S_RUNNING = 2; } +message QueryIdentifierProto { + optional string app_identifier = 1; + optional int32 dag_identifier = 2; +} + message SubmitWorkRequestProto { optional string container_id_string = 1; optional string am_host = 2; @@ -98,7 +104,7 @@ message SubmitWorkResponseProto { } message SourceStateUpdatedRequestProto { - optional string dag_name = 1; + optional QueryIdentifierProto query_identifier = 1; optional string src_name = 2; optional SourceStateProto state = 3; } @@ -108,17 +114,16 @@ message SourceStateUpdatedResponseProto { message QueryCompleteRequestProto { optional string query_id = 1; - optional string dag_name = 2; - optional int64 delete_delay = 3 [default = 0]; + optional QueryIdentifierProto query_identifier = 2; + optional int64 delete_delay = 4 [default = 0]; } message QueryCompleteResponseProto { } message TerminateFragmentRequestProto { - optional string query_id = 1; - optional string dag_name = 2; - optional string fragment_identifier_string = 7; + optional QueryIdentifierProto query_identifier = 1; + optional string fragment_identifier_string = 2; } message TerminateFragmentResponseProto { http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 38af07e..ef49714 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -16,6 +16,7 @@ package org.apache.hadoop.hive.llap.daemon.impl; import static org.mockito.Mockito.mock; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; @@ -25,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.security.Credentials; @@ -48,18 +50,36 @@ public class TaskExecutorTestHelpers { SubmitWorkRequestProto requestProto = createSubmitWorkRequestProto(fragmentNum, parallelism, startTime); - MockRequest mockRequest = new MockRequest(requestProto, canFinish, workTime); + QueryFragmentInfo queryFragmentInfo = createQueryFragmentInfo(requestProto.getFragmentSpec()); + MockRequest mockRequest = new MockRequest(requestProto, queryFragmentInfo, canFinish, workTime); return mockRequest; } public static TaskExecutorService.TaskWrapper createTaskWrapper( SubmitWorkRequestProto request, boolean canFinish, int workTime) { - MockRequest mockRequest = new MockRequest(request, canFinish, workTime); + QueryFragmentInfo queryFragmentInfo = createQueryFragmentInfo(request.getFragmentSpec()); + MockRequest mockRequest = new MockRequest(request, queryFragmentInfo, canFinish, workTime); TaskExecutorService.TaskWrapper taskWrapper = new TaskExecutorService.TaskWrapper(mockRequest, null); return taskWrapper; } + public static QueryFragmentInfo createQueryFragmentInfo(FragmentSpecProto fragmentSpecProto) { + QueryInfo queryInfo = createQueryInfo(); + QueryFragmentInfo fragmentInfo = + new QueryFragmentInfo(queryInfo, "fakeVertexName", fragmentSpecProto.getFragmentNumber(), 0, + fragmentSpecProto); + return fragmentInfo; + } + + public static QueryInfo createQueryInfo() { + QueryIdentifier queryIdentifier = new QueryIdentifier("fake_app_id_string", 1); + QueryInfo queryInfo = + new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_name", 1, "fakeUser", + new ConcurrentHashMap<String, LlapDaemonProtocolProtos.SourceStateProto>(), + new String[0], null); + return queryInfo; + } public static SubmitWorkRequestProto createSubmitWorkRequestProto( int fragmentNumber, int selfAndUpstreamParallelism, @@ -80,7 +100,7 @@ public class TaskExecutorTestHelpers { return SubmitWorkRequestProto .newBuilder() .setFragmentSpec( - LlapDaemonProtocolProtos.FragmentSpecProto + FragmentSpecProto .newBuilder() .setAttemptNumber(0) .setDagName("MockDag") @@ -119,9 +139,9 @@ public class TaskExecutorTestHelpers { private boolean shouldSleep = true; private final Condition finishedCondition = lock.newCondition(); - public MockRequest(SubmitWorkRequestProto requestProto, + public MockRequest(SubmitWorkRequestProto requestProto, QueryFragmentInfo fragmentInfo, boolean canFinish, long workTime) { - super(requestProto, mock(QueryFragmentInfo.class), new Configuration(), + super(requestProto, fragmentInfo, new Configuration(), new ExecutionContextImpl("localhost"), null, new Credentials(), 0, null, null, mock( LlapDaemonExecutorMetrics.class), mock(KilledTaskHandler.class), mock( http://git-wip-us.apache.org/repos/asf/hive/blob/94e8761a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestQueryIdentifier.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestQueryIdentifier.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestQueryIdentifier.java new file mode 100644 index 0000000..39a3865 --- /dev/null +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestQueryIdentifier.java @@ -0,0 +1,48 @@ +package org.apache.hadoop.hive.llap.daemon.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import org.junit.Test; + +public class TestQueryIdentifier { + + @Test (timeout = 5000) + public void testEquality() { + + String appIdString1 = "app1"; + String appIdString2 = "app2"; + + int dagId1 = 1; + int dagId2 = 2; + + QueryIdentifier[] queryIdentifiers = new QueryIdentifier[4]; + + queryIdentifiers[0] = new QueryIdentifier(appIdString1, dagId1); + queryIdentifiers[1] = new QueryIdentifier(appIdString1, dagId2); + queryIdentifiers[2] = new QueryIdentifier(appIdString2, dagId1); + queryIdentifiers[3] = new QueryIdentifier(appIdString2, dagId2); + + for (int i = 0 ; i < 4 ; i++) { + for (int j = 0 ; j < 4 ; j++) { + if (i == j) { + assertEquals(queryIdentifiers[i], queryIdentifiers[j]); + } else { + assertNotEquals(queryIdentifiers[i], queryIdentifiers[j]); + } + } + } + + QueryIdentifier q11 = new QueryIdentifier(appIdString1, dagId1); + QueryIdentifier q12 = new QueryIdentifier(appIdString1, dagId2); + QueryIdentifier q21 = new QueryIdentifier(appIdString2, dagId1); + QueryIdentifier q22 = new QueryIdentifier(appIdString2, dagId2); + + assertEquals(queryIdentifiers[0], q11); + assertEquals(queryIdentifiers[1], q12); + assertEquals(queryIdentifiers[2], q21); + assertEquals(queryIdentifiers[3], q22); + + + } +}