This is an automated email from the ASF dual-hosted git repository.

epayne pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 56ecf46  HADOOP-17346. Fair call queue is defeated by abusive service 
principals. Contributed by Ahmed Hussein (ahussein).
56ecf46 is described below

commit 56ecf469f82feead169a77e1f3b6f3eabae1d509
Author: Eric Payne <epa...@apache.org>
AuthorDate: Mon Nov 23 20:48:07 2020 +0000

    HADOOP-17346. Fair call queue is defeated by abusive service principals. 
Contributed by Ahmed Hussein (ahussein).
---
 .../org/apache/hadoop/ipc/CallQueueManager.java    | 14 +++++
 .../org/apache/hadoop/ipc/DecayRpcScheduler.java   | 59 +++++++++++++++++++---
 .../src/main/java/org/apache/hadoop/ipc/RPC.java   | 14 ++++-
 .../main/java/org/apache/hadoop/ipc/Server.java    | 17 ++++++-
 .../apache/hadoop/ipc/UserIdentityProvider.java    |  2 +-
 .../org/apache/hadoop/security/SecurityUtil.java   | 20 +++++++-
 .../authorize/ServiceAuthorizationManager.java     | 29 ++++++-----
 .../apache/hadoop/ipc/TestDecayRpcScheduler.java   |  3 +-
 .../test/java/org/apache/hadoop/ipc/TestRPC.java   | 37 ++++++++++++++
 .../java/org/apache/hadoop/ipc/TestRpcBase.java    |  5 +-
 10 files changed, 171 insertions(+), 29 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
index 0287656..692f1a8 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import 
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -202,6 +203,19 @@ public class CallQueueManager<E extends Schedulable>
     return scheduler.getPriorityLevel(e);
   }
 
+  int getPriorityLevel(UserGroupInformation user) {
+    if (scheduler instanceof DecayRpcScheduler) {
+      return ((DecayRpcScheduler)scheduler).getPriorityLevel(user);
+    }
+    return 0;
+  }
+
+  void setPriorityLevel(UserGroupInformation user, int priority) {
+    if (scheduler instanceof DecayRpcScheduler) {
+      ((DecayRpcScheduler)scheduler).setPriorityLevel(user, priority);
+    }
+  }
+
   void setClientBackoffEnabled(boolean value) {
     clientBackOffEnabled = value;
   }
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
index c1ca419..d1108a9 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
@@ -39,6 +40,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.AtomicDoubleArray;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -173,6 +175,7 @@ public class DecayRpcScheduler implements RpcScheduler,
   private MetricsProxy metricsProxy;
   private final CostProvider costProvider;
 
+  private final Map<String, Integer> staticPriorities = new HashMap<>();
   /**
    * This TimerTask will call decayCurrentCosts until
    * the scheduler has been garbage collected.
@@ -476,7 +479,7 @@ public class DecayRpcScheduler implements RpcScheduler,
       AtomicLong value = entry.getValue().get(0);
 
       long snapshot = value.get();
-      int computedLevel = computePriorityLevel(snapshot);
+      int computedLevel = computePriorityLevel(snapshot, id);
 
       nextCache.put(id, computedLevel);
     }
@@ -526,7 +529,11 @@ public class DecayRpcScheduler implements RpcScheduler,
    * @param cost the cost for an identity
    * @return scheduling decision from 0 to numLevels - 1
    */
