Author: shv
Date: Thu Feb 25 21:43:20 2010
New Revision: 916468

URL: http://svn.apache.org/viewvc?rev=916468&view=rev
Log:
HADOOP-6573. Support for persistent delegation tokens. Contributed by Jitendra 
Pandey.

Modified:
    hadoop/common/trunk/CHANGES.txt
    
hadoop/common/trunk/src/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
    
hadoop/common/trunk/src/test/core/org/apache/hadoop/security/token/delegation/TestDelegationToken.java

Modified: hadoop/common/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=916468&r1=916467&r2=916468&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Thu Feb 25 21:43:20 2010
@@ -170,6 +170,9 @@
     HADOOP-6596. Add a version field to the AbstractDelegationTokenIdentifier's
     serialized value. (omalley)
 
+    HADOOP-6573. Support for persistent delegation tokens.
+    (Jitendra Pandey via shv)
+
   OPTIMIZATIONS
 
     HADOOP-6467. Improve the performance on HarFileSystem.listStatus(..).

Modified: 
hadoop/common/trunk/src/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java?rev=916468&r1=916467&r2=916468&view=diff
==============================================================================
--- 
hadoop/common/trunk/src/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
 (original)
+++ 
hadoop/common/trunk/src/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
 Thu Feb 25 21:43:20 2010
@@ -52,23 +52,30 @@
 
   /** 
    * Cache of currently valid tokens, mapping from DelegationTokenIdentifier 
-   * to DelegationTokenInformation. Protected by its own lock.
+   * to DelegationTokenInformation. Protected by this object lock.
    */
-  private final Map<TokenIdent, DelegationTokenInformation> currentTokens 
+  protected final Map<TokenIdent, DelegationTokenInformation> currentTokens 
       = new HashMap<TokenIdent, DelegationTokenInformation>();
   
   /**
-   * Sequence number to create DelegationTokenIdentifier
+   * Sequence number to create DelegationTokenIdentifier.
+   * Protected by this object lock.
    */
-  private int delegationTokenSequenceNumber = 0;
+  protected int delegationTokenSequenceNumber = 0;
   
-  private final Map<Integer, DelegationKey> allKeys 
+  /**
+   * Access to allKeys is protected by this object lock
+   */
+  protected final Map<Integer, DelegationKey> allKeys 
       = new HashMap<Integer, DelegationKey>();
   
   /**
-   * Access to currentId and currentKey is protected by this object lock.
+   * Access to currentId is protected by this object lock.
+   */
+  protected int currentId = 0;
+  /**
+   * Access to currentKey is protected by this object lock
    */
-  private int currentId = 0;
   private DelegationKey currentKey;
   
   private long keyUpdateInterval;
@@ -76,7 +83,7 @@
   private long tokenRemoverScanInterval;
   private long tokenRenewInterval;
   private Thread tokenRemoverThread;
-  private volatile boolean running;
+  protected volatile boolean running;
 
   public AbstractDelegationTokenSecretManager(long delegationKeyUpdateInterval,
       long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
@@ -112,27 +119,50 @@
     return allKeys.values().toArray(new DelegationKey[0]);
   }
   
