This is an automated email from the ASF dual-hosted git repository.

hapylestat pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 2616503  AMBARI-25443. Create principal/keytab operation with multi 
threaded (dgrinenko)
2616503 is described below

commit 26165032d50ff1059229424bbe3c16b36b77ce4e
Author: Dmitry Grinenko <m...@nxa.io>
AuthorDate: Mon Dec 9 09:18:29 2019 +0200

    AMBARI-25443. Create principal/keytab operation with multi threaded 
(dgrinenko)
---
 .../ambari/server/configuration/Configuration.java |  34 ++++
 .../server/controller/KerberosHelperImpl.java      |   3 +-
 .../HostKerberosIdentityResourceProvider.java      |   8 +-
 .../ConfigureAmbariIdentitiesServerAction.java     |  41 +++--
 .../kerberos/CreateKeytabFilesServerAction.java    | 203 +++++++++++----------
 .../kerberos/CreatePrincipalsServerAction.java     |  35 +++-
 .../kerberos/DestroyPrincipalsServerAction.java    |  98 +++++-----
 .../kerberos/FinalizeKerberosServerAction.java     |  18 +-
 .../kerberos/KerberosServerAction.java             |  81 ++++++--
 .../HostKerberosIdentityResourceProviderTest.java  |  18 +-
 .../AbstractPrepareKerberosServerActionTest.java   |  25 +--
 .../ConfigureAmbariIdentitiesServerActionTest.java |   2 +
 .../kerberos/FinalizeKerberosServerActionTest.java |   2 +
 .../server/view/HttpImpersonatorImplTest.java      |   1 -
 14 files changed, 380 insertions(+), 189 deletions(-)

diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index f553905..67c6967 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -2736,6 +2736,20 @@ public class Configuration {
   public static final ConfigurationProperty<Integer> TLS_EPHEMERAL_DH_KEY_SIZE 
= new ConfigurationProperty<>(
     "security.server.tls.ephemeral_dh_key_size", 2048);
 
+  /**
+   * The number of threads to use when executing server-side Kerberos 
commands, such as generate keytabs.
+   */
+  @Markdown(description = "The number of threads to use when executing 
server-side Kerberos commands, such as generate keytabs.")
+  public static final ConfigurationProperty<Integer> 
KERBEROS_SERVER_ACTION_THREADPOOL_SIZE = new ConfigurationProperty<>(
+    "server.kerberos.action.threadpool.size", 1);
+
+  /**
+   * The timeout, in seconds, when finalizing Kerberos 
enable/disable/regenerate commands.
+   */
+  @Markdown(description = "The timeout, in seconds, when finalizing Kerberos 
enable/disable/regenerate commands.")
+  public static final ConfigurationProperty<Integer> 
KERBEROS_SERVER_ACTION_FINALIZE_SECONDS = new ConfigurationProperty<>(
+    "server.kerberos.finalize.timeout", 600);
+
   private static final Logger LOG = LoggerFactory.getLogger(
     Configuration.class);
 
@@ -5619,6 +5633,26 @@ public class Configuration {
   }
 
   /**
+   * Gets the number of threads to use when executing server-side Kerberos
+   * commands, such as generate keytabs.
+   *
+   * @return the threadpool size, defaulting to 1
+   */
+  public int getKerberosServerActionThreadpoolSize() {
+    return 
Integer.parseInt(getProperty(KERBEROS_SERVER_ACTION_THREADPOOL_SIZE));
+  }
+
+  /**
+   * Get the timeout, in seconds, when finalizing Kerberos
+   * enable/disable/regenerate commands.
+   *
+   * @return the timeout, in seconds, defaulting to 600.
+   */
+  public int getKerberosServerActionFinalizeTimeout() {
+    return 
Integer.parseInt(getProperty(KERBEROS_SERVER_ACTION_FINALIZE_SECONDS));
+  }
+
+  /**
    * Generates a markdown table which includes:
    * <ul>
    * <li>Property key name</li>
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelperImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelperImpl.java
index 4402d4e..0e3560b 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelperImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/KerberosHelperImpl.java
@@ -3637,6 +3637,7 @@ public class KerberosHelperImpl implements KerberosHelper 
{
         commandParameters.put(KerberosServerAction.DATA_DIRECTORY, 
dataDirectory.getAbsolutePath());
       }
 
+      int timeout = configuration.getKerberosServerActionFinalizeTimeout();
       Stage stage = 
createServerActionStage(requestStageContainer.getLastStageId(),
           cluster,
           requestStageContainer.getId(),
@@ -3646,7 +3647,7 @@ public class KerberosHelperImpl implements KerberosHelper 
{
           FinalizeKerberosServerAction.class,
           event,
           commandParameters,
-          "Finalize Operations", 300);
+          "Finalize Operations", timeout);
 
       RoleGraph roleGraph = roleGraphFactory.createNew(roleCommandOrder);
       roleGraph.build(stage);
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostKerberosIdentityResourceProvider.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostKerberosIdentityResourceProvider.java
index 5ed5f35..ef25b39 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostKerberosIdentityResourceProvider.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostKerberosIdentityResourceProvider.java
@@ -34,6 +34,9 @@ import org.apache.ambari.server.orm.dao.HostDAO;
 import org.apache.ambari.server.orm.dao.KerberosPrincipalDAO;
 import org.apache.ambari.server.orm.dao.KerberosPrincipalHostDAO;
 import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.kerberos.KerberosDescriptor;
 import org.apache.ambari.server.state.kerberos.KerberosIdentityDescriptor;
 import org.apache.ambari.server.state.kerberos.KerberosKeytabDescriptor;
 import org.apache.ambari.server.state.kerberos.KerberosPrincipalDescriptor;
@@ -158,10 +161,13 @@ public class HostKerberosIdentityResourceProvider extends 
ReadOnlyResourceProvid
       for (Map<String, Object> propertyMap : propertyMaps) {
         String clusterName = (String) 
propertyMap.get(KERBEROS_IDENTITY_CLUSTER_NAME_PROPERTY_ID);
         String hostName = (String) 
propertyMap.get(KERBEROS_IDENTITY_HOST_NAME_PROPERTY_ID);
+        Clusters clusters = getManagementController().getClusters();
+        Cluster cluster = clusters.getCluster(clusterName);
+        KerberosDescriptor kerberosDescriptor = 
kerberosHelper.getKerberosDescriptor(cluster, false);
 
         // Retrieve the active identities for the cluster filtered and grouped 
by hostname
         Map<String, Collection<KerberosIdentityDescriptor>> hostDescriptors =
-            kerberosHelper.getActiveIdentities(clusterName, hostName, null, 
null, true);
+            kerberosHelper.getActiveIdentities(clusterName, hostName, null, 
null,true);
 
         if (hostDescriptors != null) {
           for (Map.Entry<String, Collection<KerberosIdentityDescriptor>> entry 
: hostDescriptors.entrySet()) {
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/ConfigureAmbariIdentitiesServerAction.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/ConfigureAmbariIdentitiesServerAction.java
index 2c660fd..63a1d55 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/ConfigureAmbariIdentitiesServerAction.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/ConfigureAmbariIdentitiesServerAction.java
@@ -22,8 +22,8 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
 
-import com.google.inject.Inject;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.agent.CommandReport;
@@ -40,6 +40,9 @@ import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.Striped;
+import com.google.inject.Inject;
+
 /**
  * ConfigureAmbariIdentitiesServerAction is a ServerAction implementation that 
creates keytab files as
  * instructed.
@@ -65,6 +68,11 @@ public class ConfigureAmbariIdentitiesServerAction extends 
KerberosServerAction
   private HostDAO hostDAO;
 
   /**
+   * Used to prevent multiple threads from working with the same keytab.
+   */
+  private Striped<Lock> m_locksByKeytab = Striped.lazyWeakLock(25);
+
+  /**
    * Called to execute this action.  Upon invocation, calls
    * {@link KerberosServerAction#processIdentities(Map)} )}
    * to iterate through the Kerberos identity metadata and call
@@ -118,29 +126,34 @@ public class ConfigureAmbariIdentitiesServerAction 
extends KerberosServerAction
         LOG.error(message);
         commandReport = createCommandReport(1, HostRoleStatus.FAILED, "{}", 
actionLog.getStdOut(), actionLog.getStdErr());
       } else {
-
         String hostName = 
identityRecord.get(KerberosIdentityDataFileReader.HOSTNAME);
         if (hostName != null && 
hostName.equalsIgnoreCase(KerberosHelper.AMBARI_SERVER_HOST_NAME)) {
           String destKeytabFilePath = 
identityRecord.get(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH);
           File hostDirectory = new File(dataDirectory, hostName);
           File srcKeytabFile = new File(hostDirectory, 
DigestUtils.sha1Hex(destKeytabFilePath));
 
-          if (srcKeytabFile.exists()) {
-            String ownerAccess = 
identityRecord.get(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_ACCESS);
-            boolean ownerWritable = "w".equalsIgnoreCase(ownerAccess) || 
"rw".equalsIgnoreCase(ownerAccess);
-            boolean ownerReadable = "r".equalsIgnoreCase(ownerAccess) || 
"rw".equalsIgnoreCase(ownerAccess);
-            String groupAccess = 
identityRecord.get(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_ACCESS);
-            boolean groupWritable = "w".equalsIgnoreCase(groupAccess) || 
"rw".equalsIgnoreCase(groupAccess);
-            boolean groupReadable = "r".equalsIgnoreCase(groupAccess) || 
"rw".equalsIgnoreCase(groupAccess);
-
-            installAmbariServerIdentity(evaluatedPrincipal, 
srcKeytabFile.getAbsolutePath(), destKeytabFilePath,
+          Lock lock = m_locksByKeytab.get(destKeytabFilePath);
+          lock.lock();
+          try {
+            if (srcKeytabFile.exists()) {
+              String ownerAccess = 
identityRecord.get(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_ACCESS);
+              boolean ownerWritable = "w".equalsIgnoreCase(ownerAccess) || 
"rw".equalsIgnoreCase(ownerAccess);
+              boolean ownerReadable = "r".equalsIgnoreCase(ownerAccess) || 
"rw".equalsIgnoreCase(ownerAccess);
+              String groupAccess = 
identityRecord.get(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_ACCESS);
+              boolean groupWritable = "w".equalsIgnoreCase(groupAccess) || 
"rw".equalsIgnoreCase(groupAccess);
+              boolean groupReadable = "r".equalsIgnoreCase(groupAccess) || 
"rw".equalsIgnoreCase(groupAccess);
+
+              installAmbariServerIdentity(evaluatedPrincipal, 
srcKeytabFile.getAbsolutePath(), destKeytabFilePath,
                 
identityRecord.get(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_NAME), 
ownerReadable, ownerWritable,
                 
identityRecord.get(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_NAME), 
groupReadable, groupWritable, actionLog);
 
-            if 
("AMBARI_SERVER_SELF".equals(identityRecord.get(KerberosIdentityDataFileReader.COMPONENT)))
 {
-              // Create/update the JAASFile...
-              configureJAAS(evaluatedPrincipal, destKeytabFilePath, actionLog);
+              if 
("AMBARI_SERVER_SELF".equals(identityRecord.get(KerberosIdentityDataFileReader.COMPONENT)))
 {
+                // Create/update the JAASFile...
+                configureJAAS(evaluatedPrincipal, destKeytabFilePath, 
actionLog);
+              }
             }
+          } finally {
+            lock.unlock();
           }
         }
       }
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/CreateKeytabFilesServerAction.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/CreateKeytabFilesServerAction.java
index d2488b6..836beaa 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/CreateKeytabFilesServerAction.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/CreateKeytabFilesServerAction.java
@@ -18,7 +18,10 @@
 
 package org.apache.ambari.server.serveraction.kerberos;
 
+import com.google.common.util.concurrent.Striped;
 import com.google.inject.Inject;
+import com.sun.jersey.client.impl.CopyOnWriteHashMap;
+
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.agent.CommandReport;
@@ -42,7 +45,9 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
 
 /**
  * CreateKeytabFilesServerAction is a ServerAction implementation that creates 
keytab files as
@@ -82,10 +87,15 @@ public class CreateKeytabFilesServerAction extends 
KerberosServerAction {
   private HostDAO hostDAO;
 
   /**
+   * Used to prevent multiple threads from working with the same keytab.
+   */
+  private Striped<Lock> m_locksByKeytab = Striped.lazyWeakLock(25);
+
+  /**
    * A map of data used to track what has been processed in order to optimize 
the creation of keytabs
    * such as knowing when to create a cached keytab file or use a cached 
keytab file.
    */
-  Map<String, Set<String>> visitedIdentities = new HashMap<String, 
Set<String>>();
+  Map<String, Set<String>> visitedIdentities = new ConcurrentHashMap<>();
 
   /**
    * Called to execute this action.  Upon invocation, calls
@@ -180,118 +190,125 @@ public class CreateKeytabFilesServerAction extends 
KerberosServerAction {
           String keytabFilePath = 
identityRecord.get(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH);
 
           if ((hostName != null) && !hostName.isEmpty() && (keytabFilePath != 
null) && !keytabFilePath.isEmpty()) {
-            Set<String> visitedPrincipalKeys = 
visitedIdentities.get(evaluatedPrincipal);
-            String visitationKey = String.format("%s|%s", hostName, 
keytabFilePath);
-
-            if ((visitedPrincipalKeys == null) || 
!visitedPrincipalKeys.contains(visitationKey)) {
-              // Look up the current evaluatedPrincipal's password.
-              // If found create the keytab file, else try to find it in the 
cache.
-              String password = principalPasswordMap.get(evaluatedPrincipal);
-              Integer keyNumber = 
principalKeyNumberMap.get(evaluatedPrincipal);
-
-              message = String.format("Creating keytab file for %s on host 
%s", evaluatedPrincipal, hostName);
-              LOG.info(message);
-              actionLog.writeStdOut(message);
-              
auditEventBuilder.withPrincipal(evaluatedPrincipal).withHostName(hostName).withKeyTabFilePath(keytabFilePath);
-
-              // Determine where to store the keytab file.  It should go into 
a host-specific
-              // directory under the previously determined data directory.
-              File hostDirectory = new File(dataDirectory, hostName);
-
-              // Ensure the host directory exists...
-              if (!hostDirectory.exists() && hostDirectory.mkdirs()) {
-                // Make sure only Ambari has access to this directory.
-                ensureAmbariOnlyAccess(hostDirectory);
-              }
+            Lock lock = m_locksByKeytab.get(keytabFilePath);
+            lock.lock();
+
+            try {
+              Set<String> visitedPrincipalKeys = 
visitedIdentities.get(evaluatedPrincipal);
+              String visitationKey = String.format("%s|%s", hostName, 
keytabFilePath);
+
+              if ((visitedPrincipalKeys == null) || 
!visitedPrincipalKeys.contains(visitationKey)) {
+                // Look up the current evaluatedPrincipal's password.
+                // If found create the keytab file, else try to find it in the 
cache.
+                String password = principalPasswordMap.get(evaluatedPrincipal);
+                Integer keyNumber = 
principalKeyNumberMap.get(evaluatedPrincipal);
+
+                message = String.format("Creating keytab file for %s on host 
%s", evaluatedPrincipal, hostName);
+                LOG.info(message);
+                actionLog.writeStdOut(message);
+                
auditEventBuilder.withPrincipal(evaluatedPrincipal).withHostName(hostName).withKeyTabFilePath(keytabFilePath);
+
+                // Determine where to store the keytab file.  It should go 
into a host-specific
+                // directory under the previously determined data directory.
+                File hostDirectory = new File(dataDirectory, hostName);
+
+                // Ensure the host directory exists...
+                if (!hostDirectory.exists() && hostDirectory.mkdirs()) {
+                  // Make sure only Ambari has access to this directory.
+                  ensureAmbariOnlyAccess(hostDirectory);
+                }
 
-            if (hostDirectory.exists()) {
-              File destinationKeytabFile = new File(hostDirectory, 
DigestUtils.sha1Hex(keytabFilePath));
-              HostEntity hostEntity = hostDAO.findByName(hostName);
-              // in case of ambari-server identity there's no host entity for 
ambari_server host
-              if (hostEntity == null && 
!hostName.equalsIgnoreCase(KerberosHelper.AMBARI_SERVER_HOST_NAME)) {
-                message = "Failed to find HostEntity for hostname = " + 
hostName;
-                actionLog.writeStdErr(message);
-                LOG.error(message);
-                commandReport = createCommandReport(1, HostRoleStatus.FAILED, 
"{}", actionLog.getStdOut(), actionLog.getStdErr());
-                return commandReport;
-              }
+                if (hostDirectory.exists()) {
+                  File destinationKeytabFile = new File(hostDirectory, 
DigestUtils.sha1Hex(keytabFilePath));
+                  HostEntity hostEntity = hostDAO.findByName(hostName);
+                  // in case of ambari-server identity there's no host entity 
for ambari_server host
+                  if (hostEntity == null && 
!hostName.equalsIgnoreCase(KerberosHelper.AMBARI_SERVER_HOST_NAME)) {
+                    message = "Failed to find HostEntity for hostname = " + 
hostName;
+                    actionLog.writeStdErr(message);
+                    LOG.error(message);
+                    commandReport = createCommandReport(1, 
HostRoleStatus.FAILED, "{}", actionLog.getStdOut(), actionLog.getStdErr());
+                    return commandReport;
+                  }
 
-              boolean canCache = 
"true".equalsIgnoreCase(identityRecord.get(KerberosIdentityDataFileReader.KEYTAB_FILE_IS_CACHABLE));
-              boolean regenerateKeytabs = 
"true".equalsIgnoreCase(getCommandParameterValue(getCommandParameters(), 
REGENERATE_ALL));
-              boolean onlyKeytabWrite = 
"true".equalsIgnoreCase(identityRecord.get(KerberosIdentityDataFileReader.ONLY_KEYTAB_WRITE));
-              boolean grabKeytabFromCache = regenerateKeytabs && 
onlyKeytabWrite;
+                  boolean canCache = 
"true".equalsIgnoreCase(identityRecord.get(KerberosIdentityDataFileReader.KEYTAB_FILE_IS_CACHABLE));
+                  boolean regenerateKeytabs = 
"true".equalsIgnoreCase(getCommandParameterValue(getCommandParameters(), 
REGENERATE_ALL));
+                  boolean onlyKeytabWrite = 
"true".equalsIgnoreCase(identityRecord.get(KerberosIdentityDataFileReader.ONLY_KEYTAB_WRITE));
+                  boolean grabKeytabFromCache = regenerateKeytabs && 
onlyKeytabWrite;
 
 
-              if (password == null) { // if canCache=true we will try to get 
keytab from cache and send to agent.
-                if (!grabKeytabFromCache && 
(hostName.equalsIgnoreCase(KerberosHelper.AMBARI_SERVER_HOST_NAME) || 
kerberosPrincipalHostDAO
-                  .exists(evaluatedPrincipal, hostEntity.getHostId()))) {
-                  // There is nothing to do for this since it must already 
exist and we don't want to
-                  // regenerate the keytab
-                  message = String.format("Skipping keytab file for %s, 
missing password indicates nothing to do", evaluatedPrincipal);
-                  LOG.debug(message);
-                } else {
-                  KerberosPrincipalEntity principalEntity = 
kerberosPrincipalDAO.find(evaluatedPrincipal);
-                  String cachedKeytabPath = (principalEntity == null) ? null : 
principalEntity.getCachedKeytabPath();
-
-                    if (cachedKeytabPath == null) {
-                      message = String.format("Failed to create keytab for %s, 
missing cached file", evaluatedPrincipal);
-                      actionLog.writeStdErr(message);
-                      LOG.error(message);
-                      commandReport = createCommandReport(1, 
HostRoleStatus.FAILED, "{}", actionLog.getStdOut(), actionLog.getStdErr());
+                  if (password == null) { // if canCache=true we will try to 
get keytab from cache and send to agent.
+                    if (!grabKeytabFromCache && 
(hostName.equalsIgnoreCase(KerberosHelper.AMBARI_SERVER_HOST_NAME) || 
kerberosPrincipalHostDAO
+                      .exists(evaluatedPrincipal, hostEntity.getHostId()))) {
+                      // There is nothing to do for this since it must already 
exist and we don't want to
+                      // regenerate the keytab
+                      message = String.format("Skipping keytab file for %s, 
missing password indicates nothing to do", evaluatedPrincipal);
+                      LOG.debug(message);
                     } else {
-                      try {
-                        operationHandler.createKeytabFile(new 
File(cachedKeytabPath), destinationKeytabFile);
-                      } catch (KerberosOperationException e) {
-                        message = String.format("Failed to create keytab file 
for %s - %s", evaluatedPrincipal, e.getMessage());
+                      KerberosPrincipalEntity principalEntity = 
kerberosPrincipalDAO.find(evaluatedPrincipal);
+                      String cachedKeytabPath = (principalEntity == null) ? 
null : principalEntity.getCachedKeytabPath();
+
+                      if (cachedKeytabPath == null) {
+                        message = String.format("Failed to create keytab for 
%s, missing cached file", evaluatedPrincipal);
                         actionLog.writeStdErr(message);
-                        LOG.error(message, e);
+                        LOG.error(message);
                         commandReport = createCommandReport(1, 
HostRoleStatus.FAILED, "{}", actionLog.getStdOut(), actionLog.getStdErr());
+                      } else {
+                        try {
+                          operationHandler.createKeytabFile(new 
File(cachedKeytabPath), destinationKeytabFile);
+                        } catch (KerberosOperationException e) {
+                          message = String.format("Failed to create keytab 
file for %s - %s", evaluatedPrincipal, e.getMessage());
+                          actionLog.writeStdErr(message);
+                          LOG.error(message, e);
+                          commandReport = createCommandReport(1, 
HostRoleStatus.FAILED, "{}", actionLog.getStdOut(), actionLog.getStdErr());
+                        }
                       }
                     }
-                  }
-                } else {
-                  Keytab keytab = createKeytab(evaluatedPrincipal, password, 
keyNumber, operationHandler, visitedPrincipalKeys != null, canCache, actionLog);
-
-                  if (keytab != null) {
-                    try {
-                      if (operationHandler.createKeytabFile(keytab, 
destinationKeytabFile)) {
-                        ensureAmbariOnlyAccess(destinationKeytabFile);
+                  } else {
+                    Keytab keytab = createKeytab(evaluatedPrincipal, password, 
keyNumber, operationHandler, visitedPrincipalKeys != null, canCache, actionLog);
 
-                        message = String.format("Successfully created keytab 
file for %s at %s", evaluatedPrincipal, 
destinationKeytabFile.getAbsolutePath());
-                        LOG.debug(message);
-                        
auditEventBuilder.withPrincipal(evaluatedPrincipal).withHostName(hostName).withKeyTabFilePath(destinationKeytabFile.getAbsolutePath());
-                      } else {
-                        message = String.format("Failed to create keytab file 
for %s at %s", evaluatedPrincipal, destinationKeytabFile.getAbsolutePath());
+                    if (keytab != null) {
+                      try {
+                        if (operationHandler.createKeytabFile(keytab, 
destinationKeytabFile)) {
+                          ensureAmbariOnlyAccess(destinationKeytabFile);
+
+                          message = String.format("Successfully created keytab 
file for %s at %s", evaluatedPrincipal, 
destinationKeytabFile.getAbsolutePath());
+                          LOG.debug(message);
+                          
auditEventBuilder.withPrincipal(evaluatedPrincipal).withHostName(hostName).withKeyTabFilePath(destinationKeytabFile.getAbsolutePath());
+                        } else {
+                          message = String.format("Failed to create keytab 
file for %s at %s", evaluatedPrincipal, 
destinationKeytabFile.getAbsolutePath());
+                          actionLog.writeStdErr(message);
+                          LOG.error(message);
+                          commandReport = createCommandReport(1, 
HostRoleStatus.FAILED, "{}", actionLog.getStdOut(), actionLog.getStdErr());
+                        }
+                      } catch (KerberosOperationException e) {
+                        message = String.format("Failed to create keytab file 
for %s - %s", evaluatedPrincipal, e.getMessage());
                         actionLog.writeStdErr(message);
-                        LOG.error(message);
+                        LOG.error(message, e);
                         commandReport = createCommandReport(1, 
HostRoleStatus.FAILED, "{}", actionLog.getStdOut(), actionLog.getStdErr());
                       }
-                    } catch (KerberosOperationException e) {
-                      message = String.format("Failed to create keytab file 
for %s - %s", evaluatedPrincipal, e.getMessage());
-                      actionLog.writeStdErr(message);
-                      LOG.error(message, e);
+                    } else {
                       commandReport = createCommandReport(1, 
HostRoleStatus.FAILED, "{}", actionLog.getStdOut(), actionLog.getStdErr());
                     }
-                  } else {
-                    commandReport = createCommandReport(1, 
HostRoleStatus.FAILED, "{}", actionLog.getStdOut(), actionLog.getStdErr());
-                  }
 
-                  if (visitedPrincipalKeys == null) {
-                    visitedPrincipalKeys = new HashSet<String>();
-                    visitedIdentities.put(evaluatedPrincipal, 
visitedPrincipalKeys);
-                  }
+                    if (visitedPrincipalKeys == null) {
+                      visitedPrincipalKeys = new HashSet<String>();
+                      visitedIdentities.put(evaluatedPrincipal, 
visitedPrincipalKeys);
+                    }
 
-                  visitedPrincipalKeys.add(visitationKey);
+                    visitedPrincipalKeys.add(visitationKey);
+                  }
+                } else {
+                  message = String.format("Failed to create keytab file for 
%s, the container directory does not exist: %s",
+                    evaluatedPrincipal, hostDirectory.getAbsolutePath());
+                  actionLog.writeStdErr(message);
+                  LOG.error(message);
+                  commandReport = createCommandReport(1, 
HostRoleStatus.FAILED, "{}", actionLog.getStdOut(), actionLog.getStdErr());
                 }
               } else {
-                message = String.format("Failed to create keytab file for %s, 
the container directory does not exist: %s",
-                  evaluatedPrincipal, hostDirectory.getAbsolutePath());
-                actionLog.writeStdErr(message);
-                LOG.error(message);
-                commandReport = createCommandReport(1, HostRoleStatus.FAILED, 
"{}", actionLog.getStdOut(), actionLog.getStdErr());
+                LOG.debug(String.format("Skipping previously processed keytab 
for %s on host %s", evaluatedPrincipal, hostName));
               }
-            } else {
-              LOG.debug(String.format("Skipping previously processed keytab 
for %s on host %s", evaluatedPrincipal, hostName));
+            } finally {
+              lock.unlock();
             }
           }
         }
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/CreatePrincipalsServerAction.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/CreatePrincipalsServerAction.java
index 3825602..38e4832 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/CreatePrincipalsServerAction.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/CreatePrincipalsServerAction.java
@@ -18,6 +18,7 @@
 
 package org.apache.ambari.server.serveraction.kerberos;
 
+import com.google.common.util.concurrent.Striped;
 import com.google.inject.Inject;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
@@ -29,6 +30,7 @@ import 
org.apache.ambari.server.orm.entities.KerberosPrincipalEntity;
 import org.apache.ambari.server.security.SecurePasswordHelper;
 import org.apache.ambari.server.serveraction.ActionLog;
 import org.apache.commons.lang.StringUtils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,6 +38,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
 
 /**
  * CreatePrincipalsServerAction is a ServerAction implementation that creates 
principals as instructed.
@@ -61,6 +64,11 @@ public class CreatePrincipalsServerAction extends 
KerberosServerAction {
   private KerberosPrincipalHostDAO kerberosPrincipalHostDAO;
 
   /**
+   * Used to prevent multiple threads from working with the same principal.
+   */
+  private Striped<Lock> locksByPrincipal = Striped.lazyWeakLock(25);
+
+  /**
    * SecurePasswordHelper used to generate secure passwords for newly created 
principals
    */
   @Inject
@@ -70,7 +78,7 @@ public class CreatePrincipalsServerAction extends 
KerberosServerAction {
    * A set of visited principal names used to prevent unnecessary processing 
on already processed
    * principal names
    */
-  private Set<String> seenPrincipals = new HashSet<String>();
+  private Set<String> seenPrincipals = new ConcurrentHashSet<>();
 
   /**
    * Called to execute this action.  Upon invocation, calls
@@ -154,20 +162,27 @@ public class CreatePrincipalsServerAction extends 
KerberosServerAction {
       if (processPrincipal) {
         Map<String, String> principalPasswordMap = 
getPrincipalPasswordMap(requestSharedDataContext);
 
+        Lock lock = locksByPrincipal.get(evaluatedPrincipal);
+        lock.lock();
+
         String password = principalPasswordMap.get(evaluatedPrincipal);
 
-        if (password == null) {
-          boolean servicePrincipal = 
"service".equalsIgnoreCase(identityRecord.get(KerberosIdentityDataFileReader.PRINCIPAL_TYPE));
-          CreatePrincipalResult result = createPrincipal(evaluatedPrincipal, 
servicePrincipal, kerberosConfiguration, operationHandler, regenerateKeytabs, 
actionLog);
+        try {
+          if (password == null) {
+            boolean servicePrincipal = 
"service".equalsIgnoreCase(identityRecord.get(KerberosIdentityDataFileReader.PRINCIPAL_TYPE));
+            CreatePrincipalResult result = createPrincipal(evaluatedPrincipal, 
servicePrincipal, kerberosConfiguration, operationHandler, regenerateKeytabs, 
actionLog);
 
-          if (result == null) {
-            commandReport = createCommandReport(1, HostRoleStatus.FAILED, 
"{}", actionLog.getStdOut(), actionLog.getStdErr());
-          } else {
-            Map<String, Integer> principalKeyNumberMap = 
getPrincipalKeyNumberMap(requestSharedDataContext);
+            if (result == null) {
+              commandReport = createCommandReport(1, HostRoleStatus.FAILED, 
"{}", actionLog.getStdOut(), actionLog.getStdErr());
+            } else {
+              Map<String, Integer> principalKeyNumberMap = 
getPrincipalKeyNumberMap(requestSharedDataContext);
 
-            principalPasswordMap.put(evaluatedPrincipal, result.getPassword());
-            principalKeyNumberMap.put(evaluatedPrincipal, 
result.getKeyNumber());
+              principalPasswordMap.put(evaluatedPrincipal, 
result.getPassword());
+              principalKeyNumberMap.put(evaluatedPrincipal, 
result.getKeyNumber());
+            }
           }
+        } finally {
+          lock.unlock();
         }
       }
     }
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/DestroyPrincipalsServerAction.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/DestroyPrincipalsServerAction.java
index a25357c..d2fa207 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/DestroyPrincipalsServerAction.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/DestroyPrincipalsServerAction.java
@@ -18,6 +18,7 @@
 
 package org.apache.ambari.server.serveraction.kerberos;
 
+import com.google.common.util.concurrent.Striped;
 import com.google.inject.Inject;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.CommandReport;
@@ -26,15 +27,16 @@ import org.apache.ambari.server.controller.KerberosHelper;
 import org.apache.ambari.server.orm.dao.KerberosPrincipalDAO;
 import org.apache.ambari.server.orm.entities.KerberosPrincipalEntity;
 import org.apache.ambari.server.utils.ShellCommandUtil;
+import org.eclipse.jetty.util.ConcurrentHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
 
 /**
  * DestroyPrincipalsServerAction is a ServerAction implementation that 
destroys principals as instructed.
@@ -52,10 +54,16 @@ public class DestroyPrincipalsServerAction extends 
KerberosServerAction {
   private KerberosPrincipalDAO kerberosPrincipalDAO;
 
   /**
+   * Used to prevent multiple threads from working with the same keytab.
+   */
+  private Striped<Lock> m_locksByPrincipal = Striped.lazyWeakLock(25);
+
+
+  /**
    * A set of visited principal names used to prevent unnecessary processing 
on already processed
    * principal names
    */
-  private Set<String> seenPrincipals = new HashSet<String>();
+  private Set<String> seenPrincipals = new ConcurrentHashSet<>();
 
   /**
    * Called to execute this action.  Upon invocation, calls
@@ -100,66 +108,72 @@ public class DestroyPrincipalsServerAction extends 
KerberosServerAction {
 
     // Only process this principal if we haven't already processed it
     if (!seenPrincipals.contains(evaluatedPrincipal)) {
-      seenPrincipals.add(evaluatedPrincipal);
+      Lock lock = m_locksByPrincipal.get(evaluatedPrincipal);
+      lock.lock();
+      try {
+        seenPrincipals.add(evaluatedPrincipal);
 
-      String message = String.format("Destroying identity, %s", 
evaluatedPrincipal);
-      LOG.info(message);
-      actionLog.writeStdOut(message);
-      
DestroyPrincipalKerberosAuditEvent.DestroyPrincipalKerberosAuditEventBuilder 
auditEventBuilder = DestroyPrincipalKerberosAuditEvent.builder()
+        String message = String.format("Destroying identity, %s", 
evaluatedPrincipal);
+        LOG.info(message);
+        actionLog.writeStdOut(message);
+        
DestroyPrincipalKerberosAuditEvent.DestroyPrincipalKerberosAuditEventBuilder 
auditEventBuilder = DestroyPrincipalKerberosAuditEvent.builder()
           .withTimestamp(System.currentTimeMillis())
           .withRequestId(getHostRoleCommand().getRequestId())
           .withTaskId(getHostRoleCommand().getTaskId())
           .withPrincipal(evaluatedPrincipal);
 
-      try {
         try {
-          operationHandler.removePrincipal(evaluatedPrincipal);
-        } catch (KerberosOperationException e) {
-          message = String.format("Failed to remove identity for %s from the 
KDC - %s", evaluatedPrincipal, e.getMessage());
-          LOG.warn(message);
-          actionLog.writeStdErr(message);
-          auditEventBuilder.withReasonOfFailure(message);
-        }
+          try {
+            operationHandler.removePrincipal(evaluatedPrincipal);
+          } catch (KerberosOperationException e) {
+            message = String.format("Failed to remove identity for %s from the 
KDC - %s", evaluatedPrincipal, e.getMessage());
+            LOG.warn(message);
+            actionLog.writeStdErr(message);
+            auditEventBuilder.withReasonOfFailure(message);
+          }
 
-        try {
-          KerberosPrincipalEntity principalEntity = 
kerberosPrincipalDAO.find(evaluatedPrincipal);
+          try {
+            KerberosPrincipalEntity principalEntity = 
kerberosPrincipalDAO.find(evaluatedPrincipal);
 
-          if (principalEntity != null) {
-            String cachedKeytabPath = principalEntity.getCachedKeytabPath();
+            if (principalEntity != null) {
+              String cachedKeytabPath = principalEntity.getCachedKeytabPath();
 
-            kerberosPrincipalDAO.remove(principalEntity);
+              kerberosPrincipalDAO.remove(principalEntity);
 
-            // If a cached  keytabs file exists for this principal, delete it.
-            if (cachedKeytabPath != null) {
-              if (!new File(cachedKeytabPath).delete()) {
-                LOG.debug(String.format("Failed to remove cached keytab for 
%s", evaluatedPrincipal));
+              // If a cached  keytabs file exists for this principal, delete 
it.
+              if (cachedKeytabPath != null) {
+                if (!new File(cachedKeytabPath).delete()) {
+                  LOG.debug(String.format("Failed to remove cached keytab for 
%s", evaluatedPrincipal));
+                }
               }
             }
-          }
 
-          // delete Ambari server keytab
-          String hostName = 
identityRecord.get(KerberosIdentityDataFileReader.HOSTNAME);
-          if (hostName != null && 
hostName.equalsIgnoreCase(KerberosHelper.AMBARI_SERVER_HOST_NAME)) {
-            String keytabFilePath = 
identityRecord.get(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH);
-            if (keytabFilePath != null) {
-              try {
-                ShellCommandUtil.Result result = 
ShellCommandUtil.delete(keytabFilePath, true, true);
-                if (!result.isSuccessful()) {
-                  LOG.warn("Failed to remove ambari keytab for {} due to {}", 
evaluatedPrincipal, result.getStderr());
+            // delete Ambari server keytab
+            String hostName = 
identityRecord.get(KerberosIdentityDataFileReader.HOSTNAME);
+            if (hostName != null && 
hostName.equalsIgnoreCase(KerberosHelper.AMBARI_SERVER_HOST_NAME)) {
+              String keytabFilePath = 
identityRecord.get(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH);
+              if (keytabFilePath != null) {
+                try {
+                  ShellCommandUtil.Result result = 
ShellCommandUtil.delete(keytabFilePath, true, true);
+                  if (!result.isSuccessful()) {
+                    LOG.warn("Failed to remove ambari keytab for {} due to 
{}", evaluatedPrincipal, result.getStderr());
+                  }
+                } catch (IOException | InterruptedException e) {
+                  LOG.warn("Failed to remove ambari keytab for " + 
evaluatedPrincipal, e);
                 }
-              } catch (IOException|InterruptedException e) {
-                LOG.warn("Failed to remove ambari keytab for " + 
evaluatedPrincipal, e);
               }
             }
+          } catch (Throwable t) {
+            message = String.format("Failed to remove identity for %s from the 
Ambari database - %s", evaluatedPrincipal, t.getMessage());
+            LOG.warn(message);
+            actionLog.writeStdErr(message);
+            auditEventBuilder.withReasonOfFailure(message);
           }
-        } catch (Throwable t) {
-          message = String.format("Failed to remove identity for %s from the 
Ambari database - %s", evaluatedPrincipal, t.getMessage());
-          LOG.warn(message);
-          actionLog.writeStdErr(message);
-          auditEventBuilder.withReasonOfFailure(message);
+        } finally {
+          auditLog(auditEventBuilder.build());
         }
       } finally {
-        auditLog(auditEventBuilder.build());
+        lock.unlock();
       }
     }
 
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/FinalizeKerberosServerAction.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/FinalizeKerberosServerAction.java
index 0b845d9..54a869f 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/FinalizeKerberosServerAction.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/FinalizeKerberosServerAction.java
@@ -40,11 +40,19 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
+
+import com.google.common.util.concurrent.Striped;
 
 public class FinalizeKerberosServerAction extends KerberosServerAction {
   private final static Logger LOG = 
LoggerFactory.getLogger(FinalizeKerberosServerAction.class);
 
   /**
+   * Used to prevent multiple threads from working with the same keytab.
+   */
+  private Striped<Lock> m_locksByKeytab = Striped.lazyWeakLock(25);
+
+  /**
    * Processes an identity as necessary.
    * <p/>
    * This implementation ensures that keytab files for the Ambari identities 
have the correct
@@ -86,7 +94,13 @@ public class FinalizeKerberosServerAction extends 
KerberosServerAction {
 
           String keytabFilePath = 
identityRecord.get(KerberosIdentityDataFile.KEYTAB_FILE_PATH);
 
-          if (!StringUtils.isEmpty(keytabFilePath)) {
+          if (StringUtils.isEmpty(keytabFilePath)) {
+            return null;
+          }
+
+          Lock lock = m_locksByKeytab.get(keytabFilePath);
+          lock.lock();
+          try {
             Set<String> visited = (Set<String>) 
requestSharedDataContext.get(this.getClass().getName() + "_visited");
 
             if (!visited.contains(keytabFilePath)) {
@@ -149,6 +163,8 @@ public class FinalizeKerberosServerAction extends 
KerberosServerAction {
 
               visited.add(keytabFilePath);
             }
+          } finally {
+            lock.unlock();
           }
         }
       }
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/KerberosServerAction.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/KerberosServerAction.java
index 9a3c4ed..138e494 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/KerberosServerAction.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/KerberosServerAction.java
@@ -18,11 +18,13 @@
 
 package org.apache.ambari.server.serveraction.kerberos;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Inject;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.KerberosHelper;
 import org.apache.ambari.server.security.credential.PrincipalKeyCredential;
 import org.apache.ambari.server.serveraction.AbstractServerAction;
@@ -36,8 +38,18 @@ import static 
org.apache.ambari.server.serveraction.kerberos.KerberosIdentityDat
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
 
 /**
  * KerberosServerAction is an abstract class to be implemented by 
Kerberos-related
@@ -165,6 +177,12 @@ public abstract class KerberosServerAction extends 
AbstractServerAction {
   @Inject
   private KerberosHelper kerberosHelper;
 
+  @Inject
+  /**
+   * Ambari configuration
+   */
+  private Configuration configuration;
+
   /**
    * Given a (command parameter) Map and a property name, attempts to safely 
retrieve the requested
    * data.
@@ -350,7 +368,7 @@ public abstract class KerberosServerAction extends 
AbstractServerAction {
    * @return a CommandReport indicating the result of this operation
    * @throws AmbariException
    */
-  protected CommandReport processIdentities(Map<String, Object> 
requestSharedDataContext)
+  protected CommandReport processIdentities(final Map<String, Object> 
requestSharedDataContext)
       throws AmbariException {
     CommandReport commandReport = null;
     Map<String, String> commandParameters = getCommandParameters();
@@ -361,7 +379,7 @@ public abstract class KerberosServerAction extends 
AbstractServerAction {
     if (commandParameters != null) {
       // Grab the relevant data from this action's command parameters map
       PrincipalKeyCredential administratorCredential = 
kerberosHelper.getKDCAdministratorCredentials(getClusterName());
-      String defaultRealm = getDefaultRealm(commandParameters);
+      final String defaultRealm = getDefaultRealm(commandParameters);
       KDCType kdcType = getKDCType(commandParameters);
       String dataDirectoryPath = getDataDirectoryPath(commandParameters);
 
@@ -390,7 +408,7 @@ public abstract class KerberosServerAction extends 
AbstractServerAction {
               throw new AmbariException(message);
             }
 
-            KerberosOperationHandler handler = 
kerberosOperationHandlerFactory.getKerberosOperationHandler(kdcType);
+            final KerberosOperationHandler handler = 
kerberosOperationHandlerFactory.getKerberosOperationHandler(kdcType);
             if (handler == null) {
               String message = String.format("Failed to process the 
identities, a KDC operation handler was not found for the KDC type of : %s",
                   kdcType.toString());
@@ -399,7 +417,7 @@ public abstract class KerberosServerAction extends 
AbstractServerAction {
               throw new AmbariException(message);
             }
 
-            Map<String, String> kerberosConfiguration = 
getConfiguration("kerberos-env");
+            final Map<String, String> kerberosConfiguration = 
getConfiguration("kerberos-env");
 
             try {
               handler.open(administratorCredential, defaultRealm, 
kerberosConfiguration);
@@ -413,16 +431,51 @@ public abstract class KerberosServerAction extends 
AbstractServerAction {
 
             // Create the data file reader to parse and iterate through the 
records
             KerberosIdentityDataFileReader reader = null;
+            ExecutorService executorService = null;
             try {
+              // create the thread factory, executor, and completion service 
for
+              // running the identity processing in parallel
+              ExecutionCommand executionCommand = getExecutionCommand();
+              int threadCount = 
configuration.getKerberosServerActionThreadpoolSize();
+              String factoryName = (executionCommand == null)
+                ? "process-identity-%d"
+                : "process-identity-task-" + executionCommand.getTaskId() + 
"-thread-%d";
+              ThreadFactory threadFactory = new 
ThreadFactoryBuilder().setNameFormat(factoryName).build();
+              executorService = Executors.newFixedThreadPool(threadCount, 
threadFactory);
+              CompletionService<CommandReport> completionService = new 
ExecutorCompletionService<>(executorService);
+
+              List<Future<CommandReport>> futures = new ArrayList<>();
               reader = 
kerberosIdentityDataFileReaderFactory.createKerberosIdentityDataFileReader(identityDataFile);
-              for (Map<String, String> record : reader) {
-                // Process the current record
-                commandReport = processRecord(record, defaultRealm, handler, 
kerberosConfiguration, requestSharedDataContext);
-
-                // If the principal processor returns a CommandReport, than it 
is time to stop since
-                // an error condition has probably occurred, else all is 
assumed to be well.
-                if (commandReport != null) {
-                  break;
+              try {
+                for (final Map<String, String> record : reader) {
+                  Future<CommandReport> future = completionService.submit(new 
Callable<CommandReport>() {
+                    @Override
+                    public CommandReport call() throws Exception {
+                      return processRecord(record, defaultRealm, handler, 
kerberosConfiguration, requestSharedDataContext);
+                    }
+                  });
+                  futures.add(future);
+                }
+
+                LOG.info("Processing {} identities concurrently with {} 
thread(s)...", futures.size(), threadCount);
+                for (int i = 0; i < futures.size(); i++) {
+                  Future<CommandReport> future = completionService.take();
+
+                  // If the principal processor returns a CommandReport, than 
it is time to stop since
+                  // an error condition has probably occurred, else all is 
assumed to be well.
+                  commandReport = future.get();
+                  if (commandReport != null) {
+                    break;
+                  }
+                }
+              } catch (Exception e) {
+                LOG.error("Unable to process identities asynchronously", e);
+                return createCommandReport(0, HostRoleStatus.FAILED, "{}", 
actionLog.getStdOut(),actionLog.getStdErr());
+              } finally {
+                for (Future<CommandReport> future: futures){
+                  if (!future.isCancelled() &!future.isDone()) {
+                    future.cancel(true);
+                  }
                 }
               }
             } catch (AmbariException e) {
@@ -453,6 +506,10 @@ public abstract class KerberosServerAction extends 
AbstractServerAction {
               } catch (KerberosOperationException e) {
                 // Ignore this...
               }
+
+              if (executorService != null) {
+                executorService.shutdown();
+              }
             }
           }
         }
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/HostKerberosIdentityResourceProviderTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/HostKerberosIdentityResourceProviderTest.java
index f3e1046..1be7660 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/HostKerberosIdentityResourceProviderTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/HostKerberosIdentityResourceProviderTest.java
@@ -30,10 +30,14 @@ import org.apache.ambari.server.orm.dao.HostDAO;
 import org.apache.ambari.server.orm.dao.KerberosPrincipalDAO;
 import org.apache.ambari.server.orm.dao.KerberosPrincipalHostDAO;
 import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.kerberos.KerberosDescriptor;
 import org.apache.ambari.server.state.kerberos.KerberosIdentityDescriptor;
 import org.apache.ambari.server.state.kerberos.KerberosKeytabDescriptor;
 import org.apache.ambari.server.state.kerberos.KerberosPrincipalDescriptor;
 import org.apache.ambari.server.state.kerberos.KerberosPrincipalType;
+import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
 import org.junit.Assert;
 import org.junit.Test;
@@ -130,8 +134,11 @@ public class HostKerberosIdentityResourceProviderTest 
extends EasyMockSupport {
 
   @Test
   public void testGetResources() throws Exception {
+    Clusters clusters = createNiceMock(Clusters.class);
+    
expect(clusters.getCluster(EasyMock.anyString())).andReturn(createNiceMock(Cluster.class)).once();
 
     AmbariManagementController managementController = 
createMock(AmbariManagementController.class);
+    
expect(managementController.getClusters()).andReturn(clusters).atLeastOnce();
 
     KerberosPrincipalDescriptor principalDescriptor1 = 
createStrictMock(KerberosPrincipalDescriptor.class);
     
expect(principalDescriptor1.getValue()).andReturn("princip...@example.com");
@@ -209,9 +216,14 @@ public class HostKerberosIdentityResourceProviderTest 
extends EasyMockSupport {
     activeIdentities.put("Host100", identities);
 
     KerberosHelper kerberosHelper = createStrictMock(KerberosHelper.class);
-    expect(kerberosHelper.getActiveIdentities("Cluster100", "Host100", null, 
null, true))
-        .andReturn(activeIdentities)
-        .times(1);
+    KerberosDescriptor kerberosDescriptor = 
createNiceMock(KerberosDescriptor.class);
+    expect(kerberosHelper.getKerberosDescriptor(
+      EasyMock.anyObject(Cluster.class),
+      EasyMock.eq(false))).andReturn(kerberosDescriptor).atLeastOnce();
+
+    expect(kerberosHelper.getActiveIdentities("Cluster100", "Host100", 
null,null, true))
+      .andReturn(activeIdentities)
+      .once();
 
     // replay
     replayAll();
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/serveraction/kerberos/AbstractPrepareKerberosServerActionTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/serveraction/kerberos/AbstractPrepareKerberosServerActionTest.java
index 5522132..01d7a60 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/serveraction/kerberos/AbstractPrepareKerberosServerActionTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/serveraction/kerberos/AbstractPrepareKerberosServerActionTest.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.serveraction.kerberos;
 
 import static org.easymock.EasyMock.anyBoolean;
 import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createNiceMock;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
@@ -44,6 +45,7 @@ import org.apache.ambari.server.state.ServiceComponentHost;
 import org.apache.ambari.server.state.kerberos.KerberosComponentDescriptor;
 import org.apache.ambari.server.state.kerberos.KerberosDescriptor;
 import org.apache.ambari.server.state.kerberos.KerberosServiceDescriptor;
+import org.apache.ambari.server.state.stack.OsFamily;
 import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Before;
@@ -65,10 +67,10 @@ public class AbstractPrepareKerberosServerActionTest {
   private Injector injector;
   private final PrepareKerberosServerAction prepareKerberosServerAction = new 
PrepareKerberosServerAction();
 
-  private final AuditLogger auditLogger = 
EasyMock.createNiceMock(AuditLogger.class);
-  private final Clusters clusters = EasyMock.createNiceMock(Clusters.class);
-  private final KerberosHelper kerberosHelper = 
EasyMock.createNiceMock(KerberosHelper.class);
-  private final KerberosIdentityDataFileWriterFactory 
kerberosIdentityDataFileWriterFactory = 
EasyMock.createNiceMock(KerberosIdentityDataFileWriterFactory.class);
+  private final AuditLogger auditLogger = createNiceMock(AuditLogger.class);
+  private final Clusters clusters = createNiceMock(Clusters.class);
+  private final KerberosHelper kerberosHelper = 
createNiceMock(KerberosHelper.class);
+  private final KerberosIdentityDataFileWriterFactory 
kerberosIdentityDataFileWriterFactory = 
createNiceMock(KerberosIdentityDataFileWriterFactory.class);
 
   @Before
   public void setUp() throws Exception {
@@ -78,6 +80,7 @@ public class AbstractPrepareKerberosServerActionTest {
         bind(KerberosHelper.class).toInstance(kerberosHelper);
         
bind(KerberosIdentityDataFileWriterFactory.class).toInstance(kerberosIdentityDataFileWriterFactory);
         bind(Clusters.class).toInstance(clusters);
+        bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
         bind(AuditLogger.class).toInstance(auditLogger);
       }
     });
@@ -94,13 +97,13 @@ public class AbstractPrepareKerberosServerActionTest {
   @Test
   @SuppressWarnings("unchecked")
   public void testProcessServiceComponentHosts() throws Exception {
-    final Cluster cluster =  EasyMock.createNiceMock(Cluster.class);
-    final KerberosIdentityDataFileWriter kerberosIdentityDataFileWriter = 
EasyMock.createNiceMock(KerberosIdentityDataFileWriter.class);
-    final KerberosDescriptor kerberosDescriptor = 
EasyMock.createNiceMock(KerberosDescriptor.class);
-    final ServiceComponentHost serviceComponentHostHDFS = 
EasyMock.createNiceMock(ServiceComponentHost.class);
-    final ServiceComponentHost serviceComponentHostZK = 
EasyMock.createNiceMock(ServiceComponentHost.class);
-    final KerberosServiceDescriptor serviceDescriptor = 
EasyMock.createNiceMock(KerberosServiceDescriptor.class);
-    final KerberosComponentDescriptor componentDescriptor = 
EasyMock.createNiceMock(KerberosComponentDescriptor.class);
+    final Cluster cluster =  createNiceMock(Cluster.class);
+    final KerberosIdentityDataFileWriter kerberosIdentityDataFileWriter = 
createNiceMock(KerberosIdentityDataFileWriter.class);
+    final KerberosDescriptor kerberosDescriptor = 
createNiceMock(KerberosDescriptor.class);
+    final ServiceComponentHost serviceComponentHostHDFS = 
createNiceMock(ServiceComponentHost.class);
+    final ServiceComponentHost serviceComponentHostZK = 
createNiceMock(ServiceComponentHost.class);
+    final KerberosServiceDescriptor serviceDescriptor = 
createNiceMock(KerberosServiceDescriptor.class);
+    final KerberosComponentDescriptor componentDescriptor = 
createNiceMock(KerberosComponentDescriptor.class);
 
     final String hdfsService = "HDFS";
     final String zookeeperService = "ZOOKEEPER";
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/serveraction/kerberos/ConfigureAmbariIdentitiesServerActionTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/serveraction/kerberos/ConfigureAmbariIdentitiesServerActionTest.java
index 439bcae..e49beb9 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/serveraction/kerberos/ConfigureAmbariIdentitiesServerActionTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/serveraction/kerberos/ConfigureAmbariIdentitiesServerActionTest.java
@@ -30,6 +30,7 @@ import 
org.apache.ambari.server.orm.dao.KerberosPrincipalHostDAO;
 import org.apache.ambari.server.orm.entities.HostEntity;
 import org.apache.ambari.server.serveraction.ActionLog;
 import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.stack.OsFamily;
 import org.apache.ambari.server.utils.StageUtils;
 import org.apache.commons.io.FileUtils;
 import org.easymock.EasyMockSupport;
@@ -194,6 +195,7 @@ public class ConfigureAmbariIdentitiesServerActionTest 
extends EasyMockSupport {
         bind(AuditLogger.class).toInstance(createNiceMock(AuditLogger.class));
         bind(Clusters.class).toInstance(createNiceMock(Clusters.class));
         
bind(KerberosHelper.class).toInstance(createNiceMock(KerberosHelper.class));
+        bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
 
         bind(HostDAO.class).toInstance(createMock(HostDAO.class));
         
bind(KerberosPrincipalHostDAO.class).toInstance(createMock(KerberosPrincipalHostDAO.class));
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/serveraction/kerberos/FinalizeKerberosServerActionTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/serveraction/kerberos/FinalizeKerberosServerActionTest.java
index 9404480..3061a36 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/serveraction/kerberos/FinalizeKerberosServerActionTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/serveraction/kerberos/FinalizeKerberosServerActionTest.java
@@ -46,6 +46,7 @@ import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.SecurityState;
 import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.stack.OsFamily;
 import org.easymock.EasyMockSupport;
 import org.junit.Rule;
 import org.junit.Test;
@@ -198,6 +199,7 @@ public class FinalizeKerberosServerActionTest extends 
EasyMockSupport {
       protected void configure() {
         
bind(KerberosHelper.class).toInstance(createMock(KerberosHelper.class));
         bind(Clusters.class).toInstance(clusters);
+        bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
         bind(AuditLogger.class).toInstance(createNiceMock(AuditLogger.class));
       }
     });
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/view/HttpImpersonatorImplTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/view/HttpImpersonatorImplTest.java
index e1df325..b0d2dba 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/view/HttpImpersonatorImplTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/view/HttpImpersonatorImplTest.java
@@ -27,7 +27,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
-import sun.nio.cs.StandardCharsets;
 
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;

Reply via email to