Repository: impala
Updated Branches:
  refs/heads/master 8dcf54aee -> 1c6058fa1


IMPALA-7510. Support principals/privileges with LocalCatalog

This enables support for Sentry authorization when LocalCatalog is
enabled. The design is detailed in a change to the comment on
CatalogdMetaProvider, but to recap it briefly here:

At a high level, this patch takes the approach of duplicating the "v1"
catalog flow for PRINCIPAL and PRIVILEGE catalog objects. Namely, the catalog
daemon publishes complete objects into the statestore topic, and the
impalad fully replicates them locally.

I took this approach rather than trying to do fine-grained caching and
invalidation for the following reasons:

- The PRINCIPAL and PRIVILEGE metadata is typically many orders of magnitude
  smaller than table metadata. So, the benefit of fine-grained caching
  and eviction is not as great.

- The PRINCIPAL and PRIVILEGE catalog objects are fairly tightly intertwined
  with relationships between them and backwards mappings maintained from
  groups back to principals. This logic is implemented by the
  AuthorizationPolicy class. Implementing similar mapping in a
  fine-grained caching approach would be a reasonable amount of work.

- This bit of code is under some current flux as others are working on
  implementing more fine grained permissioning. Thus, trying to
  duplicate the logic in a "fetch-on-demand" implementation might turn
  out to be chasing somewhat of a moving target.

In order to take this approach, the patch is organized as follows:

- refactored some of the role/principal removal logic from ImpaladCatalog
  into AuthorizationPolicy. This makes it easier to perform the similar
  "subscribe" with less duplicate cdoe.

- changed catalogd to publish PRINCIPAL and PRIVILEGE objects to v2
  catalogs in addition to v1.

- passed through LocalCatalog.getAuthPolicy to CatalogdMetaProvider, and
  added an AuthorizationPolicy member there. This member is maintained
  when we see PRINCIPAL and PRIVILEGE objects come via the catalog
  updates.

- had to implement LocalCatalog.isReady() to ensure that we don't allow
  user access until the first topic update has been consumed.

- additionally had to copy some other code from ImpaladCatalog to
  protect against various races -- we need a CatalogDeltaLog as well as
  careful sequencing of the order in which the objects apply.

With this patch and the following one to enable UDF support, I was able
to run the tests in tests/authorization successfully with LocalCatalog
enabled.

Change-Id: Iccce5aabdb6afe466fdaeae0fb3700c66e658558
Reviewed-on: http://gerrit.cloudera.org:8080/11358
Reviewed-by: Todd Lipcon <t...@apache.org>
Tested-by: Todd Lipcon <t...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b986f2a8
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b986f2a8
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b986f2a8

Branch: refs/heads/master
Commit: b986f2a8bb9bdfb75edabc20420fa8254f3e899e
Parents: 37c080b
Author: Todd Lipcon <t...@apache.org>
Authored: Wed Aug 29 19:02:32 2018 -0700
Committer: Todd Lipcon <t...@apache.org>
Committed: Thu Sep 6 02:39:08 2018 +0000

----------------------------------------------------------------------
 .../impala/catalog/AuthorizationPolicy.java     |  48 +++++++
 .../impala/catalog/CatalogServiceCatalog.java   |  10 +-
 .../apache/impala/catalog/ImpaladCatalog.java   | 105 +++++++--------
 .../impala/catalog/PrincipalPrivilege.java      |   2 +-
 .../catalog/local/CatalogdMetaProvider.java     | 131 ++++++++++++++++++-
 .../catalog/local/DirectMetaProvider.java       |  14 ++
 .../impala/catalog/local/LocalCatalog.java      |   9 +-
 .../impala/catalog/local/MetaProvider.java      |  16 ++-
 tests/common/custom_cluster_test_suite.py       |   4 +
 9 files changed, 272 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/b986f2a8/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java 
b/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
index ffb8cfd..f6429e6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
+++ b/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import com.google.common.base.Preconditions;
 import org.apache.commons.net.ntp.TimeStamp;
 import org.apache.impala.thrift.TColumn;
+import org.apache.impala.thrift.TPrincipal;
 import org.apache.impala.thrift.TPrincipalType;
 import org.apache.impala.thrift.TPrivilege;
 import org.apache.impala.thrift.TResultRow;
@@ -286,6 +287,53 @@ public class AuthorizationPolicy implements PrivilegeCache 
{
         removeRole(principalName) : removeUser(principalName);
   }
 
+
+  /**
+   * Removes a principal, but only if the existing principal has a lower 
version
+   * than the specified 'dropVersion'.
+   */
+  public synchronized void removePrincipalIfLowerVersion(TPrincipal 
thriftPrincipal,
+      long dropCatalogVersion) {
+    Principal existingPrincipal = 
getPrincipal(thriftPrincipal.getPrincipal_name(),
+        thriftPrincipal.getPrincipal_type());
+    if (existingPrincipal == null ||
+        existingPrincipal.getCatalogVersion() >= dropCatalogVersion) {
+      return;
+    }
+
+    removePrincipal(thriftPrincipal.getPrincipal_name(),
+        thriftPrincipal.getPrincipal_type());
+    // NOTE: the privileges are added to the CatalogObjectVersionSet 
automatically
+    // by being added to the CatalogObjectCache<Privilege> inside the 
principal.
+    // However, since we're just removing the principal wholesale here without 
removing
+    // anything from that map, we need to manually remove them from the 
version set.
+    //
+    // TODO(todd): it seems like it would make sense to do this in 
removePrincipal rather
+    // than here, but this is preserving the behavior of the existing code. 
Perhaps
+    // we have a memory leak in the catalogd due to not removing them there.
+    
CatalogObjectVersionSet.INSTANCE.removeAll(existingPrincipal.getPrivileges());
+  }
+
+
+
+  /**
+   * Removes a privilege, but only if the existing privilege has a lower 
version
+   * than the specified 'dropVersion'.
+   */
+  public synchronized void removePrivilegeIfLowerVersion(TPrivilege 
thriftPrivilege,
+      long dropCatalogVersion) {
+    Principal principal = getPrincipal(thriftPrivilege.getPrincipal_id(),
+        thriftPrivilege.getPrincipal_type());
+    if (principal == null) return;
+    PrincipalPrivilege existingPrivilege =
+        principal.getPrivilege(thriftPrivilege.getPrivilege_name());
+    if (existingPrivilege != null &&
+        existingPrivilege.getCatalogVersion() < dropCatalogVersion) {
+      principal.removePrivilege(thriftPrivilege.getPrivilege_name());
+    }
+  }
+
+
   /**
    * Removes a role. Returns the removed role or null if no role with
    * this name existed.

http://git-wip-us.apache.org/repos/asf/impala/blob/b986f2a8/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 1d0b32a..f02a2f3 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -534,11 +534,15 @@ public class CatalogServiceCatalog extends Catalog {
         // whole catalog.
         // TODO(todd) ensure that the impalad does this invalidation as 
required.
         return obj;
+      case PRIVILEGE:
+      case PRINCIPAL:
+        // The caching of this data on the impalad side is somewhat complex
+        // and this code is also under some churn at the moment. So, we'll 
just publish
+        // the full information rather than doing fetch-on-demand.
+        return obj;
       case DATA_SOURCE:
       case FUNCTION:
       case HDFS_CACHE_POOL:
-      case PRIVILEGE:
-      case PRINCIPAL:
         // These are currently not cached by v2 impalad.
         // TODO(todd): handle these items.
         return null;
@@ -1667,6 +1671,8 @@ public class CatalogServiceCatalog extends Catalog {
     versionLock_.writeLock().lock();
     try {
       Principal principal = authPolicy_.removePrincipal(principalName, type);
+      // TODO(todd): does this end up leaking the privileges associated
+      // with this principal into the CatalogObjectVersionSet on the catalogd?
       if (principal == null) return null;
       for (PrincipalPrivilege priv: principal.getPrivileges()) {
         priv.setCatalogVersion(incrementAndGetCatalogVersion());

http://git-wip-us.apache.org/repos/asf/impala/blob/b986f2a8/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index 921e827..cd31279 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -23,9 +23,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.impala.analysis.TableName;
-import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.service.FeSupport;
@@ -35,8 +33,6 @@ import org.apache.impala.thrift.TDataSource;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TFunction;
 import org.apache.impala.thrift.TGetPartitionStatsResponse;
-import org.apache.impala.thrift.TPrincipal;
-import org.apache.impala.thrift.TPrivilege;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
@@ -107,14 +103,47 @@ public class ImpaladCatalog extends Catalog implements 
FeCatalog {
   }
 
   /**
-   * Returns true if the given object does not depend on any other object 
already
-   * existing in the catalog in order to be added.
+   * Utility class for sequencing the order in which a set of updated catalog 
objects
+   * need to be applied to the catalog in order to satisfy referential 
constraints.
+   *
+   * If one type of object refers to another type of object, it needs to be 
added
+   * after it and deleted before it.
    */
-  private boolean isTopLevelCatalogObject(TCatalogObject catalogObject) {
-    return catalogObject.getType() == TCatalogObjectType.DATABASE ||
-        catalogObject.getType() == TCatalogObjectType.DATA_SOURCE ||
-        catalogObject.getType() == TCatalogObjectType.HDFS_CACHE_POOL ||
-        catalogObject.getType() == TCatalogObjectType.PRINCIPAL;
+  public static class ObjectUpdateSequencer {
+    private final ArrayDeque<TCatalogObject> updatedObjects = new 
ArrayDeque<>();
+    private final ArrayDeque<TCatalogObject> deletedObjects = new 
ArrayDeque<>();
+
+    public void add(TCatalogObject obj, boolean isDeleted) {
+      if (!isDeleted) {
+        // Update top-level objects first.
+        if (isTopLevelCatalogObject(obj)) {
+          updatedObjects.addFirst(obj);
+        } else {
+          updatedObjects.addLast(obj);
+        }
+      } else {
+        // Remove low-level objects first.
+        if (isTopLevelCatalogObject(obj)) {
+          deletedObjects.addLast(obj);
+        } else {
+          deletedObjects.addFirst(obj);
+        }
+      }
+    }
+
+    public Iterable<TCatalogObject> getUpdatedObjects() { return 
updatedObjects; }
+    public Iterable<TCatalogObject> getDeletedObjects() { return 
deletedObjects; }
+
+    /**
+     * Returns true if the given object does not depend on any other object 
already
+     * existing in the catalog in order to be added.
+     */
+    private static boolean isTopLevelCatalogObject(TCatalogObject 
catalogObject) {
+      return catalogObject.getType() == TCatalogObjectType.DATABASE ||
+          catalogObject.getType() == TCatalogObjectType.DATA_SOURCE ||
+          catalogObject.getType() == TCatalogObjectType.HDFS_CACHE_POOL ||
+          catalogObject.getType() == TCatalogObjectType.PRINCIPAL;
+    }
   }
 
   /**
@@ -156,8 +185,7 @@ public class ImpaladCatalog extends Catalog implements 
FeCatalog {
     TUpdateCatalogCacheRequest req) throws CatalogException, TException {
     // For updates from catalog op results, the service ID is set in the 
request.
     if (req.isSetCatalog_service_id()) 
setCatalogServiceId(req.catalog_service_id);
-    ArrayDeque<TCatalogObject> updatedObjects = new ArrayDeque<>();
-    ArrayDeque<TCatalogObject> deletedObjects = new ArrayDeque<>();
+    ObjectUpdateSequencer sequencer = new ObjectUpdateSequencer();
     long newCatalogVersion = lastSyncedCatalogVersion_.get();
     Pair<Boolean, ByteBuffer> update;
     while ((update = 
FeSupport.NativeGetNextCatalogObjectUpdate(req.native_iterator_ptr))
@@ -177,24 +205,12 @@ public class ImpaladCatalog extends Catalog implements 
FeCatalog {
       if (obj.type == TCatalogObjectType.CATALOG) {
         setCatalogServiceId(obj.catalog.catalog_service_id);
         newCatalogVersion = obj.catalog_version;
-      } else if (!update.first) {
-        // Update top-level objects first.
-        if (isTopLevelCatalogObject(obj)) {
-          updatedObjects.addFirst(obj);
-        } else {
-          updatedObjects.addLast(obj);
-        }
       } else {
-        // Remove low-level objects first.
-        if (isTopLevelCatalogObject(obj)) {
-          deletedObjects.addLast(obj);
-        } else {
-          deletedObjects.addFirst(obj);
-        }
+        sequencer.add(obj, update.first);
       }
     }
 
-    for (TCatalogObject catalogObject: updatedObjects) {
+    for (TCatalogObject catalogObject: sequencer.getUpdatedObjects()) {
       try {
         addCatalogObject(catalogObject);
       } catch (Exception e) {
@@ -202,7 +218,9 @@ public class ImpaladCatalog extends Catalog implements 
FeCatalog {
       }
     }
 
-    for (TCatalogObject catalogObject: deletedObjects) 
removeCatalogObject(catalogObject);
+    for (TCatalogObject catalogObject: sequencer.getDeletedObjects()) {
+      removeCatalogObject(catalogObject);
+    }
 
     lastSyncedCatalogVersion_.set(newCatalogVersion);
     // Cleanup old entries in the log.
@@ -322,10 +340,12 @@ public class ImpaladCatalog extends Catalog implements 
FeCatalog {
         removeDataSource(catalogObject.getData_source(), dropCatalogVersion);
         break;
       case PRINCIPAL:
-        removePrincipal(catalogObject.getPrincipal(), dropCatalogVersion);
+        authPolicy_.removePrincipalIfLowerVersion(catalogObject.getPrincipal(),
+            dropCatalogVersion);
         break;
       case PRIVILEGE:
-        removePrivilege(catalogObject.getPrivilege(), dropCatalogVersion);
+        authPolicy_.removePrivilegeIfLowerVersion(catalogObject.getPrivilege(),
+            dropCatalogVersion);
         break;
       case HDFS_CACHE_POOL:
         HdfsCachePool existingItem =
@@ -457,31 +477,6 @@ public class ImpaladCatalog extends Catalog implements 
FeCatalog {
     }
   }
 
-  private void removePrincipal(TPrincipal thriftPrincipal, long 
dropCatalogVersion) {
-    Principal existingPrincipal = authPolicy_.getPrincipal(
-        thriftPrincipal.getPrincipal_name(), 
thriftPrincipal.getPrincipal_type());
-    // version of the drop, remove the function.
-    if (existingPrincipal != null &&
-        existingPrincipal.getCatalogVersion() < dropCatalogVersion) {
-      authPolicy_.removePrincipal(thriftPrincipal.getPrincipal_name(),
-          thriftPrincipal.getPrincipal_type());
-      
CatalogObjectVersionSet.INSTANCE.removeAll(existingPrincipal.getPrivileges());
-    }
-  }
-
-  private void removePrivilege(TPrivilege thriftPrivilege, long 
dropCatalogVersion) {
-    Principal principal = 
authPolicy_.getPrincipal(thriftPrivilege.getPrincipal_id(),
-        thriftPrivilege.getPrincipal_type());
-    if (principal == null) return;
-    PrincipalPrivilege existingPrivilege =
-        principal.getPrivilege(thriftPrivilege.getPrivilege_name());
-    // version of the drop, remove the function.
-    if (existingPrivilege != null &&
-        existingPrivilege.getCatalogVersion() < dropCatalogVersion) {
-      principal.removePrivilege(thriftPrivilege.getPrivilege_name());
-    }
-  }
-
   @Override // FeCatalog
   public boolean isReady() {
     return lastSyncedCatalogVersion_.get() > INITIAL_CATALOG_VERSION;

http://git-wip-us.apache.org/repos/asf/impala/blob/b986f2a8/fe/src/main/java/org/apache/impala/catalog/PrincipalPrivilege.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/PrincipalPrivilege.java 
b/fe/src/main/java/org/apache/impala/catalog/PrincipalPrivilege.java
index bb7500b..dd33fe7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/PrincipalPrivilege.java
+++ b/fe/src/main/java/org/apache/impala/catalog/PrincipalPrivilege.java
@@ -45,7 +45,7 @@ public class PrincipalPrivilege extends CatalogObjectImpl {
   private final TPrivilege privilege_;
 
   private PrincipalPrivilege(TPrivilege privilege) {
-    privilege_ = privilege;
+    privilege_ = Preconditions.checkNotNull(privilege);
   }
 
   public TPrivilege toThrift() { return privilege_; }

http://git-wip-us.apache.org/repos/asf/impala/blob/b986f2a8/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index c36c30d..9fdcb03 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -36,8 +36,14 @@ import 
org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.impala.catalog.AuthorizationPolicy;
 import org.apache.impala.catalog.Catalog;
+import org.apache.impala.catalog.CatalogDeltaLog;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.Principal;
+import org.apache.impala.catalog.PrincipalPrivilege;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.ImpaladCatalog.ObjectUpdateSequencer;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.Reference;
@@ -130,6 +136,18 @@ import 
com.google.errorprone.annotations.concurrent.GuardedBy;
  * it is no longer "linked". Over time, the old entries will naturally age out 
of the
  * cache.
  *
+ *
+ * Metadata that is _not_ fetched on demand
+ * ================================================
+ * This implementation does not fetch _all_ metadata on demand. In fact, some 
pieces of
+ * metadata are currently provided in the same manner as "legacy" 
coordinators: the
+ * full metadata objects are published by the catalog daemon into the 
statestore, and
+ * we keep a full "replica" of that information. In particular, we currently 
use this
+ * strategy for Sentry metadata (roles and privileges) since the caching of 
this data
+ * is relatively more complex. Given that this data is typically quite small 
relative
+ * to the table metadata, it's not too expensive to maintain the full replica.
+ *
+ *
  * TODO(todd): expose statistics on a per-query and per-daemon level about 
cache
  * hit rates, number of outbound RPCs, etc.
  * TODO(todd): handle retry/backoff to ride over short catalog interruptions
@@ -199,6 +217,12 @@ public class CatalogdMetaProvider implements MetaProvider {
       Catalog.INITIAL_CATALOG_VERSION);
 
   /**
+   * Tracks objects that have been deleted in response to a DDL issued from 
this
+   * coordinator.
+   */
+  CatalogDeltaLog deletedObjectsLog_ = new CatalogDeltaLog();
+
+  /**
    * The last known Catalog Service ID. If the ID changes, it indicates the 
CatalogServer
    * has restarted.
    */
@@ -207,6 +231,12 @@ public class CatalogdMetaProvider implements MetaProvider {
   private final Object catalogServiceIdLock_ = new Object();
 
 
+  /**
+   * Cache of authorization policy metadata. Populated from data pushed from 
the
+   * StateStore. Currently this is _not_ "fetch-on-demand".
+   */
+  private final AuthorizationPolicy authPolicy_ = new AuthorizationPolicy();
+
   public CatalogdMetaProvider(TBackendGflags flags) {
     Preconditions.checkArgument(flags.isSetLocal_catalog_cache_expiration_s());
     Preconditions.checkArgument(flags.isSetLocal_catalog_cache_mb());
@@ -237,6 +267,16 @@ public class CatalogdMetaProvider implements MetaProvider {
     return cache_.stats();
   }
 
+  @Override
+  public AuthorizationPolicy getAuthPolicy() {
+    return authPolicy_;
+  }
+
+  @Override
+  public boolean isReady() {
+    return lastSeenCatalogVersion_.get() > Catalog.INITIAL_CATALOG_VERSION;
+  }
+
   /**
    * Send a GetPartialCatalogObject request to catalogd. This handles 
converting
    * non-OK status responses back to exceptions, performing various generic 
sanity
@@ -698,14 +738,23 @@ public class CatalogdMetaProvider implements MetaProvider 
{
    * indicate that the catalogd representation of the object has changed and 
therefore
    * needs to be invalidated in the impalad.
    */
-  public TUpdateCatalogCacheResponse 
updateCatalogCache(TUpdateCatalogCacheRequest req) {
+  public synchronized TUpdateCatalogCacheResponse updateCatalogCache(
+      TUpdateCatalogCacheRequest req) {
     if (req.isSetCatalog_service_id()) {
       witnessCatalogServiceId(req.catalog_service_id);
     }
 
+    // We might see a top-level catalog version number while visiting the 
objects. If so,
+    // we'll capture it here and process it down at the end after applying all 
other
+    // objects.
+    Long nextCatalogVersion = null;
+
+    ObjectUpdateSequencer authObjectSequencer = new ObjectUpdateSequencer();
+
     Pair<Boolean, ByteBuffer> update;
     while ((update = 
FeSupport.NativeGetNextCatalogObjectUpdate(req.native_iterator_ptr))
         != null) {
+      boolean isDelete = update.first;
       TCatalogObject obj = new TCatalogObject();
       try {
         obj.read(new TBinaryProtocol(new TByteBuffer(update.second)));
@@ -717,18 +766,53 @@ public class CatalogdMetaProvider implements MetaProvider 
{
             "coordinator.", e);
         continue;
       }
+      if (isDelete) {
+        deletedObjectsLog_.addRemovedObject(obj);
+      } else if (deletedObjectsLog_.wasObjectRemovedAfter(obj)) {
+        LOG.trace("Skipping update because a matching object was removed " +
+            "in a later catalog version: {}", obj);
+        continue;
+      }
+
       invalidateCacheForObject(obj);
 
+      // The sequencing of updates to authorization objects is important since 
they
+      // may be cross-referential. So, just add them to the sequencer which 
ensures
+      // we handle them in the right order later.
+      if (obj.type == TCatalogObjectType.PRINCIPAL ||
+          obj.type == TCatalogObjectType.PRIVILEGE) {
+        authObjectSequencer.add(obj, isDelete);
+      }
+
       // Handle CATALOG objects. These are sent only via the updates published 
via
       // the statestore topic, and not via the synchronous updates returned 
from DDLs.
       if (obj.type == TCatalogObjectType.CATALOG) {
         // The top-level CATALOG object version is used to implement SYNC_DDL. 
We need
         // to keep track of this and pass it back to the C++ code in the 
return value
-        // of this call.
-        lastSeenCatalogVersion_.set(obj.catalog_version);
+        // of this call. This is also used to know when the catalog is ready at
+        // startup.
+        nextCatalogVersion = obj.catalog_version;
         witnessCatalogServiceId(obj.catalog.catalog_service_id);
       }
     }
+
+    for (TCatalogObject obj : authObjectSequencer.getUpdatedObjects()) {
+      updateAuthPolicy(obj, /*isDelete=*/false);
+    }
+    for (TCatalogObject obj : authObjectSequencer.getDeletedObjects()) {
+      updateAuthPolicy(obj, /*isDelete=*/true);
+    }
+
+    deletedObjectsLog_.garbageCollect(lastSeenCatalogVersion_.get());
+
+    // NOTE: it's important to defer setting the new catalog version until the
+    // end of the loop, since the CATALOG object might be one of the first 
objects
+    // processed, and we don't want to prematurely indicate that we are done 
processing
+    // the update.
+    if (nextCatalogVersion != null) {
+      lastSeenCatalogVersion_.set(nextCatalogVersion);
+    }
+
     // TODO(IMPALA-7506) 'minVersion' here should be the minimum version of 
any object
     // that we have in our cache. This is used to make global INVALIDATE 
METADATA'
     // operations synchronous. The flow is as follows for v1 impalads:
@@ -757,6 +841,42 @@ public class CatalogdMetaProvider implements MetaProvider {
     }
   }
 
+  private void updateAuthPolicy(TCatalogObject obj, boolean isDelete) {
+    LOG.trace("Updating authorization policy: {} isDelete={}", obj, isDelete);
+    switch (obj.type) {
+    case PRINCIPAL:
+      if (!isDelete) {
+        Principal principal = Principal.fromThrift(obj.getPrincipal());
+        principal.setCatalogVersion(obj.getCatalog_version());
+        authPolicy_.addPrincipal(principal);
+      } else {
+        authPolicy_.removePrincipalIfLowerVersion(obj.getPrincipal(),
+            obj.getCatalog_version());
+      }
+      break;
+    case PRIVILEGE:
+      if (!isDelete) {
+        // TODO(todd): duplicate code from ImpaladCatalog.
+        PrincipalPrivilege privilege =
+            PrincipalPrivilege.fromThrift(obj.getPrivilege());
+        privilege.setCatalogVersion(obj.getCatalog_version());
+        try {
+          authPolicy_.addPrivilege(privilege);
+        } catch (CatalogException e) {
+          // TODO(todd) it's odd that we swallow this error, both here and in
+          // the original code in ImpaladCatalog.
+          LOG.error("Error adding privilege: ", e);
+        }
+      } else {
+        authPolicy_.removePrivilegeIfLowerVersion(obj.getPrivilege(),
+            obj.getCatalog_version());
+      }
+      break;
+    default:
+        throw new IllegalArgumentException("invalid type: " + obj.type);
+    }
+  }
+
   /**
    * Witness a service ID received from the catalog. We can see the service IDs
    * either from a DDL response (in which case the service ID is part of the 
RPC
@@ -775,6 +895,11 @@ public class CatalogdMetaProvider implements MetaProvider {
         }
         catalogServiceId_ = serviceId;
         cache_.invalidateAll();
+        // TODO(todd): we probably need to invalidate the auth policy too.
+        // we are probably better off detecting this at a higher level and
+        // reinstantiating the metaprovider entirely, similar to how 
ImpaladCatalog
+        // handles this.
+
         // TODO(todd): slight race here: a concurrent request from the old 
catalog
         // could theoretically be just about to write something back into the 
cache
         // after we do the above invalidate. Maybe we would be better off 
replacing

http://git-wip-us.apache.org/repos/asf/impala/blob/b986f2a8/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index 8bba217..6f13755 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -39,6 +39,7 @@ import 
org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.impala.catalog.AuthorizationPolicy;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.MetaStoreClientPool;
@@ -81,6 +82,19 @@ class DirectMetaProvider implements MetaProvider {
     }
   }
 
+
+  @Override
+  public AuthorizationPolicy getAuthPolicy() {
+    throw new UnsupportedOperationException("not supported");
+  }
+
+  @Override
+  public boolean isReady() {
+    // Direct provider is always ready since we don't need to wait for
+    // an update from any external process.
+    return true;
+  }
+
   @Override
   public ImmutableList<String> loadDbList() throws TException {
     try (MetaStoreClient c = msClientPool_.getClient()) {

http://git-wip-us.apache.org/repos/asf/impala/blob/b986f2a8/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
index 4f72702..328f3b1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
@@ -209,6 +209,9 @@ public class LocalCatalog implements FeCatalog {
   @Override
   public void waitForCatalogUpdate(long timeoutMs) {
     // No-op for local catalog.
+    // TODO(todd): Frontend.waitForCatalog() gets called at startup and ends
+    // up being a tight loop that spews logs unless we do something to wait 
better
+    // here.
   }
 
   @Override
@@ -218,7 +221,7 @@ public class LocalCatalog implements FeCatalog {
 
   @Override
   public AuthorizationPolicy getAuthPolicy() {
-    return null; // TODO(todd): implement auth policy
+    return metaProvider_.getAuthPolicy();
   }
 
   @Override
@@ -241,13 +244,13 @@ public class LocalCatalog implements FeCatalog {
 
   @Override
   public boolean isReady() {
-    // We are always ready.
-    return true;
+    return metaProvider_.isReady();
   }
 
   @Override
   public void setIsReady(boolean isReady) {
     // No-op for local catalog.
+    // This appears to only be used in some tests.
   }
 
   MetaProvider getMetaProvider() {

http://git-wip-us.apache.org/repos/asf/impala/blob/b986f2a8/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
index c5497bd..5431e30 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
@@ -28,6 +28,7 @@ import 
org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.impala.catalog.AuthorizationPolicy;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TNetworkAddress;
@@ -42,11 +43,20 @@ import com.google.errorprone.annotations.Immutable;
  *
  * Implementations may directly access the metadata from the source systems
  * or may include caching, etc.
- *
- * TODO(IMPALA-7127): expand this to include file metadata, sentry metadata,
- * etc.
  */
 interface MetaProvider {
+
+  /**
+   * Get the authorization policy. This acts as a repository of authorization
+   * metadata.
+   */
+  AuthorizationPolicy getAuthPolicy();
+
+  /**
+   * Return true if the metaprovider is ready to service requests.
+   */
+  boolean isReady();
+
   ImmutableList<String> loadDbList() throws TException;
 
   Database loadDb(String dbName) throws TException;

http://git-wip-us.apache.org/repos/asf/impala/blob/b986f2a8/tests/common/custom_cluster_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/custom_cluster_test_suite.py 
b/tests/common/custom_cluster_test_suite.py
index d9ce584..88b3875 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -18,8 +18,10 @@
 # Superclass for all tests that need a custom cluster.
 # TODO: Configure cluster size and other parameters.
 
+import logging
 import os
 import os.path
+import pipes
 import pytest
 import re
 from subprocess import check_call
@@ -156,6 +158,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     if os.environ.get("ERASURE_CODING") == "true":
       
cmd.append("--impalad_args=--default_query_options=allow_erasure_coded_files=true")
 
+    logging.info("Starting cluster with command: %s" %
+                 " ".join(pipes.quote(arg) for arg in cmd + options))
     try:
       check_call(cmd + options, close_fds=True)
     finally:

Reply via email to