Revert "SENTRY-1769 Refactor HMSFollower Class (Kalyan Kumar Kalvagadda reviewed by Vamsee Yarlagadda, Na Li, Sergio Pena and Alexander Kolbasov)"
This reverts commit e5bb466efc621318c69f4a929dea3e39a77962af. Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/c56f48cc Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/c56f48cc Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/c56f48cc Branch: refs/heads/sentry-ha-redesign Commit: c56f48cc62e5b346d6c8432ccfa07d7f9fa4f110 Parents: e5bb466 Author: Kalyan Kumar Kalvagadda <kkal...@cloudera.com> Authored: Thu Jul 13 18:57:20 2017 -0500 Committer: Kalyan Kumar Kalvagadda <kkal...@cloudera.com> Committed: Thu Jul 13 18:57:20 2017 -0500 ---------------------------------------------------------------------- .../apache/sentry/hdfs/PathImageRetriever.java | 13 +- .../org/apache/sentry/hdfs/SentryPlugin.java | 40 +- .../provider/db/SentryPolicyStorePlugin.java | 4 +- .../db/service/model/MAuthzPathsSnapshotId.java | 2 +- .../service/model/MSentryHmsNotification.java | 8 +- .../db/service/persistent/PathsImage.java | 10 +- .../db/service/persistent/SentryStore.java | 13 +- .../thrift/SentryPolicyStoreProcessor.java | 5 +- .../sentry/service/thrift/HMSFollower.java | 714 ++++++++++++++----- .../service/thrift/NotificationProcessor.java | 571 ++++----------- .../sentry/service/thrift/SentryHmsClient.java | 244 ------- .../sentry/service/thrift/SentryService.java | 17 +- .../service/thrift/SentryServiceUtil.java | 35 +- .../sentry/service/thrift/ServiceConstants.java | 2 +- .../db/service/persistent/TestSentryStore.java | 1 + .../sentry/service/thrift/TestHMSFollower.java | 348 +++------ .../thrift/TestNotificationProcessor.java | 465 ------------ .../service/thrift/TestSentryHmsClient.java | 470 ------------ .../TestDbPrivilegeCleanupOnDrop.java | 4 +- 19 files changed, 820 insertions(+), 2146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java index ac5c5b2..2426b40 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java @@ -25,14 +25,9 @@ import org.apache.sentry.provider.db.service.persistent.PathsImage; import org.apache.sentry.provider.db.service.persistent.SentryStore; import javax.annotation.concurrent.ThreadSafe; - import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; -import javax.annotation.concurrent.ThreadSafe; -import org.apache.sentry.hdfs.service.thrift.TPathChanges; -import org.apache.sentry.provider.db.service.persistent.PathsImage; -import org.apache.sentry.provider.db.service.persistent.SentryStore; /** * PathImageRetriever obtains a complete snapshot of Hive Paths from a persistent @@ -42,10 +37,10 @@ import org.apache.sentry.provider.db.service.persistent.SentryStore; * It is a thread safe class, as all the underlying database operation is thread safe. */ @ThreadSafe -class PathImageRetriever implements ImageRetriever<PathsUpdate> { +public class PathImageRetriever implements ImageRetriever<PathsUpdate> { - private static final String[] root = {"/"}; private final SentryStore sentryStore; + private static final String[] root = {"/"}; PathImageRetriever(SentryStore sentryStore) { this.sentryStore = sentryStore; @@ -60,8 +55,8 @@ class PathImageRetriever implements ImageRetriever<PathsUpdate> { // persistent storage, along with the sequence number of latest // delta change the snapshot corresponds to. PathsImage pathsImage = sentryStore.retrieveFullPathsImage(); + long curSeqNum = pathsImage.getCurSeqNum(); long curImgNum = pathsImage.getCurImgNum(); - long curSeqNum = pathsImage.getId(); Map<String, Set<String>> pathImage = pathsImage.getPathImage(); // Translates the complete Hive paths snapshot into a PathsUpdate. @@ -78,7 +73,7 @@ class PathImageRetriever implements ImageRetriever<PathsUpdate> { } SentryHdfsMetricsUtil.getPathChangesHistogram.update(pathsUpdate - .getPathChanges().size()); + .getPathChanges().size()); // Translate PathsUpdate that contains a full image to TPathsDump for // consumer (NN) to be able to quickly construct UpdateableAuthzPaths http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java index 0c3ba5b..d6100de 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java @@ -23,14 +23,12 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; -import org.apache.sentry.core.common.exception.SentryInvalidInputException; import org.apache.sentry.core.common.utils.SigUtils; import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges; import org.apache.sentry.hdfs.service.thrift.TRoleChanges; import org.apache.sentry.provider.db.SentryPolicyStorePlugin; import org.apache.sentry.provider.db.service.persistent.SentryStore; -import org.apache.sentry.service.thrift.SentryServiceUtil; import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleAddGroupsRequest; import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleDeleteGroupsRequest; import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleGrantPrivilegeRequest; @@ -40,7 +38,7 @@ import org.apache.sentry.provider.db.service.thrift.TDropSentryRoleRequest; import org.apache.sentry.provider.db.service.thrift.TRenamePrivilegesRequest; import org.apache.sentry.provider.db.service.thrift.TSentryGroup; import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege; -import org.apache.sentry.service.thrift.HmsFollower; +import org.apache.sentry.service.thrift.HMSFollower; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +56,7 @@ import static org.apache.sentry.hdfs.service.thrift.sentry_hdfs_serviceConstants * <ol> * <li> * Whenever updates happen on HMS, a corresponding notification log is generated, - * and {@link HmsFollower} will process the notification event and persist it in database. + * and {@link HMSFollower} will process the notification event and persist it in database. * <li> * The NameNode periodically asks Sentry for updates. Sentry may return zero * or more updates previously received via HMS notification log. @@ -242,22 +240,16 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen @Override public Update onRenameSentryPrivilege(TRenamePrivilegesRequest request) - throws SentryPluginException, SentryInvalidInputException{ - String oldAuthz = null; - String newAuthz = null; - try { - oldAuthz = SentryServiceUtil.getAuthzObj(request.getOldAuthorizable()); - newAuthz = SentryServiceUtil.getAuthzObj(request.getNewAuthorizable()); - } catch (SentryInvalidInputException failure) { - LOGGER.error("onRenameSentryPrivilege, Could not rename sentry privilege ", failure); - throw failure; - } + throws SentryPluginException { + String oldAuthz = HMSFollower.getAuthzObj(request.getOldAuthorizable()); + String newAuthz = HMSFollower.getAuthzObj(request.getNewAuthorizable()); PermissionsUpdate update = new PermissionsUpdate(); TPrivilegeChanges privUpdate = update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS); privUpdate.putToAddPrivileges(newAuthz, newAuthz); privUpdate.putToDelPrivileges(oldAuthz, oldAuthz); - LOGGER.debug("onRenameSentryPrivilege, Authz Perm preUpdate [ {} ]", oldAuthz); + LOGGER.debug(String.format("onRenameSentryPrivilege, Authz Perm preUpdate [ %s ]", + oldAuthz)); return update; } @@ -291,7 +283,8 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen update.addPrivilegeUpdate(authzObj).putToDelPrivileges( roleName, privilege.getAction().toUpperCase()); - LOGGER.debug("onAlterSentryRoleRevokePrivilegeCore, Authz Perm preUpdate [ {} ]", authzObj); + LOGGER.debug(String.format("onAlterSentryRoleRevokePrivilegeCore, Authz Perm preUpdate [ %s ]", + authzObj)); return update; } @@ -303,7 +296,8 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen request.getRoleName(), PermissionsUpdate.ALL_AUTHZ_OBJ); update.addRoleUpdate(request.getRoleName()).addToDelGroups(PermissionsUpdate.ALL_GROUPS); - LOGGER.debug("onDropSentryRole, Authz Perm preUpdate [ {} ]", request.getRoleName()); + LOGGER.debug(String.format("onDropSentryRole, Authz Perm preUpdate [ %s ]", + request.getRoleName())); return update; } @@ -311,18 +305,12 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen public Update onDropSentryPrivilege(TDropPrivilegesRequest request) throws SentryPluginException { PermissionsUpdate update = new PermissionsUpdate(); - String authzObj = null; - try { - authzObj = SentryServiceUtil.getAuthzObj(request.getAuthorizable()); - } catch (SentryInvalidInputException failure) { - LOGGER.error("onDropSentryPrivilege, Could not drop sentry privilege " - + failure.toString(), failure); - throw new SentryPluginException(failure.getMessage(), failure); - } + String authzObj = HMSFollower.getAuthzObj(request.getAuthorizable()); update.addPrivilegeUpdate(authzObj).putToDelPrivileges( PermissionsUpdate.ALL_ROLES, PermissionsUpdate.ALL_ROLES); - LOGGER.debug("onDropSentryPrivilege, Authz Perm preUpdate [ {} ]", authzObj); + LOGGER.debug(String.format("onDropSentryPrivilege, Authz Perm preUpdate [ %s ]", + authzObj)); return update; } http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java index a22b422..5b8a572 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java @@ -19,7 +19,6 @@ package org.apache.sentry.provider.db; import org.apache.hadoop.conf.Configuration; -import org.apache.sentry.core.common.exception.SentryInvalidInputException; import org.apache.sentry.core.common.exception.SentryUserException; import org.apache.sentry.provider.db.service.persistent.SentryStore; import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleAddGroupsRequest; @@ -69,8 +68,7 @@ public interface SentryPolicyStorePlugin { Update onDropSentryRole(TDropSentryRoleRequest tRequest) throws SentryPluginException; - Update onRenameSentryPrivilege(TRenamePrivilegesRequest request) - throws SentryPluginException, SentryInvalidInputException; + Update onRenameSentryPrivilege(TRenamePrivilegesRequest request) throws SentryPluginException; Update onDropSentryPrivilege(TDropPrivilegesRequest request) throws SentryPluginException; } http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java index d8d54f3..d683c2c 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java @@ -22,7 +22,7 @@ import javax.jdo.annotations.PrimaryKey; /** * This class is used to persist new authz paths snapshots IDs. An authz path snapshot ID is required by - * the MAuthzPathsMapping to detect new HMS snapshots created by the HmsFollower. + * the MAuthzPathsMapping to detect new HMS snapshots created by the HMSFollower. */ @PersistenceCapable public class MAuthzPathsSnapshotId { http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java index 166bec7..0d54548 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java @@ -20,13 +20,15 @@ package org.apache.sentry.provider.db.service.model; /** * Database backend store for HMS Notification ID's. All the notifications that are processed * by sentry are stored. - * <p> + */ + +/* * <p> HMS notification ID's are stored in separate table for three reasons</p> * <ol> * <li>SENTRY_PATH_CHANGE is not updated for every notification that is received from HMS. There - * are cases where HmsFollower doesn't process notifications and skip's them. Depending on + * are cases where HMSFollower doesn't process notifications and skip's them. Depending on * SENTRY_PATH_CHANGE information may not provide the last notification processed.</li> - * <li> There could be cases where HmsFollower thread in multiple sentry servers acting as a + * <li> There could be cases where HMSFollower thread in multiple sentry servers acting as a * leader and process HMS notifications. we need to avoid processing the notifications * multiple times. This can be made sure by always having some number of notification * information always regardless of purging interval.</li> http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java index 409a557..4d852e6 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java @@ -31,17 +31,17 @@ public class PathsImage { // A full image of hiveObj to Paths mapping. private final Map<String, Set<String>> pathImage; - private final long id; + private final long curSeqNum; private final long curImgNum; - public PathsImage(Map<String, Set<String>> pathImage, long id, long curImgNum) { + public PathsImage(Map<String, Set<String>> pathImage, long curSeqNum, long curImgNum) { this.pathImage = pathImage; - this.id = id; + this.curSeqNum = curSeqNum; this.curImgNum = curImgNum; } - public long getId() { - return id; + public long getCurSeqNum() { + return curSeqNum; } public long getCurImgNum() { http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java index 7b02e2c..979e45b 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java @@ -107,7 +107,7 @@ import static org.apache.sentry.provider.db.service.persistent.QueryParamBuilder * single node and rely on DB for multi-node synchronization. * <p> * This isn't much of a problem for path updates since they are - * driven by HmsFollower which usually runs on a single leader + * driven by HMSFollower which usually runs on a single leader * node, but permission updates originate from clients * directly and may be highly concurrent. * <p> @@ -151,7 +151,7 @@ public class SentryStore { private static final long COUNT_VALUE_UNKNOWN = -1L; // Representation for unknown HMS notification ID - public static final long NOTIFICATION_UNKNOWN = -1L; + private static final long NOTIFICATION_UNKNOWN = -1L; private static final Set<String> ALL_ACTIONS = Sets.newHashSet(AccessConstants.ALL, AccessConstants.SELECT, AccessConstants.INSERT, AccessConstants.ALTER, @@ -169,8 +169,8 @@ public class SentryStore { private final TransactionManager tm; /** - * counterWait is used to synchronize notifications between Thrift and HmsFollower. - * Technically it doesn't belong here, but the only thing that connects HmsFollower + * counterWait is used to synchronize notifications between Thrift and HMSFollower. + * Technically it doesn't belong here, but the only thing that connects HMSFollower * and Thrift API is SentryStore. An alternative could be a singleton CounterWait or * some factory that returns CounterWait instances keyed by name, but this complicates * things unnecessary. @@ -2674,7 +2674,7 @@ public class SentryStore { /** * Persist an up-to-date HMS snapshot into Sentry DB in a single transaction. * - * @param authzPaths paths to be be persisted + * @param authzPaths Mapping of hiveObj to < Paths < * @throws Exception */ public void persistFullPathsImage(final Map<String, Set<String>> authzPaths) throws Exception { @@ -2685,6 +2685,7 @@ public class SentryStore { long snapshotID = getCurrentAuthzPathsSnapshotID(pm); long nextSnapshotID = snapshotID + 1; + pm.makePersistent(new MAuthzPathsSnapshotId(nextSnapshotID)); for (Map.Entry<String, Set<String>> authzPath : authzPaths.entrySet()) { pm.makePersistent(new MAuthzPathsMapping(nextSnapshotID, authzPath.getKey(), authzPath.getValue())); @@ -3703,7 +3704,7 @@ public class SentryStore { * * @param pm the PersistenceManager * @return EMPTY_NOTIFICATION_ID(0) when there are no notifications processed. - * else last NotificationID processed by HmsFollower + * else last NotificationID processed by HMSFollower */ static Long getLastProcessedNotificationIDCore( PersistenceManager pm) { http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java index cfd0e30..ad23334 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java @@ -945,10 +945,7 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { } catch (SentryThriftAPIMismatchException e) { LOGGER.error(e.getMessage(), e); response.setStatus(Status.THRIFT_VERSION_MISMATCH(e.getMessage(), e)); - } catch (SentryInvalidInputException e) { - response.setStatus(Status.InvalidInput(e.getMessage(), e)); - } - catch (Exception e) { + } catch (Exception e) { String msg = "Unknown error for request: " + request + ", message: " + e.getMessage(); LOGGER.error(msg, e); http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java index a9d05b1..1b6ae18 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java @@ -1,275 +1,641 @@ -/* - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - <p> - http://www.apache.org/licenses/LICENSE-2.0 - <p> - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ - package org.apache.sentry.service.thrift; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Timer; +import com.codahale.metrics.Timer.Context; import com.google.common.annotations.VisibleForTesting; - -import java.net.SocketException; - -import java.util.Collection; -import javax.jdo.JDODataStoreException; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hive.hcatalog.messaging.HCatEventMessage; import org.apache.sentry.binding.hive.conf.HiveAuthzConf; -import org.apache.sentry.provider.db.service.persistent.PathsImage; +import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException; +import org.apache.sentry.core.common.exception.SentryInvalidInputException; +import org.apache.sentry.core.common.exception.SentryNoSuchObjectException; +import org.apache.sentry.hdfs.PermissionsUpdate; +import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges; +import org.apache.sentry.provider.db.SentryPolicyStorePlugin; import org.apache.sentry.provider.db.service.persistent.SentryStore; +import org.apache.sentry.provider.db.service.thrift.SentryMetrics; +import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.sentry.binding.metastore.messaging.json.*; + +import javax.jdo.JDODataStoreException; +import javax.security.auth.login.LoginException; + +import java.io.IOException; +import java.net.SocketException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; + +import static com.codahale.metrics.MetricRegistry.name; +import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE; +import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_DROP_WITH_POLICY_STORE; +import static org.apache.sentry.hdfs.Updateable.Update; /** - * HmsFollower is the thread which follows the Hive MetaStore state changes from Sentry. + * HMSFollower is the thread which follows the Hive MetaStore state changes from Sentry. * It gets the full update and notification logs from HMS and applies it to * update permissions stored in Sentry using SentryStore and also update the < obj,path > state * stored for HDFS-Sentry sync. */ -public class HmsFollower implements Runnable, AutoCloseable { - - private static final Logger LOGGER = LoggerFactory.getLogger(HmsFollower.class); - private static boolean connectedToHms = false; - private final SentryHmsClient client; +@SuppressWarnings("PMD") +public class HMSFollower implements Runnable, AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(HMSFollower.class); + private HiveSimpleConnectionFactory hiveConnectionFactory; + // Track the latest eventId of the event that has been logged. So we don't log the same message + private long lastLoggedEventId = SentryStore.EMPTY_CHANGE_ID; + private static boolean connectedToHMS = false; + private HMSClient client; private final Configuration authzConf; private final SentryStore sentryStore; - private final NotificationProcessor notificationProcessor; + private String hiveInstance; + private boolean needLogHMSSupportReady = true; private final LeaderStatusMonitor leaderMonitor; - /** - * Configuring Hms Follower thread. - * - * @param conf sentry configuration - * @param store sentry store - * @param leaderMonitor singleton instance of LeaderStatusMonitor - */ - HmsFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor, - HiveSimpleConnectionFactory hiveConnectionFactory) { - this(conf, store, leaderMonitor, hiveConnectionFactory, null); - } + private static final String SNAPSHOT = "snapshot"; + /** Measures time to get full snapshot */ + private final Timer updateTimer = SentryMetrics.getInstance() + .getTimer(name(FullUpdateInitializer.class, SNAPSHOT)); + /** Number of times update failed */ + private final Counter failedSnapshotsCount = SentryMetrics.getInstance() + .getCounter(name(FullUpdateInitializer.class, "failed")); - @VisibleForTesting - /** - * Constructor should be used only for testing purposes. - * - * @param conf sentry configuration - * @param store sentry store - * @param leaderMonitor - * @param authServerName Server that sentry is Authorizing - */ - HmsFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor, - HiveSimpleConnectionFactory hiveConnectionFactory, String authServerName) { - LOGGER.info("HmsFollower is being initialized"); + HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor, + HiveSimpleConnectionFactory hiveConnectionFactory) { authzConf = conf; this.leaderMonitor = leaderMonitor; sentryStore = store; - if (authServerName == null) { - HiveConf hiveConf = new HiveConf(); - authServerName = hiveConf.get(HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME.getVar()); - } - notificationProcessor = new NotificationProcessor(sentryStore, authServerName, authzConf); - client = new SentryHmsClient(authzConf, hiveConnectionFactory); + this.hiveConnectionFactory = hiveConnectionFactory; + } + + @VisibleForTesting + HMSFollower(Configuration conf, SentryStore sentryStore, String hiveInstance) + throws IOException, LoginException { + this(conf, sentryStore, null, null); + this.hiveInstance = hiveInstance; + hiveConnectionFactory = new HiveSimpleConnectionFactory(conf, new HiveConf()); + hiveConnectionFactory.init(); } @VisibleForTesting - public static boolean isConnectedToHms() { - return connectedToHms; + public static boolean isConnectedToHMS() { + return connectedToHMS; } @Override public void close() { - if (client != null) { - // Close any outstanding connections to HMS - try { - client.disconnect(); - } catch (Exception failure) { - LOGGER.error("Failed to close the Sentry Hms Client", failure); - } + // Close any outstanding connections to HMS + closeHMSConnection(); + try { + hiveConnectionFactory.close(); + } catch (Exception e) { + LOGGER.error("failed to close Hive Connection Factory", e); } } + /** + * Returns HMS Client if successful, returns null if HMS is not ready yet to take connections + * Throws @LoginException if Kerberos context creation failed using Sentry's kerberos credentials + * Throws @MetaException if there was a problem on creating an HMSClient + */ + private HiveMetaStoreClient getMetaStoreClient() + throws IOException, InterruptedException, MetaException { + if (client == null) { + client = hiveConnectionFactory.connect(); + connectedToHMS = true; + } + return client.getClient(); + } + @Override public void run() { - long lastProcessedNotificationId; + Long lastProcessedNotificationID; try { - // Initializing lastProcessedNotificationId based on the latest persisted notification ID. - lastProcessedNotificationId = sentryStore.getLastProcessedNotificationID(); + // Initializing lastProcessedNotificationID based on the latest persisted notification ID. + lastProcessedNotificationID = sentryStore.getLastProcessedNotificationID(); } catch (Exception e) { - LOGGER.error("Failed to get the last processed notification id from sentry store, " - + "Skipping the processing", e); + LOGGER.error("Failed to get the last processed notification id from sentry store, " + + "Skipping the processing", e); return; } // Wake any clients connected to this service waiting for HMS already processed notifications. - wakeUpWaitingClientsForSync(lastProcessedNotificationId); + wakeUpWaitingClientsForSync(lastProcessedNotificationID); // Only the leader should listen to HMS updates if ((leaderMonitor != null) && !leaderMonitor.isLeader()) { // Close any outstanding connections to HMS - close(); + closeHMSConnection(); return; } - syncupWithHms(lastProcessedNotificationId); + processHiveMetastoreUpdates(); } /** - * Processes new Hive Metastore notifications. - * - * <p>If no notifications are processed yet, then it - * does a full initial snapshot of the Hive Metastore followed by new notifications updates that - * could have happened after it. + * Wakes up HMS waiters waiting for a specific event notification. * - * <p>Clients connections waiting for an event notification will be - * woken up afterwards. + * @param eventID */ - private void syncupWithHms(long notificationId) { - try { - client.connect(); - connectedToHms = true; - } catch (Throwable e) { - LOGGER.error("HmsFollower cannot connect to HMS!!", e); - return; + private void wakeUpWaitingClientsForSync(long eventID) { + CounterWait counterWait = sentryStore.getCounterWait(); + + // Wake up any HMS waiters that are waiting for this ID. + // counterWait should never be null, but tests mock SentryStore and a mocked one + // doesn't have it. + if (counterWait != null) { + counterWait.update(eventID); } + } + /** + * Processes new Hive Metastore notifications. + * + * If no notifications are processed yet, then it does a full initial snapshot of the Hive Metastore + * followed by new notifications updates that could have happened after it. + * + * Clients connections waiting for an event notification will be woken up afterwards. + */ + private void processHiveMetastoreUpdates() { try { - long lastProcessedNotificationId = notificationId; - // Create a full HMS snapshot if there is none // Decision of taking full snapshot is based on AuthzPathsMapping information persisted - // in the sentry persistent store. If AuthzPathsMapping is empty, snapshot is needed. + // in the sentry persistent store. If AuthzPathsMapping is empty, shapshot is needed. + Long lastProcessedNotificationID; if (sentryStore.isAuthzPathsMappingEmpty()) { - lastProcessedNotificationId = createFullSnapshot(); - if (lastProcessedNotificationId == SentryStore.EMPTY_NOTIFICATION_ID) { + // TODO: expose time used for full update in the metrics + + // To ensure point-in-time snapshot consistency, need to make sure + // there were no HMS updates while retrieving the snapshot. + // In detail the logic is: + // + // 1. Read current HMS notification ID_initial + // 2. Read HMS metadata state + // 3. Read current notification ID_new + // 4. If ID_initial != ID_new then the attempts for retrieving full HMS snapshot + // will be dropped. A new attempts will be made after 500 milliseconds when + // HMSFollower run again. + + CurrentNotificationEventId eventIDBefore = getMetaStoreClient().getCurrentNotificationEventId(); + LOGGER.info("Before fetching hive full snapshot, Current NotificationID = {}", eventIDBefore); + + Map<String, Set<String>> pathsFullSnapshot = fetchFullUpdate(); + if(pathsFullSnapshot.isEmpty()) { + LOGGER.info("Hive full snapshot is Empty. Perhaps, HMS does not have any data"); return; } + + CurrentNotificationEventId eventIDAfter = getMetaStoreClient().getCurrentNotificationEventId(); + LOGGER.info("After fetching hive full snapshot, Current NotificationID = {}", eventIDAfter); + + if (!eventIDBefore.equals(eventIDAfter)) { + LOGGER.error("Fail to get a point-in-time hive full snapshot. Current ID = {}", + eventIDAfter); + return; + } + + LOGGER.info("Successfully fetched hive full snapshot, Current NotificationID = {}", + eventIDAfter); + // As eventIDAfter is the last event that was processed, eventIDAfter is used to update + // lastProcessedNotificationID instead of getting it from persistent store. + lastProcessedNotificationID = eventIDAfter.getEventId(); + sentryStore.persistFullPathsImage(pathsFullSnapshot); + sentryStore.persistLastProcessedNotificationID(eventIDAfter.getEventId()); + // Wake up any HMS waiters that could have been put on hold before getting the eventIDBefore value. + wakeUpWaitingClientsForSync(lastProcessedNotificationID); + } else { + // Every time HMSFollower is scheduled to run, value should be updates based + // on the value stored in database. + lastProcessedNotificationID = sentryStore.getLastProcessedNotificationID(); + } + + // HMSFollower connected to HMS and it finished full snapshot if that was required + // Log this message only once + if (needLogHMSSupportReady && connectedToHMS) { + LOGGER.info("Sentry HMS support is ready"); + needLogHMSSupportReady = false; + } + + // HIVE-15761: Currently getNextNotification API may return an empty + // NotificationEventResponse causing TProtocolException. + // Workaround: Only processes the notification events newer than the last updated one. + CurrentNotificationEventId eventId = getMetaStoreClient().getCurrentNotificationEventId(); + LOGGER.debug("Last Notification in HMS {} lastProcessedNotificationID is {}", + eventId.getEventId(), lastProcessedNotificationID); + if (eventId.getEventId() > lastProcessedNotificationID) { + NotificationEventResponse response = + getMetaStoreClient().getNextNotification(lastProcessedNotificationID, Integer.MAX_VALUE, null); + if (response.isSetEvents()) { + if (!response.getEvents().isEmpty()) { + if (lastProcessedNotificationID != lastLoggedEventId) { + // Only log when there are updates and the notification ID has changed. + LOGGER.debug("lastProcessedNotificationID = {}. Processing {} events", + lastProcessedNotificationID, response.getEvents().size()); + lastLoggedEventId = lastProcessedNotificationID; + } + + processNotificationEvents(response.getEvents()); + } + } } - // Get the new notification from HMS and process them. - processNotifications(client.getNotifications(lastProcessedNotificationId)); } catch (TException e) { - // If the underlying exception is around socket exception, - // it is better to retry connection to HMS + // If the underlying exception is around socket exception, it is better to retry connection to HMS if (e.getCause() instanceof SocketException) { - LOGGER.error("Encountered Socket Exception during fetching Notification entries," - + " will attempt to reconnect to HMS after configured interval", e); - close(); + LOGGER.error("Encountered Socket Exception during fetching Notification entries, will reconnect to HMS", e); + client.invalidate(); + closeHMSConnection(); } else { - LOGGER.error("ThriftException occurred communicating with HMS", e); + LOGGER.error("ThriftException occured fetching Notification entries, will try", e); } + } catch (SentryInvalidInputException | SentryInvalidHMSEventException e) { + LOGGER.error("Encounter SentryInvalidInputException|SentryInvalidHMSEventException " + + "while processing notification log", e); } catch (Throwable t) { // catching errors to prevent the executor to halt. - LOGGER.error("Exception in HmsFollower! Caused by: " + t.getMessage(), - t); + LOGGER.error("Caught unexpected exception in HMSFollower! Caused by: " + t.getMessage(), + t.getCause()); + t.printStackTrace(); } } /** - * Request for full snapshot and persists it if there is no snapshot available in the - * sentry store. Also, wakes-up any waiting clients. - * - * @return ID of last notification processed. - * @throws Exception if there are failures + * Function to close HMS connection and any associated kerberos context (if applicable) */ - private long createFullSnapshot() throws Exception { - LOGGER.debug("Attempting to take full HMS snapshot"); - PathsImage snapshotInfo = client.getFullSnapshot(); - if (snapshotInfo.getPathImage().isEmpty()) { - return snapshotInfo.getId(); - } + private void closeHMSConnection() { try { - LOGGER.debug("Persisting HMS path full snapshot"); - sentryStore.persistFullPathsImage(snapshotInfo.getPathImage()); - sentryStore.persistLastProcessedNotificationID(snapshotInfo.getId()); - } catch (Exception failure) { - LOGGER.error("Received exception while persisting HMS path full snapshot "); - throw failure; + if (client != null) { + LOGGER.info("Closing the HMS client connection"); + client.close(); + connectedToHMS = false; + } + } finally { + client = null; } - // Wake up any HMS waiters that could have been put on hold before getting the - // eventIDBefore value. - wakeUpWaitingClientsForSync(snapshotInfo.getId()); - // HmsFollower connected to HMS and it finished full snapshot if that was required - // Log this message only once - LOGGER.info("Sentry HMS support is ready"); - return snapshotInfo.getId(); } /** - * Process the collection of notifications and wake up any waiting clients. - * Also, persists the notification ID regardless of processing result. + * Retrieve a Hive full snapshot from HMS. * - * @param events list of event to be processed - * @throws Exception if the complete notification list is not processed because of JDO Exception + * @return HMS snapshot. Snapshot consists of a mapping from auth object name + * to the set of paths corresponding to that name. + * @throws InterruptedException + * @throws TException + * @throws ExecutionException */ - void processNotifications(Collection<NotificationEvent> events) throws Exception { - boolean isNotificationProcessed; - if (events.isEmpty()) { - return; + private Map<String, Set<String>> fetchFullUpdate() + throws TException, ExecutionException { + LOGGER.info("Request full HMS snapshot"); + try (FullUpdateInitializer updateInitializer = + new FullUpdateInitializer(hiveConnectionFactory, authzConf); + Context context = updateTimer.time()) { + Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot(); + LOGGER.info("Obtained full HMS snapshot"); + return pathsUpdate; + } catch (Exception ignored) { + failedSnapshotsCount.inc(); + // Caller will retry later + return Collections.emptyMap(); } + } + + private boolean syncWithPolicyStore(HiveAuthzConf.AuthzConfVars syncConfVar) { + return "true" + .equalsIgnoreCase((authzConf.get(syncConfVar.getVar(), syncConfVar.getDefault()))); + } + + /** + * Throws SentryInvalidHMSEventException if Notification event contains insufficient information + */ + void processNotificationEvents(List<NotificationEvent> events) throws Exception { + SentryJSONMessageDeserializer deserializer = new SentryJSONMessageDeserializer(); + + boolean isNotificationProcessingSkipped = false; for (NotificationEvent event : events) { - isNotificationProcessed = false; + String dbName; + String tableName; + String oldLocation; + String newLocation; + String location; + List<String> locations; + NotificationProcessor notificationProcessor = new NotificationProcessor(sentryStore, LOGGER); try { - // Only the leader should process the notifications - if ((leaderMonitor != null) && !leaderMonitor.isLeader()) { - return; + LOGGER.debug("Processing notification with id {} and type {}", event.getEventId(), + event.getEventType()); + switch (HCatEventMessage.EventType.valueOf(event.getEventType())) { + case CREATE_DATABASE: + SentryJSONCreateDatabaseMessage message = + deserializer.getCreateDatabaseMessage(event.getMessage()); + dbName = message.getDB(); + location = message.getLocation(); + if ((dbName == null) || (location == null)) { + isNotificationProcessingSkipped = true; + LOGGER.error(String.format("Create database event " + + "has incomplete information. dbName = %s location = %s", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(location, "null"))); + break; + } + if (syncWithPolicyStore(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE)) { + dropSentryDbPrivileges(dbName, event); + } + notificationProcessor.processCreateDatabase(dbName, location, event.getEventId()); + break; + case DROP_DATABASE: + SentryJSONDropDatabaseMessage dropDatabaseMessage = + deserializer.getDropDatabaseMessage(event.getMessage()); + dbName = dropDatabaseMessage.getDB(); + location = dropDatabaseMessage.getLocation(); + if (dbName == null) { + isNotificationProcessingSkipped = true; + LOGGER.error("Drop database event has incomplete information: dbName = null"); + break; + } + if (syncWithPolicyStore(AUTHZ_SYNC_DROP_WITH_POLICY_STORE)) { + dropSentryDbPrivileges(dbName, event); + } + notificationProcessor.processDropDatabase(dbName, location, event.getEventId()); + break; + case CREATE_TABLE: + SentryJSONCreateTableMessage createTableMessage = deserializer.getCreateTableMessage(event.getMessage()); + dbName = createTableMessage.getDB(); + tableName = createTableMessage.getTable(); + location = createTableMessage.getLocation(); + if ((dbName == null) || (tableName == null) || (location == null)) { + isNotificationProcessingSkipped = true; + LOGGER.error(String.format("Create table event " + "has incomplete information." + + " dbName = %s, tableName = %s, location = %s", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(tableName, "null"), + StringUtils.defaultIfBlank(location, "null"))); + break; + } + if (syncWithPolicyStore(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE)) { + dropSentryTablePrivileges(dbName, tableName, event); + } + notificationProcessor.processCreateTable(dbName, tableName, location, event.getEventId()); + break; + case DROP_TABLE: + SentryJSONDropTableMessage dropTableMessage = deserializer.getDropTableMessage(event.getMessage()); + dbName = dropTableMessage.getDB(); + tableName = dropTableMessage.getTable(); + if ((dbName == null) || (tableName == null)) { + isNotificationProcessingSkipped = true; + LOGGER.error(String.format("Drop table event " + + "has incomplete information. dbName = %s, tableName = %s", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(tableName, "null"))); + break; + } + if (syncWithPolicyStore(AUTHZ_SYNC_DROP_WITH_POLICY_STORE)) { + dropSentryTablePrivileges(dbName, tableName, event); + } + notificationProcessor.processDropTable(dbName, tableName, event.getEventId()); + break; + case ALTER_TABLE: + SentryJSONAlterTableMessage alterTableMessage = deserializer.getAlterTableMessage(event.getMessage()); + + String oldDbName = alterTableMessage.getDB(); + String oldTableName = alterTableMessage.getTable(); + String newDbName = event.getDbName(); + String newTableName = event.getTableName(); + oldLocation = alterTableMessage.getOldLocation(); + newLocation = alterTableMessage.getNewLocation(); + + if ((oldDbName == null) || + (oldTableName == null) || + (newDbName == null) || + (newTableName == null) || + (oldLocation == null) || + (newLocation == null)) { + isNotificationProcessingSkipped = true; + LOGGER.error(String.format("Alter table event " + + "has incomplete information. oldDbName = %s, oldTableName = %s, oldLocation = %s, " + + "newDbName = %s, newTableName = %s, newLocation = %s", + StringUtils.defaultIfBlank(oldDbName, "null"), + StringUtils.defaultIfBlank(oldTableName, "null"), + StringUtils.defaultIfBlank(oldLocation, "null"), + StringUtils.defaultIfBlank(newDbName, "null"), + StringUtils.defaultIfBlank(newTableName, "null"), + StringUtils.defaultIfBlank(newLocation, "null"))); + break; + } else if ((oldDbName.equalsIgnoreCase(newDbName)) && + (oldTableName.equalsIgnoreCase(newTableName)) && + (oldLocation.equalsIgnoreCase(newLocation))) { + isNotificationProcessingSkipped = true; + LOGGER.info(String.format("Alter table notification ignored as neither name nor " + + "location has changed: oldAuthzObj = %s, oldLocation = %s, newAuthzObj = %s, " + + "newLocation = %s", oldDbName + "." + oldTableName , oldLocation, + newDbName + "." + newTableName, newLocation)); + break; + } + + if (!newDbName.equalsIgnoreCase(oldDbName) || !oldTableName.equalsIgnoreCase(newTableName)) { + // Name has changed + try { + renamePrivileges(oldDbName, oldTableName, newDbName, newTableName); + } catch (SentryNoSuchObjectException e) { + LOGGER.info("Rename Sentry privilege ignored as there are no privileges on the table: %s.%s", + oldDbName, oldTableName); + } catch (Exception e) { + isNotificationProcessingSkipped = true; + LOGGER.info("Could not process Alter table event. Event: " + event.toString(), e); + break; + } + } + notificationProcessor.processAlterTable(oldDbName, newDbName, oldTableName, + newTableName, oldLocation, newLocation, event.getEventId()); + break; + case ADD_PARTITION: + SentryJSONAddPartitionMessage addPartitionMessage = + deserializer.getAddPartitionMessage(event.getMessage()); + dbName = addPartitionMessage.getDB(); + tableName = addPartitionMessage.getTable(); + locations = addPartitionMessage.getLocations(); + if ((dbName == null) || (tableName == null) || (locations == null)) { + isNotificationProcessingSkipped = true; + LOGGER.error(String.format("Create table event has incomplete information. " + + "dbName = %s, tableName = %s, locations = %s", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(tableName, "null"), + locations != null ? locations.toString() : "null")); + break; + } + notificationProcessor.processAddPartition(dbName, tableName, locations, event.getEventId()); + break; + case DROP_PARTITION: + SentryJSONDropPartitionMessage dropPartitionMessage = + deserializer.getDropPartitionMessage(event.getMessage()); + dbName = dropPartitionMessage.getDB(); + tableName = dropPartitionMessage.getTable(); + locations = dropPartitionMessage.getLocations(); + if ((dbName == null) || (tableName == null) || (locations == null)) { + isNotificationProcessingSkipped = true; + LOGGER.error(String.format("Drop partition event " + + "has incomplete information. dbName = %s, tableName = %s, location = %s", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(tableName, "null"), + locations != null ? locations.toString() : "null")); + break; + } + notificationProcessor.processDropPartition(dbName, tableName, locations, event.getEventId()); + + break; + case ALTER_PARTITION: + SentryJSONAlterPartitionMessage alterPartitionMessage = + deserializer.getAlterPartitionMessage(event.getMessage()); + dbName = alterPartitionMessage.getDB(); + tableName = alterPartitionMessage.getTable(); + oldLocation = alterPartitionMessage.getOldLocation(); + newLocation = alterPartitionMessage.getNewLocation(); + + if ((dbName == null) || + (tableName == null) || + (oldLocation == null) || + (newLocation == null)) { + isNotificationProcessingSkipped = true; + LOGGER.error(String.format("Alter partition event " + + "has incomplete information. dbName = %s, tableName = %s, " + + "oldLocation = %s, newLocation = %s", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(tableName, "null"), + StringUtils.defaultIfBlank(oldLocation, "null"), + StringUtils.defaultIfBlank(newLocation, "null"))); + break; + } else if (oldLocation.equalsIgnoreCase(newLocation)) { + isNotificationProcessingSkipped = true; + LOGGER.info(String.format("Alter partition notification ignored as" + + "location has not changed: AuthzObj = %s, Location = %s", dbName + "." + + "." + tableName, oldLocation)); + break; + } + + notificationProcessor.processAlterPartition(dbName, tableName, oldLocation, + newLocation, event.getEventId()); + break; + case INSERT: + // TODO DO we need to do anything here? + break; } - isNotificationProcessed = notificationProcessor.processNotificationEvent(event); } catch (Exception e) { if (e.getCause() instanceof JDODataStoreException) { - LOGGER.info("Received JDO Storage Exception, Could be because of processing " - + "duplicate notification"); + LOGGER.info("Received JDO Storage Exception, Could be because of processing " + + "duplicate notification"); if (event.getEventId() <= sentryStore.getLastProcessedNotificationID()) { // Rest of the notifications need not be processed. - LOGGER.error("Received event with Id: {} which is smaller then the ID " - + "persisted in store", event.getEventId()); - break; + throw e; } - } else { - LOGGER.error("Processing the notification with ID:{} failed with exception {}", - event.getEventId(), e); } + sentryStore.persistLastProcessedNotificationID(event.getEventId()); } - if (!isNotificationProcessed) { - try { - // Update the notification ID in the persistent store even when the notification is - // not processed as the content in in the notification is not valid. - // Continue processing the next notification. - LOGGER.debug("Explicitly Persisting Notification ID:{}", event.getEventId()); - sentryStore.persistLastProcessedNotificationID(event.getEventId()); - } catch (Exception failure) { - LOGGER.error("Received exception while persisting the notification ID " - + event.getEventId()); - throw failure; - } + if (isNotificationProcessingSkipped) { + // Update the notification ID in the persistent store even when the notification is + // not processed as the content in in the notification is not valid. + // Continue processing the next notification. + sentryStore.persistLastProcessedNotificationID(event.getEventId()); + isNotificationProcessingSkipped = false; } // Wake up any HMS waiters that are waiting for this ID. wakeUpWaitingClientsForSync(event.getEventId()); } } - /** - * Wakes up HMS waiters waiting for a specific event notification. - * - * @param eventId Id of a notification - */ - private void wakeUpWaitingClientsForSync(long eventId) { - CounterWait counterWait = sentryStore.getCounterWait(); + private void dropSentryDbPrivileges(String dbName, NotificationEvent event) throws Exception { + try { + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setDb(dbName); + sentryStore.dropPrivilege(authorizable, onDropSentryPrivilege(authorizable)); + } catch (SentryNoSuchObjectException e) { + LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the database: %s", dbName); + } catch (Exception e) { + throw new SentryInvalidInputException("Could not process Drop database event." + + "Event: " + event.toString(), e); + } + } - // Wake up any HMS waiters that are waiting for this ID. - // counterWait should never be null, but tests mock SentryStore and a mocked one - // doesn't have it. - if (counterWait != null) { - counterWait.update(eventId); + private void dropSentryTablePrivileges(String dbName, String tableName, NotificationEvent event) throws Exception { + try { + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setDb(dbName); + authorizable.setTable(tableName); + sentryStore.dropPrivilege(authorizable, onDropSentryPrivilege(authorizable)); + } catch (SentryNoSuchObjectException e) { + LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the table: %s.%s", dbName, tableName); + } catch (Exception e) { + throw new SentryInvalidInputException("Could not process Drop table event. Event: " + event.toString(), e); + } + } + + private void renamePrivileges(String oldDbName, String oldTableName, String newDbName, String newTableName) throws + Exception { + TSentryAuthorizable oldAuthorizable = new TSentryAuthorizable(hiveInstance); + oldAuthorizable.setDb(oldDbName); + oldAuthorizable.setTable(oldTableName); + TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(hiveInstance); + newAuthorizable.setDb(newDbName); + newAuthorizable.setTable(newTableName); + Update update = + onRenameSentryPrivilege(oldAuthorizable, newAuthorizable); + sentryStore.renamePrivilege(oldAuthorizable, newAuthorizable, update); + } + + @VisibleForTesting + static Update onDropSentryPrivilege(TSentryAuthorizable authorizable) { + PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false); + String authzObj = getAuthzObj(authorizable); + update.addPrivilegeUpdate(authzObj).putToDelPrivileges(PermissionsUpdate.ALL_ROLES, PermissionsUpdate.ALL_ROLES); + return update; + } + + @VisibleForTesting + static Update onRenameSentryPrivilege(TSentryAuthorizable oldAuthorizable, + TSentryAuthorizable newAuthorizable) + throws SentryPolicyStorePlugin.SentryPluginException { + String oldAuthz = getAuthzObj(oldAuthorizable); + String newAuthz = getAuthzObj(newAuthorizable); + PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false); + TPrivilegeChanges privUpdate = update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS); + privUpdate.putToAddPrivileges(newAuthz, newAuthz); + privUpdate.putToDelPrivileges(oldAuthz, oldAuthz); + return update; + } + + public static String getAuthzObj(TSentryAuthorizable authzble) { + String authzObj = null; + if (!SentryStore.isNULL(authzble.getDb())) { + String dbName = authzble.getDb(); + String tblName = authzble.getTable(); + if (SentryStore.isNULL(tblName)) { + authzObj = dbName; + } else { + authzObj = dbName + "." + tblName; + } } + return authzObj == null ? null : authzObj.toLowerCase(); } }