HADOOP-10758. KMS: add ACLs on per key basis. (tucu)

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b02a4b40
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b02a4b40
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b02a4b40

Branch: refs/heads/HDFS-6584
Commit: b02a4b40610e93eef6559db09a11d287e859446d
Parents: cbfe263
Author: Alejandro Abdelnur <t...@apache.org>
Authored: Wed Sep 10 14:26:15 2014 -0700
Committer: Alejandro Abdelnur <t...@apache.org>
Committed: Wed Sep 10 14:26:15 2014 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   2 +
 .../hadoop-kms/src/main/conf/kms-acls.xml       |  38 +++
 .../hadoop/crypto/key/kms/server/KMSACLs.java   |  97 ++++++-
 .../crypto/key/kms/server/KMSConfiguration.java |   9 +
 .../hadoop/crypto/key/kms/server/KMSWebApp.java |  17 +-
 .../kms/server/KeyAuthorizationKeyProvider.java | 276 +++++++++++++++++++
 .../hadoop-kms/src/site/apt/index.apt.vm        | 106 +++++++
 .../hadoop/crypto/key/kms/server/TestKMS.java   | 236 +++++++++++++++-
 .../server/TestKeyAuthorizationKeyProvider.java | 218 +++++++++++++++
 9 files changed, 986 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b02a4b40/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt 
b/hadoop-common-project/hadoop-common/CHANGES.txt
index b2157d6..3cea14a 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -509,6 +509,8 @@ Release 2.6.0 - UNRELEASED
     HADOOP-11057. checknative command to probe for winutils.exe on windows.
     (Xiaoyu Yao via cnauroth)
 
+    HADOOP-10758. KMS: add ACLs on per key basis. (tucu)
+
   OPTIMIZATIONS
 
     HADOOP-10838. Byte array native checksumming. (James Thomas via todd)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b02a4b40/hadoop-common-project/hadoop-kms/src/main/conf/kms-acls.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-kms/src/main/conf/kms-acls.xml 
b/hadoop-common-project/hadoop-kms/src/main/conf/kms-acls.xml
index cdff629..24a46b8 100644
--- a/hadoop-common-project/hadoop-kms/src/main/conf/kms-acls.xml
+++ b/hadoop-common-project/hadoop-kms/src/main/conf/kms-acls.xml
@@ -94,4 +94,42 @@
       ACL for decrypt EncryptedKey CryptoExtension operations
     </description>
   </property>
+
+  <property>
+    <name>default.key.acl.MANAGEMENT</name>
+    <value>*</value>
+    <description>
+      default ACL for MANAGEMENT operations for all key acls that are not
+      explicitly defined.
+    </description>
+  </property>
+
+  <property>
+    <name>default.key.acl.GENERATE_EEK</name>
+    <value>*</value>
+    <description>
+      default ACL for GENERATE_EEK operations for all key acls that are not
+      explicitly defined.
+    </description>
+  </property>
+
+  <property>
+    <name>default.key.acl.DECRYPT_EEK</name>
+    <value>*</value>
+    <description>
+      default ACL for DECRYPT_EEK operations for all key acls that are not
+      explicitly defined.
+    </description>
+  </property>
+
+  <property>
+    <name>default.key.acl.READ</name>
+    <value>*</value>
+    <description>
+      default ACL for READ operations for all key acls that are not
+      explicitly defined.
+    </description>
+  </property>
+
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b02a4b40/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSACLs.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSACLs.java
 
b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSACLs.java
index 8a10bb2..530fe11 100644
--- 
a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSACLs.java
+++ 
b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSACLs.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.crypto.key.kms.server;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.kms.server.KMS.KMSOp;
+import 
org.apache.hadoop.crypto.key.kms.server.KeyAuthorizationKeyProvider.KeyACLs;
+import 
org.apache.hadoop.crypto.key.kms.server.KeyAuthorizationKeyProvider.KeyOpType;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
@@ -32,6 +34,7 @@ import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 
 /**
  * Provides access to the <code>AccessControlList</code>s used by KMS,
@@ -39,7 +42,7 @@ import java.util.concurrent.TimeUnit;
  * are defined has been updated.
  */
 @InterfaceAudience.Private
