Repository: impala Updated Branches: refs/heads/master 8dcf54aee -> 1c6058fa1
IMPALA-7510. Support principals/privileges with LocalCatalog This enables support for Sentry authorization when LocalCatalog is enabled. The design is detailed in a change to the comment on CatalogdMetaProvider, but to recap it briefly here: At a high level, this patch takes the approach of duplicating the "v1" catalog flow for PRINCIPAL and PRIVILEGE catalog objects. Namely, the catalog daemon publishes complete objects into the statestore topic, and the impalad fully replicates them locally. I took this approach rather than trying to do fine-grained caching and invalidation for the following reasons: - The PRINCIPAL and PRIVILEGE metadata is typically many orders of magnitude smaller than table metadata. So, the benefit of fine-grained caching and eviction is not as great. - The PRINCIPAL and PRIVILEGE catalog objects are fairly tightly intertwined with relationships between them and backwards mappings maintained from groups back to principals. This logic is implemented by the AuthorizationPolicy class. Implementing similar mapping in a fine-grained caching approach would be a reasonable amount of work. - This bit of code is under some current flux as others are working on implementing more fine grained permissioning. Thus, trying to duplicate the logic in a "fetch-on-demand" implementation might turn out to be chasing somewhat of a moving target. In order to take this approach, the patch is organized as follows: - refactored some of the role/principal removal logic from ImpaladCatalog into AuthorizationPolicy. This makes it easier to perform the similar "subscribe" with less duplicate cdoe. - changed catalogd to publish PRINCIPAL and PRIVILEGE objects to v2 catalogs in addition to v1. - passed through LocalCatalog.getAuthPolicy to CatalogdMetaProvider, and added an AuthorizationPolicy member there. This member is maintained when we see PRINCIPAL and PRIVILEGE objects come via the catalog updates. - had to implement LocalCatalog.isReady() to ensure that we don't allow user access until the first topic update has been consumed. - additionally had to copy some other code from ImpaladCatalog to protect against various races -- we need a CatalogDeltaLog as well as careful sequencing of the order in which the objects apply. With this patch and the following one to enable UDF support, I was able to run the tests in tests/authorization successfully with LocalCatalog enabled. Change-Id: Iccce5aabdb6afe466fdaeae0fb3700c66e658558 Reviewed-on: http://gerrit.cloudera.org:8080/11358 Reviewed-by: Todd Lipcon <t...@apache.org> Tested-by: Todd Lipcon <t...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b986f2a8 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b986f2a8 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b986f2a8 Branch: refs/heads/master Commit: b986f2a8bb9bdfb75edabc20420fa8254f3e899e Parents: 37c080b Author: Todd Lipcon <t...@apache.org> Authored: Wed Aug 29 19:02:32 2018 -0700 Committer: Todd Lipcon <t...@apache.org> Committed: Thu Sep 6 02:39:08 2018 +0000 ---------------------------------------------------------------------- .../impala/catalog/AuthorizationPolicy.java | 48 +++++++ .../impala/catalog/CatalogServiceCatalog.java | 10 +- .../apache/impala/catalog/ImpaladCatalog.java | 105 +++++++-------- .../impala/catalog/PrincipalPrivilege.java | 2 +- .../catalog/local/CatalogdMetaProvider.java | 131 ++++++++++++++++++- .../catalog/local/DirectMetaProvider.java | 14 ++ .../impala/catalog/local/LocalCatalog.java | 9 +- .../impala/catalog/local/MetaProvider.java | 16 ++- tests/common/custom_cluster_test_suite.py | 4 + 9 files changed, 272 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/b986f2a8/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java b/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java index ffb8cfd..f6429e6 100644 --- a/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java +++ b/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java @@ -24,6 +24,7 @@ import java.util.Set; import com.google.common.base.Preconditions; import org.apache.commons.net.ntp.TimeStamp; import org.apache.impala.thrift.TColumn; +import org.apache.impala.thrift.TPrincipal; import org.apache.impala.thrift.TPrincipalType; import org.apache.impala.thrift.TPrivilege; import org.apache.impala.thrift.TResultRow; @@ -286,6 +287,53 @@ public class AuthorizationPolicy implements PrivilegeCache { removeRole(principalName) : removeUser(principalName); } + + /** + * Removes a principal, but only if the existing principal has a lower version + * than the specified 'dropVersion'. + */ + public synchronized void removePrincipalIfLowerVersion(TPrincipal thriftPrincipal, + long dropCatalogVersion) { + Principal existingPrincipal = getPrincipal(thriftPrincipal.getPrincipal_name(), + thriftPrincipal.getPrincipal_type()); + if (existingPrincipal == null || + existingPrincipal.getCatalogVersion() >= dropCatalogVersion) { + return; + } + + removePrincipal(thriftPrincipal.getPrincipal_name(), + thriftPrincipal.getPrincipal_type()); + // NOTE: the privileges are added to the CatalogObjectVersionSet automatically + // by being added to the CatalogObjectCache<Privilege> inside the principal. + // However, since we're just removing the principal wholesale here without removing + // anything from that map, we need to manually remove them from the version set. + // + // TODO(todd): it seems like it would make sense to do this in removePrincipal rather + // than here, but this is preserving the behavior of the existing code. Perhaps + // we have a memory leak in the catalogd due to not removing them there. + CatalogObjectVersionSet.INSTANCE.removeAll(existingPrincipal.getPrivileges()); + } + + + + /** + * Removes a privilege, but only if the existing privilege has a lower version + * than the specified 'dropVersion'. + */ + public synchronized void removePrivilegeIfLowerVersion(TPrivilege thriftPrivilege, + long dropCatalogVersion) { + Principal principal = getPrincipal(thriftPrivilege.getPrincipal_id(), + thriftPrivilege.getPrincipal_type()); + if (principal == null) return; + PrincipalPrivilege existingPrivilege = + principal.getPrivilege(thriftPrivilege.getPrivilege_name()); + if (existingPrivilege != null && + existingPrivilege.getCatalogVersion() < dropCatalogVersion) { + principal.removePrivilege(thriftPrivilege.getPrivilege_name()); + } + } + + /** * Removes a role. Returns the removed role or null if no role with * this name existed. http://git-wip-us.apache.org/repos/asf/impala/blob/b986f2a8/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 1d0b32a..f02a2f3 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -534,11 +534,15 @@ public class CatalogServiceCatalog extends Catalog { // whole catalog. // TODO(todd) ensure that the impalad does this invalidation as required. return obj; + case PRIVILEGE: + case PRINCIPAL: + // The caching of this data on the impalad side is somewhat complex + // and this code is also under some churn at the moment. So, we'll just publish + // the full information rather than doing fetch-on-demand. + return obj; case DATA_SOURCE: case FUNCTION: case HDFS_CACHE_POOL: - case PRIVILEGE: - case PRINCIPAL: // These are currently not cached by v2 impalad. // TODO(todd): handle these items. return null; @@ -1667,6 +1671,8 @@ public class CatalogServiceCatalog extends Catalog { versionLock_.writeLock().lock(); try { Principal principal = authPolicy_.removePrincipal(principalName, type); + // TODO(todd): does this end up leaking the privileges associated + // with this principal into the CatalogObjectVersionSet on the catalogd? if (principal == null) return null; for (PrincipalPrivilege priv: principal.getPrivileges()) { priv.setCatalogVersion(incrementAndGetCatalogVersion()); http://git-wip-us.apache.org/repos/asf/impala/blob/b986f2a8/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java index 921e827..cd31279 100644 --- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java @@ -23,9 +23,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.fs.Path; import org.apache.impala.analysis.TableName; -import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; import org.apache.impala.common.InternalException; import org.apache.impala.common.Pair; import org.apache.impala.service.FeSupport; @@ -35,8 +33,6 @@ import org.apache.impala.thrift.TDataSource; import org.apache.impala.thrift.TDatabase; import org.apache.impala.thrift.TFunction; import org.apache.impala.thrift.TGetPartitionStatsResponse; -import org.apache.impala.thrift.TPrincipal; -import org.apache.impala.thrift.TPrivilege; import org.apache.impala.thrift.TTable; import org.apache.impala.thrift.TUniqueId; import org.apache.impala.thrift.TUpdateCatalogCacheRequest; @@ -107,14 +103,47 @@ public class ImpaladCatalog extends Catalog implements FeCatalog { } /** - * Returns true if the given object does not depend on any other object already - * existing in the catalog in order to be added. + * Utility class for sequencing the order in which a set of updated catalog objects + * need to be applied to the catalog in order to satisfy referential constraints. + * + * If one type of object refers to another type of object, it needs to be added + * after it and deleted before it. */ - private boolean isTopLevelCatalogObject(TCatalogObject catalogObject) { - return catalogObject.getType() == TCatalogObjectType.DATABASE || - catalogObject.getType() == TCatalogObjectType.DATA_SOURCE || - catalogObject.getType() == TCatalogObjectType.HDFS_CACHE_POOL || - catalogObject.getType() == TCatalogObjectType.PRINCIPAL; + public static class ObjectUpdateSequencer { + private final ArrayDeque<TCatalogObject> updatedObjects = new ArrayDeque<>(); + private final ArrayDeque<TCatalogObject> deletedObjects = new ArrayDeque<>(); + + public void add(TCatalogObject obj, boolean isDeleted) { + if (!isDeleted) { + // Update top-level objects first. + if (isTopLevelCatalogObject(obj)) { + updatedObjects.addFirst(obj); + } else { + updatedObjects.addLast(obj); + } + } else { + // Remove low-level objects first. + if (isTopLevelCatalogObject(obj)) { + deletedObjects.addLast(obj); + } else { + deletedObjects.addFirst(obj); + } + } + } + + public Iterable<TCatalogObject> getUpdatedObjects() { return updatedObjects; } + public Iterable<TCatalogObject> getDeletedObjects() { return deletedObjects; } + + /** + * Returns true if the given object does not depend on any other object already + * existing in the catalog in order to be added. + */ + private static boolean isTopLevelCatalogObject(TCatalogObject catalogObject) { + return catalogObject.getType() == TCatalogObjectType.DATABASE || + catalogObject.getType() == TCatalogObjectType.DATA_SOURCE || + catalogObject.getType() == TCatalogObjectType.HDFS_CACHE_POOL || + catalogObject.getType() == TCatalogObjectType.PRINCIPAL; + } } /** @@ -156,8 +185,7 @@ public class ImpaladCatalog extends Catalog implements FeCatalog { TUpdateCatalogCacheRequest req) throws CatalogException, TException { // For updates from catalog op results, the service ID is set in the request. if (req.isSetCatalog_service_id()) setCatalogServiceId(req.catalog_service_id); - ArrayDeque<TCatalogObject> updatedObjects = new ArrayDeque<>(); - ArrayDeque<TCatalogObject> deletedObjects = new ArrayDeque<>(); + ObjectUpdateSequencer sequencer = new ObjectUpdateSequencer(); long newCatalogVersion = lastSyncedCatalogVersion_.get(); Pair<Boolean, ByteBuffer> update; while ((update = FeSupport.NativeGetNextCatalogObjectUpdate(req.native_iterator_ptr)) @@ -177,24 +205,12 @@ public class ImpaladCatalog extends Catalog implements FeCatalog { if (obj.type == TCatalogObjectType.CATALOG) { setCatalogServiceId(obj.catalog.catalog_service_id); newCatalogVersion = obj.catalog_version; - } else if (!update.first) { - // Update top-level objects first. - if (isTopLevelCatalogObject(obj)) { - updatedObjects.addFirst(obj); - } else { - updatedObjects.addLast(obj); - } } else { - // Remove low-level objects first. - if (isTopLevelCatalogObject(obj)) { - deletedObjects.addLast(obj); - } else { - deletedObjects.addFirst(obj); - } + sequencer.add(obj, update.first); } } - for (TCatalogObject catalogObject: updatedObjects) { + for (TCatalogObject catalogObject: sequencer.getUpdatedObjects()) { try { addCatalogObject(catalogObject); } catch (Exception e) { @@ -202,7 +218,9 @@ public class ImpaladCatalog extends Catalog implements FeCatalog { } } - for (TCatalogObject catalogObject: deletedObjects) removeCatalogObject(catalogObject); + for (TCatalogObject catalogObject: sequencer.getDeletedObjects()) { + removeCatalogObject(catalogObject); + } lastSyncedCatalogVersion_.set(newCatalogVersion); // Cleanup old entries in the log. @@ -322,10 +340,12 @@ public class ImpaladCatalog extends Catalog implements FeCatalog { removeDataSource(catalogObject.getData_source(), dropCatalogVersion); break; case PRINCIPAL: - removePrincipal(catalogObject.getPrincipal(), dropCatalogVersion); + authPolicy_.removePrincipalIfLowerVersion(catalogObject.getPrincipal(), + dropCatalogVersion); break; case PRIVILEGE: - removePrivilege(catalogObject.getPrivilege(), dropCatalogVersion); + authPolicy_.removePrivilegeIfLowerVersion(catalogObject.getPrivilege(), + dropCatalogVersion); break; case HDFS_CACHE_POOL: HdfsCachePool existingItem = @@ -457,31 +477,6 @@ public class ImpaladCatalog extends Catalog implements FeCatalog { } } - private void removePrincipal(TPrincipal thriftPrincipal, long dropCatalogVersion) { - Principal existingPrincipal = authPolicy_.getPrincipal( - thriftPrincipal.getPrincipal_name(), thriftPrincipal.getPrincipal_type()); - // version of the drop, remove the function. - if (existingPrincipal != null && - existingPrincipal.getCatalogVersion() < dropCatalogVersion) { - authPolicy_.removePrincipal(thriftPrincipal.getPrincipal_name(), - thriftPrincipal.getPrincipal_type()); - CatalogObjectVersionSet.INSTANCE.removeAll(existingPrincipal.getPrivileges()); - } - } - - private void removePrivilege(TPrivilege thriftPrivilege, long dropCatalogVersion) { - Principal principal = authPolicy_.getPrincipal(thriftPrivilege.getPrincipal_id(), - thriftPrivilege.getPrincipal_type()); - if (principal == null) return; - PrincipalPrivilege existingPrivilege = - principal.getPrivilege(thriftPrivilege.getPrivilege_name()); - // version of the drop, remove the function. - if (existingPrivilege != null && - existingPrivilege.getCatalogVersion() < dropCatalogVersion) { - principal.removePrivilege(thriftPrivilege.getPrivilege_name()); - } - } - @Override // FeCatalog public boolean isReady() { return lastSyncedCatalogVersion_.get() > INITIAL_CATALOG_VERSION; http://git-wip-us.apache.org/repos/asf/impala/blob/b986f2a8/fe/src/main/java/org/apache/impala/catalog/PrincipalPrivilege.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/PrincipalPrivilege.java b/fe/src/main/java/org/apache/impala/catalog/PrincipalPrivilege.java index bb7500b..dd33fe7 100644 --- a/fe/src/main/java/org/apache/impala/catalog/PrincipalPrivilege.java +++ b/fe/src/main/java/org/apache/impala/catalog/PrincipalPrivilege.java @@ -45,7 +45,7 @@ public class PrincipalPrivilege extends CatalogObjectImpl { private final TPrivilege privilege_; private PrincipalPrivilege(TPrivilege privilege) { - privilege_ = privilege; + privilege_ = Preconditions.checkNotNull(privilege); } public TPrivilege toThrift() { return privilege_; } http://git-wip-us.apache.org/repos/asf/impala/blob/b986f2a8/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java index c36c30d..9fdcb03 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java @@ -36,8 +36,14 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; +import org.apache.impala.catalog.AuthorizationPolicy; import org.apache.impala.catalog.Catalog; +import org.apache.impala.catalog.CatalogDeltaLog; +import org.apache.impala.catalog.CatalogException; +import org.apache.impala.catalog.Principal; +import org.apache.impala.catalog.PrincipalPrivilege; import org.apache.impala.catalog.HdfsPartition.FileDescriptor; +import org.apache.impala.catalog.ImpaladCatalog.ObjectUpdateSequencer; import org.apache.impala.common.InternalException; import org.apache.impala.common.Pair; import org.apache.impala.common.Reference; @@ -130,6 +136,18 @@ import com.google.errorprone.annotations.concurrent.GuardedBy; * it is no longer "linked". Over time, the old entries will naturally age out of the * cache. * + * + * Metadata that is _not_ fetched on demand + * ================================================ + * This implementation does not fetch _all_ metadata on demand. In fact, some pieces of + * metadata are currently provided in the same manner as "legacy" coordinators: the + * full metadata objects are published by the catalog daemon into the statestore, and + * we keep a full "replica" of that information. In particular, we currently use this + * strategy for Sentry metadata (roles and privileges) since the caching of this data + * is relatively more complex. Given that this data is typically quite small relative + * to the table metadata, it's not too expensive to maintain the full replica. + * + * * TODO(todd): expose statistics on a per-query and per-daemon level about cache * hit rates, number of outbound RPCs, etc. * TODO(todd): handle retry/backoff to ride over short catalog interruptions @@ -199,6 +217,12 @@ public class CatalogdMetaProvider implements MetaProvider { Catalog.INITIAL_CATALOG_VERSION); /** + * Tracks objects that have been deleted in response to a DDL issued from this + * coordinator. + */ + CatalogDeltaLog deletedObjectsLog_ = new CatalogDeltaLog(); + + /** * The last known Catalog Service ID. If the ID changes, it indicates the CatalogServer * has restarted. */ @@ -207,6 +231,12 @@ public class CatalogdMetaProvider implements MetaProvider { private final Object catalogServiceIdLock_ = new Object(); + /** + * Cache of authorization policy metadata. Populated from data pushed from the + * StateStore. Currently this is _not_ "fetch-on-demand". + */ + private final AuthorizationPolicy authPolicy_ = new AuthorizationPolicy(); + public CatalogdMetaProvider(TBackendGflags flags) { Preconditions.checkArgument(flags.isSetLocal_catalog_cache_expiration_s()); Preconditions.checkArgument(flags.isSetLocal_catalog_cache_mb()); @@ -237,6 +267,16 @@ public class CatalogdMetaProvider implements MetaProvider { return cache_.stats(); } + @Override + public AuthorizationPolicy getAuthPolicy() { + return authPolicy_; + } + + @Override + public boolean isReady() { + return lastSeenCatalogVersion_.get() > Catalog.INITIAL_CATALOG_VERSION; + } + /** * Send a GetPartialCatalogObject request to catalogd. This handles converting * non-OK status responses back to exceptions, performing various generic sanity @@ -698,14 +738,23 @@ public class CatalogdMetaProvider implements MetaProvider { * indicate that the catalogd representation of the object has changed and therefore * needs to be invalidated in the impalad. */ - public TUpdateCatalogCacheResponse updateCatalogCache(TUpdateCatalogCacheRequest req) { + public synchronized TUpdateCatalogCacheResponse updateCatalogCache( + TUpdateCatalogCacheRequest req) { if (req.isSetCatalog_service_id()) { witnessCatalogServiceId(req.catalog_service_id); } + // We might see a top-level catalog version number while visiting the objects. If so, + // we'll capture it here and process it down at the end after applying all other + // objects. + Long nextCatalogVersion = null; + + ObjectUpdateSequencer authObjectSequencer = new ObjectUpdateSequencer(); + Pair<Boolean, ByteBuffer> update; while ((update = FeSupport.NativeGetNextCatalogObjectUpdate(req.native_iterator_ptr)) != null) { + boolean isDelete = update.first; TCatalogObject obj = new TCatalogObject(); try { obj.read(new TBinaryProtocol(new TByteBuffer(update.second))); @@ -717,18 +766,53 @@ public class CatalogdMetaProvider implements MetaProvider { "coordinator.", e); continue; } + if (isDelete) { + deletedObjectsLog_.addRemovedObject(obj); + } else if (deletedObjectsLog_.wasObjectRemovedAfter(obj)) { + LOG.trace("Skipping update because a matching object was removed " + + "in a later catalog version: {}", obj); + continue; + } + invalidateCacheForObject(obj); + // The sequencing of updates to authorization objects is important since they + // may be cross-referential. So, just add them to the sequencer which ensures + // we handle them in the right order later. + if (obj.type == TCatalogObjectType.PRINCIPAL || + obj.type == TCatalogObjectType.PRIVILEGE) { + authObjectSequencer.add(obj, isDelete); + } + // Handle CATALOG objects. These are sent only via the updates published via // the statestore topic, and not via the synchronous updates returned from DDLs. if (obj.type == TCatalogObjectType.CATALOG) { // The top-level CATALOG object version is used to implement SYNC_DDL. We need // to keep track of this and pass it back to the C++ code in the return value - // of this call. - lastSeenCatalogVersion_.set(obj.catalog_version); + // of this call. This is also used to know when the catalog is ready at + // startup. + nextCatalogVersion = obj.catalog_version; witnessCatalogServiceId(obj.catalog.catalog_service_id); } } + + for (TCatalogObject obj : authObjectSequencer.getUpdatedObjects()) { + updateAuthPolicy(obj, /*isDelete=*/false); + } + for (TCatalogObject obj : authObjectSequencer.getDeletedObjects()) { + updateAuthPolicy(obj, /*isDelete=*/true); + } + + deletedObjectsLog_.garbageCollect(lastSeenCatalogVersion_.get()); + + // NOTE: it's important to defer setting the new catalog version until the + // end of the loop, since the CATALOG object might be one of the first objects + // processed, and we don't want to prematurely indicate that we are done processing + // the update. + if (nextCatalogVersion != null) { + lastSeenCatalogVersion_.set(nextCatalogVersion); + } + // TODO(IMPALA-7506) 'minVersion' here should be the minimum version of any object // that we have in our cache. This is used to make global INVALIDATE METADATA' // operations synchronous. The flow is as follows for v1 impalads: @@ -757,6 +841,42 @@ public class CatalogdMetaProvider implements MetaProvider { } } + private void updateAuthPolicy(TCatalogObject obj, boolean isDelete) { + LOG.trace("Updating authorization policy: {} isDelete={}", obj, isDelete); + switch (obj.type) { + case PRINCIPAL: + if (!isDelete) { + Principal principal = Principal.fromThrift(obj.getPrincipal()); + principal.setCatalogVersion(obj.getCatalog_version()); + authPolicy_.addPrincipal(principal); + } else { + authPolicy_.removePrincipalIfLowerVersion(obj.getPrincipal(), + obj.getCatalog_version()); + } + break; + case PRIVILEGE: + if (!isDelete) { + // TODO(todd): duplicate code from ImpaladCatalog. + PrincipalPrivilege privilege = + PrincipalPrivilege.fromThrift(obj.getPrivilege()); + privilege.setCatalogVersion(obj.getCatalog_version()); + try { + authPolicy_.addPrivilege(privilege); + } catch (CatalogException e) { + // TODO(todd) it's odd that we swallow this error, both here and in + // the original code in ImpaladCatalog. + LOG.error("Error adding privilege: ", e); + } + } else { + authPolicy_.removePrivilegeIfLowerVersion(obj.getPrivilege(), + obj.getCatalog_version()); + } + break; + default: + throw new IllegalArgumentException("invalid type: " + obj.type); + } + } + /** * Witness a service ID received from the catalog. We can see the service IDs * either from a DDL response (in which case the service ID is part of the RPC @@ -775,6 +895,11 @@ public class CatalogdMetaProvider implements MetaProvider { } catalogServiceId_ = serviceId; cache_.invalidateAll(); + // TODO(todd): we probably need to invalidate the auth policy too. + // we are probably better off detecting this at a higher level and + // reinstantiating the metaprovider entirely, similar to how ImpaladCatalog + // handles this. + // TODO(todd): slight race here: a concurrent request from the old catalog // could theoretically be just about to write something back into the cache // after we do the above invalidate. Maybe we would be better off replacing http://git-wip-us.apache.org/repos/asf/impala/blob/b986f2a8/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java index 8bba217..6f13755 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; +import org.apache.impala.catalog.AuthorizationPolicy; import org.apache.impala.catalog.HdfsPartition.FileDescriptor; import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.MetaStoreClientPool; @@ -81,6 +82,19 @@ class DirectMetaProvider implements MetaProvider { } } + + @Override + public AuthorizationPolicy getAuthPolicy() { + throw new UnsupportedOperationException("not supported"); + } + + @Override + public boolean isReady() { + // Direct provider is always ready since we don't need to wait for + // an update from any external process. + return true; + } + @Override public ImmutableList<String> loadDbList() throws TException { try (MetaStoreClient c = msClientPool_.getClient()) { http://git-wip-us.apache.org/repos/asf/impala/blob/b986f2a8/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java index 4f72702..328f3b1 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java @@ -209,6 +209,9 @@ public class LocalCatalog implements FeCatalog { @Override public void waitForCatalogUpdate(long timeoutMs) { // No-op for local catalog. + // TODO(todd): Frontend.waitForCatalog() gets called at startup and ends + // up being a tight loop that spews logs unless we do something to wait better + // here. } @Override @@ -218,7 +221,7 @@ public class LocalCatalog implements FeCatalog { @Override public AuthorizationPolicy getAuthPolicy() { - return null; // TODO(todd): implement auth policy + return metaProvider_.getAuthPolicy(); } @Override @@ -241,13 +244,13 @@ public class LocalCatalog implements FeCatalog { @Override public boolean isReady() { - // We are always ready. - return true; + return metaProvider_.isReady(); } @Override public void setIsReady(boolean isReady) { // No-op for local catalog. + // This appears to only be used in some tests. } MetaProvider getMetaProvider() { http://git-wip-us.apache.org/repos/asf/impala/blob/b986f2a8/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java index c5497bd..5431e30 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; +import org.apache.impala.catalog.AuthorizationPolicy; import org.apache.impala.catalog.HdfsPartition.FileDescriptor; import org.apache.impala.common.Pair; import org.apache.impala.thrift.TNetworkAddress; @@ -42,11 +43,20 @@ import com.google.errorprone.annotations.Immutable; * * Implementations may directly access the metadata from the source systems * or may include caching, etc. - * - * TODO(IMPALA-7127): expand this to include file metadata, sentry metadata, - * etc. */ interface MetaProvider { + + /** + * Get the authorization policy. This acts as a repository of authorization + * metadata. + */ + AuthorizationPolicy getAuthPolicy(); + + /** + * Return true if the metaprovider is ready to service requests. + */ + boolean isReady(); + ImmutableList<String> loadDbList() throws TException; Database loadDb(String dbName) throws TException; http://git-wip-us.apache.org/repos/asf/impala/blob/b986f2a8/tests/common/custom_cluster_test_suite.py ---------------------------------------------------------------------- diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py index d9ce584..88b3875 100644 --- a/tests/common/custom_cluster_test_suite.py +++ b/tests/common/custom_cluster_test_suite.py @@ -18,8 +18,10 @@ # Superclass for all tests that need a custom cluster. # TODO: Configure cluster size and other parameters. +import logging import os import os.path +import pipes import pytest import re from subprocess import check_call @@ -156,6 +158,8 @@ class CustomClusterTestSuite(ImpalaTestSuite): if os.environ.get("ERASURE_CODING") == "true": cmd.append("--impalad_args=--default_query_options=allow_erasure_coded_files=true") + logging.info("Starting cluster with command: %s" % + " ".join(pipes.quote(arg) for arg in cmd + options)) try: check_call(cmd + options, close_fds=True) finally: