http://git-wip-us.apache.org/repos/asf/hadoop/blob/583fa6ed/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
 
b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
index 1517b04..c171143 100644
--- 
a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
+++ 
b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -31,26 +32,35 @@ import 
org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersi
 import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
 import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
 import org.apache.hadoop.crypto.key.kms.KMSDelegationToken;
+import org.apache.hadoop.crypto.key.kms.KMSTokenRenewer;
 import org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider;
+import org.apache.hadoop.crypto.key.kms.TestLoadBalancingKMSClientProvider;
 import org.apache.hadoop.crypto.key.kms.ValueQueue;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import 
org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationHandler;
+import 
org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
 import 
org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.KMSUtil;
+import org.apache.hadoop.util.KMSUtilFaultInjector;
 import org.apache.hadoop.util.Time;
 import org.apache.http.client.utils.URIBuilder;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
@@ -71,7 +81,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.Writer;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.SocketTimeoutException;
 import java.net.URI;
@@ -96,6 +105,10 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.KMS_CLIENT_COPY_LEGACY_TOKEN_KEY;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
+import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND;
+import static 
org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_LEGACY_KIND;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -113,6 +126,20 @@ public class TestKMS {
 
   private SSLFactory sslFactory;
 
+  private final KMSUtilFaultInjector oldInjector =
+      KMSUtilFaultInjector.get();
+
+  // Injector to create providers with different ports. Can only happen in 
tests
+  private final KMSUtilFaultInjector testInjector =
+      new KMSUtilFaultInjector() {
+        @Override
+        public KeyProvider createKeyProviderForTests(String value,
+            Configuration conf) throws IOException {
+          return TestLoadBalancingKMSClientProvider
+              .createKeyProviderForTests(value, conf);
+        }
+      };
+
   // Keep track of all key providers created during a test case, so they can be
   // closed at test tearDown.
   private List<KeyProvider> providersCreated = new LinkedList<>();
@@ -122,7 +149,12 @@ public class TestKMS {
 
   @Before
   public void setUp() throws Exception {
-    setUpMiniKdc();
+    GenericTestUtils.setLogLevel(KMSClientProvider.LOG, Level.TRACE);
+    GenericTestUtils
+        .setLogLevel(DelegationTokenAuthenticationHandler.LOG, Level.TRACE);
+    GenericTestUtils
+        .setLogLevel(DelegationTokenAuthenticator.LOG, Level.TRACE);
+    GenericTestUtils.setLogLevel(KMSUtil.LOG, Level.TRACE);
     // resetting kerberos security
     Configuration conf = new Configuration();
     UserGroupInformation.setConfiguration(conf);
@@ -141,24 +173,78 @@ public class TestKMS {
   }
 
   public static abstract class KMSCallable<T> implements Callable<T> {
-    private URL kmsUrl;
+    private List<URL> kmsUrl;
 
     protected URL getKMSUrl() {
-      return kmsUrl;
+      return kmsUrl.get(0);
+    }
+
+    protected URL[] getKMSHAUrl() {
+      URL[] urls = new URL[kmsUrl.size()];
+      return kmsUrl.toArray(urls);
+    }
+
+    protected void addKMSUrl(URL url) {
+      if (kmsUrl == null) {
+        kmsUrl = new ArrayList<URL>();
+      }
+      kmsUrl.add(url);
+    }
+
+    /*
+     * The format of the returned value will be
+     * kms://http:kms1.example1.com:port1,kms://http:kms2.example2.com:port2
+     */
+    protected String generateLoadBalancingKeyProviderUriString() {
+      if (kmsUrl == null || kmsUrl.size() == 0) {
+        return null;
+      }
+      StringBuffer sb = new StringBuffer();
+
+      for (int i = 0; i < kmsUrl.size(); i++) {
+        sb.append(KMSClientProvider.SCHEME_NAME + "://" +
+            kmsUrl.get(0).getProtocol() + "@");
+        URL url = kmsUrl.get(i);
+        sb.append(url.getAuthority());
+        if (url.getPath() != null) {
+          sb.append(url.getPath());
+        }
+        if (i < kmsUrl.size() - 1) {
+          sb.append(",");
+        }
+      }
+      return sb.toString();
     }
   }
 
   protected KeyProvider createProvider(URI uri, Configuration conf)
       throws IOException {
     final KeyProvider ret = new LoadBalancingKMSClientProvider(
-        new KMSClientProvider[] {new KMSClientProvider(uri, conf)}, conf);
+        new KMSClientProvider[] {new KMSClientProvider(uri, conf, uri)}, conf);
     providersCreated.add(ret);
     return ret;
   }
 
+  /**
+   * create a LoadBalancingKMSClientProvider from an array of URIs.
+   * @param uris an array of KMS URIs
+   * @param conf configuration object
+   * @return a LoadBalancingKMSClientProvider object
+   * @throws IOException
+   */
+  protected LoadBalancingKMSClientProvider createHAProvider(URI[] uris,
+      Configuration conf, String originalUri) throws IOException {
+    KMSClientProvider[] providers = new KMSClientProvider[uris.length];
+    for (int i = 0; i < providers.length; i++) {
+      providers[i] =
+          new KMSClientProvider(uris[i], conf, URI.create(originalUri));
+    }
+    return new LoadBalancingKMSClientProvider(providers, conf);
+  }
+
   private KMSClientProvider createKMSClientProvider(URI uri, Configuration 
conf)
       throws IOException {
-    final KMSClientProvider ret = new KMSClientProvider(uri, conf);
+    final KMSClientProvider ret = new KMSClientProvider(uri, conf, uri);
     providersCreated.add(ret);
     return ret;
   }
@@ -170,22 +256,33 @@ public class TestKMS {
 
   protected <T> T runServer(int port, String keystore, String password, File 
confDir,
       KMSCallable<T> callable) throws Exception {
+    return runServer(new int[] {port}, keystore, password, confDir, callable);
+  }
+
+  protected <T> T runServer(int[] ports, String keystore, String password,
+      File confDir, KMSCallable<T> callable) throws Exception {
     MiniKMS.Builder miniKMSBuilder = new 
MiniKMS.Builder().setKmsConfDir(confDir)
         .setLog4jConfFile("log4j.properties");
     if (keystore != null) {
       miniKMSBuilder.setSslConf(new File(keystore), password);
     }
-    if (port > 0) {
-      miniKMSBuilder.setPort(port);
+    final List<MiniKMS> kmsList = new ArrayList<>();
+    for (int i=0; i< ports.length; i++) {
+      if (ports[i] > 0) {
+        miniKMSBuilder.setPort(ports[i]);
+      }
+      MiniKMS miniKMS = miniKMSBuilder.build();
+      kmsList.add(miniKMS);
+      miniKMS.start();
+      LOG.info("Test KMS running at: " + miniKMS.getKMSUrl());
+      callable.addKMSUrl(miniKMS.getKMSUrl());
     }
-    MiniKMS miniKMS = miniKMSBuilder.build();
-    miniKMS.start();
     try {
-      System.out.println("Test KMS running at: " + miniKMS.getKMSUrl());
-      callable.kmsUrl = miniKMS.getKMSUrl();
       return callable.call();
     } finally {
-      miniKMS.stop();
+      for (MiniKMS miniKMS: kmsList) {
+        miniKMS.stop();
+      }
     }
   }
 
@@ -240,6 +337,13 @@ public class TestKMS {
     return new URI("kms://" + str);
   }
 
+  public static URI[] createKMSHAUri(URL[] kmsUrls) throws Exception {
+    URI[] uris = new URI[kmsUrls.length];
+    for (int i = 0; i < kmsUrls.length; i++) {
+      uris[i] = createKMSUri(kmsUrls[i]);
+    }
+    return uris;
+  }
 
   private static class KerberosConfiguration
       extends javax.security.auth.login.Configuration {
@@ -315,19 +419,17 @@ public class TestKMS {
         principals.toArray(new String[principals.size()]));
   }
 
-  private void setUpMiniKdc() throws Exception {
+  @BeforeClass
+  public static void setUpMiniKdc() throws Exception {
     Properties kdcConf = MiniKdc.createConf();
     setUpMiniKdc(kdcConf);
   }
 
   @After
   public void tearDown() throws Exception {
-    if (kdc != null) {
-      kdc.stop();
-      kdc = null;
-    }
     UserGroupInformation.setShouldRenewImmediatelyForTests(false);
     UserGroupInformation.reset();
+    KMSUtilFaultInjector.set(oldInjector);
     if (!providersCreated.isEmpty()) {
       final MultipleIOException.Builder b = new MultipleIOException.Builder();
       for (KeyProvider kp : providersCreated) {
@@ -345,6 +447,14 @@ public class TestKMS {
     }
   }
 
+  @AfterClass
+  public static void shutdownMiniKdc() {
+    if (kdc != null) {
+      kdc.stop();
+      kdc = null;
+    }
+  }
+
   private <T> T doAs(String user, final PrivilegedExceptionAction<T> action)
       throws Exception {
     UserGroupInformation.loginUserFromKeytab(user, keytab.getAbsolutePath());
@@ -501,8 +611,10 @@ public class TestKMS {
                 Token<?>[] tokens =
                     
((KeyProviderDelegationTokenExtension.DelegationTokenExtension)kp)
                     .addDelegationTokens("myuser", new Credentials());
-                Assert.assertEquals(1, tokens.length);
-                Assert.assertEquals("kms-dt", tokens[0].getKind().toString());
+                assertEquals(2, tokens.length);
+                assertEquals(KMSDelegationToken.TOKEN_KIND,
+                    tokens[0].getKind());
+                kp.close();
                 return null;
               }
             });
@@ -518,8 +630,9 @@ public class TestKMS {
           Token<?>[] tokens =
               
((KeyProviderDelegationTokenExtension.DelegationTokenExtension)kp)
               .addDelegationTokens("myuser", new Credentials());
-          Assert.assertEquals(1, tokens.length);
-          Assert.assertEquals("kms-dt", tokens[0].getKind().toString());
+          assertEquals(2, tokens.length);
+          assertEquals(KMSDelegationToken.TOKEN_KIND, tokens[0].getKind());
+          kp.close();
         }
         return null;
       }
@@ -2011,7 +2124,6 @@ public class TestKMS {
             return null;
           }
         });
-
         nonKerberosUgi.addCredentials(credentials);
 
         try {
@@ -2067,6 +2179,17 @@ public class TestKMS {
     testDelegationTokensOps(true, true);
   }
 
+  private Text getTokenService(KeyProvider provider) {
+    assertTrue("KeyProvider should be an instance of KMSClientProvider",
+        (provider instanceof LoadBalancingKMSClientProvider));
+    assertEquals("Num client providers should be 1", 1,
+        ((LoadBalancingKMSClientProvider)provider).getProviders().length);
+    Text tokenService =
+        (((LoadBalancingKMSClientProvider)provider).getProviders()[0])
+        .getDelegationTokenService();
+    return tokenService;
+  }
+
   private void testDelegationTokensOps(final boolean ssl, final boolean kerb)
       throws Exception {
     final File confDir = getTestDir();
@@ -2098,11 +2221,16 @@ public class TestKMS {
         final URI uri = createKMSUri(getKMSUrl());
         clientConf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
             createKMSUri(getKMSUrl()).toString());
+        clientConf.setBoolean(KMS_CLIENT_COPY_LEGACY_TOKEN_KEY, false);
 
         doAs("client", new PrivilegedExceptionAction<Void>() {
           @Override
           public Void run() throws Exception {
             KeyProvider kp = createProvider(uri, clientConf);
+            // Unset the conf value for key provider path just to be sure that
+            // the key provider created for renew and cancel token is from
+            // token service field.
+            clientConf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
             // test delegation token retrieval
             KeyProviderDelegationTokenExtension kpdte =
                 KeyProviderDelegationTokenExtension.
@@ -2110,13 +2238,10 @@ public class TestKMS {
             final Credentials credentials = new Credentials();
             final Token<?>[] tokens =
                 kpdte.addDelegationTokens("client1", credentials);
-            Assert.assertEquals(1, credentials.getAllTokens().size());
-            InetSocketAddress kmsAddr =
-                new InetSocketAddress(getKMSUrl().getHost(),
-                    getKMSUrl().getPort());
-            Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
-                credentials.getToken(SecurityUtil.buildTokenService(kmsAddr)).
-                    getKind());
+            Text tokenService = getTokenService(kp);
+            assertEquals(1, credentials.getAllTokens().size());
+            assertEquals(TOKEN_KIND,
+                credentials.getToken(tokenService).getKind());
 
             // Test non-renewer user cannot renew.
             for (Token<?> token : tokens) {
@@ -2243,12 +2368,11 @@ public class TestKMS {
         final URI uri = createKMSUri(getKMSUrl());
         clientConf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
             createKMSUri(getKMSUrl()).toString());
+        clientConf.setBoolean(KMS_CLIENT_COPY_LEGACY_TOKEN_KEY, false);
         final KeyProvider kp = createProvider(uri, clientConf);
         final KeyProviderDelegationTokenExtension kpdte =
             KeyProviderDelegationTokenExtension.
                 createKeyProviderDelegationTokenExtension(kp);
-        final InetSocketAddress kmsAddr =
-            new InetSocketAddress(getKMSUrl().getHost(), 
getKMSUrl().getPort());
 
         // Job 1 (e.g. YARN log aggregation job), with user DT.
         final Collection<Token<?>> job1Token = new HashSet<>();
@@ -2258,16 +2382,17 @@ public class TestKMS {
             // Get a DT and use it.
             final Credentials credentials = new Credentials();
             kpdte.addDelegationTokens("client", credentials);
+            Text tokenService = getTokenService(kp);
             Assert.assertEquals(1, credentials.getAllTokens().size());
-            Assert.assertEquals(KMSDelegationToken.TOKEN_KIND, credentials.
-                getToken(SecurityUtil.buildTokenService(kmsAddr)).getKind());
+
             UserGroupInformation.getCurrentUser().addCredentials(credentials);
             LOG.info("Added kms dt to credentials: {}", UserGroupInformation.
                 getCurrentUser().getCredentials().getAllTokens());
-            Token<?> token =
+            final Token<?> token =
                 UserGroupInformation.getCurrentUser().getCredentials()
-                    .getToken(SecurityUtil.buildTokenService(kmsAddr));
-            Assert.assertNotNull(token);
+                    .getToken(tokenService);
+            assertNotNull(token);
+            assertEquals(TOKEN_KIND, token.getKind());
             job1Token.add(token);
 
             // Decode the token to get max time.
@@ -2302,17 +2427,16 @@ public class TestKMS {
             // Get a new DT, but don't use it yet.
             final Credentials newCreds = new Credentials();
             kpdte.addDelegationTokens("client", newCreds);
-            Assert.assertEquals(1, newCreds.getAllTokens().size());
-            Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
-                newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
-                    getKind());
+            assertEquals(1, newCreds.getAllTokens().size());
+            final Text tokenService = getTokenService(kp);
+            assertEquals(TOKEN_KIND,
+                newCreds.getToken(tokenService).getKind());
 
             // Using job 1's DT should fail.
             final Credentials oldCreds = new Credentials();
             for (Token<?> token : job1Token) {
-              if (token.getKind().equals(KMSDelegationToken.TOKEN_KIND)) {
-                oldCreds
-                    .addToken(SecurityUtil.buildTokenService(kmsAddr), token);
+              if (token.getKind().equals(TOKEN_KIND)) {
+                oldCreds.addToken(tokenService, token);
               }
             }
             UserGroupInformation.getCurrentUser().addCredentials(oldCreds);
@@ -2326,12 +2450,11 @@ public class TestKMS {
             }
 
             // Using the new DT should succeed.
-            Assert.assertEquals(1, newCreds.getAllTokens().size());
-            Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
-                newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
-                    getKind());
+            assertEquals(1, newCreds.getAllTokens().size());
+            assertEquals(TOKEN_KIND,
+                newCreds.getToken(tokenService).getKind());
             UserGroupInformation.getCurrentUser().addCredentials(newCreds);
-            LOG.info("Credetials now are: {}", UserGroupInformation
+            LOG.info("Credentials now are: {}", UserGroupInformation
                 .getCurrentUser().getCredentials().getAllTokens());
             kp.getKeys();
             return null;
@@ -2357,7 +2480,13 @@ public class TestKMS {
     doKMSWithZK(true, true);
   }
 
-  public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception {
+  private <T> T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner,
+      KMSCallable<T> callable) throws Exception {
+    return runServerWithZooKeeper(zkDTSM, zkSigner, callable, 1);
+  }
+
+  private <T> T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner,
+      KMSCallable<T> callable, int kmsSize) throws Exception {
     TestingServer zkServer = null;
     try {
       zkServer = new TestingServer();
@@ -2403,43 +2532,265 @@ public class TestKMS {
 
       writeConf(testDir, conf);
 
-      KMSCallable<KeyProvider> c =
-          new KMSCallable<KeyProvider>() {
-        @Override
-        public KeyProvider call() throws Exception {
-          final Configuration conf = new Configuration();
-          conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
-          final URI uri = createKMSUri(getKMSUrl());
-
-          final KeyProvider kp =
-              doAs("SET_KEY_MATERIAL",
-                  new PrivilegedExceptionAction<KeyProvider>() {
-                    @Override
-                    public KeyProvider run() throws Exception {
-                      KeyProvider kp = createProvider(uri, conf);
-                          kp.createKey("k1", new byte[16],
-                              new KeyProvider.Options(conf));
-                          kp.createKey("k2", new byte[16],
-                              new KeyProvider.Options(conf));
-                          kp.createKey("k3", new byte[16],
-                              new KeyProvider.Options(conf));
-                      return kp;
-                    }
-                  });
-          return kp;
-        }
-      };
-
-      runServer(null, null, testDir, c);
+      int[] ports = new int[kmsSize];
+      for (int i = 0; i < ports.length; i++) {
+        ports[i] = -1;
+      }
+      return runServer(ports, null, null, testDir, callable);
     } finally {
       if (zkServer != null) {
         zkServer.stop();
         zkServer.close();
       }
     }
+  }
+
+  public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception {
+    KMSCallable<KeyProvider> c =
+        new KMSCallable<KeyProvider>() {
+          @Override
+          public KeyProvider call() throws Exception {
+            final Configuration conf = new Configuration();
+            conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
+            final URI uri = createKMSUri(getKMSUrl());
+
+            final KeyProvider kp =
+                doAs("SET_KEY_MATERIAL",
+                    new PrivilegedExceptionAction<KeyProvider>() {
+                      @Override
+                      public KeyProvider run() throws Exception {
+                        KeyProvider kp = createProvider(uri, conf);
+                        kp.createKey("k1", new byte[16],
+                            new KeyProvider.Options(conf));
+                        kp.createKey("k2", new byte[16],
+                            new KeyProvider.Options(conf));
+                        kp.createKey("k3", new byte[16],
+                            new KeyProvider.Options(conf));
+                        return kp;
+                      }
+                    });
+            return kp;
+          }
+        };
+
+    runServerWithZooKeeper(zkDTSM, zkSigner, c);
+  }
+
+  @Test
+  public void doKMSHAZKWithDelegationTokenAccess() throws Exception {
+    KMSCallable<Void> c = new KMSCallable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        final Configuration conf = new Configuration();
+        conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
+        final URI[] uris = createKMSHAUri(getKMSHAUrl());
+        final Credentials credentials = new Credentials();
+        final String lbUri = generateLoadBalancingKeyProviderUriString();
+        final LoadBalancingKMSClientProvider lbkp =
+            createHAProvider(uris, conf, lbUri);
+        conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
+        // Login as a Kerberos user principal using keytab.
+        // Connect to KMS to create a delegation token and add it to 
credentials
+        final String keyName = "k0";
+        doAs("SET_KEY_MATERIAL",
+            new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws Exception {
+                KeyProviderDelegationTokenExtension kpdte =
+                    KeyProviderDelegationTokenExtension.
+                        createKeyProviderDelegationTokenExtension(lbkp);
+                kpdte.addDelegationTokens("SET_KEY_MATERIAL", credentials);
+                kpdte.createKey(keyName, new KeyProvider.Options(conf));
+                return null;
+              }
+            });
+
+        assertTokenIdentifierEquals(credentials);
+
+        final LoadBalancingKMSClientProvider lbkp1 =
+            createHAProvider(uris, conf, lbUri);
+        // verify both tokens can be used to authenticate
+        for (Token t : credentials.getAllTokens()) {
+          assertTokenAccess(lbkp1, keyName, t);
+        }
+        return null;
+      }
+    };
+    runServerWithZooKeeper(true, true, c, 2);
+  }
+
+  /**
+   * Assert that the passed in credentials have 2 tokens, of kind
+   * {@link KMSDelegationToken#TOKEN_KIND} and
+   * {@link KMSDelegationToken#TOKEN_LEGACY_KIND}. Assert that the 2 tokens 
have
+   * the same identifier.
+   */
+  private void assertTokenIdentifierEquals(Credentials credentials)
+      throws IOException {
+    // verify the 2 tokens have the same identifier
+    assertEquals(2, credentials.getAllTokens().size());
+    Token token = null;
+    Token legacyToken = null;
+    for (Token t : credentials.getAllTokens()) {
+      if (KMSDelegationToken.TOKEN_KIND.equals(t.getKind())) {
+        token = t;
+      } else if (KMSDelegationToken.TOKEN_LEGACY_KIND.equals(t.getKind())) {
+        legacyToken = t;
+      }
+    }
+    assertNotNull(token);
+    assertNotNull(legacyToken);
+    final DelegationTokenIdentifier tokenId =
+        (DelegationTokenIdentifier) token.decodeIdentifier();
+    final DelegationTokenIdentifier legacyTokenId =
+        (DelegationTokenIdentifier) legacyToken.decodeIdentifier();
+    assertEquals("KMS DT and legacy dt should have identical identifier",
+        tokenId, legacyTokenId);
+  }
 
+  /**
+   * Tests token access with each providers in the
+   * {@link LoadBalancingKMSClientProvider}. This is to make sure the 2 token
+   * kinds are compatible and can both be used to authenticate.
+   */
+  private void assertTokenAccess(final LoadBalancingKMSClientProvider lbkp,
+      final String keyName, final Token token) throws Exception {
+    UserGroupInformation tokenUgi =
+        UserGroupInformation.createUserForTesting("test", new String[] {});
+    // Verify the tokens can authenticate to any KMS
+    tokenUgi.addToken(token);
+    tokenUgi.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        // Create a kms client with one provider at a time. Must use one
+        // provider so that if it fails to authenticate, it does not fall
+        // back to the next KMS instance.
+        // It should succeed because its delegation token can access any
+        // KMS instances.
+        for (KMSClientProvider provider : lbkp.getProviders()) {
+          if (token.getKind().equals(TOKEN_LEGACY_KIND) && !token.getService()
+              .equals(provider.getDelegationTokenService())) {
+            // Historically known issue: Legacy token can only work with the
+            // key provider specified in the token's Service
+            continue;
+          }
+          LOG.info("Rolling key {} via provider {} with token {}.", keyName,
+              provider, token);
+          provider.rollNewVersion(keyName);
+        }
+        return null;
+      }
+    });
   }
 
+  @Test
+  public void testKMSHAZKDelegationTokenRenewCancel() throws Exception {
+    testKMSHAZKDelegationTokenRenewCancel(TOKEN_KIND);
+  }
+
+  @Test
+  public void testKMSHAZKDelegationTokenRenewCancelLegacy() throws Exception {
+    testKMSHAZKDelegationTokenRenewCancel(TOKEN_LEGACY_KIND);
+  }
+
+  private void testKMSHAZKDelegationTokenRenewCancel(final Text tokenKind)
+      throws Exception {
+    GenericTestUtils.setLogLevel(KMSTokenRenewer.LOG, Level.TRACE);
+    assertTrue(tokenKind == TOKEN_KIND || tokenKind == TOKEN_LEGACY_KIND);
+    KMSCallable<Void> c = new KMSCallable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        final Configuration conf = new Configuration();
+        final URI[] uris = createKMSHAUri(getKMSHAUrl());
+        final Credentials credentials = new Credentials();
+        // Create a UGI without Kerberos auth. It will be authenticated with
+        // delegation token.
+        final UserGroupInformation nonKerberosUgi =
+            UserGroupInformation.getCurrentUser();
+        final String lbUri = generateLoadBalancingKeyProviderUriString();
+        final LoadBalancingKMSClientProvider lbkp =
+            createHAProvider(uris, conf, lbUri);
+        conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
+        // Login as a Kerberos user principal using keytab.
+        // Connect to KMS to create a delegation token and add it to 
credentials
+        doAs("SET_KEY_MATERIAL",
+            new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws Exception {
+                KeyProviderDelegationTokenExtension kpdte =
+                    KeyProviderDelegationTokenExtension.
+                        createKeyProviderDelegationTokenExtension(lbkp);
+                kpdte.addDelegationTokens("SET_KEY_MATERIAL", credentials);
+                return null;
+              }
+            });
+
+        // Test token renewal and cancellation
+        final Collection<Token<? extends TokenIdentifier>> tokens =
+            credentials.getAllTokens();
+        doAs("SET_KEY_MATERIAL", new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            Assert.assertEquals(2, tokens.size());
+            boolean tokenFound = false;
+            for (Token token : tokens) {
+              if (!tokenKind.equals(token.getKind())) {
+                continue;
+              } else {
+                tokenFound = true;
+              }
+              KMSUtilFaultInjector.set(testInjector);
+              setupConfForToken(token.getKind(), conf, lbUri);
+
+              LOG.info("Testing token: {}", token);
+              long tokenLife = token.renew(conf);
+              LOG.info("Renewed token {}, new lifetime:{}", token, tokenLife);
+              Thread.sleep(10);
+              long newTokenLife = token.renew(conf);
+              LOG.info("Renewed token {}, new lifetime:{}", token,
+                  newTokenLife);
+              assertTrue(newTokenLife > tokenLife);
+
+              boolean canceled = false;
+              // test delegation token cancellation
+              if (!canceled) {
+                token.cancel(conf);
+                LOG.info("Cancelled token {}", token);
+                canceled = true;
+              }
+              assertTrue("token should have been canceled", canceled);
+              try {
+                token.renew(conf);
+                fail("should not be able to renew a canceled token " + token);
+              } catch (Exception e) {
+                LOG.info("Expected exception when renewing token", e);
+              }
+            }
+            assertTrue("Should have found token kind " + tokenKind + " from "
+                + tokens, tokenFound);
+            return null;
+          }
+        });
+        return null;
+      }
+    };
+    runServerWithZooKeeper(true, true, c, 2);
+  }
+
+  /**
+   * Set or unset the key provider configuration based on token kind.
+   */
+  private void setupConfForToken(Text tokenKind, Configuration conf,
+      String lbUri) {
+    if (tokenKind.equals(TOKEN_KIND)) {
+      conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
+    } else {
+      // conf is only required for legacy tokens to create provider,
+      // new tokens create provider by parsing its own Service field
+      assertEquals(TOKEN_LEGACY_KIND, tokenKind);
+      conf.set(HADOOP_SECURITY_KEY_PROVIDER_PATH, lbUri);
+    }
+  }
 
   @Test
   public void testProxyUserKerb() throws Exception {
@@ -2558,6 +2909,16 @@ public class TestKMS {
 
   @Test
   public void testTGTRenewal() throws Exception {
+    shutdownMiniKdc();
+    try {
+      testTgtRenewalInt();
+    } finally {
+      shutdownMiniKdc();
+      setUpMiniKdc();
+    }
+  }
+
+  private void testTgtRenewalInt() throws Exception {
     Properties kdcConf = MiniKdc.createConf();
     kdcConf.setProperty(MiniKdc.MAX_TICKET_LIFETIME, "3");
     kdcConf.setProperty(MiniKdc.MIN_TICKET_LIFETIME, "3");


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to