-public class KMSACLs implements Runnable {
+public class KMSACLs implements Runnable, KeyACLs {
   private static final Logger LOG = LoggerFactory.getLogger(KMSACLs.class);
 
   private static final String UNAUTHORIZED_MSG_WITH_KEY =
@@ -67,6 +70,9 @@ public class KMSACLs implements Runnable {
 
   private volatile Map<Type, AccessControlList> acls;
   private volatile Map<Type, AccessControlList> blacklistedAcls;
+  private volatile Map<String, HashMap<KeyOpType, AccessControlList>> keyAcls;
+  private final Map<KeyOpType, AccessControlList> defaultKeyAcls =
+      new HashMap<KeyOpType, AccessControlList>();
   private ScheduledExecutorService executorService;
   private long lastReload;
 
@@ -74,14 +80,15 @@ public class KMSACLs implements Runnable {
     if (conf == null) {
       conf = loadACLs();
     }
-    setACLs(conf);
+    setKMSACLs(conf);
+    setKeyACLs(conf);
   }
 
   public KMSACLs() {
     this(null);
   }
 
-  private void setACLs(Configuration conf) {
+  private void setKMSACLs(Configuration conf) {
     Map<Type, AccessControlList> tempAcls = new HashMap<Type, 
AccessControlList>();
     Map<Type, AccessControlList> tempBlacklist = new HashMap<Type, 
AccessControlList>();
     for (Type aclType : Type.values()) {
@@ -99,14 +106,69 @@ public class KMSACLs implements Runnable {
     blacklistedAcls = tempBlacklist;
   }
 
+  private void setKeyACLs(Configuration conf) {
+    Map<String, HashMap<KeyOpType, AccessControlList>> tempKeyAcls =
+        new HashMap<String, HashMap<KeyOpType,AccessControlList>>();
+    Map<String, String> allKeyACLS =
+        conf.getValByRegex(Pattern.quote(KMSConfiguration.KEY_ACL_PREFIX));
+    for (Map.Entry<String, String> keyAcl : allKeyACLS.entrySet()) {
+      String k = keyAcl.getKey();
+      // this should be of type "key.acl.<KEY_NAME>.<OP_TYPE>"
+      int keyNameStarts = KMSConfiguration.KEY_ACL_PREFIX.length();
+      int keyNameEnds = k.lastIndexOf(".");
+      if (keyNameStarts >= keyNameEnds) {
+        LOG.warn("Invalid key name '{}'", k);
+      } else {
+        String aclStr = keyAcl.getValue();
+        String keyName = k.substring(keyNameStarts, keyNameEnds);
+        String keyOp = k.substring(keyNameEnds + 1);
+        KeyOpType aclType = null;
+        try {
+          aclType = KeyOpType.valueOf(keyOp);
+        } catch (IllegalArgumentException e) {
+          LOG.warn("Invalid key Operation '{}'", keyOp);
+        }
+        if (aclType != null) {
+          // On the assumption this will be single threaded.. else we need to
+          // ConcurrentHashMap
+          HashMap<KeyOpType,AccessControlList> aclMap =
+              tempKeyAcls.get(keyName);
+          if (aclMap == null) {
+            aclMap = new HashMap<KeyOpType, AccessControlList>();
+            tempKeyAcls.put(keyName, aclMap);
+          }
+          aclMap.put(aclType, new AccessControlList(aclStr));
+          LOG.info("KEY_NAME '{}' KEY_OP '{}' ACL '{}'",
+              keyName, aclType, aclStr);
+        }
+      }
+    }
+
+    keyAcls = tempKeyAcls;
+    for (KeyOpType keyOp : KeyOpType.values()) {
+      if (!defaultKeyAcls.containsKey(keyOp)) {
+        String confKey = KMSConfiguration.DEFAULT_KEY_ACL_PREFIX + keyOp;
+        String aclStr = conf.get(confKey);
+        if (aclStr != null) {
+          if (aclStr.equals("*")) {
+            LOG.info("Default Key ACL for  KEY_OP '{}' is set to '*'", keyOp);
+          }
+          defaultKeyAcls.put(keyOp, new AccessControlList(aclStr));
+        }
+      }
+    }
+  }
+
   @Override
   public void run() {
     try {
       if (KMSConfiguration.isACLsFileNewer(lastReload)) {
-        setACLs(loadACLs());
+        setKMSACLs(loadACLs());
+        setKeyACLs(loadACLs());
       }
     } catch (Exception ex) {
-      LOG.warn("Could not reload ACLs file: " + ex.toString(), ex);
+      LOG.warn(
+          String.format("Could not reload ACLs file: '%s'", ex.toString()), 
ex);
     }
   }
 
@@ -164,4 +226,29 @@ public class KMSACLs implements Runnable {
     }
   }
 
+  @Override
+  public boolean hasAccessToKey(String keyName, UserGroupInformation ugi,
+      KeyOpType opType) {
+    Map<KeyOpType, AccessControlList> keyAcl = keyAcls.get(keyName);
+    if (keyAcl == null) {
+      // Get KeyAcl map of DEFAULT KEY.
+      keyAcl = defaultKeyAcls;
+    }
+    // If No key acl defined for this key, check to see if
+    // there are key defaults configured for this operation
+    AccessControlList acl = keyAcl.get(opType);
+    if (acl == null) {
+      // If no acl is specified for this operation,
+      // deny access
+      return false;
+    } else {
+      return acl.isUserAllowed(ugi);
+    }
+  }
+
+  @Override
+  public boolean isACLPresent(String keyName, KeyOpType opType) {
+    return (keyAcls.containsKey(keyName) || 
defaultKeyAcls.containsKey(opType));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b02a4b40/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java
 
b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java
index 35dccfc..a7daa24 100644
--- 
a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java
+++ 
b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java
@@ -36,6 +36,9 @@ public class KMSConfiguration {
 
   public static final String CONFIG_PREFIX = "hadoop.kms.";
 
+  public static final String KEY_ACL_PREFIX = "key.acl.";
+  public static final String DEFAULT_KEY_ACL_PREFIX = "default.key.acl.";
+
   // Property to Enable/Disable Caching
   public static final String KEY_CACHE_ENABLE = CONFIG_PREFIX +
       "cache.enable";
@@ -57,6 +60,12 @@ public class KMSConfiguration {
   // 10 secs
   public static final long KMS_AUDIT_AGGREGATION_DELAY_DEFAULT = 10000;
 
+  // Property to Enable/Disable per Key authorization
+  public static final String KEY_AUTHORIZATION_ENABLE = CONFIG_PREFIX +
+      "key.authorization.enable"; 
+
+  public static final boolean KEY_AUTHORIZATION_ENABLE_DEFAULT = true;
+
   static Configuration getConfiguration(boolean loadHadoopDefaults,
       String ... resources) {
     Configuration conf = new Configuration(loadHadoopDefaults);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b02a4b40/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java
 
b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java
index aaf90e8..0827b78 100644
--- 
a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java
+++ 
b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java
@@ -68,7 +68,7 @@ public class KMSWebApp implements ServletContextListener {
 
   private JmxReporter jmxReporter;
   private static Configuration kmsConf;
-  private static KMSACLs acls;
+  private static KMSACLs kmsAcls;
   private static Meter adminCallsMeter;
   private static Meter keyCallsMeter;
   private static Meter unauthorizedCallsMeter;
@@ -126,8 +126,8 @@ public class KMSWebApp implements ServletContextListener {
       LOG.info("  KMS Hadoop Version: " + VersionInfo.getVersion());
       
LOG.info("-------------------------------------------------------------");
 
-      acls = new KMSACLs();
-      acls.startReloader();
+      kmsAcls = new KMSACLs();
+      kmsAcls.startReloader();
 
       metricRegistry = new MetricRegistry();
       jmxReporter = JmxReporter.forRegistry(metricRegistry).build();
@@ -188,6 +188,13 @@ public class KMSWebApp implements ServletContextListener {
       keyProviderCryptoExtension = 
           new EagerKeyGeneratorKeyProviderCryptoExtension(kmsConf, 
               keyProviderCryptoExtension);
+      if (kmsConf.getBoolean(KMSConfiguration.KEY_AUTHORIZATION_ENABLE,
+          KMSConfiguration.KEY_AUTHORIZATION_ENABLE_DEFAULT)) {
+        keyProviderCryptoExtension =
+            new KeyAuthorizationKeyProvider(
+                keyProviderCryptoExtension, kmsAcls);
+      }
+        
       LOG.info("Initialized KeyProviderCryptoExtension "
           + keyProviderCryptoExtension);
       final int defaultBitlength = kmsConf
@@ -213,7 +220,7 @@ public class KMSWebApp implements ServletContextListener {
   @Override
   public void contextDestroyed(ServletContextEvent sce) {
     kmsAudit.shutdown();
-    acls.stopReloader();
+    kmsAcls.stopReloader();
     jmxReporter.stop();
     jmxReporter.close();
     metricRegistry = null;
@@ -225,7 +232,7 @@ public class KMSWebApp implements ServletContextListener {
   }
 
   public static KMSACLs getACLs() {
-    return acls;
+    return kmsAcls;
   }
 
   public static Meter getAdminCallsMeter() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b02a4b40/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KeyAuthorizationKeyProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KeyAuthorizationKeyProvider.java
 
b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KeyAuthorizationKeyProvider.java
new file mode 100644
index 0000000..fe908e3
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KeyAuthorizationKeyProvider.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.crypto.key.kms.server;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.NoSuchAlgorithmException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * A {@link KeyProvider} proxy that checks whether the current user derived via
+ * {@link UserGroupInformation}, is authorized to perform the following
+ * type of operations on a Key :
+ * <ol>
+ * <li>MANAGEMENT operations : createKey, rollNewVersion, deleteKey</li>
+ * <li>GENERATE_EEK operations : generateEncryptedKey, warmUpEncryptedKeys</li>
+ * <li>DECRYPT_EEK operation : decryptEncryptedKey</li>
+ * <li>READ operations : getKeyVersion, getKeyVersions, getMetadata,
+ * getKeysMetadata, getCurrentKey</li>
+ * </ol>
+ * The read operations (getCurrentKeyVersion / getMetadata) etc are not 
checked.
+ */
+public class KeyAuthorizationKeyProvider extends KeyProviderCryptoExtension {
+
+  public static final String KEY_ACL = "key.acl.";
+  private static final String KEY_ACL_NAME = KEY_ACL + "name";
+
+  public enum KeyOpType {
+    ALL, READ, MANAGEMENT, GENERATE_EEK, DECRYPT_EEK;
+  }
+
+  /**
+   * Interface that needs to be implemented by a client of the
+   * <code>KeyAuthorizationKeyProvider</code>.
+   */
+  public static interface KeyACLs {
+    
+    /**
+     * This is called by the KeyProvider to check if the given user is
+     * authorized to perform the specified operation on the given acl name.
+     * @param aclName name of the key ACL
+     * @param ugi User's UserGroupInformation
+     * @param opType Operation Type 
+     * @return true if user has access to the aclName and opType else false
+     */
+    public boolean hasAccessToKey(String aclName, UserGroupInformation ugi,
+        KeyOpType opType);
+
+    /**
+     * 
+     * @param aclName ACL name
+     * @param opType Operation Type
+     * @return true if AclName exists else false 
+     */
+    public boolean isACLPresent(String aclName, KeyOpType opType);
+  }
+
+  private final KeyProviderCryptoExtension provider;
+  private final KeyACLs acls;
+
+  /**
+   * The constructor takes a {@link KeyProviderCryptoExtension} and an
+   * implementation of <code>KeyACLs</code>. All calls are delegated to the
+   * provider keyProvider after authorization check (if required)
+   * @param keyProvider 
+   * @param acls
+   */
+  public KeyAuthorizationKeyProvider(KeyProviderCryptoExtension keyProvider,
+      KeyACLs acls) {
+    super(keyProvider, null);
+    this.provider = keyProvider;
+    this.acls = acls;
+  }
+
+  // This method first checks if "key.acl.name" attribute is present as an
+  // attribute in the provider Options. If yes, use the aclName for any
+  // subsequent access checks, else use the keyName as the aclName and set it
+  // as the value of the "key.acl.name" in the key's metadata.
+  private void authorizeCreateKey(String keyName, Options options,
+      UserGroupInformation ugi) throws IOException{
+    Preconditions.checkNotNull(ugi, "UserGroupInformation cannot be null");
+    Map<String, String> attributes = options.getAttributes();
+    String aclName = attributes.get(KEY_ACL_NAME);
+    boolean success = false;
+    if (Strings.isNullOrEmpty(aclName)) {
+      if (acls.isACLPresent(keyName, KeyOpType.MANAGEMENT)) {
+        options.setAttributes(ImmutableMap.<String, String> builder()
+            .putAll(attributes).put(KEY_ACL_NAME, keyName).build());
+        success =
+            acls.hasAccessToKey(keyName, ugi, KeyOpType.MANAGEMENT)
+                || acls.hasAccessToKey(keyName, ugi, KeyOpType.ALL);
+      } else {
+        success = false;
+      }
+    } else {
+      success = acls.isACLPresent(aclName, KeyOpType.MANAGEMENT) &&
+          (acls.hasAccessToKey(aclName, ugi, KeyOpType.MANAGEMENT)
+          || acls.hasAccessToKey(aclName, ugi, KeyOpType.ALL));
+    }
+    if (!success)
+      throw new AuthorizationException(String.format("User [%s] is not"
+          + " authorized to create key !!", ugi.getShortUserName()));
+  }
+
+  private void checkAccess(String aclName, UserGroupInformation ugi,
+      KeyOpType opType) throws AuthorizationException {
+    Preconditions.checkNotNull(aclName, "Key ACL name cannot be null");
+    Preconditions.checkNotNull(ugi, "UserGroupInformation cannot be null");
+    if (acls.isACLPresent(aclName, KeyOpType.MANAGEMENT) &&
+        (acls.hasAccessToKey(aclName, ugi, opType)
+            || acls.hasAccessToKey(aclName, ugi, KeyOpType.ALL))) {
+      return;
+    } else {
+      throw new AuthorizationException(String.format("User [%s] is not"
+          + " authorized to perform [%s] on key with ACL name [%s]!!",
+          ugi.getShortUserName(), opType, aclName));
+    }
+  }
+
+  @Override
+  public KeyVersion createKey(String name, Options options)
+      throws NoSuchAlgorithmException, IOException {
+    authorizeCreateKey(name, options, getUser());
+    return provider.createKey(name, options);
+  }
+
+  @Override
+  public KeyVersion createKey(String name, byte[] material, Options options)
+      throws IOException {
+    authorizeCreateKey(name, options, getUser());
+    return provider.createKey(name, material, options);
+  }
+
+  @Override
+  public KeyVersion rollNewVersion(String name)
+      throws NoSuchAlgorithmException, IOException {
+    doAccessCheck(name, KeyOpType.MANAGEMENT);
+    return provider.rollNewVersion(name);
+  }
+
+  @Override
+  public void deleteKey(String name) throws IOException {
+    doAccessCheck(name, KeyOpType.MANAGEMENT);
+    provider.deleteKey(name);
+  }
+
+  @Override
+  public KeyVersion rollNewVersion(String name, byte[] material)
+      throws IOException {
+    doAccessCheck(name, KeyOpType.MANAGEMENT);
+    return provider.rollNewVersion(name, material);
+  }
+
+  @Override
+  public void warmUpEncryptedKeys(String... names) throws IOException {
+    for (String name : names) {
+      doAccessCheck(name, KeyOpType.GENERATE_EEK);
+    }
+    provider.warmUpEncryptedKeys(names);
+  }
+
+  @Override
+  public EncryptedKeyVersion generateEncryptedKey(String encryptionKeyName)
+      throws IOException, GeneralSecurityException {
+    doAccessCheck(encryptionKeyName, KeyOpType.GENERATE_EEK);
+    return provider.generateEncryptedKey(encryptionKeyName);
+  }
+
+  @Override
+  public KeyVersion decryptEncryptedKey(EncryptedKeyVersion 
encryptedKeyVersion)
+          throws IOException, GeneralSecurityException {
+    doAccessCheck(
+        encryptedKeyVersion.getEncryptionKeyName(), KeyOpType.DECRYPT_EEK);
+    return provider.decryptEncryptedKey(encryptedKeyVersion);
+  }
+
+  @Override
+  public KeyVersion getKeyVersion(String versionName) throws IOException {
+    KeyVersion keyVersion = provider.getKeyVersion(versionName);
+    if (keyVersion != null) {
+      doAccessCheck(keyVersion.getName(), KeyOpType.READ);
+    }
+    return keyVersion;
+  }
+
+  @Override
+  public List<String> getKeys() throws IOException {
+    return provider.getKeys();
+  }
+
+  @Override
+  public List<KeyVersion> getKeyVersions(String name) throws IOException {
+    doAccessCheck(name, KeyOpType.READ);
+    return provider.getKeyVersions(name);
+  }
+
+  @Override
+  public Metadata getMetadata(String name) throws IOException {
+    doAccessCheck(name, KeyOpType.READ);
+    return provider.getMetadata(name);
+  }
+
+  @Override
+  public Metadata[] getKeysMetadata(String... names) throws IOException {
+    for (String name : names) {
+      doAccessCheck(name, KeyOpType.READ);
+    }
+    return provider.getKeysMetadata(names);
+  }
+
+  @Override
+  public KeyVersion getCurrentKey(String name) throws IOException {
+    doAccessCheck(name, KeyOpType.READ);
+    return provider.getCurrentKey(name);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    provider.flush();
+  }
+
+  @Override
+  public boolean isTransient() {
+    return provider.isTransient();
+  }
+
+  private void doAccessCheck(String keyName, KeyOpType opType) throws
+      IOException {
+    Metadata metadata = provider.getMetadata(keyName);
+    if (metadata != null) {
+      String aclName = metadata.getAttributes().get(KEY_ACL_NAME);
+      checkAccess((aclName == null) ? keyName : aclName, getUser(), opType);
+    }
+  }
+
+  private UserGroupInformation getUser() throws IOException {
+    return UserGroupInformation.getCurrentUser();
+  }
+
+  @Override
+  protected KeyProvider getKeyProvider() {
+    return this;
+  }
+
+  @Override
+  public String toString() {
+    return provider.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b02a4b40/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm 
b/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm
index e947c9b..c76ca3b 100644
--- a/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm
+++ b/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm
@@ -443,6 +443,112 @@ $ keytool -genkey -alias tomcat -keyalg RSA
 
 +---+
 
+*** Key Access Control
+
+  KMS supports access control for all non-read operations at the Key level.
+  All Key Access operations are classified as :
+
+  * MANAGEMENT - createKey, deleteKey, rolloverNewVersion
+
+  * GENERATE_EEK - generateEncryptedKey, warmUpEncryptedKeys
+
+  * DECRYPT_EEK - decryptEncryptedKey;
+
+  * READ - getKeyVersion, getKeyVersions, getMetadata, getKeysMetadata,
+           getCurrentKey;
+
+  * ALL - all of the above;
+
+  These can be defined in the KMS <<<etc/hadoop/kms-acls.xml>>> as follows
+
+  For all keys for which a key access has not been explicitly configured, It
+  is possible to configure a default key access control for a subset of the
+  operation types.
+
+  If no ACL is configured for a specific key AND no default ACL is configured
+  for the requested operation, then access will be DENIED.
+  
+  <<NOTE:>> The default ACL does not support <<<ALL>>> operation qualifier.
+  
++---+
+  <property>
+    <name>key.acl.testKey1.MANAGEMENT</name>
+    <value>*</value>
+    <description>
+      ACL for create-key, deleteKey and rolloverNewVersion operations.
+    </description>
+  </property>
+
+  <property>
+    <name>key.acl.testKey2.GENERATE_EEK</name>
+    <value>*</value>
+    <description>
+      ACL for generateEncryptedKey operations.
+    </description>
+  </property>
+
+  <property>
+    <name>key.acl.testKey3.DECRYPT_EEK</name>
+    <value>*</value>
+    <description>
+      ACL for decryptEncryptedKey operations.
+    </description>
+  </property>
+
+  <property>
+    <name>key.acl.testKey4.READ</name>
+    <value>*</value>
+    <description>
+      ACL for getKeyVersion, getKeyVersions, getMetadata, getKeysMetadata,
+      getCurrentKey operations
+    </description>
+  </property>
+
+  <property>
+    <name>key.acl.testKey5.ALL</name>
+    <value>*</value>
+    <description>
+      ACL for ALL operations.
+    </description>
+  </property>
+
+  <property>
+    <name>default.key.acl.MANAGEMENT</name>
+    <value>user1,user2</value>
+    <description>
+      default ACL for MANAGEMENT operations for all keys that are not
+      explicitly defined.
+    </description>
+  </property>
+
+  <property>
+    <name>default.key.acl.GENERATE_EEK</name>
+    <value>user1,user2</value>
+    <description>
+      default ACL for GENERATE_EEK operations for all keys that are not
+      explicitly defined.
+    </description>
+  </property>
+
+  <property>
+    <name>default.key.acl.DECRYPT_EEK</name>
+    <value>user1,user2</value>
+    <description>
+      default ACL for DECRYPT_EEK operations for all keys that are not
+      explicitly defined.
+    </description>
+  </property>
+
+  <property>
+    <name>default.key.acl.READ</name>
+    <value>user1,user2</value>
+    <description>
+      default ACL for READ operations for all keys that are not
+      explicitly defined.
+    </description>
+  </property>
++---+
+
 ** KMS Delegation Token Configuration
 
   KMS delegation token secret manager can be configured with the following

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b02a4b40/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
 
b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
index 74eab5c..1ca0c0d 100644
--- 
a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
+++ 
b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
@@ -18,8 +18,10 @@
 package org.apache.hadoop.crypto.key.kms.server;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.kms.server.KeyAuthorizationKeyProvider;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
+import org.apache.hadoop.crypto.key.KeyProvider.Options;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import 
org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
 import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
@@ -338,6 +340,13 @@ public class TestKMS {
     UserGroupInformation.setConfiguration(conf);
     File confDir = getTestDir();
     conf = createBaseKMSConf(confDir);
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k1.ALL", "*");
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k2.MANAGEMENT", "*");
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k2.READ", "*");
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k3.ALL", "*");
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k4.ALL", "*");
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k5.ALL", "*");
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k6.ALL", "*");
     writeConf(confDir, conf);
 
     runServer(null, null, confDir, new KMSCallable() {
@@ -492,10 +501,20 @@ public class TestKMS {
         options = new KeyProvider.Options(conf);
         options.setCipher("AES/CTR/NoPadding");
         options.setBitLength(128);
-        kp.createKey("k2", options);
+        KeyVersion kVer2 = kp.createKey("k2", options);
         KeyProvider.Metadata meta = kp.getMetadata("k2");
         Assert.assertNull(meta.getDescription());
-        Assert.assertTrue(meta.getAttributes().isEmpty());
+        Assert.assertEquals("k2", meta.getAttributes().get("key.acl.name"));
+
+        // test key ACL.. k2 is granted only MANAGEMENT Op access
+        try {
+          kpExt =
+              KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp);
+          kpExt.generateEncryptedKey(kVer2.getName());
+          Assert.fail("User should not be allowed to encrypt !!");
+        } catch (Exception ex) {
+          // 
+        }
 
         // createKey() description, no tags
         options = new KeyProvider.Options(conf);
@@ -505,7 +524,7 @@ public class TestKMS {
         kp.createKey("k3", options);
         meta = kp.getMetadata("k3");
         Assert.assertEquals("d", meta.getDescription());
-        Assert.assertTrue(meta.getAttributes().isEmpty());
+        Assert.assertEquals("k3", meta.getAttributes().get("key.acl.name"));
 
         Map<String, String> attributes = new HashMap<String, String>();
         attributes.put("a", "A");
@@ -514,6 +533,7 @@ public class TestKMS {
         options = new KeyProvider.Options(conf);
         options.setCipher("AES/CTR/NoPadding");
         options.setBitLength(128);
+        attributes.put("key.acl.name", "k4");
         options.setAttributes(attributes);
         kp.createKey("k4", options);
         meta = kp.getMetadata("k4");
@@ -525,6 +545,7 @@ public class TestKMS {
         options.setCipher("AES/CTR/NoPadding");
         options.setBitLength(128);
         options.setDescription("d");
+        attributes.put("key.acl.name", "k5");
         options.setAttributes(attributes);
         kp.createKey("k5", options);
         meta = kp.getMetadata("k5");
@@ -565,6 +586,201 @@ public class TestKMS {
   }
 
   @Test
+  public void testKeyACLs() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set("hadoop.security.authentication", "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+    final File testDir = getTestDir();
+    conf = createBaseKMSConf(testDir);
+    conf.set("hadoop.kms.authentication.type", "kerberos");
+    conf.set("hadoop.kms.authentication.kerberos.keytab",
+        keytab.getAbsolutePath());
+    conf.set("hadoop.kms.authentication.kerberos.principal", "HTTP/localhost");
+    conf.set("hadoop.kms.authentication.kerberos.name.rules", "DEFAULT");
+
+    for (KMSACLs.Type type : KMSACLs.Type.values()) {
+      conf.set(type.getAclConfigKey(), type.toString());
+    }
+    
conf.set(KMSACLs.Type.CREATE.getAclConfigKey(),"CREATE,ROLLOVER,GET,SET_KEY_MATERIAL,GENERATE_EEK");
+    
conf.set(KMSACLs.Type.ROLLOVER.getAclConfigKey(),"CREATE,ROLLOVER,GET,SET_KEY_MATERIAL,GENERATE_EEK");
+    
conf.set(KMSACLs.Type.GENERATE_EEK.getAclConfigKey(),"CREATE,ROLLOVER,GET,SET_KEY_MATERIAL,GENERATE_EEK");
+    
conf.set(KMSACLs.Type.DECRYPT_EEK.getAclConfigKey(),"CREATE,ROLLOVER,GET,SET_KEY_MATERIAL,GENERATE_EEK");
+
+
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "test_key.MANAGEMENT", 
"CREATE");
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "all_access.ALL", 
"GENERATE_EEK");
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "all_access.DECRYPT_EEK", 
"ROLLOVER");
+    conf.set(KMSConfiguration.DEFAULT_KEY_ACL_PREFIX + "MANAGEMENT", 
"ROLLOVER");
+
+    writeConf(testDir, conf);
+
+    runServer(null, null, testDir, new KMSCallable() {
+
+      @Override
+      public Void call() throws Exception {
+        final Configuration conf = new Configuration();
+        conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
+        final URI uri = createKMSUri(getKMSUrl());
+
+        doAs("CREATE", new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            KeyProvider kp = new KMSClientProvider(uri, conf);
+            try {
+              Options options = new KeyProvider.Options(conf);
+              Map<String, String> attributes = options.getAttributes();
+              HashMap<String,String> newAttribs = new HashMap<String, 
String>(attributes);
+              newAttribs.put("key.acl.name", "test_key");
+              options.setAttributes(newAttribs);
+              KeyProvider.KeyVersion kv = kp.createKey("k0", options);
+              Assert.assertNull(kv.getMaterial());
+              KeyVersion rollVersion = kp.rollNewVersion("k0");
+              Assert.assertNull(rollVersion.getMaterial());
+              KeyProviderCryptoExtension kpce =
+                  
KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp);
+              try {
+                kpce.generateEncryptedKey("k0");
+                Assert.fail("User [CREATE] should not be allowed to 
generate_eek on k0");
+              } catch (Exception e) {
+                // Ignore
+              }
+              newAttribs = new HashMap<String, String>(attributes);
+              newAttribs.put("key.acl.name", "all_access");
+              options.setAttributes(newAttribs);
+              try {
+                kp.createKey("kx", options);
+                Assert.fail("User [CREATE] should not be allowed to create 
kx");
+              } catch (Exception e) {
+                // Ignore
+              }
+            } catch (Exception ex) {
+              Assert.fail(ex.getMessage());
+            }
+            return null;
+          }
+        });
+
+        doAs("ROLLOVER", new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            KeyProvider kp = new KMSClientProvider(uri, conf);
+            try {
+              Options options = new KeyProvider.Options(conf);
+              Map<String, String> attributes = options.getAttributes();
+              HashMap<String,String> newAttribs = new HashMap<String, 
String>(attributes);
+              newAttribs.put("key.acl.name", "test_key2");
+              options.setAttributes(newAttribs);
+              KeyProvider.KeyVersion kv = kp.createKey("k1", options);
+              Assert.assertNull(kv.getMaterial());
+              KeyVersion rollVersion = kp.rollNewVersion("k1");
+              Assert.assertNull(rollVersion.getMaterial());
+              try {
+                kp.rollNewVersion("k0");
+                Assert.fail("User [ROLLOVER] should not be allowed to rollover 
k0");
+              } catch (Exception e) {
+                // Ignore
+              }
+              KeyProviderCryptoExtension kpce =
+                  
KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp);
+              try {
+                kpce.generateEncryptedKey("k1");
+                Assert.fail("User [ROLLOVER] should not be allowed to 
generate_eek on k1");
+              } catch (Exception e) {
+                // Ignore
+              }
+              newAttribs = new HashMap<String, String>(attributes);
+              newAttribs.put("key.acl.name", "all_access");
+              options.setAttributes(newAttribs);
+              try {
+                kp.createKey("kx", options);
+                Assert.fail("User [ROLLOVER] should not be allowed to create 
kx");
+              } catch (Exception e) {
+                // Ignore
+              }
+            } catch (Exception ex) {
+              Assert.fail(ex.getMessage());
+            }
+            return null;
+          }
+        });
+
+        doAs("GET", new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            KeyProvider kp = new KMSClientProvider(uri, conf);
+            try {
+              Options options = new KeyProvider.Options(conf);
+              Map<String, String> attributes = options.getAttributes();
+              HashMap<String,String> newAttribs = new HashMap<String, 
String>(attributes);
+              newAttribs.put("key.acl.name", "test_key");
+              options.setAttributes(newAttribs);
+              try {
+                kp.createKey("k2", options);
+                Assert.fail("User [GET] should not be allowed to create 
key..");
+              } catch (Exception e) {
+                // Ignore
+              }
+              newAttribs = new HashMap<String, String>(attributes);
+              newAttribs.put("key.acl.name", "all_access");
+              options.setAttributes(newAttribs);
+              try {
+                kp.createKey("kx", options);
+                Assert.fail("User [GET] should not be allowed to create kx");
+              } catch (Exception e) {
+                // Ignore
+              }
+            } catch (Exception ex) {
+              Assert.fail(ex.getMessage());
+            }
+            return null;
+          }
+        });
+
+        final EncryptedKeyVersion ekv = doAs("GENERATE_EEK", new 
PrivilegedExceptionAction<EncryptedKeyVersion>() {
+          @Override
+          public EncryptedKeyVersion run() throws Exception {
+            KeyProvider kp = new KMSClientProvider(uri, conf);
+            try {
+              Options options = new KeyProvider.Options(conf);
+              Map<String, String> attributes = options.getAttributes();
+              HashMap<String,String> newAttribs = new HashMap<String, 
String>(attributes);
+              newAttribs.put("key.acl.name", "all_access");
+              options.setAttributes(newAttribs);
+              kp.createKey("kx", options);
+              KeyProviderCryptoExtension kpce =
+                  
KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp);
+              try {
+                return kpce.generateEncryptedKey("kx");
+              } catch (Exception e) {
+                Assert.fail("User [GENERATE_EEK] should be allowed to 
generate_eek on kx");
+              }
+            } catch (Exception ex) {
+              Assert.fail(ex.getMessage());
+            }
+            return null;
+          }
+        });
+
+        doAs("ROLLOVER", new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            KeyProvider kp = new KMSClientProvider(uri, conf);
+            try {
+              KeyProviderCryptoExtension kpce =
+                  
KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp);
+              kpce.decryptEncryptedKey(ekv);
+            } catch (Exception ex) {
+              Assert.fail(ex.getMessage());
+            }
+            return null;
+          }
+        });
+        return null;
+      }
+    });
+  }
+
+  @Test
   public void testACLs() throws Exception {
     Configuration conf = new Configuration();
     conf.set("hadoop.security.authentication", "kerberos");
@@ -586,6 +802,9 @@ public class TestKMS {
     conf.set(KMSACLs.Type.ROLLOVER.getAclConfigKey(),
         KMSACLs.Type.ROLLOVER.toString() + ",SET_KEY_MATERIAL");
 
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k0.ALL", "*");
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k1.ALL", "*");
+
     writeConf(testDir, conf);
 
     runServer(null, null, testDir, new KMSCallable() {
@@ -891,6 +1110,9 @@ public class TestKMS {
     conf.set(KMSACLs.Type.DECRYPT_EEK.getAclConfigKey(), 
"client,hdfs,otheradmin");
     conf.set(KMSACLs.Type.DECRYPT_EEK.getBlacklistConfigKey(), 
"hdfs,otheradmin");
 
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "ck0.ALL", "*");
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "ck1.ALL", "*");
+
     writeConf(testDir, conf);
 
     runServer(null, null, testDir, new KMSCallable() {
@@ -973,6 +1195,7 @@ public class TestKMS {
       conf.set(type.getAclConfigKey(), " ");
     }
     conf.set(KMSACLs.Type.CREATE.getAclConfigKey(), "client");
+    conf.set(KMSConfiguration.DEFAULT_KEY_ACL_PREFIX + "MANAGEMENT", 
"client,client/host");
 
     writeConf(testDir, conf);
 
@@ -1096,6 +1319,9 @@ public class TestKMS {
     conf.set("hadoop.kms.authentication.kerberos.principal", "HTTP/localhost");
     conf.set("hadoop.kms.authentication.kerberos.name.rules", "DEFAULT");
 
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "kA.ALL", "*");
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "kD.ALL", "*");
+
     writeConf(testDir, conf);
 
     runServer(null, null, testDir, new KMSCallable() {
@@ -1164,6 +1390,10 @@ public class TestKMS {
     conf.set("hadoop.kms.authentication.kerberos.name.rules", "DEFAULT");
     conf.set("hadoop.kms.proxyuser.client.users", "foo");
     conf.set("hadoop.kms.proxyuser.client.hosts", "*");
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "kAA.ALL", "*");
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "kBB.ALL", "*");
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "kCC.ALL", "*");
+
     writeConf(testDir, conf);
 
     runServer(null, null, testDir, new KMSCallable() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b02a4b40/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKeyAuthorizationKeyProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKeyAuthorizationKeyProvider.java
 
b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKeyAuthorizationKeyProvider.java
new file mode 100644
index 0000000..a79926a
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKeyAuthorizationKeyProvider.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto.key.kms.server;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.security.SecureRandom;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
+import org.apache.hadoop.crypto.key.KeyProvider.Options;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import 
org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
+import org.apache.hadoop.crypto.key.UserProvider;
+import 
org.apache.hadoop.crypto.key.kms.server.KeyAuthorizationKeyProvider.KeyACLs;
+import 
org.apache.hadoop.crypto.key.kms.server.KeyAuthorizationKeyProvider.KeyOpType;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestKeyAuthorizationKeyProvider {
+
+  private static final String CIPHER = "AES";
+
+  @Test
+  public void testCreateKey() throws Exception {
+    final Configuration conf = new Configuration();
+    KeyProvider kp = 
+        new UserProvider.Factory().createProvider(new URI("user:///"), conf);
+    KeyACLs mock = mock(KeyACLs.class);
+    when(mock.isACLPresent("foo", KeyOpType.MANAGEMENT)).thenReturn(true);
+    UserGroupInformation u1 = UserGroupInformation.createRemoteUser("u1");
+    when(mock.hasAccessToKey("foo", u1, 
KeyOpType.MANAGEMENT)).thenReturn(true);
+    final KeyProviderCryptoExtension kpExt =
+        new KeyAuthorizationKeyProvider(
+            KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp),
+            mock);
+
+    u1.doAs(
+        new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            try {
+              kpExt.createKey("foo", SecureRandom.getSeed(16),
+                  newOptions(conf));
+            } catch (IOException ioe) {
+              Assert.fail("User should be Authorized !!");
+            }
+
+            // "bar" key not configured
+            try {
+              kpExt.createKey("bar", SecureRandom.getSeed(16),
+                  newOptions(conf));
+              Assert.fail("User should NOT be Authorized !!");
+            } catch (IOException ioe) {
+              // Ignore
+            }
+            return null;
+          }
+        }
+        );
+
+    // Unauthorized User
+    UserGroupInformation.createRemoteUser("badGuy").doAs(
+        new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            try {
+              kpExt.createKey("foo", SecureRandom.getSeed(16),
+                  newOptions(conf));
+              Assert.fail("User should NOT be Authorized !!");
+            } catch (IOException ioe) {
+              // Ignore
+            }
+            return null;
+          }
+        }
+        );
+  }
+
+  @Test
+  public void testOpsWhenACLAttributeExists() throws Exception {
+    final Configuration conf = new Configuration();
+    KeyProvider kp = 
+        new UserProvider.Factory().createProvider(new URI("user:///"), conf);
+    KeyACLs mock = mock(KeyACLs.class);
+    when(mock.isACLPresent("testKey", KeyOpType.MANAGEMENT)).thenReturn(true);
+    when(mock.isACLPresent("testKey", 
KeyOpType.GENERATE_EEK)).thenReturn(true);
+    when(mock.isACLPresent("testKey", KeyOpType.DECRYPT_EEK)).thenReturn(true);
+    when(mock.isACLPresent("testKey", KeyOpType.ALL)).thenReturn(true);
+    UserGroupInformation u1 = UserGroupInformation.createRemoteUser("u1");
+    UserGroupInformation u2 = UserGroupInformation.createRemoteUser("u2");
+    UserGroupInformation u3 = UserGroupInformation.createRemoteUser("u3");
+    UserGroupInformation sudo = UserGroupInformation.createRemoteUser("sudo");
+    when(mock.hasAccessToKey("testKey", u1, 
KeyOpType.MANAGEMENT)).thenReturn(true);
+    when(mock.hasAccessToKey("testKey", u2, 
KeyOpType.GENERATE_EEK)).thenReturn(true);
+    when(mock.hasAccessToKey("testKey", u3, 
KeyOpType.DECRYPT_EEK)).thenReturn(true);
+    when(mock.hasAccessToKey("testKey", sudo, KeyOpType.ALL)).thenReturn(true);
+    final KeyProviderCryptoExtension kpExt =
+        new KeyAuthorizationKeyProvider(
+            KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp),
+            mock);
+
+    final KeyVersion barKv = u1.doAs(
+        new PrivilegedExceptionAction<KeyVersion>() {
+          @Override
+          public KeyVersion run() throws Exception {
+            Options opt = newOptions(conf);
+            Map<String, String> m = new HashMap<String, String>();
+            m.put("key.acl.name", "testKey");
+            opt.setAttributes(m);
+            try {
+              KeyVersion kv = 
+                  kpExt.createKey("foo", SecureRandom.getSeed(16), opt);
+              kpExt.rollNewVersion(kv.getName());
+              kpExt.rollNewVersion(kv.getName(), SecureRandom.getSeed(16));
+              kpExt.deleteKey(kv.getName());
+            } catch (IOException ioe) {
+              Assert.fail("User should be Authorized !!");
+            }
+
+            KeyVersion retkv = null;
+            try {
+              retkv = kpExt.createKey("bar", SecureRandom.getSeed(16), opt);
+              kpExt.generateEncryptedKey(retkv.getName());
+              Assert.fail("User should NOT be Authorized to generate EEK !!");
+            } catch (IOException ioe) {
+            }
+            Assert.assertNotNull(retkv);
+            return retkv;
+          }
+        }
+        );
+
+    final EncryptedKeyVersion barEKv =
+        u2.doAs(
+            new PrivilegedExceptionAction<EncryptedKeyVersion>() {
+              @Override
+              public EncryptedKeyVersion run() throws Exception {
+                try {
+                  kpExt.deleteKey(barKv.getName());
+                  Assert.fail("User should NOT be Authorized to "
+                      + "perform any other operation !!");
+                } catch (IOException ioe) {
+                }
+                return kpExt.generateEncryptedKey(barKv.getName());
+              }
+            });
+
+    u3.doAs(
+        new PrivilegedExceptionAction<KeyVersion>() {
+          @Override
+          public KeyVersion run() throws Exception {
+            try {
+              kpExt.deleteKey(barKv.getName());
+              Assert.fail("User should NOT be Authorized to "
+                  + "perform any other operation !!");
+            } catch (IOException ioe) {
+            }
+            return kpExt.decryptEncryptedKey(barEKv);
+          }
+        });
+
+    sudo.doAs(
+        new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            Options opt = newOptions(conf);
+            Map<String, String> m = new HashMap<String, String>();
+            m.put("key.acl.name", "testKey");
+            opt.setAttributes(m);
+            try {
+              KeyVersion kv = 
+                  kpExt.createKey("foo", SecureRandom.getSeed(16), opt);
+              kpExt.rollNewVersion(kv.getName());
+              kpExt.rollNewVersion(kv.getName(), SecureRandom.getSeed(16));
+              EncryptedKeyVersion ekv = 
kpExt.generateEncryptedKey(kv.getName());
+              kpExt.decryptEncryptedKey(ekv);
+              kpExt.deleteKey(kv.getName());
+            } catch (IOException ioe) {
+              Assert.fail("User should be Allowed to do everything !!");
+            }
+            return null;
+          }
+        }
+        );
+  }
+
+  private static KeyProvider.Options newOptions(Configuration conf) {
+    KeyProvider.Options options = new KeyProvider.Options(conf);
+    options.setCipher(CIPHER);
+    options.setBitLength(128);
+    return options;
+  }
+
+}

Reply via email to