-  private int computePriorityLevel(long cost) {
+  private int computePriorityLevel(long cost, Object identity) {
+    Integer staticPriority = staticPriorities.get(identity);
+    if (staticPriority != null) {
+      return staticPriority.intValue();
+    }
     long totalCallSnapshot = totalDecayedCallCost.get();
 
     double proportion = 0;
@@ -566,11 +573,20 @@ public class DecayRpcScheduler implements RpcScheduler,
     // Cache was no good, compute it
     List<AtomicLong> costList = callCosts.get(identity);
     long currentCost = costList == null ? 0 : costList.get(0).get();
-    int priority = computePriorityLevel(currentCost);
+    int priority = computePriorityLevel(currentCost, identity);
     LOG.debug("compute priority for {} priority {}", identity, priority);
     return priority;
   }
 
+  private String getIdentity(Schedulable obj) {
+    String identity = this.identityProvider.makeIdentity(obj);
+    if (identity == null) {
+      // Identity provider did not handle this
+      identity = DECAYSCHEDULER_UNKNOWN_IDENTITY;
+    }
+    return identity;
+  }
+
   /**
    * Compute the appropriate priority for a schedulable based on past requests.
    * @param obj the schedulable obj to query and remember
@@ -579,15 +595,42 @@ public class DecayRpcScheduler implements RpcScheduler,
   @Override
   public int getPriorityLevel(Schedulable obj) {
     // First get the identity
-    String identity = this.identityProvider.makeIdentity(obj);
-    if (identity == null) {
-      // Identity provider did not handle this
-      identity = DECAYSCHEDULER_UNKNOWN_IDENTITY;
-    }
+    String identity = getIdentity(obj);
+    // highest priority users may have a negative priority but their
+    // calls will be priority 0.
+    return Math.max(0, cachedOrComputedPriorityLevel(identity));
+  }
 
+  @VisibleForTesting
+  int getPriorityLevel(UserGroupInformation ugi) {
+    String identity = getIdentity(newSchedulable(ugi));
+    // returns true priority of the user.
     return cachedOrComputedPriorityLevel(identity);
   }
 
+  @VisibleForTesting
+  void setPriorityLevel(UserGroupInformation ugi, int priority) {
+    String identity = getIdentity(newSchedulable(ugi));
+    priority = Math.min(numLevels - 1, priority);
+    LOG.info("Setting priority for user:" + identity + "=" + priority);
+    staticPriorities.put(identity, priority);
+  }
+
+  // dummy instance to conform to identity provider api.
+  private static Schedulable newSchedulable(UserGroupInformation ugi) {
+    return new Schedulable() {
+      @Override
+      public UserGroupInformation getUserGroupInformation() {
+        return ugi;
+      }
+
+      @Override
+      public int getPriorityLevel() {
+        return 0;
+      }
+    };
+  }
+
   @Override
   public boolean shouldBackOff(Schedulable obj) {
     Boolean backOff = false;
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
index d892997..efb3258 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
@@ -51,6 +51,7 @@ import 
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.Rpc
 import 
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -980,7 +981,18 @@ public class RPC {
            " ProtocolImpl=" + protocolImpl.getClass().getName() +
            " protocolClass=" + protocolClass.getName());
      }
-   }
+      String client = SecurityUtil.getClientPrincipal(protocolClass, 
getConf());
+      if (client != null) {
+        // notify the server's rpc scheduler that the protocol user has
+        // highest priority.  the scheduler should exempt the user from
+        // priority calculations.
+        try {
+          setPriorityLevel(UserGroupInformation.createRemoteUser(client), -1);
+        } catch (Exception ex) {
+          LOG.warn("Failed to set scheduling priority for " + client, ex);
+        }
+      }
+    }
    
    static class VerProtocolImpl {
      final long version;
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index d06736a..0178139 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -637,7 +637,22 @@ public abstract class Server {
           address.getPort(), e);
     }
   }
-  
+
+  @VisibleForTesting
+  int getPriorityLevel(Schedulable e) {
+    return callQueue.getPriorityLevel(e);
+  }
+
+  @VisibleForTesting
+  int getPriorityLevel(UserGroupInformation ugi) {
+    return callQueue.getPriorityLevel(ugi);
+  }
+
+  @VisibleForTesting
+  void setPriorityLevel(UserGroupInformation ugi, int priority) {
+    callQueue.setPriorityLevel(ugi, priority);
+  }
+
   /**
    * Returns a handle to the rpcMetrics (required in tests)
    * @return rpc metrics
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/UserIdentityProvider.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/UserIdentityProvider.java
index 763605e..91ec1a2 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/UserIdentityProvider.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/UserIdentityProvider.java
@@ -31,6 +31,6 @@ public class UserIdentityProvider implements IdentityProvider 
{
       return null;
     }
 
-    return ugi.getUserName();
+    return ugi.getShortUserName();
   }
 }
\ No newline at end of file
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java
index 2313119..dbf1328 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java
@@ -379,7 +379,25 @@ public final class SecurityUtil {
     }
     return null;
   }
- 
+
+  /**
+   * Look up the client principal for a given protocol. It searches all known
+   * SecurityInfo providers.
+   * @param protocol the protocol class to get the information for
+   * @param conf configuration object
+   * @return client principal or null if it has no client principal defined.
+   */
+  public static String getClientPrincipal(Class<?> protocol,
+      Configuration conf) {
+    String user = null;
+    KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf);
+    if (krbInfo != null) {
+      String key = krbInfo.clientPrincipal();
+      user = (key != null && !key.isEmpty()) ? conf.get(key) : null;
+    }
+    return user;
+  }
+
   /**
    * Look up the TokenInfo for a given protocol. It searches all known
    * SecurityInfo providers.
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java
index 4c47348..71c8be3 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java
@@ -99,22 +99,23 @@ public class ServiceAuthorizationManager {
     }
     
     // get client principal key to verify (if available)
-    KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf);
-    String clientPrincipal = null; 
-    if (krbInfo != null) {
-      String clientKey = krbInfo.clientPrincipal();
-      if (clientKey != null && !clientKey.isEmpty()) {
-        try {
-          clientPrincipal = SecurityUtil.getServerPrincipal(
-              conf.get(clientKey), addr);
-        } catch (IOException e) {
-          throw (AuthorizationException) new AuthorizationException(
-              "Can't figure out Kerberos principal name for connection from "
-                  + addr + " for user=" + user + " protocol=" + protocol)
-              .initCause(e);
-        }
+    String clientPrincipal = null;
+
+
+    try {
+      clientPrincipal = SecurityUtil.getClientPrincipal(protocol, conf);
+      if (clientPrincipal != null) {
+
+        clientPrincipal = SecurityUtil.getServerPrincipal(clientPrincipal,
+            addr);
       }
+    } catch (IOException e) {
+      throw (AuthorizationException) new AuthorizationException(
+          "Can't figure out Kerberos principal name for connection from "
+              + addr + " for user=" + user + " protocol=" + protocol)
+          .initCause(e);
     }
+    
     if((clientPrincipal != null && 
!clientPrincipal.equals(user.getUserName())) || 
        acls.length != 2  || !acls[0].isUserAllowed(user) || 
acls[1].isUserAllowed(user)) {
       String cause = clientPrincipal != null ?
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
index 7bdc6b5..ca052f0 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
@@ -42,9 +42,8 @@ import java.util.concurrent.TimeUnit;
 public class TestDecayRpcScheduler {
   private Schedulable mockCall(String id) {
     Schedulable mockCall = mock(Schedulable.class);
-    UserGroupInformation ugi = mock(UserGroupInformation.class);
+    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(id);
 
-    when(ugi.getUserName()).thenReturn(id);
     when(mockCall.getUserGroupInformation()).thenReturn(ugi);
 
     return mockCall;
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index f0d9baf..40e7995 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -1293,6 +1293,43 @@ public class TestRPC extends TestRpcBase {
     }
   }
 
+  @Test (timeout=30000)
+  public void testProtocolUserPriority() throws Exception {
+    final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0";
+    conf.set(CLIENT_PRINCIPAL_KEY, "clientForProtocol");
+    Server server = null;
+    try {
+      server = setupDecayRpcSchedulerandTestServer(ns + ".");
+
+      UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user");
+      // normal users start with priority 0.
+      Assert.assertEquals(0, server.getPriorityLevel(ugi));
+      // calls for a protocol defined client will have priority of 0.
+      Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi)));
+
+      // protocol defined client will have top priority of -1.
+      ugi = UserGroupInformation.createRemoteUser("clientForProtocol");
+      Assert.assertEquals(-1, server.getPriorityLevel(ugi));
+      // calls for a protocol defined client will have priority of 0.
+      Assert.assertEquals(0, server.getPriorityLevel(newSchedulable(ugi)));
+    } finally {
+      stop(server, null);
+    }
+  }
+
+  private static Schedulable newSchedulable(UserGroupInformation ugi) {
+    return new Schedulable(){
+      @Override
+      public UserGroupInformation getUserGroupInformation() {
+        return ugi;
+      }
+      @Override
+      public int getPriorityLevel() {
+        return 0; // doesn't matter.
+      }
+    };
+  }
+
   private Server setupDecayRpcSchedulerandTestServer(String ns)
       throws Exception {
     final int queueSizePerHandler = 3;
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
index 2f2d36f..22ea918 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java
@@ -62,6 +62,8 @@ public class TestRpcBase {
 
   protected final static String SERVER_PRINCIPAL_KEY =
       "test.ipc.server.principal";
+  protected final static String CLIENT_PRINCIPAL_KEY =
+      "test.ipc.client.principal";
   protected final static String ADDRESS = "0.0.0.0";
   protected final static int PORT = 0;
   protected static InetSocketAddress addr;
@@ -271,7 +273,8 @@ public class TestRpcBase {
     }
   }
 
-  @KerberosInfo(serverPrincipal = SERVER_PRINCIPAL_KEY)
+  @KerberosInfo(serverPrincipal = SERVER_PRINCIPAL_KEY,
+                clientPrincipal = CLIENT_PRINCIPAL_KEY)
   @TokenInfo(TestTokenSelector.class)
   @ProtocolInfo(protocolName = 
"org.apache.hadoop.ipc.TestRpcBase$TestRpcService",
       protocolVersion = 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to