Repository: hive
Updated Branches:
  refs/heads/branch-2.0 5b7230d8e -> 91adc5287


HIVE-12931. Shuffle tokens stay around forever in LLAP. (Siddharth Seth, 
reviewed by Sergey Shelukhin)
(cherry picked from commit 1e6fa1eb3f8b7eeb551355cb6ebee483ef4e4d71)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/91adc528
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/91adc528
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/91adc528

Branch: refs/heads/branch-2.0
Commit: 91adc5287b6677ea1ea23b97524b007ff01568fa
Parents: 5b7230d
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Feb 1 11:14:37 2016 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Feb 1 11:15:23 2016 -0800

----------------------------------------------------------------------
 .../llap/daemon/impl/ContainerRunnerImpl.java   | 31 +++++--------
 .../hive/llap/daemon/impl/QueryTracker.java     | 14 +++++-
 .../llap/shufflehandler/ShuffleHandler.java     | 49 +++++++++++++++-----
 3 files changed, 62 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/91adc528/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index a2a55cc..e7c7f44 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -46,7 +46,6 @@ import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
-import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
 import org.apache.hadoop.hive.ql.exec.tez.TezProcessor;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.security.Credentials;
@@ -178,36 +177,30 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
 
       QueryIdentifier queryIdentifier = new 
QueryIdentifier(request.getApplicationIdString(), dagIdentifier);
 
+      Credentials credentials = new Credentials();
+      DataInputBuffer dib = new DataInputBuffer();
+      byte[] tokenBytes = request.getCredentialsBinary().toByteArray();
+      dib.reset(tokenBytes, tokenBytes.length);
+      credentials.readTokenStorageStream(dib);
+
+      Token<JobTokenIdentifier> jobToken = 
TokenCache.getSessionToken(credentials);
+
       QueryFragmentInfo fragmentInfo = queryTracker
-          .registerFragment(queryIdentifier, request.getApplicationIdString(), 
fragmentSpec.getDagName(),
+          .registerFragment(queryIdentifier, request.getApplicationIdString(),
+              fragmentSpec.getDagName(),
               dagIdentifier,
               fragmentSpec.getVertexName(), fragmentSpec.getFragmentNumber(),
-              fragmentSpec.getAttemptNumber(), request.getUser(), 
request.getFragmentSpec());
+              fragmentSpec.getAttemptNumber(), request.getUser(), 
request.getFragmentSpec(),
+              jobToken);
 
       String[] localDirs = fragmentInfo.getLocalDirs();
       Preconditions.checkNotNull(localDirs);
-
       if (LOG.isDebugEnabled()) {
         LOG.debug("Dirs are: " + Arrays.toString(localDirs));
       }
       // May need to setup localDir for re-localization, which is usually 
setup as Environment.PWD.
       // Used for re-localization, to add the user specified configuration 
(conf_pb_binary_stream)
 
-      Credentials credentials = new Credentials();
-      DataInputBuffer dib = new DataInputBuffer();
-      byte[] tokenBytes = request.getCredentialsBinary().toByteArray();
-      dib.reset(tokenBytes, tokenBytes.length);
-      credentials.readTokenStorageStream(dib);
-
-      Token<JobTokenIdentifier> jobToken = 
TokenCache.getSessionToken(credentials);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Registering request with the ShuffleHandler");
-      }
-      ShuffleHandler.get()
-          .registerDag(request.getApplicationIdString(), dagIdentifier, 
jobToken,
-              request.getUser(), localDirs);
-
       TaskRunnerCallable callable = new TaskRunnerCallable(request, 
fragmentInfo, new Configuration(getConfig()),
           new LlapExecutionContext(localAddress.get().getHostName(), 
queryTracker), env,
           credentials, memoryPerExecutor, amReporter, confParams, metrics, 
killedTaskHandler,

http://git-wip-us.apache.org/repos/asf/hive/blob/91adc528/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 0676edd..80264a0 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -19,6 +19,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.token.Token;
 import org.apache.tez.common.CallableWithNdc;
 
 import org.apache.hadoop.service.AbstractService;
@@ -31,6 +32,7 @@ import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentS
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
 import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
 import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+import org.apache.tez.common.security.JobTokenIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -120,8 +122,8 @@ public class QueryTracker extends AbstractService {
    * @throws IOException
    */
   QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String 
appIdString, String dagName,
-      int dagIdentifier, String vertexName, int fragmentNumber, int 
attemptNumber, String user,
-      FragmentSpecProto fragmentSpec) throws IOException {
+                                     int dagIdentifier, String vertexName, int 
fragmentNumber, int attemptNumber, String user,
+                                     FragmentSpecProto fragmentSpec, 
Token<JobTokenIdentifier> appToken) throws IOException {
     ReadWriteLock dagLock = getDagLock(queryIdentifier);
     dagLock.readLock().lock();
     try {
@@ -132,6 +134,14 @@ public class QueryTracker extends AbstractService {
               getSourceCompletionMap(queryIdentifier), localDirsBase, localFs);
           queryInfoMap.putIfAbsent(queryIdentifier, queryInfo);
         }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Registering request for {} with the ShuffleHandler", 
queryIdentifier);
+        }
+        ShuffleHandler.get()
+            .registerDag(appIdString, dagIdentifier, appToken,
+                user, queryInfo.getLocalDirs());
+
         return queryInfo.registerFragment(vertexName, fragmentNumber, 
attemptNumber, fragmentSpec);
       } else {
         // Cleanup the dag lock here, since it may have been created after the 
query completed

http://git-wip-us.apache.org/repos/asf/hive/blob/91adc528/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
index b042455..2c51169 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
@@ -155,7 +155,7 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
   private final ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
 
   /* List of registered applications */
-  private final ConcurrentMap<String, Boolean> registeredApps = new 
ConcurrentHashMap<String, Boolean>();
+  private final ConcurrentMap<String, Integer> registeredApps = new 
ConcurrentHashMap<>();
   /* Maps application identifiers (jobIds) to the associated user for the app 
*/
   private final ConcurrentMap<String,String> userRsrc;
   private JobTokenSecretManager secretManager;
@@ -406,6 +406,9 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
 
   /**
    * Register an application and it's associated credentials and user 
information.
+   *
+   * This method and unregisterDag must be synchronized externally to prevent 
races in shuffle token registration/unregistration
+   *
    * @param applicationIdString
    * @param dagIdentifier
    * @param appToken
@@ -414,12 +417,24 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
   public void registerDag(String applicationIdString, int dagIdentifier,
                           Token<JobTokenIdentifier> appToken,
                           String user, String[] appDirs) {
-    // TODO Fix this. There's a race here, where an app may think everything 
is registered, finish really fast, send events and the consumer will not find 
the registration.
-    Boolean registered = registeredApps.putIfAbsent(applicationIdString, 
Boolean.valueOf(true));
-    if (registered == null) {
-      LOG.debug("Registering watches for AppDirs: appId=" + 
applicationIdString);
+    Integer registeredDagIdentifier = 
registeredApps.putIfAbsent(applicationIdString, dagIdentifier);
+    // App never seen, or previous dag has been unregistered.
+    if (registeredDagIdentifier == null) {
       recordJobShuffleInfo(applicationIdString, user, appToken);
+    }
+    // Register the new dag identifier, if that's not the one currently 
registered.
+    // Register comes in before the unregister for the previous dag
+    if (registeredDagIdentifier != null && 
!registeredDagIdentifier.equals(dagIdentifier)) {
+      registeredApps.put(applicationIdString, dagIdentifier);
+      // Don't need to recordShuffleInfo since the out of sync unregister will 
not remove the
+      // credentials
+    }
+    // First time registration, or new register comes in before the previous 
unregister.
+    if (registeredDagIdentifier == null || 
!registeredDagIdentifier.equals(dagIdentifier)) {
       if (dirWatcher != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Registering watches for AppDirs: appId={}, dagId={}", 
applicationIdString, dagIdentifier);
+        }
         for (String appDir : appDirs) {
           try {
             dirWatcher.registerDagDir(appDir, applicationIdString, 
dagIdentifier, user,
@@ -432,15 +447,27 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
     }
   }
 
+  /**
+   * Unregister a specific dag
+   *
+   * This method and registerDag must be synchronized externally to prevent 
races in shuffle token registration/unregistration
+   *
+   * @param dir
+   * @param applicationIdString
+   * @param dagIdentifier
+   */
   public void unregisterDag(String dir, String applicationIdString, int 
dagIdentifier) {
+    Integer currentDagIdentifier = registeredApps.get(applicationIdString);
+    // Unregister may come in after the new dag has started running. The 
methods are expected to
+    // be synchronized, hence the following check is sufficient.
+    if (currentDagIdentifier != null && 
currentDagIdentifier.equals(dagIdentifier)) {
+      registeredApps.remove(applicationIdString);
+      removeJobShuffleInfo(applicationIdString);
+    }
+    // Unregister for the dirWatcher for the specific dagIdentifier in either 
case.
     if (dirWatcher != null) {
       dirWatcher.unregisterDagDir(dir, applicationIdString, dagIdentifier);
     }
-    // TODO Cleanup registered tokens and dag info
-  }
-
-  public void unregisterApplication(String applicationIdString) {
-    removeJobShuffleInfo(applicationIdString);
   }
 
 
@@ -468,7 +495,7 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
     // This is in place to be compatible with the MR ShuffleHandler. Requests 
from ShuffleInputs
     // arrive with a job_ prefix.
     String jobIdString = appIdString.replace("application", "job");
-    userRsrc.put(jobIdString, user);
+    userRsrc.putIfAbsent(jobIdString, user);
     secretManager.addTokenForJob(jobIdString, jobToken);
     LOG.info("Added token for " + jobIdString);
   }

Reply via email to