-  /** Update the current master key */
-  private synchronized void updateCurrentKey() throws IOException {
+  protected void logUpdateMasterKey(DelegationKey key) throws IOException {
+    return;
+  }
+  
+  /** 
+   * Update the current master key 
+   * This is called once by startThreads before tokenRemoverThread is created, 
+   * and only by tokenRemoverThread afterwards.
+   */
+  private void updateCurrentKey() throws IOException {
     LOG.info("Updating the current master key for generating delegation 
tokens");
     /* Create a new currentKey with an estimated expiry date. */
-    currentId++;
-    currentKey = new DelegationKey(currentId, System.currentTimeMillis()
+    int newCurrentId;
+    synchronized (this) {
+      newCurrentId = currentId+1;
+    }
+    DelegationKey newKey = new DelegationKey(newCurrentId, System
+        .currentTimeMillis()
         + keyUpdateInterval + tokenMaxLifetime, generateSecret());
-    allKeys.put(currentKey.getKeyId(), currentKey);
+    //Log must be invoked outside the lock on 'this'
+    logUpdateMasterKey(newKey);
+    synchronized (this) {
+      currentId = newKey.getKeyId();
+      currentKey = newKey;
+      allKeys.put(currentKey.getKeyId(), currentKey);
+    }
   }
   
-  /** Update the current master key for generating delegation tokens */
-  public synchronized void rollMasterKey() throws IOException {
-    removeExpiredKeys();
-    /* set final expiry date for retiring currentKey */
-    currentKey.setExpiryDate(System.currentTimeMillis() + tokenMaxLifetime);
-    /*
-     * currentKey might have been removed by removeExpiredKeys(), if
-     * updateMasterKey() isn't called at expected interval. Add it back to
-     * allKeys just in case.
-     */
-    allKeys.put(currentKey.getKeyId(), currentKey);
+  /** 
+   * Update the current master key for generating delegation tokens 
+   * It should be called only by tokenRemoverThread.
+   */
+  void rollMasterKey() throws IOException {
+    synchronized (this) {
+      removeExpiredKeys();
+      /* set final expiry date for retiring currentKey */
+      currentKey.setExpiryDate(System.currentTimeMillis() + tokenMaxLifetime);
+      /*
+       * currentKey might have been removed by removeExpiredKeys(), if
+       * updateMasterKey() isn't called at expected interval. Add it back to
+       * allKeys just in case.
+       */
+      allKeys.put(currentKey.getKeyId(), currentKey);
+    }
     updateCurrentKey();
   }
 
@@ -148,35 +178,24 @@
   }
   
   @Override
-  protected byte[] createPassword(TokenIdent identifier) {
+  protected synchronized byte[] createPassword(TokenIdent identifier) {
     int sequenceNum;
-    int id;
-    DelegationKey key;
-    long now = System.currentTimeMillis();    
-    synchronized (this) {
-      id = currentId;
-      key = currentKey;
-      sequenceNum = ++delegationTokenSequenceNumber;
-    }
+    long now = System.currentTimeMillis();
+    sequenceNum = ++delegationTokenSequenceNumber;
     identifier.setIssueDate(now);
     identifier.setMaxDate(now + tokenMaxLifetime);
-    identifier.setMasterKeyId(id);
+    identifier.setMasterKeyId(currentId);
     identifier.setSequenceNumber(sequenceNum);
-    byte[] password = createPassword(identifier.getBytes(), key.getKey());
-    synchronized (currentTokens) {
-      currentTokens.put(identifier, new DelegationTokenInformation(now
-          + tokenRenewInterval, password));
-    }
+    byte[] password = createPassword(identifier.getBytes(), 
currentKey.getKey());
+    currentTokens.put(identifier, new DelegationTokenInformation(now
+        + tokenRenewInterval, password));
     return password;
   }
 
   @Override
-  public byte[] retrievePassword(TokenIdent identifier
-                                 ) throws InvalidToken {
-    DelegationTokenInformation info = null;
-    synchronized (currentTokens) {
-      info = currentTokens.get(identifier);
-    }
+  public synchronized byte[] retrievePassword(TokenIdent identifier)
+      throws InvalidToken {
+    DelegationTokenInformation info = currentTokens.get(identifier);
     if (info == null) {
       throw new InvalidToken("token is expired or doesn't exist");
     }
@@ -195,18 +214,14 @@
    * @throws InvalidToken if the token is invalid
    * @throws AccessControlException if the user can't renew token
    */
-  public long renewToken(Token<TokenIdent> token,
+  public synchronized long renewToken(Token<TokenIdent> token,
                          String renewer) throws InvalidToken, IOException {
     long now = System.currentTimeMillis();
     ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
     DataInputStream in = new DataInputStream(buf);
     TokenIdent id = createIdentifier();
     id.readFields(in);
-    synchronized (currentTokens) {
-      if (currentTokens.get(id) == null) {
-        throw new InvalidToken("Renewal request for unknown token");
-      }
-    }
+
     if (id.getMaxDate() < now) {
       throw new InvalidToken("User " + renewer + 
                              " tried to renew an expired token");
@@ -222,36 +237,36 @@
                                        "renewer specified as " + 
                                        id.getRenewer());
     }
-    DelegationKey key = null;
-    synchronized (this) {
-      key = allKeys.get(id.getMasterKeyId());
-    }
+    DelegationKey key = allKeys.get(id.getMasterKeyId());
     if (key == null) {
-      throw new InvalidToken("Unable to find master key for keyId=" + 
-                             id.getMasterKeyId() +
-                             " from cache. Failed to renew an unexpired token"+
-                             " with sequenceNumber=" + id.getSequenceNumber());
+      throw new InvalidToken("Unable to find master key for keyId="
+          + id.getMasterKeyId()
+          + " from cache. Failed to renew an unexpired token"
+          + " with sequenceNumber=" + id.getSequenceNumber());
     }
     byte[] password = createPassword(token.getIdentifier(), key.getKey());
     if (!Arrays.equals(password, token.getPassword())) {
-      throw new AccessControlException("Client " + renewer + 
-                                       " is trying to renew a token with " +
-                                       "wrong password");
+      throw new AccessControlException("Client " + renewer
+          + " is trying to renew a token with " + "wrong password");
     }
-    DelegationTokenInformation info = new DelegationTokenInformation(
-        Math.min(id.getMaxDate(), now + tokenRenewInterval), password);
-    synchronized (currentTokens) {
-      currentTokens.put(id, info);
+    long renewTime = Math.min(id.getMaxDate(), now + tokenRenewInterval);
+    DelegationTokenInformation info = new DelegationTokenInformation(renewTime,
+        password);
+
+    if (currentTokens.get(id) == null) {
+      throw new InvalidToken("Renewal request for unknown token");
     }
-    return info.getRenewDate();
+    currentTokens.put(id, info);
+    return renewTime;
   }
   
   /**
    * Cancel a token by removing it from cache.
+   * @return Identifier of the canceled token
    * @throws InvalidToken for invalid token
    * @throws AccessControlException if the user isn't allowed to cancel
    */
-  public void cancelToken(Token<TokenIdent> token,
+  public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
       String canceller) throws IOException {
     ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
     DataInputStream in = new DataInputStream(buf);
@@ -262,18 +277,17 @@
     }
     String owner = id.getUser().getUserName();
     Text renewer = id.getRenewer();
-    if (!canceller.equals(owner) && 
-        (renewer == null || !canceller.equals(renewer.toString()))) {
-      throw new AccessControlException(canceller + 
-                                      " is not authorized to cancel the 
token");
+    if (!canceller.equals(owner)
+        && (renewer == null || !canceller.equals(renewer.toString()))) {
+      throw new AccessControlException(canceller
+          + " is not authorized to cancel the token");
     }
     DelegationTokenInformation info = null;
-    synchronized (currentTokens) {
-      info = currentTokens.remove(id);
-    }
+    info = currentTokens.remove(id);
     if (info == null) {
       throw new InvalidToken("Token not found");
     }
+    return id;
   }
   
   /**
@@ -285,16 +299,16 @@
     return SecretManager.createSecretKey(key);
   }
 
-  /** Utility class to encapsulate a token's renew date and password. */
-  private static class DelegationTokenInformation {
+  /** Class to encapsulate a token's renew date and password. */
+  public static class DelegationTokenInformation {
     long renewDate;
     byte[] password;
-    DelegationTokenInformation(long renewDate, byte[] password) {
+    public DelegationTokenInformation(long renewDate, byte[] password) {
       this.renewDate = renewDate;
       this.password = password;
     }
     /** returns renew date */
-    long getRenewDate() {
+    public long getRenewDate() {
       return renewDate;
     }
     /** returns password */
@@ -304,15 +318,13 @@
   }
   
   /** Remove expired delegation tokens from cache */
-  private void removeExpiredToken() {
+  private synchronized void removeExpiredToken() {
     long now = System.currentTimeMillis();
-    synchronized (currentTokens) {
-      Iterator<DelegationTokenInformation> i = 
currentTokens.values().iterator();
-      while (i.hasNext()) {
-        long renewDate = i.next().getRenewDate();
-        if (now > renewDate) {
-          i.remove();
-        }
+    Iterator<DelegationTokenInformation> i = currentTokens.values().iterator();
+    while (i.hasNext()) {
+      long renewDate = i.next().getRenewDate();
+      if (now > renewDate) {
+        i.remove();
       }
     }
   }
@@ -321,7 +333,9 @@
     if (LOG.isDebugEnabled())
       LOG.debug("Stopping expired delegation token remover thread");
     running = false;
-    tokenRemoverThread.interrupt();
+    if (tokenRemoverThread != null) {
+      tokenRemoverThread.interrupt();
+    }
   }
   
   private class ExpiredTokenRemover extends Thread {

Modified: 
hadoop/common/trunk/src/test/core/org/apache/hadoop/security/token/delegation/TestDelegationToken.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/security/token/delegation/TestDelegationToken.java?rev=916468&r1=916467&r2=916468&view=diff
==============================================================================
--- 
hadoop/common/trunk/src/test/core/org/apache/hadoop/security/token/delegation/TestDelegationToken.java
 (original)
+++ 
hadoop/common/trunk/src/test/core/org/apache/hadoop/security/token/delegation/TestDelegationToken.java
 Thu Feb 25 21:43:20 2010
@@ -25,7 +25,10 @@
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import junit.framework.Assert;
 
@@ -36,8 +39,11 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Test;
 
@@ -91,6 +97,18 @@
     protected byte[] createPassword(TestDelegationTokenIdentifier t) {
       return super.createPassword(t);
     }
+    
+    public byte[] createPassword(TestDelegationTokenIdentifier t, 
DelegationKey key) {
+      return SecretManager.createPassword(t.getBytes(), key.getKey());
+    }
+    
+    public Map<TestDelegationTokenIdentifier, DelegationTokenInformation> 
getAllTokens() {
+      return currentTokens;
+    }
+    
+    public DelegationKey getKey(TestDelegationTokenIdentifier id) {
+      return allKeys.get(id.getMasterKeyId());
+    }
   }
   
   public static class TokenSelector extends 
@@ -299,4 +317,52 @@
       dtSecretManager.stopThreads();
     }
   }
+  
+  @Test
+  public void testParallelDelegationTokenCreation() throws Exception {
+    final TestDelegationTokenSecretManager dtSecretManager = 
+        new TestDelegationTokenSecretManager(2000, 24 * 60 * 60 * 1000, 
+            7 * 24 * 60 * 60 * 1000, 2000);
+    try {
+      dtSecretManager.startThreads();
+      int numThreads = 100;
+      final int numTokensPerThread = 100;
+      class tokenIssuerThread implements Runnable {
+
+        public void run() {
+          for(int i =0;i <numTokensPerThread; i++) {
+            generateDelegationToken(dtSecretManager, "auser", "arenewer");
+            try {
+              Thread.sleep(250); 
+            } catch (Exception e) {
+            }
+          }
+        }
+      }
+      Thread[] issuers = new Thread[numThreads];
+      for (int i =0; i <numThreads; i++) {
+        issuers[i] = new Daemon(new tokenIssuerThread());
+        issuers[i].start();
+      }
+      for (int i =0; i <numThreads; i++) {
+        issuers[i].join();
+      }
+      Map<TestDelegationTokenIdentifier, DelegationTokenInformation> 
tokenCache = dtSecretManager
+          .getAllTokens();
+      Assert.assertEquals(numTokensPerThread*numThreads, tokenCache.size());
+      Iterator<TestDelegationTokenIdentifier> iter = 
tokenCache.keySet().iterator();
+      while (iter.hasNext()) {
+        TestDelegationTokenIdentifier id = iter.next();
+        DelegationTokenInformation info = tokenCache.get(id);
+        Assert.assertTrue(info != null);
+        DelegationKey key = dtSecretManager.getKey(id);
+        Assert.assertTrue(key != null);
+        byte[] storedPassword = dtSecretManager.retrievePassword(id);
+        byte[] password = dtSecretManager.createPassword(id, key);
+        Assert.assertTrue(Arrays.equals(password, storedPassword));
+      }
+    } finally {
+      dtSecretManager.stopThreads();
+    }
+  }
 }


Reply via email to