Author: shv Date: Tue Jun 5 04:20:53 2012 New Revision: 1346235 URL: http://svn.apache.org/viewvc?rev=1346235&view=rev Log: MAPREDUCE-2452. Moves the cancellation of delegation tokens to a separate thread. Contributed by Devaraj Das and Benoy Antony.
Modified: hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java hadoop/common/branches/branch-0.22/mapreduce/src/test/findbugsExcludeFile.xml Modified: hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt?rev=1346235&r1=1346234&r2=1346235&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt Tue Jun 5 04:20:53 2012 @@ -44,6 +44,10 @@ Release 0.22.1 - Unreleased MAPREDUCE-2420. JobTracker should be able to renew delegation token over HTTP. (Boris Shkolnik and Benoy Antony via shv) + MAPREDUCE-2452. Moves the cancellation of delegation tokens to a separate + thread. (Devaraj Das and Benoy Antony via shv) + + Release 0.22.0 - 2011-11-29 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java?rev=1346235&r1=1346234&r2=1346235&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java Tue Jun 5 04:20:53 2012 @@ -30,6 +30,7 @@ import java.util.Iterator; import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,6 +50,7 @@ import org.apache.hadoop.security.token. import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.util.StringUtils; @InterfaceAudience.Private @@ -107,10 +109,87 @@ public class DelegationTokenRenewal { // global single timer (daemon) private static Timer renewalTimer = new Timer(true); + //delegation token canceler thread + private static DelegationTokenCancelThread dtCancelThread = + new DelegationTokenCancelThread(); + static { + dtCancelThread.start(); + } + + //managing the list of tokens using Map // jobId=>List<tokens> private static Set<DelegationTokenToRenew> delegationTokens = Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>()); + + private static class DelegationTokenCancelThread extends Thread { + private static class TokenWithConf { + Token<DelegationTokenIdentifier> token; + Configuration conf; + TokenWithConf(Token<DelegationTokenIdentifier> token, + Configuration conf) { + this.token = token; + this.conf = conf; + } + } + private LinkedBlockingQueue<TokenWithConf> queue = + new LinkedBlockingQueue<TokenWithConf>(); + + public DelegationTokenCancelThread() { + super("Delegation Token Canceler"); + setDaemon(true); + } + public void cancelToken(Token<DelegationTokenIdentifier> token, + Configuration conf) { + TokenWithConf tokenWithConf = new TokenWithConf(token, conf); + while (!queue.offer(tokenWithConf)) { + LOG.warn("Unable to add token " + token + " for cancellation. " + + "Will retry.."); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + public void run() { + while (true) { + TokenWithConf tokenWithConf = null; + try { + tokenWithConf = queue.take(); + DistributedFileSystem dfs = null; + try { + // do it over rpc. For that we need DFS object + dfs = getDFSForToken(tokenWithConf.token, tokenWithConf.conf); + } catch (Exception e) { + LOG.info("couldn't get DFS to cancel. Will retry over HTTPS"); + dfs = null; + } + + if(dfs != null) { + dfs.cancelDelegationToken(tokenWithConf.token); + } else { + cancelDelegationTokenOverHttps(tokenWithConf.token, + tokenWithConf.conf); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Canceling token " + tokenWithConf.token.getService() + + " for dfs=" + dfs); + } + } catch (IOException e) { + LOG.warn("Failed to cancel token " + tokenWithConf.token + " " + + StringUtils.stringifyException(e)); + } catch (InterruptedException ie) { + return; + } catch (Throwable t) { + LOG.warn("Got exception " + StringUtils.stringifyException(t) + + ". Exiting.."); + System.exit(-1); + } + } + } + } //adding token private static void addTokenToList(DelegationTokenToRenew t) { delegationTokens.add(t); @@ -337,24 +416,7 @@ public class DelegationTokenRenewal { Configuration conf = t.conf; if(token.getKind().equals(kindHdfs)) { - DistributedFileSystem dfs = null; - try { - // do it over rpc. For that we need DFS object - dfs = getDFSForToken(token, conf); - } catch (Exception e) { - LOG.info("couldn't get DFS to cancel. Will retry over HTTPS"); - dfs = null; - } - - try { - if(dfs != null) { - dfs.cancelDelegationToken(token); - } else { - cancelDelegationTokenOverHttps(token,conf); - } - } catch (Exception e) { - LOG.warn("Failed to cancel " + token, e); - } + dtCancelThread.cancelToken(token, conf); } } Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/findbugsExcludeFile.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/findbugsExcludeFile.xml?rev=1346235&r1=1346234&r2=1346235&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/test/findbugsExcludeFile.xml (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/test/findbugsExcludeFile.xml Tue Jun 5 04:20:53 2012 @@ -388,4 +388,9 @@ <Field name="started" /> <Bug pattern="IS2_INCONSISTENT_SYNC" /> </Match> + <Match> + <Class name="org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal$DelegationTokenCancelThread" /> + <Method name="run" /> + <Bug pattern="DM_EXIT" /> + </Match> </FindBugsFilter>