SENTRY-2109: Fix the logic of identifying HMS out of Sync and handle gaps and out-of-sequence notifications.(Kalyan Kumar kalvagadda, reviewed-by Vadim Spector, Na Li and Arjun Mishra)
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/afcaa499 Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/afcaa499 Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/afcaa499 Branch: refs/heads/master Commit: afcaa4997afae428522e7f0d7fb5917e9e58580d Parents: 1b71cfb Author: Kalyan Kumar Kalvagadda <kkal...@cloudera.com> Authored: Thu Feb 1 16:52:15 2018 -0600 Committer: Kalyan Kumar Kalvagadda <kkal...@cloudera.com> Committed: Thu Feb 1 16:52:15 2018 -0600 ---------------------------------------------------------------------- .../exception/SentryOutOfSyncException.java | 26 +++ .../db/service/persistent/HMSFollower.java | 104 +++------ .../db/service/persistent/SentryStore.java | 30 ++- .../service/thrift/HiveNotificationFetcher.java | 204 +++++++++++++++-- .../sentry/service/thrift/SentryService.java | 4 + .../sentry/service/thrift/ServiceConstants.java | 6 + .../db/service/persistent/TestHMSFollower.java | 86 ++++--- .../TestHMSFollowerSentryStoreIntegration.java | 4 +- .../db/service/persistent/TestSentryStore.java | 32 +-- .../thrift/TestHiveNotificationFetcher.java | 222 ++++++++++++++++++- .../TestHiveNotificationFetcherCache.java | 203 +++++++++++++++++ .../e2e/dbprovider/TestSnapshotCreation.java | 87 ++++++++ ...tSnapshotCreationWithShorterHMSEventTtl.java | 105 +++++++++ ...shotWithLongerHMSFollowerLongerInterval.java | 114 ++++++++++ .../tests/e2e/hdfs/TestHDFSIntegrationBase.java | 43 +++- 15 files changed, 1107 insertions(+), 163 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryOutOfSyncException.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryOutOfSyncException.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryOutOfSyncException.java new file mode 100644 index 0000000..d2b1945 --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryOutOfSyncException.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.core.common.exception; + +public class SentryOutOfSyncException extends Exception { + private static final long serialVersionUID = 1L; + public SentryOutOfSyncException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java index 2f2b984..45e4305 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java @@ -18,18 +18,19 @@ package org.apache.sentry.provider.db.service.persistent; -import org.apache.sentry.core.common.utils.PubSub; -import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; - import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME; import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME_DEPRECATED; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; + import java.util.Collection; -import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import javax.jdo.JDODataStoreException; + +import org.apache.sentry.core.common.exception.SentryOutOfSyncException; +import org.apache.sentry.core.common.utils.PubSub; +import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.thrift.TException; @@ -109,7 +110,7 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { notificationProcessor = new NotificationProcessor(sentryStore, authServerName, authzConf); client = new SentryHMSClient(authzConf, hiveConnectionFactory); hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabledNoCache(authzConf); // no cache to test different settings for hdfs sync - notificationFetcher = new HiveNotificationFetcher(sentryStore, hiveConnectionFactory); + notificationFetcher = new HiveNotificationFetcher(sentryStore, hiveConnectionFactory, authzConf); // subscribe to full update notification if (conf.getBoolean(ServerConfig.SENTRY_SERVICE_FULL_UPDATE_PUBSUB, false)) { @@ -147,25 +148,25 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { @Override public void run() { SentryStateBank.enableState(HMSFollowerState.COMPONENT,HMSFollowerState.STARTED); - long lastProcessedNotificationId; + long maxNotificationId; try { try { - // Initializing lastProcessedNotificationId based on the latest persisted notification ID. - lastProcessedNotificationId = sentryStore.getLastProcessedNotificationID(); + // Initializing maxNotificationId based on the latest persisted notification ID. + maxNotificationId = sentryStore.getMaxNotificationID(); } catch (Exception e) { LOGGER.error("Failed to get the last processed notification id from sentry store, " + "Skipping the processing", e); return; } // Wake any clients connected to this service waiting for HMS already processed notifications. - wakeUpWaitingClientsForSync(lastProcessedNotificationId); + wakeUpWaitingClientsForSync(maxNotificationId); // Only the leader should listen to HMS updates if (!isLeader()) { // Close any outstanding connections to HMS close(); return; } - syncupWithHms(lastProcessedNotificationId); + syncupWithHms(maxNotificationId); } finally { SentryStateBank.disableState(HMSFollowerState.COMPONENT,HMSFollowerState.STARTED); } @@ -189,8 +190,9 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { * * <p>Clients connections waiting for an event notification will be * woken up afterwards. + * @param maxNotificationId Max of all event-id's that sentry has processed. */ - private void syncupWithHms(long notificationId) { + private void syncupWithHms(long maxNotificationId) { try { client.connect(); connectedToHms = true; @@ -201,18 +203,17 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { } try { + Collection<NotificationEvent> notifications; /* Before getting notifications, it checks if a full HMS snapshot is required. */ - if (isFullSnapshotRequired(notificationId)) { + if (isFullSnapshotRequired(maxNotificationId)) { createFullSnapshot(); return; } - - Collection<NotificationEvent> notifications = - notificationFetcher.fetchNotifications(notificationId); - - // After getting notifications, it checks if the HMS did some clean-up and notifications - // are out-of-sync with Sentry. - if (areNotificationsOutOfSync(notifications, notificationId)) { + try { + notifications = notificationFetcher.fetchNotifications(maxNotificationId); + } catch (SentryOutOfSyncException e) { + LOGGER.error("An error occurred while fetching HMS notifications: {}", + e.getMessage()); createFullSnapshot(); return; } @@ -225,7 +226,7 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { } // Continue with processing new notifications if no snapshots are done. - processNotifications(notifications); + processNotifications(notifications, maxNotificationId); } catch (TException e) { LOGGER.error("An error occurred while fetching HMS notifications: ", e); close(); @@ -284,52 +285,6 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { } /** - * Checks if the HMS and Sentry processed notifications are out-of-sync. - * This could happen because the HMS did some clean-up of old notifications - * and Sentry was not requesting notifications during that time. - * - * @param events All new notifications to check for an out-of-sync. - * @param latestProcessedId The latest notification processed by Sentry to check against the - * list of notifications events. - * @return True if an out-of-sync is found; False otherwise. - */ - private boolean areNotificationsOutOfSync(Collection<NotificationEvent> events, - long latestProcessedId) { - if (events.isEmpty()) { - return false; - } - - /* - * If the sequence of notifications has a gap, then an out-of-sync might - * have happened due to the following issue: - * - * - HDFS sync was disabled or Sentry was shutdown for a time period longer than - * the HMS notification clean-up thread causing old notifications to be deleted. - * - * HMS notifications may contain both gaps in the sequence and duplicates - * (the same ID repeated more then once for different events). - * - * To accept duplicates (see NotificationFetcher for more info), then a gap is found - * if the 1st notification received is higher than the current ID processed + 1. - * i.e. - * 1st ID = 3, latest ID = 3 (duplicate found but no gap detected) - * 1st ID = 4, latest ID = 3 (consecutive ID found but no gap detected) - * 1st ID = 5, latest ID = 3 (a gap is detected) - */ - - List<NotificationEvent> eventList = (List<NotificationEvent>) events; - long firstNotificationId = eventList.get(0).getEventId(); - - if (firstNotificationId > (latestProcessedId + 1)) { - LOGGER.info("First HMS event notification Id = {} is greater than latest Sentry processed" - + "notification Id = {} + 1. Need to request a full HMS snapshot.", firstNotificationId, latestProcessedId); - return true; - } - - return false; - } - - /** * Request for full snapshot and persists it if there is no snapshot available in the sentry * store. Also, wakes-up any waiting clients. * @@ -392,16 +347,27 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { * Also, persists the notification ID regardless of processing result. * * @param events list of event to be processed + * @param notificationId Max event-id that sentry processed so far. * @throws Exception if the complete notification list is not processed because of JDO Exception */ - public void processNotifications(Collection<NotificationEvent> events) throws Exception { + public void processNotifications(Collection<NotificationEvent> events, long notificationId) throws Exception { boolean isNotificationProcessed; + long eventIdProcessed = notificationId; if (events.isEmpty()) { return; } for (NotificationEvent event : events) { isNotificationProcessed = false; + if (eventIdProcessed > 0) { + if (eventIdProcessed == event.getEventId()) { + LOGGER.info("Processing event with Duplicate event-id: {}", eventIdProcessed); + } else if (eventIdProcessed != event.getEventId() - 1) { + LOGGER.info("Events between ID's " + eventIdProcessed + " and " + + event.getEventId() + " are either missing OR out of order"); + } + } + eventIdProcessed = event.getEventId(); try { // Only the leader should process the notifications if (!isLeader()) { @@ -409,11 +375,12 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { return; } isNotificationProcessed = notificationProcessor.processNotificationEvent(event); + notificationFetcher.updateCache(event); } catch (Exception e) { if (e.getCause() instanceof JDODataStoreException) { LOGGER.info("Received JDO Storage Exception, Could be because of processing " + "duplicate notification"); - if (event.getEventId() <= sentryStore.getLastProcessedNotificationID()) { + if (event.getEventId() <= sentryStore.getMaxNotificationID()) { // Rest of the notifications need not be processed. LOGGER.error("Received event with Id: {} which is smaller then the ID " + "persisted in store", event.getEventId()); @@ -431,6 +398,7 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { // Continue processing the next notification. LOGGER.debug("Explicitly Persisting Notification ID = {} ", event.getEventId()); sentryStore.persistLastProcessedNotificationID(event.getEventId()); + notificationFetcher.updateCache(event); } catch (Exception failure) { LOGGER.error("Received exception while persisting the notification ID = {}", event.getEventId()); throw failure; http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java index edea5b6..03a6f45 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java @@ -40,6 +40,7 @@ import javax.jdo.PersistenceManager; import javax.jdo.PersistenceManagerFactory; import javax.jdo.Query; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.sentry.core.common.exception.SentryAccessDeniedException; @@ -467,7 +468,7 @@ public class SentryStore { @Override public Long getValue() { try { - return getLastProcessedNotificationID(); + return getMaxNotificationID(); } catch (Exception e) { LOGGER.error("Can not read current notificationId", e); return NOTIFICATION_UNKNOWN; @@ -552,7 +553,7 @@ public class SentryStore { } @VisibleForTesting - void clearAllTables() { + public void clearAllTables() { try { tm.executeTransaction( new TransactionBlock<Object>() { @@ -2803,7 +2804,7 @@ public class SentryStore { * * @return the last persisted snapshot ID. It returns 0 if no rows are found. */ - private long getCurrentAuthzPathsSnapshotID() throws Exception { + public long getCurrentAuthzPathsSnapshotID() throws Exception { return tm.executeTransaction( new TransactionBlock<Long>() { @Override @@ -3886,7 +3887,7 @@ public class SentryStore { * @return the notification ID of latest path change. If no change * found then return 0. */ - public Long getLastProcessedNotificationID() throws Exception { + public Long getMaxNotificationID() throws Exception { long notificationId = tm.executeTransaction( new TransactionBlock<Long>() { public Long execute(PersistenceManager pm) throws Exception { @@ -4235,4 +4236,25 @@ public class SentryStore { } }); } + + /** + * Checks if notification with particular ID was already processed by searching + * for the ID on the MSentryHmsNotification table. + * + * @param id: event_id of the notification event. + * @return True if the notification was already processed; False otherwise + */ + public boolean isNotificationIdProcessed(long id) throws Exception { + return tm.executeTransactionWithRetry(new TransactionBlock<Boolean>() { + @Override + public Boolean execute(PersistenceManager pm) throws Exception { + pm.setDetachAllOnCommit(false); + Query query = pm.newQuery(MSentryHmsNotification.class); + query.setFilter("this.notificationId == id"); + query.declareParameters("long id"); + List<MSentryHmsNotification> ids = (List<MSentryHmsNotification>) query.execute(id); + return (CollectionUtils.isEmpty(ids)) ? false : true; + } + }); + } } http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java index 93cc34f..30d6f50 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java @@ -18,10 +18,18 @@ package org.apache.sentry.service.thrift; +import static com.codahale.metrics.MetricRegistry.name; + +import com.codahale.metrics.Counter; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; + import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Set; +import java.util.HashSet; + +import org.apache.commons.collections.map.LRUMap; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; @@ -29,6 +37,8 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.sentry.hdfs.UniquePathsUpdate; import org.apache.sentry.provider.db.service.persistent.SentryStore; +import org.apache.sentry.core.common.exception.SentryOutOfSyncException; +import org.apache.sentry.provider.db.service.thrift.SentryMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,14 +51,54 @@ public final class HiveNotificationFetcher implements AutoCloseable { private final SentryStore sentryStore; private final HiveConnectionFactory hmsConnectionFactory; private HiveMetaStoreClient hmsClient; + /* + This value helps HiveNotificationFetcher to decide the notification-id to use while fetching the notifications from + HMS to handle out-of-sequence notifications. + */ + private final int refetchCount; + + /* + In each iteration, HMSFollower will try to fetch notification which has the Max(event-id) + among the notifications it already fetched. This flag is used to check if that notification + is fetched in subsequent request. It's initialized to false in every iteration if sentry has fetched notifications + in any one the previous fetches. + */ + private boolean foundLastProcessedNotification = false; + /* + Idea of this cache is to store the notification event id and hash to avoid database lookup as there will be a lot + of notifications that are fetched again and again to handle out-of-sync notifications. Least recently used entries + in this cache are evicted automatically once the map is full. + */ + private final LRUMap cache; + + /* + Cache is designed to hold all the events with event-id's in the range of MAX(event-id) (that sentry processed) and + MAX(event-id) - (notification fetch count) from the configuration. Theoretically, all of these event-id's could have + duplicates. Cache size should be sufficient to hold all of them. This factor is used to calculate the size of cache. + */ + public static final int CACHE_BUFFER_FACTOR = 10; + +/* + This value is used to build a cache. Cache anticipates below duplicates and pre-allocated memory. + */ + private static final int DUPLICATE_COUNT = 3; - /* The following cache and last filtered ID help us to avoid making less calls to the DB */ - private long lastIdFiltered = 0; - private Set<String> cache = new HashSet<>(); + // Counter for failed transactions + private static final Counter notificationCacheMissCount = + SentryMetrics.getInstance(). + getCounter(name(HiveNotificationFetcher.class, + "Cache Miss")); - public HiveNotificationFetcher(SentryStore sentryStore, HiveConnectionFactory hmsConnectionFactory) { + public HiveNotificationFetcher(SentryStore sentryStore, HiveConnectionFactory hmsConnectionFactory, + Configuration conf) { + refetchCount = conf.getInt(ServiceConstants.ServerConfig.SENTRY_HMS_NOTIFICATION_REFETCH_COUNT, + ServiceConstants.ServerConfig.SENTRY_HMS_NOTIFICATION_REFETCH_COUNT_DEFAULT); this.sentryStore = sentryStore; this.hmsConnectionFactory = hmsConnectionFactory; + /* + Size of map is dependent on SENTRY_HMS_NOTIFICATION_REFETCH_COUNT and some additional buffer. + */ + this.cache = new LRUMap(refetchCount + (refetchCount / CACHE_BUFFER_FACTOR)); } /** @@ -64,12 +114,82 @@ public final class HiveNotificationFetcher implements AutoCloseable { } /** + * Update cache with notification hash and event-id + * @param event Notification event + */ + public void updateCache(NotificationEvent event) { + HashSet<String> eventSet; + String hash = UniquePathsUpdate.sha1(event); + + if(Strings.isNullOrEmpty(hash)) { + LOGGER.error("Hash provided is either null/empty, Cache not updated"); + } + eventSet = (HashSet<String>)cache.get(event.getEventId()); + if(eventSet != null) { + eventSet.add(hash); + } else { + eventSet = new HashSet<>(DUPLICATE_COUNT, 1); + eventSet.add(hash); + cache.put(event.getEventId(), eventSet); + } + } + + /** + * Find if the Notification is found in Cache. + * @param eventId notification event id + * @param hash notification hash + * @return True, if the hash is found in Cache + * False, otherwise + */ + @VisibleForTesting + boolean isFoundInCache(long eventId, String hash) { + HashSet<String> eventSet; + eventSet = (HashSet<String>)cache.get(eventId); + + if(eventSet == null) { + return false; + } else { + return eventSet.contains(hash); + } + } + + /** + * Find if the Notification event-id is found in Cache. + * @param eventId notification event id. + * @return True, if the event-id is found in Cache + * False, otherwise + */ + boolean isFoundInCache(long eventId) { + return cache.get(eventId) != null; + } + + /** + * Get the Cache size + * @return size of the Cache + */ + @VisibleForTesting + int getCacheSize() { + return cache.size(); + } + + /** * Fetch new HMS notifications appeared since a specified event ID. The returned list may * include notifications with the same specified ID if they were not seen by Sentry. + * In each iteration HiveNotificationFetcher will get the Max(event-id) that sentry has processed and tries to go back + * by configured number of event-ids and re-fetches them. Idea is that Max(event-id) should be fetched in subsequent + * fetch. If not, sentry should consider create full snapshot.This could happen for several scenario's. + * <ul> + * <li>HMS is been restored from a snapshot taken in the past</li> + * <li>Sentry did not fetch notifications from for a while. One use case is HDFS Sync being disabled</li> + * <li>HMS is intentionally reset</li> + * <li>NOTIFICATION_LOG table is cleared</li> + * </ul> * * @param lastEventId The event ID to use to request notifications. * @param maxEvents The maximum number of events to fetch. * @return A list of newer notifications unseen by Sentry. + * @throws SentryOutOfSyncException If event with event-id equals to Max(event-id) processed by sentry is not received + * in HMS response. * @throws Exception If an error occurs on the HMS communication. */ List<NotificationEvent> fetchNotifications(long lastEventId, int maxEvents) throws Exception { @@ -84,16 +204,41 @@ public final class HiveNotificationFetcher implements AutoCloseable { * * TODO: We can avoid doing this once HIVE-16886 is fixed. */ - if (lastEventId > 0) { - filter = createNotificationFilterFor(lastEventId); - lastEventId--; + + // Value of lastEventId has to be retained for logging purposes, making a copy. + long minFetchId = lastEventId; + if (minFetchId > 0) { + filter = createNotificationFilterFor(minFetchId); + minFetchId = (minFetchId > refetchCount) ? (minFetchId - refetchCount) : 0; + foundLastProcessedNotification = false; + } else { + foundLastProcessedNotification = true; } - LOGGER.debug("Requesting HMS notifications since ID = {}", lastEventId); + LOGGER.debug("Requesting HMS notifications since ID = {} Max(Event-id) processed: {}", minFetchId, lastEventId); NotificationEventResponse response; try { - response = getHmsClient().getNextNotification(lastEventId, maxEvents, filter); + /** + * Logic below triggers full-snapshots in below situation. + * 1. When HMS response with notification events does not have the event with Max(Event-id) processed by sentry + * and the event-id of the first event in the response is not equal to Max(Event-id)+1 processed by sentry + * Logic below will not trigger full-snapshots in below situations. + * 1. When the response doesn't have ay notification events. This will be seen in situations where there no changes + * to HMS data for a white and all the notification events are evicted from the NOTIFICATION_LOG table because of + * TTL expiration. + * 2. When HMS response with notification events does have the event with Max(Event-id) processed by sentry. + */ + response = getHmsClient().getNextNotification(minFetchId, maxEvents, filter); + if ((response != null) && (response.getEventsSize() > 0) && !foundLastProcessedNotification && + (response.getEvents().get(0).getEventId() != (lastEventId + 1))) { + LOGGER.error("Max event-id processed by sentry is " + lastEventId + " but the " + + "Id of the first event received from HMS in subsequent fetch is " + + response.getEvents().get(0).getEventId() + ", Requesting for Full snapshot"); + //Full snapshot should be requested. + throw new SentryOutOfSyncException("Notification Log doesn't have the " + + "last notification processed by sentry"); + } } catch (Exception e) { close(); throw e; @@ -121,27 +266,31 @@ public final class HiveNotificationFetcher implements AutoCloseable { * specified ID. If a new filter ID is used, then we clean up the cache. */ - if (lastIdFiltered != id) { - lastIdFiltered = id; - cache.clear(); - } - return new NotificationFilter() { @Override public boolean accept(NotificationEvent notificationEvent) { - if (notificationEvent.getEventId() == id) { + LOGGER.debug("Applying filter created for event-id {} on Event with ID:{}", id, notificationEvent.getEventId()); + if (notificationEvent.getEventId() <= id) { String hash = UniquePathsUpdate.sha1(notificationEvent); - try { - if (cache.contains(hash) || sentryStore.isNotificationProcessed(hash)) { - cache.add(hash); - - LOGGER.debug("Ignoring HMS notification already processed: ID = {}", id); + // This check makes sure that the last notification processed by + // HMSFollower is present in NOTIFICATION_LOG table. If it is not found + // it can be assumed that there were events that were cleaned up before + // Sentry could fetch them. When this happens sentry should take full snapshot again. + if ((notificationEvent.getEventId() == id) && !foundLastProcessedNotification) { + foundLastProcessedNotification = true; + } + if (isFoundInCache(notificationEvent.getEventId(), hash) == true) { + LOGGER.debug("Ignoring HMS notification already processed: ID = {}", notificationEvent.getEventId()); + return false; + } else if (sentryStore.isNotificationProcessed(hash)) { + notificationCacheMissCount.inc(); + LOGGER.debug("Ignoring HMS notification already processed: ID = {} - cache miss", notificationEvent.getEventId()); return false; } } catch (Exception e) { LOGGER.error("An error occurred while checking if notification {} is already " - + "processed: {}", id, e.getMessage()); + + "processed: {}", notificationEvent.getEventId(), e); // We cannot throw an exception on this filter, so we return false assuming this // notification is already processed @@ -203,9 +352,16 @@ public final class HiveNotificationFetcher implements AutoCloseable { hmsClient.close(); } - cache.clear(); } finally { hmsClient = null; } } + + /** + * Gets notification cache miss count. + * @return notification cache miss count. + */ + public long getNotificationCacheMissCount() { + return notificationCacheMissCount.getCount(); + } } http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java index 96c6810..827b078 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java @@ -642,4 +642,8 @@ public class SentryService implements Callable, SigUtils.SigListener { // Become follower leaderMonitor.deactivate(); } + @VisibleForTesting + public long getCurrentAuthzPathsSnapshotID() throws Exception { + return sentryStore.getCurrentAuthzPathsSnapshotID(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java index 7e02874..d16d2fd 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java @@ -241,6 +241,12 @@ public class ServiceConstants { public static final int SENTRY_DELTA_KEEP_COUNT_DEFAULT = 200; /** + * Number of notifications that HMSFollower should re-fetch in periodic pull from HMS. + */ + public static final String SENTRY_HMS_NOTIFICATION_REFETCH_COUNT = "sentry_hms_notification_refetch_count"; + public static final int SENTRY_HMS_NOTIFICATION_REFETCH_COUNT_DEFAULT = 100; + + /** * Number of notification id's to keep around during cleaning */ public static final String SENTRY_HMS_NOTIFICATION_ID_KEEP_COUNT = "sentry.server.delta.keep.count"; http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java index 7903078..22ec61b 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java @@ -64,6 +64,8 @@ import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import javax.security.auth.login.LoginException; @@ -128,7 +130,7 @@ public class TestHMSFollower { hmsFollower.setSentryHmsClient(sentryHmsClient); // 1st run should get a full snapshot because AuthzPathsMapping is empty - when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); + when(sentryStore.getMaxNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(true); when(sentryStore.isHmsNotificationEmpty()).thenReturn(true); hmsFollower.run(); @@ -140,7 +142,7 @@ public class TestHMSFollower { reset(sentryStore); // 2nd run should not get a snapshot because is already processed - when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId()); + when(sentryStore.getMaxNotificationID()).thenReturn(fullSnapshot.getId()); when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); hmsFollower.run(); verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong()); @@ -198,7 +200,7 @@ public class TestHMSFollower { hmsFollower.setSentryHmsClient(sentryHmsClient); // 1st run should get a full snapshot because AuthzPathsMapping is empty - when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); + when(sentryStore.getMaxNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(true); when(sentryStore.isHmsNotificationEmpty()).thenReturn(true); hmsFollower.run(); @@ -210,7 +212,7 @@ public class TestHMSFollower { reset(sentryStore); // 2nd run should not get a snapshot because is already processed - when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId()); + when(sentryStore.getMaxNotificationID()).thenReturn(fullSnapshot.getId()); when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); hmsFollower.run(); verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong()); @@ -222,7 +224,7 @@ public class TestHMSFollower { // but because of full update trigger it will, as in the first run PubSub.getInstance().publish(PubSub.Topic.HDFS_SYNC_HMS, "message"); - when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId()); + when(sentryStore.getMaxNotificationID()).thenReturn(fullSnapshot.getId()); when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); hmsFollower.run(); verify(sentryStore, times(1)).persistFullPathsImage( @@ -233,7 +235,7 @@ public class TestHMSFollower { // 4th run should not get a snapshot because is already processed and publish-subscribe // trigger is only supposed to work once. This is exactly as 2nd run. - when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId()); + when(sentryStore.getMaxNotificationID()).thenReturn(fullSnapshot.getId()); when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); hmsFollower.run(); verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong()); @@ -284,7 +286,7 @@ public class TestHMSFollower { // 1st run should get a full snapshot because hms notificaions is empty // but it should never be persisted because HDFS sync is disabled - when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); + when(sentryStore.getMaxNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); when(sentryStore.isHmsNotificationEmpty()).thenReturn(true); hmsFollower.run(); verify(sentryStore, times(0)).persistFullPathsImage( @@ -309,7 +311,7 @@ public class TestHMSFollower { //Set last processed notification Id to match the full new value 1L final long LATEST_EVENT_ID = 1L; - when(sentryStore.getLastProcessedNotificationID()).thenReturn(LATEST_EVENT_ID); + when(sentryStore.getMaxNotificationID()).thenReturn(LATEST_EVENT_ID); //Mock that sets isHmsNotificationEmpty to false when(sentryStore.isHmsNotificationEmpty()).thenReturn(false); // Mock that sets the current HMS notification ID. Set it to match @@ -369,7 +371,7 @@ public class TestHMSFollower { hmsFollower.setSentryHmsClient(sentryHmsClient); // 1st run should get a full snapshot - when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); + when(sentryStore.getMaxNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); hmsFollower.run(); verify(sentryStore, times(1)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong()); @@ -378,7 +380,7 @@ public class TestHMSFollower { reset(sentryStore); // 2nd run should not get a snapshot because is already processed - when(sentryStore.getLastProcessedNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID); + when(sentryStore.getMaxNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID); when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); hmsFollower.run(); verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong()); @@ -416,20 +418,33 @@ public class TestHMSFollower { SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class); when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot); - when(hmsClientMock.getNextNotification(Mockito.eq(SENTRY_PROCESSED_EVENT_ID - 1), Mockito.eq(Integer.MAX_VALUE), - (NotificationFilter) Mockito.notNull())) - .thenReturn(new NotificationEventResponse( - Arrays.<NotificationEvent>asList( + when(hmsClientMock.getNextNotification(Mockito.anyLong(), Mockito.eq(Integer.MAX_VALUE), + (NotificationFilter) Mockito.notNull())).thenAnswer(new Answer<NotificationEventResponse>() { + @Override + public NotificationEventResponse answer(InvocationOnMock invocation) + throws Throwable { + NotificationFilter filter = (NotificationFilter) invocation.getArguments()[2]; + NotificationEventResponse response = new NotificationEventResponse(); + + List<NotificationEvent> events = Arrays.<NotificationEvent>asList( new NotificationEvent(fullSnapshot.getId(), 0, "", "") - ) - )); + ); + + for (NotificationEvent event : events) { + if (filter.accept(event)) { + response.addToEvents(event); + } + } + return response; + } + }); HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, hmsConnectionMock, hiveInstance); hmsFollower.setSentryHmsClient(sentryHmsClient); // 1st run should get a full snapshot - when(sentryStore.getLastProcessedNotificationID()) + when(sentryStore.getMaxNotificationID()) .thenReturn(SENTRY_PROCESSED_EVENT_ID); when(sentryStore.isHmsNotificationEmpty()).thenReturn(false); hmsFollower.run(); @@ -439,8 +454,11 @@ public class TestHMSFollower { reset(sentryStore); // 2nd run should not get a snapshot because is already processed - when(sentryStore.getLastProcessedNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID); + when(sentryStore.getMaxNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID); when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); + when(sentryStore.isNotificationIdProcessed(fullSnapshot.getId())).thenReturn(true); + when(sentryStore.isNotificationProcessed(UniquePathsUpdate.sha1(new NotificationEvent( + fullSnapshot.getId(), 0, "", "")))).thenReturn(true); hmsFollower.run(); verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong()); verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); @@ -525,7 +543,7 @@ public class TestHMSFollower { events.add(notificationEvent); HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, hiveConnectionFactory, hiveInstance); - hmsFollower.processNotifications(events); + hmsFollower.processNotifications(events, 0); TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); authorizable.setServer(hiveInstance); @@ -555,7 +573,7 @@ public class TestHMSFollower { HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, hiveConnectionFactory, hiveInstance); - hmsFollower.processNotifications(events); + hmsFollower.processNotifications(events, 0); TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); authorizable.setServer(hiveInstance); @@ -589,7 +607,7 @@ public class TestHMSFollower { HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, hiveConnectionFactory, hiveInstance); - hmsFollower.processNotifications(events); + hmsFollower.processNotifications(events, 0); TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); authorizable.setServer(hiveInstance); @@ -624,7 +642,7 @@ public class TestHMSFollower { HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, hiveConnectionFactory, hiveInstance); - hmsFollower.processNotifications(events); + hmsFollower.processNotifications(events, 0); TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); authorizable.setServer(hiveInstance); @@ -665,7 +683,7 @@ public class TestHMSFollower { HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, hiveConnectionFactory, hiveInstance); - hmsFollower.processNotifications(events); + hmsFollower.processNotifications(events, 0); TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); authorizable.setServer(hiveInstance); @@ -724,7 +742,7 @@ public class TestHMSFollower { events.add(notificationEvent); inputEventId += 1; // Process the notification - hmsFollower.processNotifications(events); + hmsFollower.processNotifications(events, 0); // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification // and persistLastProcessedNotificationID was not invoked. //noinspection unchecked @@ -748,7 +766,7 @@ public class TestHMSFollower { events.add(notificationEvent); inputEventId += 1; //Process the notification - hmsFollower.processNotifications(events); + hmsFollower.processNotifications(events, 0); // Make sure that addAuthzPathsMapping was invoked once to handle ADD_PARTITION notification // and persistLastProcessedNotificationID was not invoked. //noinspection unchecked @@ -768,7 +786,7 @@ public class TestHMSFollower { events.add(notificationEvent); inputEventId += 1; // Process the notification - hmsFollower.processNotifications(events); + hmsFollower.processNotifications(events, 0); // Make sure that persistLastProcessedNotificationID is invoked explicitly. verify(sentryStore, times(1)).persistLastProcessedNotificationID(inputEventId - 1); reset(sentryStore); @@ -786,7 +804,7 @@ public class TestHMSFollower { events.add(notificationEvent); inputEventId += 1; // Process the notification - hmsFollower.processNotifications(events); + hmsFollower.processNotifications(events, 0); // Make sure that updateAuthzPathsMapping was invoked once to handle ALTER_PARTITION // notification and persistLastProcessedNotificationID was not invoked. verify(sentryStore, times(1)).updateAuthzPathsMapping(Mockito.anyString(), @@ -809,7 +827,7 @@ public class TestHMSFollower { notificationEvent.setTableName(tableName2); events.add(notificationEvent); // Process the notification - hmsFollower.processNotifications(events); + hmsFollower.processNotifications(events, 0); // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification // and persistLastProcessedNotificationID was not invoked. //noinspection unchecked @@ -864,7 +882,7 @@ public class TestHMSFollower { events.add(notificationEvent); inputEventId += 1; // Process the notification - hmsFollower.processNotifications(events); + hmsFollower.processNotifications(events, 0); // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification // and persistLastProcessedNotificationID was not invoked. //noinspection unchecked @@ -889,7 +907,7 @@ public class TestHMSFollower { events.add(notificationEvent); inputEventId += 1; // Process the notification - hmsFollower.processNotifications(events); + hmsFollower.processNotifications(events, 0); // Make sure that renameAuthzObj and deleteAuthzPathsMapping were not invoked // to handle CREATE_TABLE notification // and persistLastProcessedNotificationID is explicitly invoked @@ -916,7 +934,7 @@ public class TestHMSFollower { notificationEvent.setTableName(tableName2); events.add(notificationEvent); // Process the notification - hmsFollower.processNotifications(events); + hmsFollower.processNotifications(events, 0); // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification // and persistLastProcessedNotificationID was not invoked. //noinspection unchecked @@ -970,7 +988,7 @@ public class TestHMSFollower { Configuration configuration = new Configuration(); HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, hiveConnectionFactory, hiveInstance); - hmsFollower.processNotifications(events); + hmsFollower.processNotifications(events, 0); // invalid event updates notification ID directly verify(sentryStore, times(1)).persistLastProcessedNotificationID(inputEventId - 1); @@ -1012,7 +1030,7 @@ public class TestHMSFollower { hmsFollower.setSentryHmsClient(sentryHmsClient); // 1st run should get a full snapshot because AuthzPathsMapping is empty - when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); + when(sentryStore.getMaxNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); when(sentryStore.isHmsNotificationEmpty()).thenReturn(true); hmsFollower.run(); @@ -1046,7 +1064,7 @@ public class TestHMSFollower { Configuration configuration = new Configuration(); HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, hiveConnectionFactory, hiveInstance); - hmsFollower.processNotifications(events); + hmsFollower.processNotifications(events, 0); TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); authorizable.setServer(hiveInstance); http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollowerSentryStoreIntegration.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollowerSentryStoreIntegration.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollowerSentryStoreIntegration.java index 91c90f9..996f554 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollowerSentryStoreIntegration.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollowerSentryStoreIntegration.java @@ -190,7 +190,7 @@ public class TestHMSFollowerSentryStoreIntegration { List<NotificationEvent> events = new ArrayList<>(); events.add(notificationEvent); - hmsFollower.processNotifications(events); + hmsFollower.processNotifications(events, 0); Assert.assertEquals(1, sentryStore.getAllTSentryPrivilegesByRoleName(roleName1) .size()); @@ -248,7 +248,7 @@ public class TestHMSFollowerSentryStoreIntegration { List<NotificationEvent> events = new ArrayList<>(); events.add(notificationEvent); - hmsFollower.processNotifications(events); + hmsFollower.processNotifications(events, 0); Assert.assertEquals(1, sentryStore.getAllTSentryPrivilegesByRoleName(roleName1) .size()); http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java index b410027..9a03f48 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java @@ -2466,7 +2466,7 @@ public class TestSentryStore extends org.junit.Assert { long notificationID = 11; sentryStore.persistFullPathsImage(authzPaths, notificationID); PathsUpdate pathsUpdate = sentryStore.retrieveFullPathsImageUpdate(prefixes); - long savedNotificationID = sentryStore.getLastProcessedNotificationID(); + long savedNotificationID = sentryStore.getMaxNotificationID(); assertEquals(1, pathsUpdate.getImgNum()); TPathsDump pathDump = pathsUpdate.toThrift().getPathsDump(); Map<Integer, TPathEntry> nodeMap = pathDump.getNodeMap(); @@ -2519,7 +2519,7 @@ public class TestSentryStore extends org.junit.Assert { sentryStore.addAuthzPathsMapping("db2", Arrays.asList("/hive/db2"), update2); // Check the latest persisted ID matches to both the path updates - long latestID = sentryStore.getLastProcessedNotificationID(); + long latestID = sentryStore.getMaxNotificationID(); assertEquals(notificationID, latestID); String []prefixes = {"/hive"}; @@ -2585,7 +2585,7 @@ public class TestSentryStore extends org.junit.Assert { sentryStore.persistLastProcessedNotificationID(notificationID); // Retrieving latest peristed ID should match with the previous persisted ID - long latestID = sentryStore.getLastProcessedNotificationID(); + long latestID = sentryStore.getMaxNotificationID(); assertEquals(notificationID, latestID); } @@ -2597,7 +2597,7 @@ public class TestSentryStore extends org.junit.Assert { sentryStore.persistFullPathsImage(new HashMap<String, Collection<String>>(), notificationID); // Add "db1.table1" authzObj - Long lastNotificationId = sentryStore.getLastProcessedNotificationID(); + Long lastNotificationId = sentryStore.getMaxNotificationID(); UniquePathsUpdate addUpdate = new UniquePathsUpdate("u1", 1, false); addUpdate.newPathChange("db1.table"). addToAddPaths(Arrays.asList("db1", "tbl1")); @@ -2624,7 +2624,7 @@ public class TestSentryStore extends org.junit.Assert { long lastChangeID = sentryStore.getLastProcessedPathChangeID(); MSentryPathChange addPathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); assertEquals(addUpdate.JSONSerialize(), addPathChange.getPathChange()); - lastNotificationId = sentryStore.getLastProcessedNotificationID(); + lastNotificationId = sentryStore.getMaxNotificationID(); assertEquals(1, lastNotificationId.longValue()); // Delete path 'db1.db/tbl1' from "db1.table1" authzObj. @@ -2649,7 +2649,7 @@ public class TestSentryStore extends org.junit.Assert { lastChangeID = sentryStore.getLastProcessedPathChangeID(); MSentryPathChange delPathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); assertEquals(delUpdate.JSONSerialize(), delPathChange.getPathChange()); - lastNotificationId = sentryStore.getLastProcessedNotificationID(); + lastNotificationId = sentryStore.getMaxNotificationID(); assertEquals(2, lastNotificationId.longValue()); // Delete "db1.table" authzObj from the authzObj -> [Paths] mapping. @@ -2674,7 +2674,7 @@ public class TestSentryStore extends org.junit.Assert { MSentryPathChange delAllPathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); assertEquals(delAllupdate.JSONSerialize(), delAllPathChange.getPathChange()); - lastNotificationId = sentryStore.getLastProcessedNotificationID(); + lastNotificationId = sentryStore.getMaxNotificationID(); assertEquals(3, lastNotificationId.longValue()); } @@ -2682,7 +2682,7 @@ public class TestSentryStore extends org.junit.Assert { @Test public void testRenameUpdateAuthzPathsMapping() throws Exception { Map<String, Collection<String>> authzPaths = new HashMap<>(); - Long lastNotificationId = sentryStore.getLastProcessedNotificationID(); + Long lastNotificationId = sentryStore.getMaxNotificationID(); authzPaths.put("db1.table1", Sets.newHashSet("user/hive/warehouse/db1.db/table1", "user/hive/warehouse/db1.db/table1/p1")); authzPaths.put("db1.table2", Sets.newHashSet("user/hive/warehouse/db1.db/table2")); @@ -2728,7 +2728,7 @@ public class TestSentryStore extends org.junit.Assert { long lastChangeID = sentryStore.getLastProcessedPathChangeID(); MSentryPathChange renamePathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); assertEquals(renameUpdate.JSONSerialize(), renamePathChange.getPathChange()); - lastNotificationId = sentryStore.getLastProcessedNotificationID(); + lastNotificationId = sentryStore.getMaxNotificationID(); assertEquals(1, lastNotificationId.longValue()); // Rename 'db1.table1' to "db1.table2" but did not change its location. renameUpdate = new UniquePathsUpdate("u2",2, false); @@ -2752,7 +2752,7 @@ public class TestSentryStore extends org.junit.Assert { assertTrue(CollectionUtils.isEqualCollection(Lists.newArrayList("user/hive/warehouse/db1.db/table1/p1", "user/hive/warehouse/db1.db/newTable1"), pathsImage.get("db1.newTable2"))); - lastNotificationId = sentryStore.getLastProcessedNotificationID(); + lastNotificationId = sentryStore.getMaxNotificationID(); assertEquals(2, lastNotificationId.longValue()); // Query the persisted path change and ensure it equals to the original one @@ -2790,7 +2790,7 @@ public class TestSentryStore extends org.junit.Assert { lastChangeID = sentryStore.getLastProcessedPathChangeID(); MSentryPathChange updatePathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); assertEquals(update.JSONSerialize(), updatePathChange.getPathChange()); - lastNotificationId = sentryStore.getLastProcessedNotificationID(); + lastNotificationId = sentryStore.getMaxNotificationID(); assertEquals(3, lastNotificationId.longValue()); } @@ -2914,7 +2914,7 @@ public class TestSentryStore extends org.junit.Assert { assertEquals(1, pathImage.get("db2.table").size()); assertEquals(3, sentryStore.getMPaths().size()); - Long lastNotificationId = sentryStore.getLastProcessedNotificationID(); + Long lastNotificationId = sentryStore.getMaxNotificationID(); assertEquals(notificationID, lastNotificationId.longValue()); } @@ -3419,7 +3419,7 @@ public class TestSentryStore extends org.junit.Assert { @Test public void testDuplicateNotification() throws Exception { Map<String, Collection<String>> authzPaths = new HashMap<>(); - Long lastNotificationId = sentryStore.getLastProcessedNotificationID(); + Long lastNotificationId = sentryStore.getMaxNotificationID(); lastNotificationId ++; authzPaths.put("db1.table1", Sets.newHashSet("user/hive/warehouse/db1.db/table1", @@ -3471,7 +3471,7 @@ public class TestSentryStore extends org.junit.Assert { long lastChangeID = sentryStore.getLastProcessedPathChangeID(); MSentryPathChange renamePathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); assertEquals(renameUpdate.JSONSerialize(), renamePathChange.getPathChange()); - Long savedLastNotificationId = sentryStore.getLastProcessedNotificationID(); + Long savedLastNotificationId = sentryStore.getMaxNotificationID(); assertEquals(lastNotificationId.longValue(), savedLastNotificationId.longValue()); @@ -3531,7 +3531,7 @@ public class TestSentryStore extends org.junit.Assert { localSentryStore.persistFullPathsImage(new HashMap<String, Collection<String>>(), 0); // Add "db1.table1" authzObj - Long lastNotificationId = sentryStore.getLastProcessedNotificationID(); + Long lastNotificationId = sentryStore.getMaxNotificationID(); UniquePathsUpdate addUpdate = new UniquePathsUpdate("u1",1, false); addUpdate.newPathChange("db1.table"). addToAddPaths(Arrays.asList("db1", "tbl1")); @@ -3601,7 +3601,7 @@ public class TestSentryStore extends org.junit.Assert { lastChangeID = localSentryStore.getLastProcessedPathChangeID(); assertEquals(0, lastChangeID); - lastNotificationId = localSentryStore.getLastProcessedNotificationID(); + lastNotificationId = localSentryStore.getMaxNotificationID(); assertEquals(0, lastNotificationId.longValue()); // enable HDFS for other tests http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java index 83a1bec..3a74b70 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java @@ -18,17 +18,20 @@ package org.apache.sentry.service.thrift; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.util.Arrays; import java.util.Collections; import java.util.List; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; -import org.apache.sentry.hdfs.UniquePathsUpdate; import org.apache.sentry.provider.db.service.persistent.SentryStore; import org.junit.Test; import org.mockito.Mockito; @@ -44,7 +47,7 @@ public class TestHiveNotificationFetcher { Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient)); - try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection)) { + try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, new Configuration())) { List<NotificationEvent> events; Mockito.when(hmsClient.getNextNotification(0, Integer.MAX_VALUE, null)) @@ -63,7 +66,7 @@ public class TestHiveNotificationFetcher { Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient)); - try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection)) { + try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, new Configuration())) { List<NotificationEvent> events; Mockito.when(hmsClient.getNextNotification(0, Integer.MAX_VALUE, null)) @@ -82,7 +85,7 @@ public class TestHiveNotificationFetcher { Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient)); - try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection)) { + try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, new Configuration())) { List<NotificationEvent> events; Mockito.when(hmsClient.getNextNotification(0, Integer.MAX_VALUE, null)) @@ -110,9 +113,11 @@ public class TestHiveNotificationFetcher { Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient)); - try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection)) { + try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, new Configuration())) { List<NotificationEvent> events; + // Updating the Cache in the notification + fetcher.updateCache(new NotificationEvent(1L, 0, "CREATE_DATABASE", "")); /* * Requesting an ID > 0 will request all notifications from 0 again but filter those * already seen notifications with ID = 1 @@ -134,13 +139,9 @@ public class TestHiveNotificationFetcher { ); for (NotificationEvent event : events) { - String hash = UniquePathsUpdate.sha1(event); - // We simulate that CREATE_DATABASE is already processed if (event.getEventType().equals("CREATE_DATABASE")) { - Mockito.when(store.isNotificationProcessed(Mockito.eq(hash))).thenReturn(true); - } else { - Mockito.when(store.isNotificationProcessed(Mockito.eq(hash))).thenReturn(false); + Mockito.when(store.isNotificationIdProcessed(1)).thenReturn(true); } if (filter.accept(event)) { @@ -153,6 +154,7 @@ public class TestHiveNotificationFetcher { }); events = fetcher.fetchNotifications(1); + verify(store, times(1)).isNotificationProcessed(Mockito.anyString()); assertEquals(2, events.size()); assertEquals(1, events.get(0).getEventId()); assertEquals("CREATE_TABLE", events.get(0).getEventType()); @@ -160,4 +162,200 @@ public class TestHiveNotificationFetcher { assertEquals("ALTER_TABLE", events.get(1).getEventType()); } } + + /** + * Test verifies that any out-of-sync notifications which below the SENTRY_HMS_NOTIFICATION_REFETCH_COUNT + * threshold will be fetched in the subsequent fetches. + * @throws Exception + */ + @Test + public void testOutofSyncNotifications() throws Exception { + SentryStore store = Mockito.mock(SentryStore.class); + HiveConnectionFactory hmsConnection = Mockito.mock(HiveConnectionFactory.class); + HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class); + + Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient)); + + try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, new Configuration())) { + List<NotificationEvent> events; + + // This mock will also test that the NotificationFilter works as expected + Mockito.when(hmsClient.getNextNotification(Mockito.eq(0L), Mockito.eq(Integer.MAX_VALUE), + (NotificationFilter) Mockito.isNull())).thenAnswer(new Answer<NotificationEventResponse>() { + @Override + public NotificationEventResponse answer(InvocationOnMock invocation) + throws Throwable { + List<NotificationEvent> events = Arrays.<NotificationEvent>asList( + new NotificationEvent(1L, 0, "CREATE_DATABASE", ""), + new NotificationEvent(2L, 0, "CREATE_TABLE", ""), + new NotificationEvent(3L, 0, "CREATE_TABLE", ""), + new NotificationEvent(7L, 0, "CREATE_TABLE", ""), + new NotificationEvent(8L, 0, "CREATE_TABLE", ""), + new NotificationEvent(9L, 0, "CREATE_TABLE", "") + ); + NotificationEventResponse response = new NotificationEventResponse(events); + for (NotificationEvent event : events) { + fetcher.updateCache(event); + } + return response; + } + }); + + events = fetcher.fetchNotifications(0); + assertEquals(6, events.size()); + assertEquals(1, events.get(0).getEventId()); + assertEquals("CREATE_DATABASE", events.get(0).getEventType()); + assertEquals(2, events.get(1).getEventId()); + assertEquals("CREATE_TABLE", events.get(1).getEventType()); + verify(hmsClient, times(1)).getNextNotification(Mockito.eq(0L), Mockito.anyInt(), Mockito.anyObject()); + + reset(hmsClient); + // This mock will also test that the NotificationFilter works as expected + Mockito.when(hmsClient.getNextNotification(Mockito.eq(0L), Mockito.eq(Integer.MAX_VALUE), + (NotificationFilter) Mockito.notNull())).thenAnswer(new Answer<NotificationEventResponse>() { + @Override + public NotificationEventResponse answer(InvocationOnMock invocation) + throws Throwable { + NotificationFilter filter = (NotificationFilter) invocation.getArguments()[2]; + NotificationEventResponse response = new NotificationEventResponse(); + + List<NotificationEvent> events = Arrays.<NotificationEvent>asList( + new NotificationEvent(1L, 0, "CREATE_DATABASE", ""), + new NotificationEvent(2L, 0, "CREATE_TABLE", ""), + new NotificationEvent(3L, 0, "CREATE_TABLE", ""), + new NotificationEvent(4L, 0, "CREATE_TABLE", ""), + new NotificationEvent(5L, 0, "CREATE_TABLE", ""), + new NotificationEvent(6L, 0, "CREATE_TABLE", ""), + new NotificationEvent(7L, 0, "CREATE_TABLE", ""), + new NotificationEvent(8L, 0, "CREATE_TABLE", ""), + new NotificationEvent(9L, 0, "CREATE_TABLE", "") + ); + + for (NotificationEvent event : events) { + // We simulate that CREATE_DATABASE is already processed + if (event.getEventId() == 9) { + Mockito.when(store.isNotificationIdProcessed(9)).thenReturn(true); + } + if (filter.accept(event)) { + response.addToEvents(event); + } + } + + return response; + } + }); + + events = fetcher.fetchNotifications(9); + assertEquals(3, events.size()); + assertEquals(4, events.get(0).getEventId()); + verify(store, times(3)).isNotificationProcessed(Mockito.anyString()); + verify(hmsClient, times(1)).getNextNotification(Mockito.eq(0L), Mockito.anyInt(), Mockito.anyObject()); + } + } + + /** + * Test verifies that any out-of-sync notifications which above the SENTRY_HMS_NOTIFICATION_REFETCH_COUNT + * threshold will be lost and will not be fetched. + * @throws Exception + */ + @Test + public void testMissingNotifications() throws Exception { + SentryStore store = Mockito.mock(SentryStore.class); + HiveConnectionFactory hmsConnection = Mockito.mock(HiveConnectionFactory.class); + HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class); + + Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient)); + + Configuration conf = new Configuration(); + conf.set(ServiceConstants.ServerConfig.SENTRY_HMS_NOTIFICATION_REFETCH_COUNT, "3"); + try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, conf)) { + List<NotificationEvent> events; + + // This mock will also test that the NotificationFilter works as expected + Mockito.when(hmsClient.getNextNotification(Mockito.eq(0L), Mockito.eq(Integer.MAX_VALUE), + (NotificationFilter) Mockito.isNull())).thenAnswer(new Answer<NotificationEventResponse>() { + @Override + public NotificationEventResponse answer(InvocationOnMock invocation) + throws Throwable { + List<NotificationEvent> events = Arrays.<NotificationEvent>asList( + new NotificationEvent(1L, 0, "CREATE_DATABASE", ""), + new NotificationEvent(2L, 0, "CREATE_TABLE", ""), + new NotificationEvent(8L, 0, "CREATE_TABLE", ""), + new NotificationEvent(9L, 0, "CREATE_TABLE", "") + ); + NotificationEventResponse response = new NotificationEventResponse(events); + for (NotificationEvent event : events) { + fetcher.updateCache(event); + } + return response; + } + }); + + events = fetcher.fetchNotifications(0); + assertEquals(4, events.size()); + assertEquals(1, events.get(0).getEventId()); + assertEquals("CREATE_DATABASE", events.get(0).getEventType()); + assertEquals(2, events.get(1).getEventId()); + assertEquals("CREATE_TABLE", events.get(1).getEventType()); + verify(hmsClient, times(1)).getNextNotification(Mockito.eq(0L), Mockito.anyInt(), Mockito.anyObject()); + + reset(hmsClient); + // This mock will also test that the NotificationFilter works as expected + Mockito.when(hmsClient.getNextNotification(Mockito.eq(6L), Mockito.eq(Integer.MAX_VALUE), + (NotificationFilter) Mockito.notNull())).thenAnswer(new Answer<NotificationEventResponse>() { + @Override + public NotificationEventResponse answer(InvocationOnMock invocation) + throws Throwable { + NotificationFilter filter = (NotificationFilter) invocation.getArguments()[2]; + NotificationEventResponse response = new NotificationEventResponse(); + + List<NotificationEvent> events = Arrays.<NotificationEvent>asList( + new NotificationEvent(7L, 0, "CREATE_TABLE", ""), + new NotificationEvent(8L, 0, "CREATE_TABLE", ""), + new NotificationEvent(9L, 0, "CREATE_TABLE", "") + ); + + for (NotificationEvent event : events) { + // We simulate that CREATE_DATABASE is already processed + if (event.getEventId() == 9) { + Mockito.when(store.isNotificationIdProcessed(9)).thenReturn(true); + } + if (filter.accept(event)) { + response.addToEvents(event); + } + } + + return response; + } + }); + + events = fetcher.fetchNotifications(9); + assertEquals(1, events.size()); + assertEquals(7, events.get(0).getEventId()); + verify(store, times(1)).isNotificationProcessed(Mockito.anyString()); + verify(hmsClient, times(1)).getNextNotification(Mockito.eq(6L), Mockito.anyInt(), Mockito.anyObject()); + } + } + + @Test + public void verifyCache() throws Exception { + SentryStore store = Mockito.mock(SentryStore.class); + HiveConnectionFactory hmsConnection = Mockito.mock(HiveConnectionFactory.class); + HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class); + + Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient)); + Configuration conf = new Configuration(); + // With this configuration, cache size should be 9. + conf.set(ServiceConstants.ServerConfig.SENTRY_HMS_NOTIFICATION_REFETCH_COUNT, "3"); + HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, conf); + + for (int i = 0; i < 15 ; i++) { + fetcher.updateCache(new NotificationEvent(i, 0, "CREATE_DATABASE", "")); + } + + assertEquals("Invalid Cache size", 3, fetcher.getCacheSize()); + for (int i = 0; i < (15 - (3 + (3/HiveNotificationFetcher.CACHE_BUFFER_FACTOR))) ; i++) { + assertFalse("Cache entry for " + i + " should have been evicted", fetcher.isFoundInCache(i)); + } + } } http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcherCache.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcherCache.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcherCache.java new file mode 100644 index 0000000..a81fdf4 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcherCache.java @@ -0,0 +1,203 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + <p> + http://www.apache.org/licenses/LICENSE-2.0 + <p> + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.sentry.service.thrift; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; + +import com.google.common.io.Files; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.security.alias.CredentialProvider; +import org.apache.hadoop.security.alias.CredentialProviderFactory; +import org.apache.hadoop.security.alias.UserProvider; +import org.apache.sentry.provider.db.service.persistent.SentryStore; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +/** + * This class tests the notification metadata cache in HiveNotificationFetcher. + */ +public class TestHiveNotificationFetcherCache { + private static Configuration conf = null; + private static File dataDir; + private static File policyFilePath; + private static String[] adminGroups = {"adminGroup1"}; + private static char[] passwd = new char[]{'1', '2', '3'}; + private static SentryStore store; + private static HiveConnectionFactory hmsConnection = Mockito.mock(HiveConnectionFactory.class); + private static HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class); + private static List<NotificationEvent> unFilteredEvents = new ArrayList<NotificationEvent>(); + + + @BeforeClass + public static void setup() throws Exception { + conf = new Configuration(false); + final String ourUrl = UserProvider.SCHEME_NAME + ":///"; + conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, ourUrl); + + // enable HDFS sync, so perm and path changes will be saved into DB + conf.set(ServiceConstants.ServerConfig.PROCESSOR_FACTORIES, "org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory"); + conf.set(ServiceConstants.ServerConfig.SENTRY_POLICY_STORE_PLUGINS, "org.apache.sentry.hdfs.SentryPlugin"); + + // THis should be a UserGroupInformation provider + CredentialProvider provider = CredentialProviderFactory.getProviders(conf).get(0); + + + // The user credentials are stored as a static variable by UserGrouoInformation provider. + // We need to only set the password the first time, an attempt to set it for the second + // time fails with an exception. + if (provider.getCredentialEntry(ServiceConstants.ServerConfig.SENTRY_STORE_JDBC_PASS) == null) { + provider.createCredentialEntry(ServiceConstants.ServerConfig.SENTRY_STORE_JDBC_PASS, passwd); + provider.flush(); + } + + dataDir = new File(Files.createTempDir(), "sentry_policy_db"); + conf.set(ServiceConstants.ServerConfig.SENTRY_VERIFY_SCHEM_VERSION, "false"); + conf.set(ServiceConstants.ServerConfig.SENTRY_STORE_JDBC_URL, + "jdbc:derby:;databaseName=" + dataDir.getPath() + ";create=true"); + conf.set(ServiceConstants.ServerConfig.SENTRY_STORE_JDBC_PASS, "dummy"); + conf.setStrings(ServiceConstants.ServerConfig.ADMIN_GROUPS, adminGroups); + conf.set(ServiceConstants.ServerConfig.SENTRY_STORE_GROUP_MAPPING, + ServiceConstants.ServerConfig.SENTRY_STORE_LOCAL_GROUP_MAPPING); + policyFilePath = new File(dataDir, "local_policy_file.ini"); + conf.set(ServiceConstants.ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE, + policyFilePath.getPath()); + + // These tests do not need to retry transactions, so setting to 1 to reduce testing time + conf.setInt(ServiceConstants.ServerConfig.SENTRY_STORE_TRANSACTION_RETRY, 1); + + // SentryStore should be initialized only once. The tables created by the test cases will + // be cleaned up during the @After method. + store = new SentryStore(conf); + + boolean hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabled(conf); + store.setPersistUpdateDeltas(hdfsSyncEnabled); + Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient)); + + Mockito.when(hmsClient.getNextNotification(Mockito.eq(0L), Mockito.eq(Integer.MAX_VALUE), + (IMetaStoreClient.NotificationFilter) Mockito.isNull())).thenAnswer(new Answer<NotificationEventResponse>() { + @Override + public NotificationEventResponse answer(InvocationOnMock invocation) + throws Throwable { + NotificationEventResponse response = new NotificationEventResponse(); + for (NotificationEvent event : unFilteredEvents) { + response.addToEvents(event); + } + + return response; + } + }); + + // This mock will also test that the NotificationFilter works as expected + Mockito.when(hmsClient.getNextNotification(Mockito.eq(0L), Mockito.eq(Integer.MAX_VALUE), + (IMetaStoreClient.NotificationFilter) Mockito.notNull())).thenAnswer(new Answer<NotificationEventResponse>() { + @Override + public NotificationEventResponse answer(InvocationOnMock invocation) + throws Throwable { + IMetaStoreClient.NotificationFilter filter = (IMetaStoreClient.NotificationFilter) invocation.getArguments()[2]; + NotificationEventResponse response = new NotificationEventResponse(); + for (NotificationEvent event : unFilteredEvents) { + if (filter.accept(event)) { + response.addToEvents(event); + } + } + + return response; + } + }); + } + + @After + public void after() { + store.clearAllTables(); + unFilteredEvents.clear(); + } + + /** + * This test makes sure that there is no additional load on the database because of additional fetches done by + * notification fetcher by making sure that there are no cache miss when there are no gaps and out-of-sequence + * notifications. + * @throws Exception + */ + @Test + public void testWithNoGapsAndOutOfSequenceNotifications() throws Exception { + + try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, new Configuration())) { + List<NotificationEvent> filteredEvents; + + for (int count = 1; count <= 150; count++) { + unFilteredEvents.add(new NotificationEvent(count, 0, "CREATE_DATABASE", "")); + } + + filteredEvents = fetcher.fetchNotifications(0); + assertEquals(150, filteredEvents.size()); + assertEquals(1, filteredEvents.get(0).getEventId()); + assertEquals("CREATE_DATABASE", filteredEvents.get(0).getEventType()); + + filteredEvents = fetcher.fetchNotifications(150); + assertEquals(0, filteredEvents.size()); + assertEquals(0, fetcher.getNotificationCacheMissCount()); + } + } + + /** + * This test makes sure that there is no additional load on the database because of additional fetches done by + * notification fetcher by making sure that there are no cache miss even when there are gaps and out-of-sequence + * notifications. + * @throws Exception + */ + @Test + public void testWithGapsAndOutOfSequenceNotifications() throws Exception { + try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, new Configuration())) { + List<NotificationEvent> filteredEvents = new ArrayList<NotificationEvent>(); + int count = 1; + for (int fetchCount = 1; fetchCount <= 10; fetchCount++) { + for (; count <= (fetchCount * 10); count++) { + if (!(count % 3 == 0 || count % 7 == 0) || (count %10 == 0)) { + unFilteredEvents.add(new NotificationEvent(count, 0, "CREATE_DATABASE", "")); + } + } + filteredEvents = fetcher.fetchNotifications(count - 1); + } + + assertTrue("Invalid notification count", (filteredEvents.size() < 100)); + assertEquals(0, fetcher.getNotificationCacheMissCount()); + + for (count = 1; count <= 100; count++) { + if (count % 3 == 0 || count % 7 == 0) { + unFilteredEvents.add(new NotificationEvent(count, 0, "CREATE_DATABASE", "")); + } + } + fetcher.fetchNotifications(count - 1); + assertEquals(0, fetcher.getNotificationCacheMissCount()); + } + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/afcaa499/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestSnapshotCreation.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestSnapshotCreation.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestSnapshotCreation.java new file mode 100644 index 0000000..88ce7f9 --- /dev/null +++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestSnapshotCreation.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.tests.e2e.dbprovider; + +import static org.junit.Assert.assertEquals; + +import java.sql.Connection; +import java.sql.Statement; + +import org.apache.sentry.tests.e2e.hdfs.TestHDFSIntegrationBase; +import org.junit.Before; +import org.junit.Test; + +/** + * This class covers basic scenario of snapshot creation and makes sure that + * HMSFollower takes a full snapshot when sentry server comes up and not + * subsequently. + */ +public class TestSnapshotCreation extends TestHDFSIntegrationBase { + + private final static String tableName1 = "tb_1"; + private final static String tableName2 = "tb_2"; + private final static String tableName3 = "tb_3"; + private final static String tableName4 = "tb_4"; + + protected static final String DB1 = "db_1", + DB2 = "db_2"; + + private Connection connection; + private Statement statement; + + @Before + public void initialize() throws Exception { + super.setUpTempDir(); + admin = "hive"; + connection = hiveServer2.createConnection(admin, admin); + statement = connection.createStatement(); + statement.execute("create role admin_role"); + statement.execute("grant role admin_role to group hive"); + statement.execute("grant all on server server1 to role admin_role"); + } + + @Test + public void BasicSanity() throws Exception { + long latestSnapshotId = 0; + //Sleep for a sec allowing HMSFollower to create a snapshot + Thread.sleep(1000); + dbNames = new String[]{DB1}; + roles = new String[]{"admin_role", "all_db1", "all_tbl1", "all_tbl2"}; + do { + //Sleep for a sec allowing HMSFollower to create a snapshot + Thread.sleep(1000); + latestSnapshotId = sentryServer.get(0).getCurrentAuthzPathsSnapshotID(); + } while (latestSnapshotId == 0); + + statement.execute("CREATE DATABASE " + DB1); + statement.execute("CREATE DATABASE " + DB2); + statement.execute("create table " + DB1 + "." + tableName1 + + " (under_col int comment 'the under column', value string)"); + statement.execute("create table " + DB1 + "." + tableName2 + + " (under_col int comment 'the under column', value string)"); + + Thread.sleep(5000); + + statement.execute("create table " + DB2 + "." + tableName3 + + " (under_col int comment 'the under column', value string)"); + statement.execute("create table " + DB2 + "." + tableName4 + + " (under_col int comment 'the under column', value string)"); + + assertEquals("Another snapshot is created, Snapshot ID: ", latestSnapshotId, sentryServer.get(0).getCurrentAuthzPathsSnapshotID()); + } +}