SENTRY-1769 Refactor HMSFollower Class (Kalyan Kumar Kalvagadda reviewed by Vamsee Yarlagadda, Na Li, Sergio Pena and Alexander Kolbasov)
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/024d99de Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/024d99de Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/024d99de Branch: refs/heads/sentry-ha-redesign-kkalyan Commit: 024d99de2663b6bdb741498f03862cd9337d9ad3 Parents: 3a7b54f Author: Kalyan Kumar Kalvagadda <kkal...@cloudera.com> Authored: Thu Jul 13 19:57:03 2017 -0500 Committer: Kalyan Kumar Kalvagadda <kkal...@cloudera.com> Committed: Thu Jul 13 19:57:03 2017 -0500 ---------------------------------------------------------------------- .../apache/sentry/hdfs/PathImageRetriever.java | 13 +- .../org/apache/sentry/hdfs/SentryPlugin.java | 40 +- .../provider/db/SentryPolicyStorePlugin.java | 4 +- .../db/service/model/MAuthzPathsSnapshotId.java | 2 +- .../service/model/MSentryHmsNotification.java | 8 +- .../db/service/persistent/PathsImage.java | 10 +- .../db/service/persistent/SentryStore.java | 13 +- .../thrift/SentryPolicyStoreProcessor.java | 5 +- .../sentry/service/thrift/HMSFollower.java | 641 ------------------- .../sentry/service/thrift/HmsFollower.java | 275 ++++++++ .../service/thrift/NotificationProcessor.java | 571 +++++++++++++---- .../sentry/service/thrift/SentryHmsClient.java | 244 +++++++ .../sentry/service/thrift/SentryService.java | 17 +- .../service/thrift/SentryServiceUtil.java | 35 +- .../sentry/service/thrift/ServiceConstants.java | 2 +- .../db/service/persistent/TestSentryStore.java | 1 - .../sentry/service/thrift/TestHMSFollower.java | 384 ----------- .../sentry/service/thrift/TestHmsFollower.java | 532 +++++++++++++++ .../thrift/TestNotificationProcessor.java | 465 ++++++++++++++ .../service/thrift/TestSentryHmsClient.java | 470 ++++++++++++++ .../TestDbPrivilegeCleanupOnDrop.java | 4 +- 21 files changed, 2531 insertions(+), 1205 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/024d99de/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java index 2426b40..ac5c5b2 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java @@ -25,9 +25,14 @@ import org.apache.sentry.provider.db.service.persistent.PathsImage; import org.apache.sentry.provider.db.service.persistent.SentryStore; import javax.annotation.concurrent.ThreadSafe; + import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.sentry.hdfs.service.thrift.TPathChanges; +import org.apache.sentry.provider.db.service.persistent.PathsImage; +import org.apache.sentry.provider.db.service.persistent.SentryStore; /** * PathImageRetriever obtains a complete snapshot of Hive Paths from a persistent @@ -37,10 +42,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * It is a thread safe class, as all the underlying database operation is thread safe. */ @ThreadSafe -public class PathImageRetriever implements ImageRetriever<PathsUpdate> { +class PathImageRetriever implements ImageRetriever<PathsUpdate> { - private final SentryStore sentryStore; private static final String[] root = {"/"}; + private final SentryStore sentryStore; PathImageRetriever(SentryStore sentryStore) { this.sentryStore = sentryStore; @@ -55,8 +60,8 @@ public class PathImageRetriever implements ImageRetriever<PathsUpdate> { // persistent storage, along with the sequence number of latest // delta change the snapshot corresponds to. PathsImage pathsImage = sentryStore.retrieveFullPathsImage(); - long curSeqNum = pathsImage.getCurSeqNum(); long curImgNum = pathsImage.getCurImgNum(); + long curSeqNum = pathsImage.getId(); Map<String, Set<String>> pathImage = pathsImage.getPathImage(); // Translates the complete Hive paths snapshot into a PathsUpdate. @@ -73,7 +78,7 @@ public class PathImageRetriever implements ImageRetriever<PathsUpdate> { } SentryHdfsMetricsUtil.getPathChangesHistogram.update(pathsUpdate - .getPathChanges().size()); + .getPathChanges().size()); // Translate PathsUpdate that contains a full image to TPathsDump for // consumer (NN) to be able to quickly construct UpdateableAuthzPaths http://git-wip-us.apache.org/repos/asf/sentry/blob/024d99de/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java index d6100de..0c3ba5b 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java @@ -23,12 +23,14 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.core.common.exception.SentryInvalidInputException; import org.apache.sentry.core.common.utils.SigUtils; import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges; import org.apache.sentry.hdfs.service.thrift.TRoleChanges; import org.apache.sentry.provider.db.SentryPolicyStorePlugin; import org.apache.sentry.provider.db.service.persistent.SentryStore; +import org.apache.sentry.service.thrift.SentryServiceUtil; import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleAddGroupsRequest; import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleDeleteGroupsRequest; import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleGrantPrivilegeRequest; @@ -38,7 +40,7 @@ import org.apache.sentry.provider.db.service.thrift.TDropSentryRoleRequest; import org.apache.sentry.provider.db.service.thrift.TRenamePrivilegesRequest; import org.apache.sentry.provider.db.service.thrift.TSentryGroup; import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege; -import org.apache.sentry.service.thrift.HMSFollower; +import org.apache.sentry.service.thrift.HmsFollower; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +58,7 @@ import static org.apache.sentry.hdfs.service.thrift.sentry_hdfs_serviceConstants * <ol> * <li> * Whenever updates happen on HMS, a corresponding notification log is generated, - * and {@link HMSFollower} will process the notification event and persist it in database. + * and {@link HmsFollower} will process the notification event and persist it in database. * <li> * The NameNode periodically asks Sentry for updates. Sentry may return zero * or more updates previously received via HMS notification log. @@ -240,16 +242,22 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen @Override public Update onRenameSentryPrivilege(TRenamePrivilegesRequest request) - throws SentryPluginException { - String oldAuthz = HMSFollower.getAuthzObj(request.getOldAuthorizable()); - String newAuthz = HMSFollower.getAuthzObj(request.getNewAuthorizable()); + throws SentryPluginException, SentryInvalidInputException{ + String oldAuthz = null; + String newAuthz = null; + try { + oldAuthz = SentryServiceUtil.getAuthzObj(request.getOldAuthorizable()); + newAuthz = SentryServiceUtil.getAuthzObj(request.getNewAuthorizable()); + } catch (SentryInvalidInputException failure) { + LOGGER.error("onRenameSentryPrivilege, Could not rename sentry privilege ", failure); + throw failure; + } PermissionsUpdate update = new PermissionsUpdate(); TPrivilegeChanges privUpdate = update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS); privUpdate.putToAddPrivileges(newAuthz, newAuthz); privUpdate.putToDelPrivileges(oldAuthz, oldAuthz); - LOGGER.debug(String.format("onRenameSentryPrivilege, Authz Perm preUpdate [ %s ]", - oldAuthz)); + LOGGER.debug("onRenameSentryPrivilege, Authz Perm preUpdate [ {} ]", oldAuthz); return update; } @@ -283,8 +291,7 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen update.addPrivilegeUpdate(authzObj).putToDelPrivileges( roleName, privilege.getAction().toUpperCase()); - LOGGER.debug(String.format("onAlterSentryRoleRevokePrivilegeCore, Authz Perm preUpdate [ %s ]", - authzObj)); + LOGGER.debug("onAlterSentryRoleRevokePrivilegeCore, Authz Perm preUpdate [ {} ]", authzObj); return update; } @@ -296,8 +303,7 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen request.getRoleName(), PermissionsUpdate.ALL_AUTHZ_OBJ); update.addRoleUpdate(request.getRoleName()).addToDelGroups(PermissionsUpdate.ALL_GROUPS); - LOGGER.debug(String.format("onDropSentryRole, Authz Perm preUpdate [ %s ]", - request.getRoleName())); + LOGGER.debug("onDropSentryRole, Authz Perm preUpdate [ {} ]", request.getRoleName()); return update; } @@ -305,12 +311,18 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen public Update onDropSentryPrivilege(TDropPrivilegesRequest request) throws SentryPluginException { PermissionsUpdate update = new PermissionsUpdate(); - String authzObj = HMSFollower.getAuthzObj(request.getAuthorizable()); + String authzObj = null; + try { + authzObj = SentryServiceUtil.getAuthzObj(request.getAuthorizable()); + } catch (SentryInvalidInputException failure) { + LOGGER.error("onDropSentryPrivilege, Could not drop sentry privilege " + + failure.toString(), failure); + throw new SentryPluginException(failure.getMessage(), failure); + } update.addPrivilegeUpdate(authzObj).putToDelPrivileges( PermissionsUpdate.ALL_ROLES, PermissionsUpdate.ALL_ROLES); - LOGGER.debug(String.format("onDropSentryPrivilege, Authz Perm preUpdate [ %s ]", - authzObj)); + LOGGER.debug("onDropSentryPrivilege, Authz Perm preUpdate [ {} ]", authzObj); return update; } http://git-wip-us.apache.org/repos/asf/sentry/blob/024d99de/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java index 5b8a572..a22b422 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java @@ -19,6 +19,7 @@ package org.apache.sentry.provider.db; import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.core.common.exception.SentryInvalidInputException; import org.apache.sentry.core.common.exception.SentryUserException; import org.apache.sentry.provider.db.service.persistent.SentryStore; import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleAddGroupsRequest; @@ -68,7 +69,8 @@ public interface SentryPolicyStorePlugin { Update onDropSentryRole(TDropSentryRoleRequest tRequest) throws SentryPluginException; - Update onRenameSentryPrivilege(TRenamePrivilegesRequest request) throws SentryPluginException; + Update onRenameSentryPrivilege(TRenamePrivilegesRequest request) + throws SentryPluginException, SentryInvalidInputException; Update onDropSentryPrivilege(TDropPrivilegesRequest request) throws SentryPluginException; } http://git-wip-us.apache.org/repos/asf/sentry/blob/024d99de/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java index d683c2c..d8d54f3 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java @@ -22,7 +22,7 @@ import javax.jdo.annotations.PrimaryKey; /** * This class is used to persist new authz paths snapshots IDs. An authz path snapshot ID is required by - * the MAuthzPathsMapping to detect new HMS snapshots created by the HMSFollower. + * the MAuthzPathsMapping to detect new HMS snapshots created by the HmsFollower. */ @PersistenceCapable public class MAuthzPathsSnapshotId { http://git-wip-us.apache.org/repos/asf/sentry/blob/024d99de/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java index 0d54548..166bec7 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java @@ -20,15 +20,13 @@ package org.apache.sentry.provider.db.service.model; /** * Database backend store for HMS Notification ID's. All the notifications that are processed * by sentry are stored. - */ - -/* + * <p> * <p> HMS notification ID's are stored in separate table for three reasons</p> * <ol> * <li>SENTRY_PATH_CHANGE is not updated for every notification that is received from HMS. There - * are cases where HMSFollower doesn't process notifications and skip's them. Depending on + * are cases where HmsFollower doesn't process notifications and skip's them. Depending on * SENTRY_PATH_CHANGE information may not provide the last notification processed.</li> - * <li> There could be cases where HMSFollower thread in multiple sentry servers acting as a + * <li> There could be cases where HmsFollower thread in multiple sentry servers acting as a * leader and process HMS notifications. we need to avoid processing the notifications * multiple times. This can be made sure by always having some number of notification * information always regardless of purging interval.</li> http://git-wip-us.apache.org/repos/asf/sentry/blob/024d99de/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java index 4d852e6..409a557 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java @@ -31,17 +31,17 @@ public class PathsImage { // A full image of hiveObj to Paths mapping. private final Map<String, Set<String>> pathImage; - private final long curSeqNum; + private final long id; private final long curImgNum; - public PathsImage(Map<String, Set<String>> pathImage, long curSeqNum, long curImgNum) { + public PathsImage(Map<String, Set<String>> pathImage, long id, long curImgNum) { this.pathImage = pathImage; - this.curSeqNum = curSeqNum; + this.id = id; this.curImgNum = curImgNum; } - public long getCurSeqNum() { - return curSeqNum; + public long getId() { + return id; } public long getCurImgNum() { http://git-wip-us.apache.org/repos/asf/sentry/blob/024d99de/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java index 979e45b..7b02e2c 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java @@ -107,7 +107,7 @@ import static org.apache.sentry.provider.db.service.persistent.QueryParamBuilder * single node and rely on DB for multi-node synchronization. * <p> * This isn't much of a problem for path updates since they are - * driven by HMSFollower which usually runs on a single leader + * driven by HmsFollower which usually runs on a single leader * node, but permission updates originate from clients * directly and may be highly concurrent. * <p> @@ -151,7 +151,7 @@ public class SentryStore { private static final long COUNT_VALUE_UNKNOWN = -1L; // Representation for unknown HMS notification ID - private static final long NOTIFICATION_UNKNOWN = -1L; + public static final long NOTIFICATION_UNKNOWN = -1L; private static final Set<String> ALL_ACTIONS = Sets.newHashSet(AccessConstants.ALL, AccessConstants.SELECT, AccessConstants.INSERT, AccessConstants.ALTER, @@ -169,8 +169,8 @@ public class SentryStore { private final TransactionManager tm; /** - * counterWait is used to synchronize notifications between Thrift and HMSFollower. - * Technically it doesn't belong here, but the only thing that connects HMSFollower + * counterWait is used to synchronize notifications between Thrift and HmsFollower. + * Technically it doesn't belong here, but the only thing that connects HmsFollower * and Thrift API is SentryStore. An alternative could be a singleton CounterWait or * some factory that returns CounterWait instances keyed by name, but this complicates * things unnecessary. @@ -2674,7 +2674,7 @@ public class SentryStore { /** * Persist an up-to-date HMS snapshot into Sentry DB in a single transaction. * - * @param authzPaths Mapping of hiveObj to < Paths < + * @param authzPaths paths to be be persisted * @throws Exception */ public void persistFullPathsImage(final Map<String, Set<String>> authzPaths) throws Exception { @@ -2685,7 +2685,6 @@ public class SentryStore { long snapshotID = getCurrentAuthzPathsSnapshotID(pm); long nextSnapshotID = snapshotID + 1; - pm.makePersistent(new MAuthzPathsSnapshotId(nextSnapshotID)); for (Map.Entry<String, Set<String>> authzPath : authzPaths.entrySet()) { pm.makePersistent(new MAuthzPathsMapping(nextSnapshotID, authzPath.getKey(), authzPath.getValue())); @@ -3704,7 +3703,7 @@ public class SentryStore { * * @param pm the PersistenceManager * @return EMPTY_NOTIFICATION_ID(0) when there are no notifications processed. - * else last NotificationID processed by HMSFollower + * else last NotificationID processed by HmsFollower */ static Long getLastProcessedNotificationIDCore( PersistenceManager pm) { http://git-wip-us.apache.org/repos/asf/sentry/blob/024d99de/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java index ad23334..cfd0e30 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java @@ -945,7 +945,10 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface { } catch (SentryThriftAPIMismatchException e) { LOGGER.error(e.getMessage(), e); response.setStatus(Status.THRIFT_VERSION_MISMATCH(e.getMessage(), e)); - } catch (Exception e) { + } catch (SentryInvalidInputException e) { + response.setStatus(Status.InvalidInput(e.getMessage(), e)); + } + catch (Exception e) { String msg = "Unknown error for request: " + request + ", message: " + e.getMessage(); LOGGER.error(msg, e); http://git-wip-us.apache.org/repos/asf/sentry/blob/024d99de/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java deleted file mode 100644 index 1b6ae18..0000000 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java +++ /dev/null @@ -1,641 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.service.thrift; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.Timer; -import com.codahale.metrics.Timer.Context; -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; -import org.apache.hive.hcatalog.messaging.HCatEventMessage; -import org.apache.sentry.binding.hive.conf.HiveAuthzConf; -import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException; -import org.apache.sentry.core.common.exception.SentryInvalidInputException; -import org.apache.sentry.core.common.exception.SentryNoSuchObjectException; -import org.apache.sentry.hdfs.PermissionsUpdate; -import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges; -import org.apache.sentry.provider.db.SentryPolicyStorePlugin; -import org.apache.sentry.provider.db.service.persistent.SentryStore; -import org.apache.sentry.provider.db.service.thrift.SentryMetrics; -import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.sentry.binding.metastore.messaging.json.*; - -import javax.jdo.JDODataStoreException; -import javax.security.auth.login.LoginException; - -import java.io.IOException; -import java.net.SocketException; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutionException; - -import static com.codahale.metrics.MetricRegistry.name; -import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE; -import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_DROP_WITH_POLICY_STORE; -import static org.apache.sentry.hdfs.Updateable.Update; - -/** - * HMSFollower is the thread which follows the Hive MetaStore state changes from Sentry. - * It gets the full update and notification logs from HMS and applies it to - * update permissions stored in Sentry using SentryStore and also update the < obj,path > state - * stored for HDFS-Sentry sync. - */ -@SuppressWarnings("PMD") -public class HMSFollower implements Runnable, AutoCloseable { - private static final Logger LOGGER = LoggerFactory.getLogger(HMSFollower.class); - private HiveSimpleConnectionFactory hiveConnectionFactory; - // Track the latest eventId of the event that has been logged. So we don't log the same message - private long lastLoggedEventId = SentryStore.EMPTY_CHANGE_ID; - private static boolean connectedToHMS = false; - private HMSClient client; - private final Configuration authzConf; - private final SentryStore sentryStore; - private String hiveInstance; - - private boolean needLogHMSSupportReady = true; - private final LeaderStatusMonitor leaderMonitor; - - private static final String SNAPSHOT = "snapshot"; - /** Measures time to get full snapshot */ - private final Timer updateTimer = SentryMetrics.getInstance() - .getTimer(name(FullUpdateInitializer.class, SNAPSHOT)); - /** Number of times update failed */ - private final Counter failedSnapshotsCount = SentryMetrics.getInstance() - .getCounter(name(FullUpdateInitializer.class, "failed")); - - HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor, - HiveSimpleConnectionFactory hiveConnectionFactory) { - authzConf = conf; - this.leaderMonitor = leaderMonitor; - sentryStore = store; - this.hiveConnectionFactory = hiveConnectionFactory; - } - - @VisibleForTesting - HMSFollower(Configuration conf, SentryStore sentryStore, String hiveInstance) - throws IOException, LoginException { - this(conf, sentryStore, null, null); - this.hiveInstance = hiveInstance; - hiveConnectionFactory = new HiveSimpleConnectionFactory(conf, new HiveConf()); - hiveConnectionFactory.init(); - } - - @VisibleForTesting - public static boolean isConnectedToHMS() { - return connectedToHMS; - } - - @Override - public void close() { - // Close any outstanding connections to HMS - closeHMSConnection(); - try { - hiveConnectionFactory.close(); - } catch (Exception e) { - LOGGER.error("failed to close Hive Connection Factory", e); - } - } - - /** - * Returns HMS Client if successful, returns null if HMS is not ready yet to take connections - * Throws @LoginException if Kerberos context creation failed using Sentry's kerberos credentials - * Throws @MetaException if there was a problem on creating an HMSClient - */ - private HiveMetaStoreClient getMetaStoreClient() - throws IOException, InterruptedException, MetaException { - if (client == null) { - client = hiveConnectionFactory.connect(); - connectedToHMS = true; - } - return client.getClient(); - } - - @Override - public void run() { - Long lastProcessedNotificationID; - try { - // Initializing lastProcessedNotificationID based on the latest persisted notification ID. - lastProcessedNotificationID = sentryStore.getLastProcessedNotificationID(); - } catch (Exception e) { - LOGGER.error("Failed to get the last processed notification id from sentry store, " + - "Skipping the processing", e); - return; - } - // Wake any clients connected to this service waiting for HMS already processed notifications. - wakeUpWaitingClientsForSync(lastProcessedNotificationID); - // Only the leader should listen to HMS updates - if ((leaderMonitor != null) && !leaderMonitor.isLeader()) { - // Close any outstanding connections to HMS - closeHMSConnection(); - return; - } - processHiveMetastoreUpdates(); - } - - /** - * Wakes up HMS waiters waiting for a specific event notification. - * - * @param eventID - */ - private void wakeUpWaitingClientsForSync(long eventID) { - CounterWait counterWait = sentryStore.getCounterWait(); - - // Wake up any HMS waiters that are waiting for this ID. - // counterWait should never be null, but tests mock SentryStore and a mocked one - // doesn't have it. - if (counterWait != null) { - counterWait.update(eventID); - } - } - - /** - * Processes new Hive Metastore notifications. - * - * If no notifications are processed yet, then it does a full initial snapshot of the Hive Metastore - * followed by new notifications updates that could have happened after it. - * - * Clients connections waiting for an event notification will be woken up afterwards. - */ - private void processHiveMetastoreUpdates() { - try { - // Decision of taking full snapshot is based on AuthzPathsMapping information persisted - // in the sentry persistent store. If AuthzPathsMapping is empty, shapshot is needed. - Long lastProcessedNotificationID; - if (sentryStore.isAuthzPathsMappingEmpty()) { - // TODO: expose time used for full update in the metrics - - // To ensure point-in-time snapshot consistency, need to make sure - // there were no HMS updates while retrieving the snapshot. - // In detail the logic is: - // - // 1. Read current HMS notification ID_initial - // 2. Read HMS metadata state - // 3. Read current notification ID_new - // 4. If ID_initial != ID_new then the attempts for retrieving full HMS snapshot - // will be dropped. A new attempts will be made after 500 milliseconds when - // HMSFollower run again. - - CurrentNotificationEventId eventIDBefore = getMetaStoreClient().getCurrentNotificationEventId(); - LOGGER.info("Before fetching hive full snapshot, Current NotificationID = {}", eventIDBefore); - - Map<String, Set<String>> pathsFullSnapshot = fetchFullUpdate(); - if(pathsFullSnapshot.isEmpty()) { - LOGGER.info("Hive full snapshot is Empty. Perhaps, HMS does not have any data"); - return; - } - - CurrentNotificationEventId eventIDAfter = getMetaStoreClient().getCurrentNotificationEventId(); - LOGGER.info("After fetching hive full snapshot, Current NotificationID = {}", eventIDAfter); - - if (!eventIDBefore.equals(eventIDAfter)) { - LOGGER.error("Fail to get a point-in-time hive full snapshot. Current ID = {}", - eventIDAfter); - return; - } - - LOGGER.info("Successfully fetched hive full snapshot, Current NotificationID = {}", - eventIDAfter); - // As eventIDAfter is the last event that was processed, eventIDAfter is used to update - // lastProcessedNotificationID instead of getting it from persistent store. - lastProcessedNotificationID = eventIDAfter.getEventId(); - sentryStore.persistFullPathsImage(pathsFullSnapshot); - sentryStore.persistLastProcessedNotificationID(eventIDAfter.getEventId()); - // Wake up any HMS waiters that could have been put on hold before getting the eventIDBefore value. - wakeUpWaitingClientsForSync(lastProcessedNotificationID); - } else { - // Every time HMSFollower is scheduled to run, value should be updates based - // on the value stored in database. - lastProcessedNotificationID = sentryStore.getLastProcessedNotificationID(); - } - - // HMSFollower connected to HMS and it finished full snapshot if that was required - // Log this message only once - if (needLogHMSSupportReady && connectedToHMS) { - LOGGER.info("Sentry HMS support is ready"); - needLogHMSSupportReady = false; - } - - // HIVE-15761: Currently getNextNotification API may return an empty - // NotificationEventResponse causing TProtocolException. - // Workaround: Only processes the notification events newer than the last updated one. - CurrentNotificationEventId eventId = getMetaStoreClient().getCurrentNotificationEventId(); - LOGGER.debug("Last Notification in HMS {} lastProcessedNotificationID is {}", - eventId.getEventId(), lastProcessedNotificationID); - if (eventId.getEventId() > lastProcessedNotificationID) { - NotificationEventResponse response = - getMetaStoreClient().getNextNotification(lastProcessedNotificationID, Integer.MAX_VALUE, null); - if (response.isSetEvents()) { - if (!response.getEvents().isEmpty()) { - if (lastProcessedNotificationID != lastLoggedEventId) { - // Only log when there are updates and the notification ID has changed. - LOGGER.debug("lastProcessedNotificationID = {}. Processing {} events", - lastProcessedNotificationID, response.getEvents().size()); - lastLoggedEventId = lastProcessedNotificationID; - } - - processNotificationEvents(response.getEvents()); - } - } - } - } catch (TException e) { - // If the underlying exception is around socket exception, it is better to retry connection to HMS - if (e.getCause() instanceof SocketException) { - LOGGER.error("Encountered Socket Exception during fetching Notification entries, will reconnect to HMS", e); - client.invalidate(); - closeHMSConnection(); - } else { - LOGGER.error("ThriftException occured fetching Notification entries, will try", e); - } - } catch (SentryInvalidInputException | SentryInvalidHMSEventException e) { - LOGGER.error("Encounter SentryInvalidInputException|SentryInvalidHMSEventException " + - "while processing notification log", e); - } catch (Throwable t) { - // catching errors to prevent the executor to halt. - LOGGER.error("Caught unexpected exception in HMSFollower! Caused by: " + t.getMessage(), - t.getCause()); - t.printStackTrace(); - } - } - - /** - * Function to close HMS connection and any associated kerberos context (if applicable) - */ - private void closeHMSConnection() { - try { - if (client != null) { - LOGGER.info("Closing the HMS client connection"); - client.close(); - connectedToHMS = false; - } - } finally { - client = null; - } - } - - /** - * Retrieve a Hive full snapshot from HMS. - * - * @return HMS snapshot. Snapshot consists of a mapping from auth object name - * to the set of paths corresponding to that name. - * @throws InterruptedException - * @throws TException - * @throws ExecutionException - */ - private Map<String, Set<String>> fetchFullUpdate() - throws TException, ExecutionException { - LOGGER.info("Request full HMS snapshot"); - try (FullUpdateInitializer updateInitializer = - new FullUpdateInitializer(hiveConnectionFactory, authzConf); - Context context = updateTimer.time()) { - Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot(); - LOGGER.info("Obtained full HMS snapshot"); - return pathsUpdate; - } catch (Exception ignored) { - failedSnapshotsCount.inc(); - // Caller will retry later - return Collections.emptyMap(); - } - } - - private boolean syncWithPolicyStore(HiveAuthzConf.AuthzConfVars syncConfVar) { - return "true" - .equalsIgnoreCase((authzConf.get(syncConfVar.getVar(), syncConfVar.getDefault()))); - } - - /** - * Throws SentryInvalidHMSEventException if Notification event contains insufficient information - */ - void processNotificationEvents(List<NotificationEvent> events) throws Exception { - SentryJSONMessageDeserializer deserializer = new SentryJSONMessageDeserializer(); - - boolean isNotificationProcessingSkipped = false; - for (NotificationEvent event : events) { - String dbName; - String tableName; - String oldLocation; - String newLocation; - String location; - List<String> locations; - NotificationProcessor notificationProcessor = new NotificationProcessor(sentryStore, LOGGER); - try { - LOGGER.debug("Processing notification with id {} and type {}", event.getEventId(), - event.getEventType()); - switch (HCatEventMessage.EventType.valueOf(event.getEventType())) { - case CREATE_DATABASE: - SentryJSONCreateDatabaseMessage message = - deserializer.getCreateDatabaseMessage(event.getMessage()); - dbName = message.getDB(); - location = message.getLocation(); - if ((dbName == null) || (location == null)) { - isNotificationProcessingSkipped = true; - LOGGER.error(String.format("Create database event " + - "has incomplete information. dbName = %s location = %s", - StringUtils.defaultIfBlank(dbName, "null"), - StringUtils.defaultIfBlank(location, "null"))); - break; - } - if (syncWithPolicyStore(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE)) { - dropSentryDbPrivileges(dbName, event); - } - notificationProcessor.processCreateDatabase(dbName, location, event.getEventId()); - break; - case DROP_DATABASE: - SentryJSONDropDatabaseMessage dropDatabaseMessage = - deserializer.getDropDatabaseMessage(event.getMessage()); - dbName = dropDatabaseMessage.getDB(); - location = dropDatabaseMessage.getLocation(); - if (dbName == null) { - isNotificationProcessingSkipped = true; - LOGGER.error("Drop database event has incomplete information: dbName = null"); - break; - } - if (syncWithPolicyStore(AUTHZ_SYNC_DROP_WITH_POLICY_STORE)) { - dropSentryDbPrivileges(dbName, event); - } - notificationProcessor.processDropDatabase(dbName, location, event.getEventId()); - break; - case CREATE_TABLE: - SentryJSONCreateTableMessage createTableMessage = deserializer.getCreateTableMessage(event.getMessage()); - dbName = createTableMessage.getDB(); - tableName = createTableMessage.getTable(); - location = createTableMessage.getLocation(); - if ((dbName == null) || (tableName == null) || (location == null)) { - isNotificationProcessingSkipped = true; - LOGGER.error(String.format("Create table event " + "has incomplete information." - + " dbName = %s, tableName = %s, location = %s", - StringUtils.defaultIfBlank(dbName, "null"), - StringUtils.defaultIfBlank(tableName, "null"), - StringUtils.defaultIfBlank(location, "null"))); - break; - } - if (syncWithPolicyStore(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE)) { - dropSentryTablePrivileges(dbName, tableName, event); - } - notificationProcessor.processCreateTable(dbName, tableName, location, event.getEventId()); - break; - case DROP_TABLE: - SentryJSONDropTableMessage dropTableMessage = deserializer.getDropTableMessage(event.getMessage()); - dbName = dropTableMessage.getDB(); - tableName = dropTableMessage.getTable(); - if ((dbName == null) || (tableName == null)) { - isNotificationProcessingSkipped = true; - LOGGER.error(String.format("Drop table event " + - "has incomplete information. dbName = %s, tableName = %s", - StringUtils.defaultIfBlank(dbName, "null"), - StringUtils.defaultIfBlank(tableName, "null"))); - break; - } - if (syncWithPolicyStore(AUTHZ_SYNC_DROP_WITH_POLICY_STORE)) { - dropSentryTablePrivileges(dbName, tableName, event); - } - notificationProcessor.processDropTable(dbName, tableName, event.getEventId()); - break; - case ALTER_TABLE: - SentryJSONAlterTableMessage alterTableMessage = deserializer.getAlterTableMessage(event.getMessage()); - - String oldDbName = alterTableMessage.getDB(); - String oldTableName = alterTableMessage.getTable(); - String newDbName = event.getDbName(); - String newTableName = event.getTableName(); - oldLocation = alterTableMessage.getOldLocation(); - newLocation = alterTableMessage.getNewLocation(); - - if ((oldDbName == null) || - (oldTableName == null) || - (newDbName == null) || - (newTableName == null) || - (oldLocation == null) || - (newLocation == null)) { - isNotificationProcessingSkipped = true; - LOGGER.error(String.format("Alter table event " + - "has incomplete information. oldDbName = %s, oldTableName = %s, oldLocation = %s, " + - "newDbName = %s, newTableName = %s, newLocation = %s", - StringUtils.defaultIfBlank(oldDbName, "null"), - StringUtils.defaultIfBlank(oldTableName, "null"), - StringUtils.defaultIfBlank(oldLocation, "null"), - StringUtils.defaultIfBlank(newDbName, "null"), - StringUtils.defaultIfBlank(newTableName, "null"), - StringUtils.defaultIfBlank(newLocation, "null"))); - break; - } else if ((oldDbName.equalsIgnoreCase(newDbName)) && - (oldTableName.equalsIgnoreCase(newTableName)) && - (oldLocation.equalsIgnoreCase(newLocation))) { - isNotificationProcessingSkipped = true; - LOGGER.info(String.format("Alter table notification ignored as neither name nor " + - "location has changed: oldAuthzObj = %s, oldLocation = %s, newAuthzObj = %s, " + - "newLocation = %s", oldDbName + "." + oldTableName , oldLocation, - newDbName + "." + newTableName, newLocation)); - break; - } - - if (!newDbName.equalsIgnoreCase(oldDbName) || !oldTableName.equalsIgnoreCase(newTableName)) { - // Name has changed - try { - renamePrivileges(oldDbName, oldTableName, newDbName, newTableName); - } catch (SentryNoSuchObjectException e) { - LOGGER.info("Rename Sentry privilege ignored as there are no privileges on the table: %s.%s", - oldDbName, oldTableName); - } catch (Exception e) { - isNotificationProcessingSkipped = true; - LOGGER.info("Could not process Alter table event. Event: " + event.toString(), e); - break; - } - } - notificationProcessor.processAlterTable(oldDbName, newDbName, oldTableName, - newTableName, oldLocation, newLocation, event.getEventId()); - break; - case ADD_PARTITION: - SentryJSONAddPartitionMessage addPartitionMessage = - deserializer.getAddPartitionMessage(event.getMessage()); - dbName = addPartitionMessage.getDB(); - tableName = addPartitionMessage.getTable(); - locations = addPartitionMessage.getLocations(); - if ((dbName == null) || (tableName == null) || (locations == null)) { - isNotificationProcessingSkipped = true; - LOGGER.error(String.format("Create table event has incomplete information. " + - "dbName = %s, tableName = %s, locations = %s", - StringUtils.defaultIfBlank(dbName, "null"), - StringUtils.defaultIfBlank(tableName, "null"), - locations != null ? locations.toString() : "null")); - break; - } - notificationProcessor.processAddPartition(dbName, tableName, locations, event.getEventId()); - break; - case DROP_PARTITION: - SentryJSONDropPartitionMessage dropPartitionMessage = - deserializer.getDropPartitionMessage(event.getMessage()); - dbName = dropPartitionMessage.getDB(); - tableName = dropPartitionMessage.getTable(); - locations = dropPartitionMessage.getLocations(); - if ((dbName == null) || (tableName == null) || (locations == null)) { - isNotificationProcessingSkipped = true; - LOGGER.error(String.format("Drop partition event " + - "has incomplete information. dbName = %s, tableName = %s, location = %s", - StringUtils.defaultIfBlank(dbName, "null"), - StringUtils.defaultIfBlank(tableName, "null"), - locations != null ? locations.toString() : "null")); - break; - } - notificationProcessor.processDropPartition(dbName, tableName, locations, event.getEventId()); - - break; - case ALTER_PARTITION: - SentryJSONAlterPartitionMessage alterPartitionMessage = - deserializer.getAlterPartitionMessage(event.getMessage()); - dbName = alterPartitionMessage.getDB(); - tableName = alterPartitionMessage.getTable(); - oldLocation = alterPartitionMessage.getOldLocation(); - newLocation = alterPartitionMessage.getNewLocation(); - - if ((dbName == null) || - (tableName == null) || - (oldLocation == null) || - (newLocation == null)) { - isNotificationProcessingSkipped = true; - LOGGER.error(String.format("Alter partition event " + - "has incomplete information. dbName = %s, tableName = %s, " + - "oldLocation = %s, newLocation = %s", - StringUtils.defaultIfBlank(dbName, "null"), - StringUtils.defaultIfBlank(tableName, "null"), - StringUtils.defaultIfBlank(oldLocation, "null"), - StringUtils.defaultIfBlank(newLocation, "null"))); - break; - } else if (oldLocation.equalsIgnoreCase(newLocation)) { - isNotificationProcessingSkipped = true; - LOGGER.info(String.format("Alter partition notification ignored as" + - "location has not changed: AuthzObj = %s, Location = %s", dbName + "." + - "." + tableName, oldLocation)); - break; - } - - notificationProcessor.processAlterPartition(dbName, tableName, oldLocation, - newLocation, event.getEventId()); - break; - case INSERT: - // TODO DO we need to do anything here? - break; - } - } catch (Exception e) { - if (e.getCause() instanceof JDODataStoreException) { - LOGGER.info("Received JDO Storage Exception, Could be because of processing " + - "duplicate notification"); - if (event.getEventId() <= sentryStore.getLastProcessedNotificationID()) { - // Rest of the notifications need not be processed. - throw e; - } - } - sentryStore.persistLastProcessedNotificationID(event.getEventId()); - } - if (isNotificationProcessingSkipped) { - // Update the notification ID in the persistent store even when the notification is - // not processed as the content in in the notification is not valid. - // Continue processing the next notification. - sentryStore.persistLastProcessedNotificationID(event.getEventId()); - isNotificationProcessingSkipped = false; - } - // Wake up any HMS waiters that are waiting for this ID. - wakeUpWaitingClientsForSync(event.getEventId()); - } - } - - private void dropSentryDbPrivileges(String dbName, NotificationEvent event) throws Exception { - try { - TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); - authorizable.setDb(dbName); - sentryStore.dropPrivilege(authorizable, onDropSentryPrivilege(authorizable)); - } catch (SentryNoSuchObjectException e) { - LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the database: %s", dbName); - } catch (Exception e) { - throw new SentryInvalidInputException("Could not process Drop database event." + - "Event: " + event.toString(), e); - } - } - - private void dropSentryTablePrivileges(String dbName, String tableName, NotificationEvent event) throws Exception { - try { - TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); - authorizable.setDb(dbName); - authorizable.setTable(tableName); - sentryStore.dropPrivilege(authorizable, onDropSentryPrivilege(authorizable)); - } catch (SentryNoSuchObjectException e) { - LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the table: %s.%s", dbName, tableName); - } catch (Exception e) { - throw new SentryInvalidInputException("Could not process Drop table event. Event: " + event.toString(), e); - } - } - - private void renamePrivileges(String oldDbName, String oldTableName, String newDbName, String newTableName) throws - Exception { - TSentryAuthorizable oldAuthorizable = new TSentryAuthorizable(hiveInstance); - oldAuthorizable.setDb(oldDbName); - oldAuthorizable.setTable(oldTableName); - TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(hiveInstance); - newAuthorizable.setDb(newDbName); - newAuthorizable.setTable(newTableName); - Update update = - onRenameSentryPrivilege(oldAuthorizable, newAuthorizable); - sentryStore.renamePrivilege(oldAuthorizable, newAuthorizable, update); - } - - @VisibleForTesting - static Update onDropSentryPrivilege(TSentryAuthorizable authorizable) { - PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false); - String authzObj = getAuthzObj(authorizable); - update.addPrivilegeUpdate(authzObj).putToDelPrivileges(PermissionsUpdate.ALL_ROLES, PermissionsUpdate.ALL_ROLES); - return update; - } - - @VisibleForTesting - static Update onRenameSentryPrivilege(TSentryAuthorizable oldAuthorizable, - TSentryAuthorizable newAuthorizable) - throws SentryPolicyStorePlugin.SentryPluginException { - String oldAuthz = getAuthzObj(oldAuthorizable); - String newAuthz = getAuthzObj(newAuthorizable); - PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false); - TPrivilegeChanges privUpdate = update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS); - privUpdate.putToAddPrivileges(newAuthz, newAuthz); - privUpdate.putToDelPrivileges(oldAuthz, oldAuthz); - return update; - } - - public static String getAuthzObj(TSentryAuthorizable authzble) { - String authzObj = null; - if (!SentryStore.isNULL(authzble.getDb())) { - String dbName = authzble.getDb(); - String tblName = authzble.getTable(); - if (SentryStore.isNULL(tblName)) { - authzObj = dbName; - } else { - authzObj = dbName + "." + tblName; - } - } - return authzObj == null ? null : authzObj.toLowerCase(); - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/024d99de/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HmsFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HmsFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HmsFollower.java new file mode 100644 index 0000000..a9d05b1 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HmsFollower.java @@ -0,0 +1,275 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + <p> + http://www.apache.org/licenses/LICENSE-2.0 + <p> + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.sentry.service.thrift; + +import com.google.common.annotations.VisibleForTesting; + +import java.net.SocketException; + +import java.util.Collection; +import javax.jdo.JDODataStoreException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.sentry.binding.hive.conf.HiveAuthzConf; +import org.apache.sentry.provider.db.service.persistent.PathsImage; +import org.apache.sentry.provider.db.service.persistent.SentryStore; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * HmsFollower is the thread which follows the Hive MetaStore state changes from Sentry. + * It gets the full update and notification logs from HMS and applies it to + * update permissions stored in Sentry using SentryStore and also update the < obj,path > state + * stored for HDFS-Sentry sync. + */ +public class HmsFollower implements Runnable, AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(HmsFollower.class); + private static boolean connectedToHms = false; + private final SentryHmsClient client; + private final Configuration authzConf; + private final SentryStore sentryStore; + private final NotificationProcessor notificationProcessor; + + private final LeaderStatusMonitor leaderMonitor; + + /** + * Configuring Hms Follower thread. + * + * @param conf sentry configuration + * @param store sentry store + * @param leaderMonitor singleton instance of LeaderStatusMonitor + */ + HmsFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor, + HiveSimpleConnectionFactory hiveConnectionFactory) { + this(conf, store, leaderMonitor, hiveConnectionFactory, null); + } + + @VisibleForTesting + /** + * Constructor should be used only for testing purposes. + * + * @param conf sentry configuration + * @param store sentry store + * @param leaderMonitor + * @param authServerName Server that sentry is Authorizing + */ + HmsFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor, + HiveSimpleConnectionFactory hiveConnectionFactory, String authServerName) { + LOGGER.info("HmsFollower is being initialized"); + authzConf = conf; + this.leaderMonitor = leaderMonitor; + sentryStore = store; + if (authServerName == null) { + HiveConf hiveConf = new HiveConf(); + authServerName = hiveConf.get(HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME.getVar()); + } + notificationProcessor = new NotificationProcessor(sentryStore, authServerName, authzConf); + client = new SentryHmsClient(authzConf, hiveConnectionFactory); + } + + @VisibleForTesting + public static boolean isConnectedToHms() { + return connectedToHms; + } + + @Override + public void close() { + if (client != null) { + // Close any outstanding connections to HMS + try { + client.disconnect(); + } catch (Exception failure) { + LOGGER.error("Failed to close the Sentry Hms Client", failure); + } + } + } + + @Override + public void run() { + long lastProcessedNotificationId; + try { + // Initializing lastProcessedNotificationId based on the latest persisted notification ID. + lastProcessedNotificationId = sentryStore.getLastProcessedNotificationID(); + } catch (Exception e) { + LOGGER.error("Failed to get the last processed notification id from sentry store, " + + "Skipping the processing", e); + return; + } + // Wake any clients connected to this service waiting for HMS already processed notifications. + wakeUpWaitingClientsForSync(lastProcessedNotificationId); + // Only the leader should listen to HMS updates + if ((leaderMonitor != null) && !leaderMonitor.isLeader()) { + // Close any outstanding connections to HMS + close(); + return; + } + syncupWithHms(lastProcessedNotificationId); + } + + /** + * Processes new Hive Metastore notifications. + * + * <p>If no notifications are processed yet, then it + * does a full initial snapshot of the Hive Metastore followed by new notifications updates that + * could have happened after it. + * + * <p>Clients connections waiting for an event notification will be + * woken up afterwards. + */ + private void syncupWithHms(long notificationId) { + try { + client.connect(); + connectedToHms = true; + } catch (Throwable e) { + LOGGER.error("HmsFollower cannot connect to HMS!!", e); + return; + } + + try { + long lastProcessedNotificationId = notificationId; + // Create a full HMS snapshot if there is none + // Decision of taking full snapshot is based on AuthzPathsMapping information persisted + // in the sentry persistent store. If AuthzPathsMapping is empty, snapshot is needed. + if (sentryStore.isAuthzPathsMappingEmpty()) { + lastProcessedNotificationId = createFullSnapshot(); + if (lastProcessedNotificationId == SentryStore.EMPTY_NOTIFICATION_ID) { + return; + } + } + // Get the new notification from HMS and process them. + processNotifications(client.getNotifications(lastProcessedNotificationId)); + } catch (TException e) { + // If the underlying exception is around socket exception, + // it is better to retry connection to HMS + if (e.getCause() instanceof SocketException) { + LOGGER.error("Encountered Socket Exception during fetching Notification entries," + + " will attempt to reconnect to HMS after configured interval", e); + close(); + } else { + LOGGER.error("ThriftException occurred communicating with HMS", e); + } + } catch (Throwable t) { + // catching errors to prevent the executor to halt. + LOGGER.error("Exception in HmsFollower! Caused by: " + t.getMessage(), + t); + } + } + + /** + * Request for full snapshot and persists it if there is no snapshot available in the + * sentry store. Also, wakes-up any waiting clients. + * + * @return ID of last notification processed. + * @throws Exception if there are failures + */ + private long createFullSnapshot() throws Exception { + LOGGER.debug("Attempting to take full HMS snapshot"); + PathsImage snapshotInfo = client.getFullSnapshot(); + if (snapshotInfo.getPathImage().isEmpty()) { + return snapshotInfo.getId(); + } + try { + LOGGER.debug("Persisting HMS path full snapshot"); + sentryStore.persistFullPathsImage(snapshotInfo.getPathImage()); + sentryStore.persistLastProcessedNotificationID(snapshotInfo.getId()); + } catch (Exception failure) { + LOGGER.error("Received exception while persisting HMS path full snapshot "); + throw failure; + } + // Wake up any HMS waiters that could have been put on hold before getting the + // eventIDBefore value. + wakeUpWaitingClientsForSync(snapshotInfo.getId()); + // HmsFollower connected to HMS and it finished full snapshot if that was required + // Log this message only once + LOGGER.info("Sentry HMS support is ready"); + return snapshotInfo.getId(); + } + + /** + * Process the collection of notifications and wake up any waiting clients. + * Also, persists the notification ID regardless of processing result. + * + * @param events list of event to be processed + * @throws Exception if the complete notification list is not processed because of JDO Exception + */ + void processNotifications(Collection<NotificationEvent> events) throws Exception { + boolean isNotificationProcessed; + if (events.isEmpty()) { + return; + } + for (NotificationEvent event : events) { + isNotificationProcessed = false; + try { + // Only the leader should process the notifications + if ((leaderMonitor != null) && !leaderMonitor.isLeader()) { + return; + } + isNotificationProcessed = notificationProcessor.processNotificationEvent(event); + } catch (Exception e) { + if (e.getCause() instanceof JDODataStoreException) { + LOGGER.info("Received JDO Storage Exception, Could be because of processing " + + "duplicate notification"); + if (event.getEventId() <= sentryStore.getLastProcessedNotificationID()) { + // Rest of the notifications need not be processed. + LOGGER.error("Received event with Id: {} which is smaller then the ID " + + "persisted in store", event.getEventId()); + break; + } + } else { + LOGGER.error("Processing the notification with ID:{} failed with exception {}", + event.getEventId(), e); + } + } + if (!isNotificationProcessed) { + try { + // Update the notification ID in the persistent store even when the notification is + // not processed as the content in in the notification is not valid. + // Continue processing the next notification. + LOGGER.debug("Explicitly Persisting Notification ID:{}", event.getEventId()); + sentryStore.persistLastProcessedNotificationID(event.getEventId()); + } catch (Exception failure) { + LOGGER.error("Received exception while persisting the notification ID " + + event.getEventId()); + throw failure; + } + } + // Wake up any HMS waiters that are waiting for this ID. + wakeUpWaitingClientsForSync(event.getEventId()); + } + } + + /** + * Wakes up HMS waiters waiting for a specific event notification. + * + * @param eventId Id of a notification + */ + private void wakeUpWaitingClientsForSync(long eventId) { + CounterWait counterWait = sentryStore.getCounterWait(); + + // Wake up any HMS waiters that are waiting for this ID. + // counterWait should never be null, but tests mock SentryStore and a mocked one + // doesn't have it. + if (counterWait != null) { + counterWait.update(eventId); + } + } +}